From 8fde52c512eb7ad2da385cdea8119851210a0827 Mon Sep 17 00:00:00 2001 From: wgr523 Date: Fri, 22 Apr 2022 00:12:44 +0800 Subject: [PATCH] Xin 145 (#82) * add HandleProposedBlock() in procFutureBlocks() * add proposedBlockHandler for downloader --- cmd/XDC/chaincmd.go | 5 +-- core/blockchain.go | 15 ++++++++- eth/downloader/downloader.go | 55 ++++++++++++++++++------------- eth/downloader/downloader_test.go | 10 ++++-- eth/fetcher/fetcher.go | 1 + eth/handler.go | 17 +++++++--- les/handler.go | 10 ++++-- 7 files changed, 80 insertions(+), 33 deletions(-) diff --git a/cmd/XDC/chaincmd.go b/cmd/XDC/chaincmd.go index 3c7db19eaa..4bf9e27198 100644 --- a/cmd/XDC/chaincmd.go +++ b/cmd/XDC/chaincmd.go @@ -19,13 +19,14 @@ package main import ( "encoding/json" "fmt" - "github.com/XinFinOrg/XDPoSChain/core/rawdb" "os" "runtime" "strconv" "sync/atomic" "time" + "github.com/XinFinOrg/XDPoSChain/core/rawdb" + "github.com/XinFinOrg/XDPoSChain/cmd/utils" "github.com/XinFinOrg/XDPoSChain/common" "github.com/XinFinOrg/XDPoSChain/console" @@ -368,7 +369,7 @@ func copyDb(ctx *cli.Context) error { chain, chainDb := utils.MakeChain(ctx, stack) syncmode := *utils.GlobalTextMarshaler(ctx, utils.SyncModeFlag.Name).(*downloader.SyncMode) - dl := downloader.New(syncmode, chainDb, new(event.TypeMux), chain, nil, nil) + dl := downloader.New(syncmode, chainDb, new(event.TypeMux), chain, nil, nil, nil) // Create a source peer to satisfy downloader requests from db, err := rawdb.NewLevelDBDatabase(ctx.Args().First(), ctx.GlobalInt(utils.CacheFlag.Name), 256, "") diff --git a/core/blockchain.go b/core/blockchain.go index 9ca580c17b..bb597cbb6b 100644 --- a/core/blockchain.go +++ b/core/blockchain.go @@ -974,7 +974,20 @@ func (bc *BlockChain) procFutureBlocks() { // Insert one by one as chain insertion needs contiguous ancestry between blocks for i := range blocks { - bc.InsertChain(blocks[i : i+1]) + _, err := bc.InsertChain(blocks[i : i+1]) + // let consensus engine handle the last block (e.g. for voting) + if i == len(blocks)-1 && err == nil { + engine, ok := bc.Engine().(*XDPoS.XDPoS) + if ok { + go func() { + header := blocks[i].Header() + err = engine.HandleProposedBlock(bc, header) + if err != nil { + log.Info("[procFutureBlocks] handle proposed block has error", "err", err, "block hash", header.Hash(), "number", header.Number) + } + }() + } + } } } } diff --git a/eth/downloader/downloader.go b/eth/downloader/downloader.go index 17fba736da..5e32e6b6d8 100644 --- a/eth/downloader/downloader.go +++ b/eth/downloader/downloader.go @@ -36,6 +36,9 @@ import ( "github.com/XinFinOrg/XDPoSChain/params" ) +// proposeBlockHandlerFn is a callback type to handle a block by the consensus +type proposeBlockHandlerFn func(header *types.Header) error + var ( MaxHashFetch = 512 // Amount of hashes to be fetched per retrieval request MaxBlockFetch = 128 // Amount of blocks to be fetched per retrieval request @@ -114,7 +117,8 @@ type Downloader struct { blockchain BlockChain // Callbacks - dropPeer peerDropFn // Drops a peer for misbehaving + dropPeer peerDropFn // Drops a peer for misbehaving + handleProposedBlock proposeBlockHandlerFn // Consensus v2 specific: Hanle new proposed block // Status synchroniseMock func(id string, hash common.Hash) error // Replacement for synchronise during testing @@ -199,31 +203,32 @@ type BlockChain interface { } // New creates a new downloader to fetch hashes and blocks from remote peers. -func New(mode SyncMode, stateDb ethdb.Database, mux *event.TypeMux, chain BlockChain, lightchain LightChain, dropPeer peerDropFn) *Downloader { +func New(mode SyncMode, stateDb ethdb.Database, mux *event.TypeMux, chain BlockChain, lightchain LightChain, dropPeer peerDropFn, handleProposedBlock proposeBlockHandlerFn) *Downloader { if lightchain == nil { lightchain = chain } dl := &Downloader{ - mode: mode, - stateDB: stateDb, - mux: mux, - queue: newQueue(), - peers: newPeerSet(), - rttEstimate: uint64(rttMaxEstimate), - rttConfidence: uint64(1000000), - blockchain: chain, - lightchain: lightchain, - dropPeer: dropPeer, - headerCh: make(chan dataPack, 1), - bodyCh: make(chan dataPack, 1), - receiptCh: make(chan dataPack, 1), - bodyWakeCh: make(chan bool, 1), - receiptWakeCh: make(chan bool, 1), - headerProcCh: make(chan []*types.Header, 1), - quitCh: make(chan struct{}), - stateCh: make(chan dataPack), - stateSyncStart: make(chan *stateSync), + mode: mode, + stateDB: stateDb, + mux: mux, + queue: newQueue(), + peers: newPeerSet(), + rttEstimate: uint64(rttMaxEstimate), + rttConfidence: uint64(1000000), + blockchain: chain, + lightchain: lightchain, + dropPeer: dropPeer, + handleProposedBlock: handleProposedBlock, + headerCh: make(chan dataPack, 1), + bodyCh: make(chan dataPack, 1), + receiptCh: make(chan dataPack, 1), + bodyWakeCh: make(chan bool, 1), + receiptWakeCh: make(chan bool, 1), + headerProcCh: make(chan []*types.Header, 1), + quitCh: make(chan struct{}), + stateCh: make(chan dataPack), + stateSyncStart: make(chan *stateSync), syncStatsState: stateSyncStats{ processed: core.GetTrieSyncProgress(stateDb), }, @@ -1393,7 +1398,13 @@ func (d *Downloader) importBlockResults(results []*fetchResult) error { log.Debug("Downloaded item processing failed", "number", results[index].Header.Number, "hash", results[index].Header.Hash(), "err", err) return errInvalidChain } - + if d.handleProposedBlock != nil { + header := blocks[len(blocks)-1].Header() + err := d.handleProposedBlock(header) + if err != nil { + log.Info("[downloader] handle proposed block has error", "err", err, "block hash", header.Hash(), "number", header.Number) + } + } return nil } diff --git a/eth/downloader/downloader_test.go b/eth/downloader/downloader_test.go index 09c59c4ca7..03da4920a7 100644 --- a/eth/downloader/downloader_test.go +++ b/eth/downloader/downloader_test.go @@ -19,13 +19,14 @@ package downloader import ( "errors" "fmt" - "github.com/XinFinOrg/XDPoSChain/core/rawdb" "math/big" "sync" "sync/atomic" "testing" "time" + "github.com/XinFinOrg/XDPoSChain/core/rawdb" + "github.com/XinFinOrg/XDPoSChain/common" "github.com/XinFinOrg/XDPoSChain/consensus/ethash" "github.com/XinFinOrg/XDPoSChain/core" @@ -97,7 +98,7 @@ func newTester() *downloadTester { tester.stateDb = rawdb.NewMemoryDatabase() tester.stateDb.Put(genesis.Root().Bytes(), []byte{0x00}) - tester.downloader = New(FullSync, tester.stateDb, new(event.TypeMux), tester, nil, tester.dropPeer) + tester.downloader = New(FullSync, tester.stateDb, new(event.TypeMux), tester, nil, tester.dropPeer, tester.handleProposedBlock) return tester } @@ -457,6 +458,11 @@ func (dl *downloadTester) dropPeer(id string) { dl.downloader.UnregisterPeer(id) } +// an empty handleProposedBlock function +func (dl *downloadTester) handleProposedBlock(header *types.Header) error { + return nil +} + // Config retrieves the blockchain's chain configuration. func (dl *downloadTester) Config() *params.ChainConfig { return params.TestChainConfig } diff --git a/eth/fetcher/fetcher.go b/eth/fetcher/fetcher.go index edd8132f7c..7b8389bf41 100644 --- a/eth/fetcher/fetcher.go +++ b/eth/fetcher/fetcher.go @@ -57,6 +57,7 @@ type bodyRequesterFn func([]common.Hash) error // headerVerifierFn is a callback type to verify a block's header for fast propagation. type headerVerifierFn func(header *types.Header) error +// proposeBlockHandlerFn is a callback type to handle a block by the consensus type proposeBlockHandlerFn func(header *types.Header) error // blockBroadcasterFn is a callback type for broadcasting a block to connected peers. diff --git a/eth/handler.go b/eth/handler.go index a55c2af1bb..6b21d5f9e1 100644 --- a/eth/handler.go +++ b/eth/handler.go @@ -206,15 +206,24 @@ func NewProtocolManager(config *params.ChainConfig, mode downloader.SyncMode, ne if len(manager.SubProtocols) == 0 { return nil, errIncompatibleConfig } + + var handleProposedBlock func(header *types.Header) error + if config.XDPoS != nil { + handleProposedBlock = func(header *types.Header) error { + return engine.(*XDPoS.XDPoS).HandleProposedBlock(blockchain, header) + } + } else { + handleProposedBlock = func(header *types.Header) error { + return nil + } + } + // Construct the different synchronisation mechanisms - manager.downloader = downloader.New(mode, chaindb, manager.eventMux, blockchain, nil, manager.removePeer) + manager.downloader = downloader.New(mode, chaindb, manager.eventMux, blockchain, nil, manager.removePeer, handleProposedBlock) validator := func(header *types.Header) error { return engine.VerifyHeader(blockchain, header, true) } - handleProposedBlock := func(header *types.Header) error { - return engine.(*XDPoS.XDPoS).HandleProposedBlock(blockchain, header) - } heighter := func() uint64 { return blockchain.CurrentBlock().NumberU64() diff --git a/les/handler.go b/les/handler.go index 05462a77a9..ba2c774b44 100644 --- a/les/handler.go +++ b/les/handler.go @@ -21,12 +21,13 @@ import ( "encoding/binary" "errors" "fmt" - "github.com/XinFinOrg/XDPoSChain/core/rawdb" "math/big" "net" "sync" "time" + "github.com/XinFinOrg/XDPoSChain/core/rawdb" + "github.com/XinFinOrg/XDPoSChain/common" "github.com/XinFinOrg/XDPoSChain/consensus" "github.com/XinFinOrg/XDPoSChain/core" @@ -205,7 +206,7 @@ func NewProtocolManager(chainConfig *params.ChainConfig, lightSync bool, protoco } if lightSync { - manager.downloader = downloader.New(downloader.LightSync, chainDb, manager.eventMux, nil, blockchain, removePeer) + manager.downloader = downloader.New(downloader.LightSync, chainDb, manager.eventMux, nil, blockchain, removePeer, manager.handleProposedBlock) manager.peers.notify((*downloaderPeerNotify)(manager)) manager.fetcher = newLightFetcher(manager) } @@ -218,6 +219,11 @@ func (pm *ProtocolManager) removePeer(id string) { pm.peers.Unregister(id) } +// an empty handleProposedBlock function +func (pm *ProtocolManager) handleProposedBlock(header *types.Header) error { + return nil +} + func (pm *ProtocolManager) Start(maxPeers int) { pm.maxPeers = maxPeers