From 6c48d5be6cf2b0792b5645246b5792cd881027ae Mon Sep 17 00:00:00 2001 From: Jerome Date: Sun, 10 Apr 2022 09:40:32 +1000 Subject: [PATCH 1/3] Xin 181 178 (#80) * add skeleton forensics * remove duplicated penalty check in verify header --- consensus/XDPoS/engines/engine_v2/engine.go | 4 ++ .../XDPoS/engines/engine_v2/forensics.go | 52 +++++++++++++++++++ .../XDPoS/engines/engine_v2/verifyHeader.go | 25 ++++----- consensus/XDPoS/utils/errors.go | 3 +- .../engine_v2_tests/verify_header_test.go | 13 ++--- 5 files changed, 70 insertions(+), 27 deletions(-) create mode 100644 consensus/XDPoS/engines/engine_v2/forensics.go diff --git a/consensus/XDPoS/engines/engine_v2/engine.go b/consensus/XDPoS/engines/engine_v2/engine.go index c1eb7ebbf5..bdd0c18472 100644 --- a/consensus/XDPoS/engines/engine_v2/engine.go +++ b/consensus/XDPoS/engines/engine_v2/engine.go @@ -57,6 +57,8 @@ type XDPoS_v2 struct { HookReward func(chain consensus.ChainReader, state *state.StateDB, parentState *state.StateDB, header *types.Header) (map[string]interface{}, error) HookPenalty func(chain consensus.ChainReader, number *big.Int, parentHash common.Hash, candidates []common.Address) ([]common.Address, error) + + forensics *Forensics } func New(config *params.XDPoSConfig, db ethdb.Database, waitPeriodCh chan int) *XDPoS_v2 { @@ -110,6 +112,8 @@ func New(config *params.XDPoSConfig, db ethdb.Database, waitPeriodCh chan int) * timeoutTimer.OnTimeoutFn = engine.OnCountdownTimeout engine.periodicJob() + + engine.AttachForensics() return engine } diff --git a/consensus/XDPoS/engines/engine_v2/forensics.go b/consensus/XDPoS/engines/engine_v2/forensics.go new file mode 100644 index 0000000000..29108c69a4 --- /dev/null +++ b/consensus/XDPoS/engines/engine_v2/forensics.go @@ -0,0 +1,52 @@ +package engine_v2 + +import ( + "github.com/XinFinOrg/XDPoSChain/common" + "github.com/XinFinOrg/XDPoSChain/consensus/XDPoS/utils" + "github.com/XinFinOrg/XDPoSChain/log" +) + +type ForensicProof struct { + QcWithSmallerRound utils.QuorumCert + QcWithLargerRound utils.QuorumCert + DivergingHash common.Hash + HashesTillSmallerRoundQc []common.Hash + HashesTillLargerRoundQc []common.Hash + AcrossEpochs bool + QcWithSmallerRoundAddresses []common.Address + QcWithLargerRoundAddresses []common.Address +} + +type Forensics struct { + ReceiverCh <-chan utils.QuorumCert + Abort chan<- struct{} +} + +// Initiate a forensics process +func (x *XDPoS_v2) AttachForensics() { + receiver := make(chan utils.QuorumCert) + abort := make(chan struct{}) + + go func() { + for { + // A real event arrived, process interesting content + select { + case quorumCert := <-receiver: + x.ProcessForensics(quorumCert) + case <-abort: + return + } + } + }() + x.forensics = &Forensics{ + ReceiverCh: receiver, + Abort: abort, + } +} + +func (x *XDPoS_v2) SendForensicProof() { +} + +func (x *XDPoS_v2) ProcessForensics(quorumCert utils.QuorumCert) { + log.Info("Received a QC in forensics", "QC", quorumCert) +} diff --git a/consensus/XDPoS/engines/engine_v2/verifyHeader.go b/consensus/XDPoS/engines/engine_v2/verifyHeader.go index 2b78022fe6..535221aea3 100644 --- a/consensus/XDPoS/engines/engine_v2/verifyHeader.go +++ b/consensus/XDPoS/engines/engine_v2/verifyHeader.go @@ -101,7 +101,14 @@ func (x *XDPoS_v2) verifyHeader(chain consensus.ChainReader, header *types.Heade if len(header.Validators)%common.AddressLength != 0 { return utils.ErrInvalidCheckpointSigners } - isLegit, err := x.isValidatorsLegit(chain, header) + + _, localPenalties, err := x.calcMasternodes(chain, header.Number, header.ParentHash) + if err != nil { + log.Error("[verifyHeader] Fail to calculate master nodes list with penalty", "Number", header.Number, "Hash", header.Hash()) + return err + } + + isLegit, err := x.isValidatorsLegit(chain, header, localPenalties) if err != nil { log.Error("[verifyHeader] Error while trying to check if the validators are legit", "Hash", header.Hash(), "Number", header.Number, "ValidatorsLength", len(header.Validators)) return err @@ -109,17 +116,6 @@ func (x *XDPoS_v2) verifyHeader(chain consensus.ChainReader, header *types.Heade if !isLegit { return utils.ErrValidatorsNotLegit } - - _, penalties, err := x.calcMasternodes(chain, header.Number, header.ParentHash) - if err != nil { - log.Error("[verifyHeader] Fail to calculate master nodes list with penalty", "Number", header.Number, "Hash", header.Hash()) - return err - } - - if !utils.CompareSignersLists(common.ExtractAddressFromBytes(header.Penalties), penalties) { - return utils.ErrPenaltyListDoesNotMatch - } - } else { if len(header.Validators) != 0 { log.Warn("[verifyHeader] Validators shall not have values in non-epochSwitch block", "Hash", header.Hash(), "Number", header.Number, "header.Validators", header.Validators) @@ -167,16 +163,15 @@ func (x *XDPoS_v2) verifyHeader(chain consensus.ChainReader, header *types.Heade } // Verify the header validators address is legit by checking against its snapshot masternode list minutes the penalty list, we also ensure the order matches -func (x *XDPoS_v2) isValidatorsLegit(chain consensus.ChainReader, header *types.Header) (bool, error) { +func (x *XDPoS_v2) isValidatorsLegit(chain consensus.ChainReader, header *types.Header, penalties []common.Address) (bool, error) { snap, err := x.getSnapshot(chain, header.Number.Uint64(), false) if err != nil { log.Error("[isValidatorsLegit] Error while trying to get snapshot", "BlockNumber", header.Number.Int64(), "Hash", header.Hash().Hex(), "error", err) return false, err } // snap.NextEpochMasterNodes - penaltyList := common.ExtractAddressFromBytes(header.Penalties) penaltyMap := make(map[common.Address]bool) - for _, item := range penaltyList { + for _, item := range penalties { penaltyMap[item] = true } diff --git a/consensus/XDPoS/utils/errors.go b/consensus/XDPoS/utils/errors.go index 8ebd5438fd..2dfbb6d538 100644 --- a/consensus/XDPoS/utils/errors.go +++ b/consensus/XDPoS/utils/errors.go @@ -91,8 +91,7 @@ var ( ErrCoinbaseAndValidatorMismatch = errors.New("Validaotor and coinbase address in header does not match") ErrNotItsTurn = errors.New("Not validator's turn to mine this block") - ErrPenaltyListDoesNotMatch = errors.New("Incoming block penalty list does not match") - ErrRoundInvalid = errors.New("Invalid Round, it shall be bigger than QC round") + ErrRoundInvalid = errors.New("Invalid Round, it shall be bigger than QC round") ErrAlreadyMined = errors.New("Already mined") ) diff --git a/consensus/tests/engine_v2_tests/verify_header_test.go b/consensus/tests/engine_v2_tests/verify_header_test.go index 5a698379ae..3c6a1e7fc3 100644 --- a/consensus/tests/engine_v2_tests/verify_header_test.go +++ b/consensus/tests/engine_v2_tests/verify_header_test.go @@ -80,18 +80,13 @@ func TestShouldVerifyBlock(t *testing.T) { err = adaptor.VerifyHeader(blockchain, invalidValidatorsSignerBlock, true) assert.Equal(t, utils.ErrInvalidCheckpointSigners, err) - invalidPenaltiesExistBlock := blockchain.GetBlockByNumber(901).Header() - invalidPenaltiesExistBlock.Penalties = common.Hex2BytesFixed("123131231", 20) - err = adaptor.VerifyHeader(blockchain, invalidPenaltiesExistBlock, true) - assert.Equal(t, utils.ErrPenaltyListDoesNotMatch, err) - // non-epoch switch invalidValidatorsExistBlock := blockchain.GetBlockByNumber(902).Header() invalidValidatorsExistBlock.Validators = []byte{123} err = adaptor.VerifyHeader(blockchain, invalidValidatorsExistBlock, true) assert.Equal(t, utils.ErrInvalidFieldInNonEpochSwitch, err) - invalidPenaltiesExistBlock = blockchain.GetBlockByNumber(902).Header() + invalidPenaltiesExistBlock := blockchain.GetBlockByNumber(902).Header() invalidPenaltiesExistBlock.Penalties = common.Hex2BytesFixed("123131231", 20) err = adaptor.VerifyHeader(blockchain, invalidPenaltiesExistBlock, true) assert.Equal(t, utils.ErrInvalidFieldInNonEpochSwitch, err) @@ -163,10 +158,8 @@ func TestShouldVerifyBlock(t *testing.T) { // Make the validators not legit by adding something to the penalty validatorsNotLegit := blockchain.GetBlockByNumber(901).Header() - penalties := []common.Address{acc1Addr} - for _, v := range penalties { - validatorsNotLegit.Penalties = append(validatorsNotLegit.Penalties, v[:]...) - } + + validatorsNotLegit.Validators = append(validatorsNotLegit.Validators, acc1Addr[:]...) err = adaptor.VerifyHeader(blockchain, validatorsNotLegit, true) assert.Equal(t, utils.ErrValidatorsNotLegit, err) } From 8fde52c512eb7ad2da385cdea8119851210a0827 Mon Sep 17 00:00:00 2001 From: wgr523 Date: Fri, 22 Apr 2022 00:12:44 +0800 Subject: [PATCH 2/3] Xin 145 (#82) * add HandleProposedBlock() in procFutureBlocks() * add proposedBlockHandler for downloader --- cmd/XDC/chaincmd.go | 5 +-- core/blockchain.go | 15 ++++++++- eth/downloader/downloader.go | 55 ++++++++++++++++++------------- eth/downloader/downloader_test.go | 10 ++++-- eth/fetcher/fetcher.go | 1 + eth/handler.go | 17 +++++++--- les/handler.go | 10 ++++-- 7 files changed, 80 insertions(+), 33 deletions(-) diff --git a/cmd/XDC/chaincmd.go b/cmd/XDC/chaincmd.go index 3c7db19eaa..4bf9e27198 100644 --- a/cmd/XDC/chaincmd.go +++ b/cmd/XDC/chaincmd.go @@ -19,13 +19,14 @@ package main import ( "encoding/json" "fmt" - "github.com/XinFinOrg/XDPoSChain/core/rawdb" "os" "runtime" "strconv" "sync/atomic" "time" + "github.com/XinFinOrg/XDPoSChain/core/rawdb" + "github.com/XinFinOrg/XDPoSChain/cmd/utils" "github.com/XinFinOrg/XDPoSChain/common" "github.com/XinFinOrg/XDPoSChain/console" @@ -368,7 +369,7 @@ func copyDb(ctx *cli.Context) error { chain, chainDb := utils.MakeChain(ctx, stack) syncmode := *utils.GlobalTextMarshaler(ctx, utils.SyncModeFlag.Name).(*downloader.SyncMode) - dl := downloader.New(syncmode, chainDb, new(event.TypeMux), chain, nil, nil) + dl := downloader.New(syncmode, chainDb, new(event.TypeMux), chain, nil, nil, nil) // Create a source peer to satisfy downloader requests from db, err := rawdb.NewLevelDBDatabase(ctx.Args().First(), ctx.GlobalInt(utils.CacheFlag.Name), 256, "") diff --git a/core/blockchain.go b/core/blockchain.go index 9ca580c17b..bb597cbb6b 100644 --- a/core/blockchain.go +++ b/core/blockchain.go @@ -974,7 +974,20 @@ func (bc *BlockChain) procFutureBlocks() { // Insert one by one as chain insertion needs contiguous ancestry between blocks for i := range blocks { - bc.InsertChain(blocks[i : i+1]) + _, err := bc.InsertChain(blocks[i : i+1]) + // let consensus engine handle the last block (e.g. for voting) + if i == len(blocks)-1 && err == nil { + engine, ok := bc.Engine().(*XDPoS.XDPoS) + if ok { + go func() { + header := blocks[i].Header() + err = engine.HandleProposedBlock(bc, header) + if err != nil { + log.Info("[procFutureBlocks] handle proposed block has error", "err", err, "block hash", header.Hash(), "number", header.Number) + } + }() + } + } } } } diff --git a/eth/downloader/downloader.go b/eth/downloader/downloader.go index 17fba736da..5e32e6b6d8 100644 --- a/eth/downloader/downloader.go +++ b/eth/downloader/downloader.go @@ -36,6 +36,9 @@ import ( "github.com/XinFinOrg/XDPoSChain/params" ) +// proposeBlockHandlerFn is a callback type to handle a block by the consensus +type proposeBlockHandlerFn func(header *types.Header) error + var ( MaxHashFetch = 512 // Amount of hashes to be fetched per retrieval request MaxBlockFetch = 128 // Amount of blocks to be fetched per retrieval request @@ -114,7 +117,8 @@ type Downloader struct { blockchain BlockChain // Callbacks - dropPeer peerDropFn // Drops a peer for misbehaving + dropPeer peerDropFn // Drops a peer for misbehaving + handleProposedBlock proposeBlockHandlerFn // Consensus v2 specific: Hanle new proposed block // Status synchroniseMock func(id string, hash common.Hash) error // Replacement for synchronise during testing @@ -199,31 +203,32 @@ type BlockChain interface { } // New creates a new downloader to fetch hashes and blocks from remote peers. -func New(mode SyncMode, stateDb ethdb.Database, mux *event.TypeMux, chain BlockChain, lightchain LightChain, dropPeer peerDropFn) *Downloader { +func New(mode SyncMode, stateDb ethdb.Database, mux *event.TypeMux, chain BlockChain, lightchain LightChain, dropPeer peerDropFn, handleProposedBlock proposeBlockHandlerFn) *Downloader { if lightchain == nil { lightchain = chain } dl := &Downloader{ - mode: mode, - stateDB: stateDb, - mux: mux, - queue: newQueue(), - peers: newPeerSet(), - rttEstimate: uint64(rttMaxEstimate), - rttConfidence: uint64(1000000), - blockchain: chain, - lightchain: lightchain, - dropPeer: dropPeer, - headerCh: make(chan dataPack, 1), - bodyCh: make(chan dataPack, 1), - receiptCh: make(chan dataPack, 1), - bodyWakeCh: make(chan bool, 1), - receiptWakeCh: make(chan bool, 1), - headerProcCh: make(chan []*types.Header, 1), - quitCh: make(chan struct{}), - stateCh: make(chan dataPack), - stateSyncStart: make(chan *stateSync), + mode: mode, + stateDB: stateDb, + mux: mux, + queue: newQueue(), + peers: newPeerSet(), + rttEstimate: uint64(rttMaxEstimate), + rttConfidence: uint64(1000000), + blockchain: chain, + lightchain: lightchain, + dropPeer: dropPeer, + handleProposedBlock: handleProposedBlock, + headerCh: make(chan dataPack, 1), + bodyCh: make(chan dataPack, 1), + receiptCh: make(chan dataPack, 1), + bodyWakeCh: make(chan bool, 1), + receiptWakeCh: make(chan bool, 1), + headerProcCh: make(chan []*types.Header, 1), + quitCh: make(chan struct{}), + stateCh: make(chan dataPack), + stateSyncStart: make(chan *stateSync), syncStatsState: stateSyncStats{ processed: core.GetTrieSyncProgress(stateDb), }, @@ -1393,7 +1398,13 @@ func (d *Downloader) importBlockResults(results []*fetchResult) error { log.Debug("Downloaded item processing failed", "number", results[index].Header.Number, "hash", results[index].Header.Hash(), "err", err) return errInvalidChain } - + if d.handleProposedBlock != nil { + header := blocks[len(blocks)-1].Header() + err := d.handleProposedBlock(header) + if err != nil { + log.Info("[downloader] handle proposed block has error", "err", err, "block hash", header.Hash(), "number", header.Number) + } + } return nil } diff --git a/eth/downloader/downloader_test.go b/eth/downloader/downloader_test.go index 09c59c4ca7..03da4920a7 100644 --- a/eth/downloader/downloader_test.go +++ b/eth/downloader/downloader_test.go @@ -19,13 +19,14 @@ package downloader import ( "errors" "fmt" - "github.com/XinFinOrg/XDPoSChain/core/rawdb" "math/big" "sync" "sync/atomic" "testing" "time" + "github.com/XinFinOrg/XDPoSChain/core/rawdb" + "github.com/XinFinOrg/XDPoSChain/common" "github.com/XinFinOrg/XDPoSChain/consensus/ethash" "github.com/XinFinOrg/XDPoSChain/core" @@ -97,7 +98,7 @@ func newTester() *downloadTester { tester.stateDb = rawdb.NewMemoryDatabase() tester.stateDb.Put(genesis.Root().Bytes(), []byte{0x00}) - tester.downloader = New(FullSync, tester.stateDb, new(event.TypeMux), tester, nil, tester.dropPeer) + tester.downloader = New(FullSync, tester.stateDb, new(event.TypeMux), tester, nil, tester.dropPeer, tester.handleProposedBlock) return tester } @@ -457,6 +458,11 @@ func (dl *downloadTester) dropPeer(id string) { dl.downloader.UnregisterPeer(id) } +// an empty handleProposedBlock function +func (dl *downloadTester) handleProposedBlock(header *types.Header) error { + return nil +} + // Config retrieves the blockchain's chain configuration. func (dl *downloadTester) Config() *params.ChainConfig { return params.TestChainConfig } diff --git a/eth/fetcher/fetcher.go b/eth/fetcher/fetcher.go index edd8132f7c..7b8389bf41 100644 --- a/eth/fetcher/fetcher.go +++ b/eth/fetcher/fetcher.go @@ -57,6 +57,7 @@ type bodyRequesterFn func([]common.Hash) error // headerVerifierFn is a callback type to verify a block's header for fast propagation. type headerVerifierFn func(header *types.Header) error +// proposeBlockHandlerFn is a callback type to handle a block by the consensus type proposeBlockHandlerFn func(header *types.Header) error // blockBroadcasterFn is a callback type for broadcasting a block to connected peers. diff --git a/eth/handler.go b/eth/handler.go index a55c2af1bb..6b21d5f9e1 100644 --- a/eth/handler.go +++ b/eth/handler.go @@ -206,15 +206,24 @@ func NewProtocolManager(config *params.ChainConfig, mode downloader.SyncMode, ne if len(manager.SubProtocols) == 0 { return nil, errIncompatibleConfig } + + var handleProposedBlock func(header *types.Header) error + if config.XDPoS != nil { + handleProposedBlock = func(header *types.Header) error { + return engine.(*XDPoS.XDPoS).HandleProposedBlock(blockchain, header) + } + } else { + handleProposedBlock = func(header *types.Header) error { + return nil + } + } + // Construct the different synchronisation mechanisms - manager.downloader = downloader.New(mode, chaindb, manager.eventMux, blockchain, nil, manager.removePeer) + manager.downloader = downloader.New(mode, chaindb, manager.eventMux, blockchain, nil, manager.removePeer, handleProposedBlock) validator := func(header *types.Header) error { return engine.VerifyHeader(blockchain, header, true) } - handleProposedBlock := func(header *types.Header) error { - return engine.(*XDPoS.XDPoS).HandleProposedBlock(blockchain, header) - } heighter := func() uint64 { return blockchain.CurrentBlock().NumberU64() diff --git a/les/handler.go b/les/handler.go index 05462a77a9..ba2c774b44 100644 --- a/les/handler.go +++ b/les/handler.go @@ -21,12 +21,13 @@ import ( "encoding/binary" "errors" "fmt" - "github.com/XinFinOrg/XDPoSChain/core/rawdb" "math/big" "net" "sync" "time" + "github.com/XinFinOrg/XDPoSChain/core/rawdb" + "github.com/XinFinOrg/XDPoSChain/common" "github.com/XinFinOrg/XDPoSChain/consensus" "github.com/XinFinOrg/XDPoSChain/core" @@ -205,7 +206,7 @@ func NewProtocolManager(chainConfig *params.ChainConfig, lightSync bool, protoco } if lightSync { - manager.downloader = downloader.New(downloader.LightSync, chainDb, manager.eventMux, nil, blockchain, removePeer) + manager.downloader = downloader.New(downloader.LightSync, chainDb, manager.eventMux, nil, blockchain, removePeer, manager.handleProposedBlock) manager.peers.notify((*downloaderPeerNotify)(manager)) manager.fetcher = newLightFetcher(manager) } @@ -218,6 +219,11 @@ func (pm *ProtocolManager) removePeer(id string) { pm.peers.Unregister(id) } +// an empty handleProposedBlock function +func (pm *ProtocolManager) handleProposedBlock(header *types.Header) error { + return nil +} + func (pm *ProtocolManager) Start(maxPeers int) { pm.maxPeers = maxPeers From eb35d4e32e592c96ec462b4631287f4f1a75e4c5 Mon Sep 17 00:00:00 2001 From: Jerome Date: Sat, 23 Apr 2022 10:33:56 +1000 Subject: [PATCH 3/3] Add set committed QC function in forensics (#83) --- consensus/XDPoS/engines/engine_v2/engine.go | 36 ++++--- .../XDPoS/engines/engine_v2/forensics.go | 80 +++++++++++---- .../XDPoS/engines/engine_v2/testing_utils.go | 4 + .../tests/engine_v2_tests/forensics_test.go | 99 +++++++++++++++++++ 4 files changed, 183 insertions(+), 36 deletions(-) create mode 100644 consensus/tests/engine_v2_tests/forensics_test.go diff --git a/consensus/XDPoS/engines/engine_v2/engine.go b/consensus/XDPoS/engines/engine_v2/engine.go index bdd0c18472..b2a42f2223 100644 --- a/consensus/XDPoS/engines/engine_v2/engine.go +++ b/consensus/XDPoS/engines/engine_v2/engine.go @@ -107,13 +107,13 @@ func New(config *params.XDPoSConfig, db ethdb.Database, waitPeriodCh chan int) * }, highestVotedRound: utils.Round(0), highestCommitBlock: nil, + forensics: NewForensics(), } // Add callback to the timer timeoutTimer.OnTimeoutFn = engine.OnCountdownTimeout engine.periodicJob() - engine.AttachForensics() return engine } @@ -822,41 +822,41 @@ func (x *XDPoS_v2) verifyQC(blockChainReader consensus.ChainReader, quorumCert * } // Update local QC variables including highestQC & lockQuorumCert, as well as commit the blocks that satisfy the algorithm requirements -func (x *XDPoS_v2) processQC(blockChainReader consensus.ChainReader, quorumCert *utils.QuorumCert) error { +func (x *XDPoS_v2) processQC(blockChainReader consensus.ChainReader, incomingQuorumCert *utils.QuorumCert) error { log.Trace("[ProcessQC][Before]", "HighQC", x.highestQuorumCert) // 1. Update HighestQC - if quorumCert.ProposedBlockInfo.Round > x.highestQuorumCert.ProposedBlockInfo.Round { - x.highestQuorumCert = quorumCert + if incomingQuorumCert.ProposedBlockInfo.Round > x.highestQuorumCert.ProposedBlockInfo.Round { + x.highestQuorumCert = incomingQuorumCert } // 2. Get QC from header and update lockQuorumCert(lockQuorumCert is the parent of highestQC) - proposedBlockHeader := blockChainReader.GetHeaderByHash(quorumCert.ProposedBlockInfo.Hash) + proposedBlockHeader := blockChainReader.GetHeaderByHash(incomingQuorumCert.ProposedBlockInfo.Hash) if proposedBlockHeader == nil { - log.Error("[processQC] Block not found using the QC", "quorumCert.ProposedBlockInfo.Hash", quorumCert.ProposedBlockInfo.Hash, "quorumCert.ProposedBlockInfo.Number", quorumCert.ProposedBlockInfo.Number) - return fmt.Errorf("Block not found, number: %v, hash: %v", quorumCert.ProposedBlockInfo.Number, quorumCert.ProposedBlockInfo.Hash) + log.Error("[processQC] Block not found using the QC", "quorumCert.ProposedBlockInfo.Hash", incomingQuorumCert.ProposedBlockInfo.Hash, "incomingQuorumCert.ProposedBlockInfo.Number", incomingQuorumCert.ProposedBlockInfo.Number) + return fmt.Errorf("Block not found, number: %v, hash: %v", incomingQuorumCert.ProposedBlockInfo.Number, incomingQuorumCert.ProposedBlockInfo.Hash) } if proposedBlockHeader.Number.Cmp(x.config.V2.SwitchBlock) > 0 { // Extra field contain parent information - quorumCert, round, _, err := x.getExtraFields(proposedBlockHeader) + proposedBlockQuorumCert, round, _, err := x.getExtraFields(proposedBlockHeader) if err != nil { return err } - if x.lockQuorumCert == nil || quorumCert.ProposedBlockInfo.Round > x.lockQuorumCert.ProposedBlockInfo.Round { - x.lockQuorumCert = quorumCert + if x.lockQuorumCert == nil || proposedBlockQuorumCert.ProposedBlockInfo.Round > x.lockQuorumCert.ProposedBlockInfo.Round { + x.lockQuorumCert = proposedBlockQuorumCert } proposedBlockRound := &round // 3. Update commit block info - _, err = x.commitBlocks(blockChainReader, proposedBlockHeader, proposedBlockRound) + _, err = x.commitBlocks(blockChainReader, proposedBlockHeader, proposedBlockRound, incomingQuorumCert) if err != nil { - log.Error("[processQC] Fail to commitBlocks", "proposedBlockRound", proposedBlockRound) + log.Error("[processQC] Error while to commitBlocks", "proposedBlockRound", proposedBlockRound) return err } } // 4. Set new round - if quorumCert.ProposedBlockInfo.Round >= x.currentRound { - err := x.setNewRound(blockChainReader, quorumCert.ProposedBlockInfo.Round+1) + if incomingQuorumCert.ProposedBlockInfo.Round >= x.currentRound { + err := x.setNewRound(blockChainReader, incomingQuorumCert.ProposedBlockInfo.Round+1) if err != nil { - log.Error("[processQC] Fail to setNewRound", "new round to set", quorumCert.ProposedBlockInfo.Round+1) + log.Error("[processQC] Fail to setNewRound", "new round to set", incomingQuorumCert.ProposedBlockInfo.Round+1) return err } } @@ -893,7 +893,7 @@ func (x *XDPoS_v2) getSyncInfo() *utils.SyncInfo { } //Find parent and grandparent, check round number, if so, commit grandparent(grandGrandParent of currentBlock) -func (x *XDPoS_v2) commitBlocks(blockChainReader consensus.ChainReader, proposedBlockHeader *types.Header, proposedBlockRound *utils.Round) (bool, error) { +func (x *XDPoS_v2) commitBlocks(blockChainReader consensus.ChainReader, proposedBlockHeader *types.Header, proposedBlockRound *utils.Round, incomingQc *utils.QuorumCert) (bool, error) { // XDPoS v1.0 switch to v2.0, skip commit if big.NewInt(0).Sub(proposedBlockHeader.Number, big.NewInt(2)).Cmp(x.config.V2.SwitchBlock) <= 0 { return false, nil @@ -930,6 +930,10 @@ func (x *XDPoS_v2) commitBlocks(blockChainReader consensus.ChainReader, proposed Round: round, } log.Debug("Successfully committed block", "Committed block Hash", x.highestCommitBlock.Hash, "Committed round", x.highestCommitBlock.Round) + // Perform forensics related operation + var headerQcToBeCommitted []types.Header + headerQcToBeCommitted = append(headerQcToBeCommitted, *parentBlock, *proposedBlockHeader) + go x.forensics.SetCommittedQCs(headerQcToBeCommitted, *incomingQc) return true, nil } // Everything else, fail to commit diff --git a/consensus/XDPoS/engines/engine_v2/forensics.go b/consensus/XDPoS/engines/engine_v2/forensics.go index 29108c69a4..ef6e3cf07e 100644 --- a/consensus/XDPoS/engines/engine_v2/forensics.go +++ b/consensus/XDPoS/engines/engine_v2/forensics.go @@ -1,11 +1,19 @@ package engine_v2 import ( + "fmt" + "github.com/XinFinOrg/XDPoSChain/common" + "github.com/XinFinOrg/XDPoSChain/consensus" "github.com/XinFinOrg/XDPoSChain/consensus/XDPoS/utils" + "github.com/XinFinOrg/XDPoSChain/core/types" "github.com/XinFinOrg/XDPoSChain/log" ) +const ( + NUM_OF_FORENSICS_PARENTS = 2 +) + type ForensicProof struct { QcWithSmallerRound utils.QuorumCert QcWithLargerRound utils.QuorumCert @@ -17,36 +25,68 @@ type ForensicProof struct { QcWithLargerRoundAddresses []common.Address } +// Forensics instance. Placeholder for future properties to be added type Forensics struct { - ReceiverCh <-chan utils.QuorumCert - Abort chan<- struct{} + HighestCommittedQCs []utils.QuorumCert } // Initiate a forensics process -func (x *XDPoS_v2) AttachForensics() { - receiver := make(chan utils.QuorumCert) - abort := make(chan struct{}) +func NewForensics() *Forensics { + return &Forensics{} +} - go func() { - for { - // A real event arrived, process interesting content - select { - case quorumCert := <-receiver: - x.ProcessForensics(quorumCert) - case <-abort: - return +/* + Entry point for processing forensics. + Triggered once processQC is successfully. + Forensics runs in a seperate go routine as its no system critical + Link to the flow diagram: https://hashlabs.atlassian.net/wiki/spaces/HASHLABS/pages/97878029/Forensics+Diagram+flow +*/ +func (f *Forensics) ProcessForensics(chain consensus.ChainReader, incomingQC utils.QuorumCert) { + log.Info("Received a QC in forensics", "QC", incomingQC) +} + +// Set the forensics committed QCs list. The order is from grandparent to current header. i.e it shall follow the QC in its header as follow [hcqc1, hcqc2, hcqc3] +func (f *Forensics) SetCommittedQCs(headers []types.Header, incomingQC utils.QuorumCert) error { + // highestCommitQCs is an array, assign the parentBlockQc and its child as well as its grandchild QC into this array for forensics purposes. + if len(headers) != NUM_OF_FORENSICS_PARENTS { + log.Error("[SetCommittedQcs] Received input length not equal to 2", len(headers)) + return fmt.Errorf("Received headers length not equal to 2 ") + } + + var committedQCs []utils.QuorumCert + for i, h := range headers { + var decodedExtraField utils.ExtraFields_v2 + // Decode the qc1 and qc2 + err := utils.DecodeBytesExtraFields(h.Extra, &decodedExtraField) + if err != nil { + log.Error("[SetCommittedQCs] Fail to decode extra when committing QC to forensics", "Error", err, "Index", i) + return err + } + if i != 0 { + if decodedExtraField.QuorumCert.ProposedBlockInfo.Hash != headers[i-1].Hash() { + log.Error("[SetCommittedQCs] Headers shall be on the same chain and in the right order", "ParentHash", h.ParentHash.Hex(), "headers[i-1].Hash()", headers[i-1].Hash().Hex()) + return fmt.Errorf("Headers shall be on the same chain and in the right order") + } else if i == len(headers)-1 { // The last header shall be pointed by the incoming QC + if incomingQC.ProposedBlockInfo.Hash != h.Hash() { + log.Error("[SetCommittedQCs] incomingQc is not pointing at the last header received", "hash", h.Hash().Hex(), "incomingQC.ProposedBlockInfo.Hash", incomingQC.ProposedBlockInfo.Hash.Hex()) + return fmt.Errorf("incomingQc is not pointing at the last header received") + } } } - }() - x.forensics = &Forensics{ - ReceiverCh: receiver, - Abort: abort, + + committedQCs = append(committedQCs, *decodedExtraField.QuorumCert) } + f.HighestCommittedQCs = append(committedQCs, incomingQC) + return nil } -func (x *XDPoS_v2) SendForensicProof() { +// Last step of forensics which sends out detailed proof to report service. +func (f *Forensics) SendForensicProof() { } -func (x *XDPoS_v2) ProcessForensics(quorumCert utils.QuorumCert) { - log.Info("Received a QC in forensics", "QC", quorumCert) +// Find the blockInfo of the block -2 distance away from the QC. Note: We using block number which means not necessary on the same chain as QC received +func (f *Forensics) findParentsQc(chain consensus.ChainReader, currentQc utils.QuorumCert, distanceFromCurrrentQc int64) { +} + +func (f *Forensics) findCommonSigners(currentQc utils.QuorumCert, higherQc utils.QuorumCert) { } diff --git a/consensus/XDPoS/engines/engine_v2/testing_utils.go b/consensus/XDPoS/engines/engine_v2/testing_utils.go index 148652a1eb..cc60c41584 100644 --- a/consensus/XDPoS/engines/engine_v2/testing_utils.go +++ b/consensus/XDPoS/engines/engine_v2/testing_utils.go @@ -73,3 +73,7 @@ func (x *XDPoS_v2) HygieneTimeoutPoolFaker() { func (x *XDPoS_v2) GetTimeoutPoolKeyListFaker() []string { return x.timeoutPool.PoolObjKeysList() } + +func (x *XDPoS_v2) GetForensicsFaker() *Forensics { + return x.forensics +} diff --git a/consensus/tests/engine_v2_tests/forensics_test.go b/consensus/tests/engine_v2_tests/forensics_test.go new file mode 100644 index 0000000000..52eca4ab6b --- /dev/null +++ b/consensus/tests/engine_v2_tests/forensics_test.go @@ -0,0 +1,99 @@ +package engine_v2_tests + +import ( + "math/big" + "testing" + "time" + + "github.com/XinFinOrg/XDPoSChain/accounts" + "github.com/XinFinOrg/XDPoSChain/accounts/abi/bind/backends" + "github.com/XinFinOrg/XDPoSChain/consensus/XDPoS" + "github.com/XinFinOrg/XDPoSChain/consensus/XDPoS/utils" + "github.com/XinFinOrg/XDPoSChain/core/types" + "github.com/XinFinOrg/XDPoSChain/params" + "github.com/stretchr/testify/assert" +) + +func TestProcessQcShallSetForensicsCommittedQc(t *testing.T) { + blockchain, _, currentBlock, signer, signFn, _ := PrepareXDCTestBlockChainForV2Engine(t, 905, params.TestXDPoSMockChainConfig, 0) + engineV2 := blockchain.Engine().(*XDPoS.XDPoS).EngineV2 + + // Assuming we are getting block 906 which have QC pointing at block 905 + blockInfo := &utils.BlockInfo{ + Hash: currentBlock.Hash(), + Round: utils.Round(5), + Number: big.NewInt(905), + } + voteForSign := &utils.VoteForSign{ + ProposedBlockInfo: blockInfo, + GapNumber: 450, + } + voteSigningHash := utils.VoteSigHash(voteForSign) + + // Set round to 5 + engineV2.SetNewRoundFaker(blockchain, utils.Round(5), false) + // Create two vote messages which will not reach vote pool threshold + signedHash, err := signFn(accounts.Account{Address: signer}, voteSigningHash.Bytes()) + assert.Nil(t, err) + voteMsg := &utils.Vote{ + ProposedBlockInfo: blockInfo, + Signature: signedHash, + GapNumber: 450, + } + + err = engineV2.VoteHandler(blockchain, voteMsg) + assert.Nil(t, err) + signedHash = SignHashByPK(acc1Key, voteSigningHash.Bytes()) + voteMsg = &utils.Vote{ + ProposedBlockInfo: blockInfo, + Signature: signedHash, + GapNumber: 450, + } + err = engineV2.VoteHandler(blockchain, voteMsg) + assert.Nil(t, err) + + // Create another vote which is signed by someone not from the master node list + randomSigner, randomSignFn, err := backends.SimulateWalletAddressAndSignFn() + assert.Nil(t, err) + randomlySignedHash, err := randomSignFn(accounts.Account{Address: randomSigner}, voteSigningHash.Bytes()) + assert.Nil(t, err) + voteMsg = &utils.Vote{ + ProposedBlockInfo: blockInfo, + Signature: randomlySignedHash, + GapNumber: 450, + } + err = engineV2.VoteHandler(blockchain, voteMsg) + assert.Nil(t, err) + + // Create a vote message that should trigger vote pool hook and increment the round to 6 + signedHash = SignHashByPK(acc3Key, voteSigningHash.Bytes()) + voteMsg = &utils.Vote{ + ProposedBlockInfo: blockInfo, + Signature: signedHash, + GapNumber: 450, + } + + err = engineV2.VoteHandler(blockchain, voteMsg) + assert.Nil(t, err) + + time.Sleep(5000 * time.Millisecond) + assert.Equal(t, 3, len(engineV2.GetForensicsFaker().HighestCommittedQCs)) +} + +func TestSetCommittedQCsInOrder(t *testing.T) { + blockchain, _, currentBlock, _, _, _ := PrepareXDCTestBlockChainForV2Engine(t, 905, params.TestXDPoSMockChainConfig, 0) + forensics := blockchain.Engine().(*XDPoS.XDPoS).EngineV2.GetForensicsFaker() + + var headers []types.Header + var decodedExtraField utils.ExtraFields_v2 + // Decode the qc1 and qc2 + err := utils.DecodeBytesExtraFields(currentBlock.Header().Extra, &decodedExtraField) + assert.Nil(t, err) + err = forensics.SetCommittedQCs(append(headers, *blockchain.GetHeaderByNumber(903), *blockchain.GetHeaderByNumber(902)), *decodedExtraField.QuorumCert) + assert.NotNil(t, err) + assert.Equal(t, "Headers shall be on the same chain and in the right order", err.Error()) + + err = forensics.SetCommittedQCs(append(headers, *blockchain.GetHeaderByNumber(903), *blockchain.GetHeaderByNumber(904)), *decodedExtraField.QuorumCert) + assert.Nil(t, err) + assert.Equal(t, 3, len(forensics.HighestCommittedQCs)) +}