* add HandleProposedBlock() in procFutureBlocks()

* add proposedBlockHandler for downloader
This commit is contained in:
wgr523 2022-04-22 00:12:44 +08:00 committed by GitHub
parent 6c48d5be6c
commit 8fde52c512
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
7 changed files with 80 additions and 33 deletions

View file

@ -19,13 +19,14 @@ package main
import (
"encoding/json"
"fmt"
"github.com/XinFinOrg/XDPoSChain/core/rawdb"
"os"
"runtime"
"strconv"
"sync/atomic"
"time"
"github.com/XinFinOrg/XDPoSChain/core/rawdb"
"github.com/XinFinOrg/XDPoSChain/cmd/utils"
"github.com/XinFinOrg/XDPoSChain/common"
"github.com/XinFinOrg/XDPoSChain/console"
@ -368,7 +369,7 @@ func copyDb(ctx *cli.Context) error {
chain, chainDb := utils.MakeChain(ctx, stack)
syncmode := *utils.GlobalTextMarshaler(ctx, utils.SyncModeFlag.Name).(*downloader.SyncMode)
dl := downloader.New(syncmode, chainDb, new(event.TypeMux), chain, nil, nil)
dl := downloader.New(syncmode, chainDb, new(event.TypeMux), chain, nil, nil, nil)
// Create a source peer to satisfy downloader requests from
db, err := rawdb.NewLevelDBDatabase(ctx.Args().First(), ctx.GlobalInt(utils.CacheFlag.Name), 256, "")

View file

@ -974,7 +974,20 @@ func (bc *BlockChain) procFutureBlocks() {
// Insert one by one as chain insertion needs contiguous ancestry between blocks
for i := range blocks {
bc.InsertChain(blocks[i : i+1])
_, err := bc.InsertChain(blocks[i : i+1])
// let consensus engine handle the last block (e.g. for voting)
if i == len(blocks)-1 && err == nil {
engine, ok := bc.Engine().(*XDPoS.XDPoS)
if ok {
go func() {
header := blocks[i].Header()
err = engine.HandleProposedBlock(bc, header)
if err != nil {
log.Info("[procFutureBlocks] handle proposed block has error", "err", err, "block hash", header.Hash(), "number", header.Number)
}
}()
}
}
}
}
}

View file

@ -36,6 +36,9 @@ import (
"github.com/XinFinOrg/XDPoSChain/params"
)
// proposeBlockHandlerFn is a callback type to handle a block by the consensus
type proposeBlockHandlerFn func(header *types.Header) error
var (
MaxHashFetch = 512 // Amount of hashes to be fetched per retrieval request
MaxBlockFetch = 128 // Amount of blocks to be fetched per retrieval request
@ -114,7 +117,8 @@ type Downloader struct {
blockchain BlockChain
// Callbacks
dropPeer peerDropFn // Drops a peer for misbehaving
dropPeer peerDropFn // Drops a peer for misbehaving
handleProposedBlock proposeBlockHandlerFn // Consensus v2 specific: Hanle new proposed block
// Status
synchroniseMock func(id string, hash common.Hash) error // Replacement for synchronise during testing
@ -199,31 +203,32 @@ type BlockChain interface {
}
// New creates a new downloader to fetch hashes and blocks from remote peers.
func New(mode SyncMode, stateDb ethdb.Database, mux *event.TypeMux, chain BlockChain, lightchain LightChain, dropPeer peerDropFn) *Downloader {
func New(mode SyncMode, stateDb ethdb.Database, mux *event.TypeMux, chain BlockChain, lightchain LightChain, dropPeer peerDropFn, handleProposedBlock proposeBlockHandlerFn) *Downloader {
if lightchain == nil {
lightchain = chain
}
dl := &Downloader{
mode: mode,
stateDB: stateDb,
mux: mux,
queue: newQueue(),
peers: newPeerSet(),
rttEstimate: uint64(rttMaxEstimate),
rttConfidence: uint64(1000000),
blockchain: chain,
lightchain: lightchain,
dropPeer: dropPeer,
headerCh: make(chan dataPack, 1),
bodyCh: make(chan dataPack, 1),
receiptCh: make(chan dataPack, 1),
bodyWakeCh: make(chan bool, 1),
receiptWakeCh: make(chan bool, 1),
headerProcCh: make(chan []*types.Header, 1),
quitCh: make(chan struct{}),
stateCh: make(chan dataPack),
stateSyncStart: make(chan *stateSync),
mode: mode,
stateDB: stateDb,
mux: mux,
queue: newQueue(),
peers: newPeerSet(),
rttEstimate: uint64(rttMaxEstimate),
rttConfidence: uint64(1000000),
blockchain: chain,
lightchain: lightchain,
dropPeer: dropPeer,
handleProposedBlock: handleProposedBlock,
headerCh: make(chan dataPack, 1),
bodyCh: make(chan dataPack, 1),
receiptCh: make(chan dataPack, 1),
bodyWakeCh: make(chan bool, 1),
receiptWakeCh: make(chan bool, 1),
headerProcCh: make(chan []*types.Header, 1),
quitCh: make(chan struct{}),
stateCh: make(chan dataPack),
stateSyncStart: make(chan *stateSync),
syncStatsState: stateSyncStats{
processed: core.GetTrieSyncProgress(stateDb),
},
@ -1393,7 +1398,13 @@ func (d *Downloader) importBlockResults(results []*fetchResult) error {
log.Debug("Downloaded item processing failed", "number", results[index].Header.Number, "hash", results[index].Header.Hash(), "err", err)
return errInvalidChain
}
if d.handleProposedBlock != nil {
header := blocks[len(blocks)-1].Header()
err := d.handleProposedBlock(header)
if err != nil {
log.Info("[downloader] handle proposed block has error", "err", err, "block hash", header.Hash(), "number", header.Number)
}
}
return nil
}

View file

@ -19,13 +19,14 @@ package downloader
import (
"errors"
"fmt"
"github.com/XinFinOrg/XDPoSChain/core/rawdb"
"math/big"
"sync"
"sync/atomic"
"testing"
"time"
"github.com/XinFinOrg/XDPoSChain/core/rawdb"
"github.com/XinFinOrg/XDPoSChain/common"
"github.com/XinFinOrg/XDPoSChain/consensus/ethash"
"github.com/XinFinOrg/XDPoSChain/core"
@ -97,7 +98,7 @@ func newTester() *downloadTester {
tester.stateDb = rawdb.NewMemoryDatabase()
tester.stateDb.Put(genesis.Root().Bytes(), []byte{0x00})
tester.downloader = New(FullSync, tester.stateDb, new(event.TypeMux), tester, nil, tester.dropPeer)
tester.downloader = New(FullSync, tester.stateDb, new(event.TypeMux), tester, nil, tester.dropPeer, tester.handleProposedBlock)
return tester
}
@ -457,6 +458,11 @@ func (dl *downloadTester) dropPeer(id string) {
dl.downloader.UnregisterPeer(id)
}
// an empty handleProposedBlock function
func (dl *downloadTester) handleProposedBlock(header *types.Header) error {
return nil
}
// Config retrieves the blockchain's chain configuration.
func (dl *downloadTester) Config() *params.ChainConfig { return params.TestChainConfig }

View file

@ -57,6 +57,7 @@ type bodyRequesterFn func([]common.Hash) error
// headerVerifierFn is a callback type to verify a block's header for fast propagation.
type headerVerifierFn func(header *types.Header) error
// proposeBlockHandlerFn is a callback type to handle a block by the consensus
type proposeBlockHandlerFn func(header *types.Header) error
// blockBroadcasterFn is a callback type for broadcasting a block to connected peers.

View file

@ -206,15 +206,24 @@ func NewProtocolManager(config *params.ChainConfig, mode downloader.SyncMode, ne
if len(manager.SubProtocols) == 0 {
return nil, errIncompatibleConfig
}
var handleProposedBlock func(header *types.Header) error
if config.XDPoS != nil {
handleProposedBlock = func(header *types.Header) error {
return engine.(*XDPoS.XDPoS).HandleProposedBlock(blockchain, header)
}
} else {
handleProposedBlock = func(header *types.Header) error {
return nil
}
}
// Construct the different synchronisation mechanisms
manager.downloader = downloader.New(mode, chaindb, manager.eventMux, blockchain, nil, manager.removePeer)
manager.downloader = downloader.New(mode, chaindb, manager.eventMux, blockchain, nil, manager.removePeer, handleProposedBlock)
validator := func(header *types.Header) error {
return engine.VerifyHeader(blockchain, header, true)
}
handleProposedBlock := func(header *types.Header) error {
return engine.(*XDPoS.XDPoS).HandleProposedBlock(blockchain, header)
}
heighter := func() uint64 {
return blockchain.CurrentBlock().NumberU64()

View file

@ -21,12 +21,13 @@ import (
"encoding/binary"
"errors"
"fmt"
"github.com/XinFinOrg/XDPoSChain/core/rawdb"
"math/big"
"net"
"sync"
"time"
"github.com/XinFinOrg/XDPoSChain/core/rawdb"
"github.com/XinFinOrg/XDPoSChain/common"
"github.com/XinFinOrg/XDPoSChain/consensus"
"github.com/XinFinOrg/XDPoSChain/core"
@ -205,7 +206,7 @@ func NewProtocolManager(chainConfig *params.ChainConfig, lightSync bool, protoco
}
if lightSync {
manager.downloader = downloader.New(downloader.LightSync, chainDb, manager.eventMux, nil, blockchain, removePeer)
manager.downloader = downloader.New(downloader.LightSync, chainDb, manager.eventMux, nil, blockchain, removePeer, manager.handleProposedBlock)
manager.peers.notify((*downloaderPeerNotify)(manager))
manager.fetcher = newLightFetcher(manager)
}
@ -218,6 +219,11 @@ func (pm *ProtocolManager) removePeer(id string) {
pm.peers.Unregister(id)
}
// an empty handleProposedBlock function
func (pm *ProtocolManager) handleProposedBlock(header *types.Header) error {
return nil
}
func (pm *ProtocolManager) Start(maxPeers int) {
pm.maxPeers = maxPeers