mirror of
https://github.com/ethereum/go-ethereum.git
synced 2026-06-19 21:31:37 +00:00
Parallel process block from fetcher
This commit is contained in:
parent
5709546dfd
commit
9f36d37558
13 changed files with 423 additions and 88 deletions
|
|
@ -58,7 +58,7 @@ type Engine interface {
|
|||
// VerifyHeader checks whether a header conforms to the consensus rules of a
|
||||
// given engine. Verifying the seal may be done optionally here, or explicitly
|
||||
// via the VerifySeal method.
|
||||
VerifyHeader(chain ChainReader, header *types.Header, seal bool) error
|
||||
VerifyHeader(chain ChainReader, header *types.Header, fullVerify bool) error
|
||||
|
||||
// VerifyHeaders is similar to VerifyHeader, but verifies a batch of headers
|
||||
// concurrently. The method returns a quit channel to abort the operations and
|
||||
|
|
|
|||
|
|
@ -75,6 +75,13 @@ type CacheConfig struct {
|
|||
TrieNodeLimit int // Memory limit (MB) at which to flush the current in-memory trie to disk
|
||||
TrieTimeLimit time.Duration // Time limit after which to flush the current in-memory trie to disk
|
||||
}
|
||||
type ResultProcessBlock struct {
|
||||
logs []*types.Log
|
||||
receipts []*types.Receipt
|
||||
state *state.StateDB
|
||||
proctime time.Duration
|
||||
usedGas uint64
|
||||
}
|
||||
|
||||
// BlockChain represents the canonical chain given a database with a genesis
|
||||
// block. The Blockchain manages chain imports, reverts, chain reorganisations.
|
||||
|
|
@ -115,14 +122,16 @@ type BlockChain struct {
|
|||
currentBlock atomic.Value // Current head of the block chain
|
||||
currentFastBlock atomic.Value // Current head of the fast-sync chain (may be above the block chain!)
|
||||
|
||||
stateCache state.Database // State database to reuse between imports (contains state cache)
|
||||
bodyCache *lru.Cache // Cache for the most recent block bodies
|
||||
bodyRLPCache *lru.Cache // Cache for the most recent block bodies in RLP encoded format
|
||||
blockCache *lru.Cache // Cache for the most recent entire blocks
|
||||
futureBlocks *lru.Cache // future blocks are blocks added for later processing
|
||||
|
||||
quit chan struct{} // blockchain quit channel
|
||||
running int32 // running must be called atomically
|
||||
stateCache state.Database // State database to reuse between imports (contains state cache)
|
||||
bodyCache *lru.Cache // Cache for the most recent block bodies
|
||||
bodyRLPCache *lru.Cache // Cache for the most recent block bodies in RLP encoded format
|
||||
blockCache *lru.Cache // Cache for the most recent entire blocks
|
||||
futureBlocks *lru.Cache // future blocks are blocks added for later processing
|
||||
resultProcess *lru.Cache
|
||||
calculatingBlock *lru.Cache
|
||||
downloadingBlock *lru.Cache
|
||||
quit chan struct{} // blockchain quit channel
|
||||
running int32 // running must be called atomically
|
||||
// procInterrupt must be atomically called
|
||||
procInterrupt int32 // interrupt signaler for block processing
|
||||
wg sync.WaitGroup // chain processing wait group for shutting down
|
||||
|
|
@ -152,21 +161,26 @@ func NewBlockChain(db ethdb.Database, cacheConfig *CacheConfig, chainConfig *par
|
|||
blockCache, _ := lru.New(blockCacheLimit)
|
||||
futureBlocks, _ := lru.New(maxFutureBlocks)
|
||||
badBlocks, _ := lru.New(badBlockLimit)
|
||||
|
||||
resultProcess, _ := lru.New(blockCacheLimit)
|
||||
preparingBlock, _ := lru.New(blockCacheLimit)
|
||||
downloadingBlock, _ := lru.New(blockCacheLimit)
|
||||
bc := &BlockChain{
|
||||
chainConfig: chainConfig,
|
||||
cacheConfig: cacheConfig,
|
||||
db: db,
|
||||
triegc: prque.New(),
|
||||
stateCache: state.NewDatabase(db),
|
||||
quit: make(chan struct{}),
|
||||
bodyCache: bodyCache,
|
||||
bodyRLPCache: bodyRLPCache,
|
||||
blockCache: blockCache,
|
||||
futureBlocks: futureBlocks,
|
||||
engine: engine,
|
||||
vmConfig: vmConfig,
|
||||
badBlocks: badBlocks,
|
||||
chainConfig: chainConfig,
|
||||
cacheConfig: cacheConfig,
|
||||
db: db,
|
||||
triegc: prque.New(),
|
||||
stateCache: state.NewDatabase(db),
|
||||
quit: make(chan struct{}),
|
||||
bodyCache: bodyCache,
|
||||
bodyRLPCache: bodyRLPCache,
|
||||
blockCache: blockCache,
|
||||
futureBlocks: futureBlocks,
|
||||
resultProcess: resultProcess,
|
||||
calculatingBlock: preparingBlock,
|
||||
downloadingBlock: downloadingBlock,
|
||||
engine: engine,
|
||||
vmConfig: vmConfig,
|
||||
badBlocks: badBlocks,
|
||||
}
|
||||
bc.SetValidator(NewBlockValidator(chainConfig, bc, engine))
|
||||
bc.SetProcessor(NewStateProcessor(chainConfig, bc, engine))
|
||||
|
|
@ -1049,6 +1063,7 @@ func (bc *BlockChain) insertChain(chain types.Blocks) (int, []interface{}, []*ty
|
|||
for i, block := range chain {
|
||||
headers[i] = block.Header()
|
||||
seals[i] = true
|
||||
bc.downloadingBlock.Add(block.Hash(), true)
|
||||
}
|
||||
abort, results := bc.engine.VerifyHeaders(bc, headers, seals)
|
||||
defer close(abort)
|
||||
|
|
@ -1168,7 +1183,7 @@ func (bc *BlockChain) insertChain(chain types.Blocks) (int, []interface{}, []*ty
|
|||
}
|
||||
switch status {
|
||||
case CanonStatTy:
|
||||
log.Debug("Inserted new block", "number", block.Number(), "hash", block.Hash(), "uncles", len(block.Uncles()),
|
||||
log.Debug("Inserted new block from downloader", "number", block.Number(), "hash", block.Hash(), "uncles", len(block.Uncles()),
|
||||
"txs", len(block.Transactions()), "gas", block.GasUsed(), "elapsed", common.PrettyDuration(time.Since(bstart)))
|
||||
|
||||
coalescedLogs = append(coalescedLogs, logs...)
|
||||
|
|
@ -1180,7 +1195,7 @@ func (bc *BlockChain) insertChain(chain types.Blocks) (int, []interface{}, []*ty
|
|||
bc.gcproc += proctime
|
||||
|
||||
case SideStatTy:
|
||||
log.Debug("Inserted forked block", "number", block.Number(), "hash", block.Hash(), "diff", block.Difficulty(), "elapsed",
|
||||
log.Debug("Inserted forked block from downloader", "number", block.Number(), "hash", block.Hash(), "diff", block.Difficulty(), "elapsed",
|
||||
common.PrettyDuration(time.Since(bstart)), "txs", len(block.Transactions()), "gas", block.GasUsed(), "uncles", len(block.Uncles()))
|
||||
|
||||
blockInsertTimer.UpdateSince(bstart)
|
||||
|
|
@ -1189,7 +1204,7 @@ func (bc *BlockChain) insertChain(chain types.Blocks) (int, []interface{}, []*ty
|
|||
stats.processed++
|
||||
stats.usedGas += usedGas
|
||||
stats.report(chain, i, bc.stateCache.TrieDB().Size())
|
||||
if bc.chainConfig.XDPoS != nil {
|
||||
if status == CanonStatTy && bc.chainConfig.XDPoS != nil {
|
||||
// epoch block
|
||||
if (chain[i].NumberU64() % bc.chainConfig.XDPoS.Epoch) == 0 {
|
||||
CheckpointCh <- 1
|
||||
|
|
@ -1206,11 +1221,212 @@ func (bc *BlockChain) insertChain(chain types.Blocks) (int, []interface{}, []*ty
|
|||
}
|
||||
// Append a single chain head event if we've progressed the chain
|
||||
if lastCanon != nil && bc.CurrentBlock().Hash() == lastCanon.Hash() {
|
||||
log.Debug("New ChainHeadEvent ", "number", lastCanon.NumberU64(), "hash", lastCanon.Hash())
|
||||
events = append(events, ChainHeadEvent{lastCanon})
|
||||
}
|
||||
return 0, events, coalescedLogs, nil
|
||||
}
|
||||
|
||||
func (bc *BlockChain) InsertBlock(block *types.Block) error {
|
||||
events, logs, err := bc.insertBlock(block)
|
||||
bc.PostChainEvents(events, logs)
|
||||
return err
|
||||
}
|
||||
|
||||
func (bc *BlockChain) PrepareBlock(block *types.Block) (err error) {
|
||||
defer log.Debug("Done prepare block ", "number", block.NumberU64(), "hash", block.Hash(), "validator", block.Header().Validator, "err", err)
|
||||
if _, check := bc.resultProcess.Get(block.Hash()); check {
|
||||
log.Debug("Stop prepare a block because the result cached", "number", block.NumberU64(), "hash", block.Hash(), "validator", block.Header().Validator)
|
||||
return nil
|
||||
}
|
||||
if _, check := bc.calculatingBlock.Get(block.Hash()); check {
|
||||
log.Debug("Stop prepare a block because inserting", "number", block.NumberU64(), "hash", block.Hash(), "validator", block.Header().Validator)
|
||||
return nil
|
||||
}
|
||||
err = bc.engine.VerifyHeader(bc, block.Header(), false)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
result, err := bc.getResultBlock(block, false)
|
||||
if err == nil {
|
||||
bc.resultProcess.Add(block.Hash(), result)
|
||||
return nil
|
||||
} else if err == ErrKnownBlock {
|
||||
return nil
|
||||
} else if err == ErrStopPreparingBlock {
|
||||
log.Debug("Stop prepare a block because calculating", "number", block.NumberU64(), "hash", block.Hash(), "validator", block.Header().Validator)
|
||||
return nil
|
||||
}
|
||||
return err
|
||||
}
|
||||
|
||||
func (bc *BlockChain) getResultBlock(block *types.Block, verifiedM2 bool) (*ResultProcessBlock, error) {
|
||||
var calculatedBlock *CalculatedBlock
|
||||
if verifiedM2 {
|
||||
if result, check := bc.resultProcess.Get(block.HashNoValidator()); check {
|
||||
log.Debug("Get result block from cache ", "number", block.NumberU64(), "hash", block.Hash(), "hash no validator", block.HashNoValidator())
|
||||
return result.(*ResultProcessBlock), nil
|
||||
}
|
||||
log.Debug("Not found cache prepare block ", "number", block.NumberU64(), "hash", block.Hash(), "validator", block.HashNoValidator())
|
||||
if calculatedBlock, _ := bc.calculatingBlock.Get(block.HashNoValidator()); calculatedBlock != nil {
|
||||
calculatedBlock.(*CalculatedBlock).stop = true
|
||||
}
|
||||
}
|
||||
calculatedBlock = &CalculatedBlock{block, false}
|
||||
bc.calculatingBlock.Add(block.HashNoValidator(), calculatedBlock)
|
||||
// Start the parallel header verifier
|
||||
// If the chain is terminating, stop processing blocks
|
||||
if atomic.LoadInt32(&bc.procInterrupt) == 1 {
|
||||
log.Debug("Premature abort during blocks processing")
|
||||
return nil, ErrBlacklistedHash
|
||||
}
|
||||
// If the header is a banned one, straight out abort
|
||||
if BadHashes[block.Hash()] {
|
||||
bc.reportBlock(block, nil, ErrBlacklistedHash)
|
||||
return nil, ErrBlacklistedHash
|
||||
}
|
||||
// Wait for the block's verification to complete
|
||||
bstart := time.Now()
|
||||
err := bc.Validator().ValidateBody(block)
|
||||
switch {
|
||||
case err == ErrKnownBlock:
|
||||
// Block and state both already known. However if the current block is below
|
||||
// this number we did a rollback and we should reimport it nonetheless.
|
||||
if bc.CurrentBlock().NumberU64() >= block.NumberU64() {
|
||||
return nil, ErrKnownBlock
|
||||
}
|
||||
case err == consensus.ErrPrunedAncestor:
|
||||
// Block competing with the canonical chain, store in the db, but don't process
|
||||
// until the competitor TD goes above the canonical TD
|
||||
currentBlock := bc.CurrentBlock()
|
||||
localTd := bc.GetTd(currentBlock.Hash(), currentBlock.NumberU64())
|
||||
externTd := new(big.Int).Add(bc.GetTd(block.ParentHash(), block.NumberU64()-1), block.Difficulty())
|
||||
if localTd.Cmp(externTd) > 0 {
|
||||
return nil, err
|
||||
}
|
||||
// Competitor chain beat canonical, gather all blocks from the common ancestor
|
||||
var winner []*types.Block
|
||||
|
||||
parent := bc.GetBlock(block.ParentHash(), block.NumberU64()-1)
|
||||
for !bc.HasState(parent.Root()) {
|
||||
winner = append(winner, parent)
|
||||
parent = bc.GetBlock(parent.ParentHash(), parent.NumberU64()-1)
|
||||
}
|
||||
for j := 0; j < len(winner)/2; j++ {
|
||||
winner[j], winner[len(winner)-1-j] = winner[len(winner)-1-j], winner[j]
|
||||
}
|
||||
log.Debug("Number block need calculated again", "number", block.NumberU64(), "hash", block.Hash().Hex(), "winners", len(winner))
|
||||
// Import all the pruned blocks to make the state available
|
||||
_, _, _, err := bc.insertChain(winner)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
case err != nil:
|
||||
bc.reportBlock(block, nil, err)
|
||||
return nil, err
|
||||
}
|
||||
// Create a new statedb using the parent block and report an
|
||||
// error if it fails.
|
||||
var parent = bc.GetBlock(block.ParentHash(), block.NumberU64()-1)
|
||||
state, err := state.New(parent.Root(), bc.stateCache)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
// Process block using the parent state as reference point.
|
||||
receipts, logs, usedGas, err := bc.processor.ProcessBlockNoValidator(calculatedBlock, state, bc.vmConfig)
|
||||
process := time.Since(bstart)
|
||||
if err != nil {
|
||||
if err != ErrStopPreparingBlock {
|
||||
bc.reportBlock(block, receipts, err)
|
||||
}
|
||||
return nil, err
|
||||
}
|
||||
// Validate the state using the default validator
|
||||
err = bc.Validator().ValidateState(block, parent, state, receipts, usedGas)
|
||||
if err != nil {
|
||||
bc.reportBlock(block, receipts, err)
|
||||
return nil, err
|
||||
}
|
||||
proctime := time.Since(bstart)
|
||||
log.Debug("Caculate new block", "number", block.Number(), "hash", block.Hash(), "uncles", len(block.Uncles()),
|
||||
"txs", len(block.Transactions()), "gas", block.GasUsed(), "elapsed", common.PrettyDuration(time.Since(bstart)), "process", process)
|
||||
return &ResultProcessBlock{receipts: receipts, logs: logs, state: state, proctime: proctime, usedGas: usedGas}, nil
|
||||
}
|
||||
|
||||
// insertChain will execute the actual chain insertion and event aggregation. The
|
||||
// only reason this method exists as a separate one is to make locking cleaner
|
||||
// with deferred statements.
|
||||
func (bc *BlockChain) insertBlock(block *types.Block) ([]interface{}, []*types.Log, error) {
|
||||
var (
|
||||
stats = insertStats{startTime: mclock.Now()}
|
||||
events = make([]interface{}, 0, 1)
|
||||
coalescedLogs []*types.Log
|
||||
)
|
||||
if _, check := bc.downloadingBlock.Get(block.Hash()); check {
|
||||
log.Debug("Stop fetcher a block because downloading", "number", block.NumberU64(), "hash", block.Hash())
|
||||
return events, coalescedLogs, nil
|
||||
}
|
||||
result, err := bc.getResultBlock(block, true)
|
||||
if err != nil {
|
||||
return events, coalescedLogs, err
|
||||
}
|
||||
defer bc.resultProcess.Remove(block.HashNoValidator())
|
||||
bc.wg.Add(1)
|
||||
defer bc.wg.Done()
|
||||
// Write the block to the chain and get the status.
|
||||
bc.chainmu.Lock()
|
||||
defer bc.chainmu.Unlock()
|
||||
if bc.HasBlockAndState(block.Hash(), block.NumberU64()) {
|
||||
return events, coalescedLogs, nil
|
||||
}
|
||||
status, err := bc.WriteBlockWithState(block, result.receipts, result.state)
|
||||
|
||||
if err != nil {
|
||||
return events, coalescedLogs, err
|
||||
}
|
||||
switch status {
|
||||
case CanonStatTy:
|
||||
log.Debug("Inserted new block from fetcher", "number", block.Number(), "hash", block.Hash(), "uncles", len(block.Uncles()),
|
||||
"txs", len(block.Transactions()), "gas", block.GasUsed(), "elapsed", common.PrettyDuration(time.Since(block.ReceivedAt)))
|
||||
|
||||
coalescedLogs = append(coalescedLogs, result.logs...)
|
||||
events = append(events, ChainEvent{block, block.Hash(), result.logs})
|
||||
|
||||
// Only count canonical blocks for GC processing time
|
||||
bc.gcproc += result.proctime
|
||||
|
||||
case SideStatTy:
|
||||
log.Debug("Inserted forked block from fetcher", "number", block.Number(), "hash", block.Hash(), "diff", block.Difficulty(), "elapsed",
|
||||
common.PrettyDuration(time.Since(block.ReceivedAt)), "txs", len(block.Transactions()), "gas", block.GasUsed(), "uncles", len(block.Uncles()))
|
||||
|
||||
blockInsertTimer.Update(result.proctime)
|
||||
events = append(events, ChainSideEvent{block})
|
||||
}
|
||||
stats.processed++
|
||||
stats.usedGas += result.usedGas
|
||||
stats.report(types.Blocks{block}, 0, bc.stateCache.TrieDB().Size())
|
||||
if status == CanonStatTy && bc.chainConfig.XDPoS != nil {
|
||||
// epoch block
|
||||
if (block.NumberU64() % bc.chainConfig.XDPoS.Epoch) == 0 {
|
||||
CheckpointCh <- 1
|
||||
}
|
||||
// prepare set of masternodes for the next epoch
|
||||
if (block.NumberU64() % bc.chainConfig.XDPoS.Epoch) == (bc.chainConfig.XDPoS.Epoch - bc.chainConfig.XDPoS.Gap) {
|
||||
err := bc.UpdateM1()
|
||||
if err != nil {
|
||||
log.Error("Error when update masternodes set. Stopping node", "err", err)
|
||||
os.Exit(1)
|
||||
}
|
||||
}
|
||||
}
|
||||
// Append a single chain head event if we've progressed the chain
|
||||
if status == CanonStatTy && bc.CurrentBlock().Hash() == block.Hash() {
|
||||
events = append(events, ChainHeadEvent{block})
|
||||
log.Debug("New ChainHeadEvent from fetcher ", "number", block.NumberU64(), "hash", block.Hash())
|
||||
}
|
||||
return events, coalescedLogs, nil
|
||||
}
|
||||
|
||||
// insertStats tracks and reports on block insertion.
|
||||
type insertStats struct {
|
||||
queued, processed, ignored int
|
||||
|
|
|
|||
|
|
@ -36,4 +36,6 @@ var (
|
|||
ErrNotXDPoS = errors.New("XDPoS not found in config")
|
||||
|
||||
ErrNotFoundM1 = errors.New("list M1 not found ")
|
||||
|
||||
ErrStopPreparingBlock = errors.New("stop calculate a block not vrified M2")
|
||||
)
|
||||
|
|
|
|||
|
|
@ -1,3 +1,4 @@
|
|||
|
||||
// Copyright 2015 The go-ethereum Authors
|
||||
// This file is part of the go-ethereum library.
|
||||
//
|
||||
|
|
@ -40,6 +41,10 @@ type StateProcessor struct {
|
|||
bc *BlockChain // Canonical block chain
|
||||
engine consensus.Engine // Consensus engine used for block rewards
|
||||
}
|
||||
type CalculatedBlock struct {
|
||||
block *types.Block
|
||||
stop bool
|
||||
}
|
||||
|
||||
// NewStateProcessor initialises a new StateProcessor.
|
||||
func NewStateProcessor(config *params.ChainConfig, bc *BlockChain, engine consensus.Engine) *StateProcessor {
|
||||
|
|
@ -69,9 +74,7 @@ func (p *StateProcessor) Process(block *types.Block, statedb *state.StateDB, cfg
|
|||
if p.config.DAOForkSupport && p.config.DAOForkBlock != nil && p.config.DAOForkBlock.Cmp(block.Number()) == 0 {
|
||||
misc.ApplyDAOHardFork(statedb)
|
||||
}
|
||||
|
||||
InitSignerInTransactions(p.config, header, block.Transactions())
|
||||
// Iterate over and process the individual transactions
|
||||
for i, tx := range block.Transactions() {
|
||||
statedb.Prepare(tx.Hash(), block.Hash(), i)
|
||||
receipt, _, err := ApplyTransaction(p.config, p.bc, nil, gp, statedb, header, tx, usedGas, cfg)
|
||||
|
|
@ -83,7 +86,44 @@ func (p *StateProcessor) Process(block *types.Block, statedb *state.StateDB, cfg
|
|||
}
|
||||
// Finalize the block, applying any consensus engine specific extras (e.g. block rewards)
|
||||
p.engine.Finalize(p.bc, header, statedb, block.Transactions(), block.Uncles(), receipts)
|
||||
return receipts, allLogs, *usedGas, nil
|
||||
}
|
||||
|
||||
func (p *StateProcessor) ProcessBlockNoValidator(cBlock *CalculatedBlock, statedb *state.StateDB, cfg vm.Config) (types.Receipts, []*types.Log, uint64, error) {
|
||||
block := cBlock.block
|
||||
var (
|
||||
receipts types.Receipts
|
||||
usedGas = new(uint64)
|
||||
header = block.Header()
|
||||
allLogs []*types.Log
|
||||
gp = new(GasPool).AddGas(block.GasLimit())
|
||||
)
|
||||
// Mutate the the block and state according to any hard-fork specs
|
||||
if p.config.DAOForkSupport && p.config.DAOForkBlock != nil && p.config.DAOForkBlock.Cmp(block.Number()) == 0 {
|
||||
misc.ApplyDAOHardFork(statedb)
|
||||
}
|
||||
if cBlock.stop {
|
||||
return nil, nil, 0, ErrStopPreparingBlock
|
||||
}
|
||||
InitSignerInTransactions(p.config, header, block.Transactions())
|
||||
if cBlock.stop {
|
||||
return nil, nil, 0, ErrStopPreparingBlock
|
||||
}
|
||||
// Iterate over and process the individual transactions
|
||||
receipts = make([]*types.Receipt, block.Transactions().Len())
|
||||
for i, tx := range block.Transactions() {
|
||||
statedb.Prepare(tx.Hash(), block.Hash(), i)
|
||||
receipt, _, err := ApplyTransaction(p.config, p.bc, nil, gp, statedb, header, tx, usedGas, cfg)
|
||||
if err != nil {
|
||||
return nil, nil, 0, err
|
||||
}
|
||||
if cBlock.stop {
|
||||
return nil, nil, 0, ErrStopPreparingBlock
|
||||
}
|
||||
receipts[i] = receipt
|
||||
}
|
||||
// Finalize the block, applying any consensus engine specific extras (e.g. block rewards)
|
||||
p.engine.Finalize(p.bc, header, statedb, block.Transactions(), block.Uncles(), receipts)
|
||||
return receipts, allLogs, *usedGas, nil
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -652,6 +652,7 @@ func (pool *TxPool) add(tx *types.Transaction, local bool) (bool, error) {
|
|||
}
|
||||
// If the transaction pool is full, discard underpriced transactions
|
||||
if uint64(len(pool.all)) >= pool.config.GlobalSlots+pool.config.GlobalQueue {
|
||||
log.Debug("Add transaction to pool full", "hash", hash, "nonce", tx.Nonce())
|
||||
// If the new transaction is underpriced, don't accept it
|
||||
if pool.priced.Underpriced(tx, pool.locals) {
|
||||
log.Trace("Discarding underpriced transaction", "hash", hash, "price", tx.GasPrice())
|
||||
|
|
@ -879,9 +880,6 @@ func (pool *TxPool) addTx(tx *types.Transaction, local bool) error {
|
|||
|
||||
// addTxs attempts to queue a batch of transactions if they are valid.
|
||||
func (pool *TxPool) addTxs(txs []*types.Transaction, local bool) []error {
|
||||
for _, tx := range txs {
|
||||
types.CacheSigner(pool.signer, tx)
|
||||
}
|
||||
pool.mu.Lock()
|
||||
defer pool.mu.Unlock()
|
||||
|
||||
|
|
|
|||
|
|
@ -42,5 +42,5 @@ type Validator interface {
|
|||
// of gas used in the process and return an error if any of the internal rules
|
||||
// failed.
|
||||
type Processor interface {
|
||||
Process(block *types.Block, statedb *state.StateDB, cfg vm.Config) (types.Receipts, []*types.Log, uint64, error)
|
||||
ProcessBlockNoValidator(block *CalculatedBlock, statedb *state.StateDB, cfg vm.Config) (types.Receipts, []*types.Log, uint64, error)
|
||||
}
|
||||
|
|
|
|||
|
|
@ -125,6 +125,30 @@ func (h *Header) HashNoNonce() common.Hash {
|
|||
})
|
||||
}
|
||||
|
||||
// HashNoNonce returns the hash which is used as input for the proof-of-work search.
|
||||
func (h *Header) HashNoValidator() common.Hash {
|
||||
return rlpHash([]interface{}{
|
||||
h.ParentHash,
|
||||
h.UncleHash,
|
||||
h.Coinbase,
|
||||
h.Root,
|
||||
h.TxHash,
|
||||
h.ReceiptHash,
|
||||
h.Bloom,
|
||||
h.Difficulty,
|
||||
h.Number,
|
||||
h.GasLimit,
|
||||
h.GasUsed,
|
||||
h.Time,
|
||||
h.Extra,
|
||||
h.MixDigest,
|
||||
h.Nonce,
|
||||
h.Validators,
|
||||
[]byte{},
|
||||
h.Penalties,
|
||||
})
|
||||
}
|
||||
|
||||
// Size returns the approximate memory used by all internal contents. It is used
|
||||
// to approximate and limit the memory consumption of various caches.
|
||||
func (h *Header) Size() common.StorageSize {
|
||||
|
|
@ -337,6 +361,9 @@ func (b *Block) Body() *Body { return &Body{b.transactions, b.uncles} }
|
|||
func (b *Block) HashNoNonce() common.Hash {
|
||||
return b.header.HashNoNonce()
|
||||
}
|
||||
func (b *Block) HashNoValidator() common.Hash {
|
||||
return b.header.HashNoValidator()
|
||||
}
|
||||
|
||||
// Size returns the true RLP encoded storage size of the block, either by encoding
|
||||
// and returning it, or returning a previsouly cached value.
|
||||
|
|
@ -455,4 +482,4 @@ func (self blockSorter) Swap(i, j int) {
|
|||
}
|
||||
func (self blockSorter) Less(i, j int) bool { return self.by(self.blocks[i], self.blocks[j]) }
|
||||
|
||||
func Number(b1, b2 *Block) bool { return b1.header.Number.Cmp(b2.header.Number) < 0 }
|
||||
func Number(b1, b2 *Block) bool { return b1.header.Number.Cmp(b2.header.Number) < 0 }
|
||||
|
|
@ -1,3 +1,4 @@
|
|||
|
||||
// Copyright 2014 The go-ethereum Authors
|
||||
// This file is part of the go-ethereum library.
|
||||
//
|
||||
|
|
@ -203,29 +204,28 @@ func New(ctx *node.ServiceContext, config *Config) (*Ethereum, error) {
|
|||
return nil
|
||||
}
|
||||
|
||||
appendM2HeaderHook := func(block *types.Block) (*types.Block, error) {
|
||||
appendM2HeaderHook := func(block *types.Block) (*types.Block, bool, error) {
|
||||
eb, err := eth.Etherbase()
|
||||
if err != nil {
|
||||
log.Error("Cannot get etherbase for append m2 header", "err", err)
|
||||
return block, fmt.Errorf("etherbase missing: %v", err)
|
||||
return block, false, fmt.Errorf("etherbase missing: %v", err)
|
||||
}
|
||||
m1, err := c.RecoverSigner(block.Header())
|
||||
if err != nil {
|
||||
return block, fmt.Errorf("can't get block creator: %v", err)
|
||||
return block, false, fmt.Errorf("can't get block creator: %v", err)
|
||||
}
|
||||
m2, err := c.GetValidator(m1, eth.blockchain, block.Header())
|
||||
if err != nil {
|
||||
return block, fmt.Errorf("can't get block validator: %v", err)
|
||||
return block, false, fmt.Errorf("can't get block validator: %v", err)
|
||||
}
|
||||
if m2 == eb {
|
||||
wallet, _ := eth.accountManager.Find(accounts.Account{Address: eb})
|
||||
header := block.Header()
|
||||
sighash, _ := wallet.SignHash(accounts.Account{Address: eb}, XDPoS.SigHash(header).Bytes())
|
||||
header.Validator = sighash
|
||||
block = types.NewBlockWithHeader(header).WithBody(block.Transactions(), block.Uncles())
|
||||
return types.NewBlockWithHeader(header).WithBody(block.Transactions(), block.Uncles()), true, nil
|
||||
}
|
||||
|
||||
return block, nil
|
||||
return block, false, nil
|
||||
}
|
||||
|
||||
eth.protocolManager.fetcher.SetSignHook(signHook)
|
||||
|
|
@ -301,8 +301,8 @@ func New(ctx *node.ServiceContext, config *Config) (*Ethereum, error) {
|
|||
if foudationWalletAddr == (common.Address{}) {
|
||||
log.Error("Foundation Wallet Address is empty", "error", foudationWalletAddr)
|
||||
}
|
||||
start := time.Now()
|
||||
if number > 0 && number-rCheckpoint > 0 && foudationWalletAddr != (common.Address{}) {
|
||||
start := time.Now()
|
||||
// Get signers in blockSigner smartcontract.
|
||||
addr := common.HexToAddress(common.BlockSigners)
|
||||
// Get reward inflation.
|
||||
|
|
@ -334,8 +334,8 @@ func New(ctx *node.ServiceContext, config *Config) (*Ethereum, error) {
|
|||
}
|
||||
}
|
||||
}
|
||||
log.Debug("Time Calculated HookReward ", "block", header.Number.Uint64(), "time", common.PrettyDuration(time.Since(start)))
|
||||
}
|
||||
log.Debug("Time Calculated HookReward ", "block", header.Number.Uint64(), "time", common.PrettyDuration(time.Since(start)))
|
||||
return nil
|
||||
}
|
||||
|
||||
|
|
@ -354,8 +354,8 @@ func New(ctx *node.ServiceContext, config *Config) (*Ethereum, error) {
|
|||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
eth.txPool.IsMasterNode = func(address common.Address) bool {
|
||||
}
|
||||
eth.txPool.IsMasterNode = func(address common.Address) bool {
|
||||
currentHeader := eth.blockchain.CurrentHeader()
|
||||
snap, err := c.GetSnapshot(eth.blockchain, currentHeader)
|
||||
if err != nil {
|
||||
|
|
|
|||
|
|
@ -62,8 +62,10 @@ type blockBroadcasterFn func(block *types.Block, propagate bool)
|
|||
// chainHeightFn is a callback type to retrieve the current chain height.
|
||||
type chainHeightFn func() uint64
|
||||
|
||||
// chainInsertFn is a callback type to insert a batch of blocks into the local chain.
|
||||
type chainInsertFn func(blocks types.Blocks) (int, error)
|
||||
// blockInsertFn is a callback type to insert a batch of blocks into the local chain.
|
||||
type blockInsertFn func(block *types.Block) error
|
||||
|
||||
type blockPrepareFn func(block *types.Block) error
|
||||
|
||||
// peerDropFn is a callback type for dropping a peer detected as malicious.
|
||||
type peerDropFn func(id string)
|
||||
|
|
@ -135,8 +137,9 @@ type Fetcher struct {
|
|||
verifyHeader headerVerifierFn // Checks if a block's headers have a valid proof of work
|
||||
broadcastBlock blockBroadcasterFn // Broadcasts a block to connected peers
|
||||
chainHeight chainHeightFn // Retrieves the current chain's height
|
||||
insertChain chainInsertFn // Injects a batch of blocks into the chain
|
||||
dropPeer peerDropFn // Drops a peer for misbehaving
|
||||
insertBlock blockInsertFn // Injects a batch of blocks into the chain
|
||||
prepareBlock blockPrepareFn
|
||||
dropPeer peerDropFn // Drops a peer for misbehaving
|
||||
|
||||
// Testing hooks
|
||||
announceChangeHook func(common.Hash, bool) // Method to call upon adding or deleting a hash from the announce list
|
||||
|
|
@ -144,11 +147,11 @@ type Fetcher struct {
|
|||
fetchingHook func([]common.Hash) // Method to call upon starting a block (eth/61) or header (eth/62) fetch
|
||||
completingHook func([]common.Hash) // Method to call upon starting a block body fetch (eth/62)
|
||||
signHook func(*types.Block) error
|
||||
appendM2HeaderHook func(*types.Block) (*types.Block, error)
|
||||
appendM2HeaderHook func(*types.Block) (*types.Block, bool, error)
|
||||
}
|
||||
|
||||
// New creates a block fetcher to retrieve blocks based on hash announcements.
|
||||
func New(getBlock blockRetrievalFn, verifyHeader headerVerifierFn, broadcastBlock blockBroadcasterFn, chainHeight chainHeightFn, insertChain chainInsertFn, dropPeer peerDropFn) *Fetcher {
|
||||
func New(getBlock blockRetrievalFn, verifyHeader headerVerifierFn, broadcastBlock blockBroadcasterFn, chainHeight chainHeightFn, insertBlock blockInsertFn, prepareBlock blockPrepareFn, dropPeer peerDropFn) *Fetcher {
|
||||
knownBlocks, _ := lru.NewARC(blockLimit)
|
||||
return &Fetcher{
|
||||
notify: make(chan *announce),
|
||||
|
|
@ -171,7 +174,8 @@ func New(getBlock blockRetrievalFn, verifyHeader headerVerifierFn, broadcastBloc
|
|||
verifyHeader: verifyHeader,
|
||||
broadcastBlock: broadcastBlock,
|
||||
chainHeight: chainHeight,
|
||||
insertChain: insertChain,
|
||||
insertBlock: insertBlock,
|
||||
prepareBlock: prepareBlock,
|
||||
dropPeer: dropPeer,
|
||||
}
|
||||
}
|
||||
|
|
@ -605,7 +609,7 @@ func (f *Fetcher) rescheduleComplete(complete *time.Timer) {
|
|||
func (f *Fetcher) enqueue(peer string, block *types.Block) {
|
||||
hash := block.Hash()
|
||||
if f.knowns.Contains(hash) {
|
||||
log.Debug("Discarded propagated block, known block", "peer", peer, "number", block.Number(), "hash", hash, "limit", blockLimit)
|
||||
log.Trace("Discarded propagated block, known block", "peer", peer, "number", block.Number(), "hash", hash, "limit", blockLimit)
|
||||
return
|
||||
}
|
||||
// Ensure the peer isn't DOSing us
|
||||
|
|
@ -657,40 +661,56 @@ func (f *Fetcher) insert(peer string, block *types.Block) {
|
|||
log.Debug("Unknown parent of propagated block", "peer", peer, "number", block.Number(), "hash", hash, "parent", block.ParentHash())
|
||||
return
|
||||
}
|
||||
fastBroadCast := true
|
||||
again:
|
||||
err := f.verifyHeader(block.Header())
|
||||
// Quickly validate the header and propagate the block if it passes
|
||||
switch err := f.verifyHeader(block.Header()); err {
|
||||
switch err {
|
||||
case nil:
|
||||
// All ok, quickly propagate to our peers
|
||||
propBroadcastOutTimer.UpdateSince(block.ReceivedAt)
|
||||
go f.broadcastBlock(block, true)
|
||||
if fastBroadCast {
|
||||
go f.broadcastBlock(block, true)
|
||||
}
|
||||
case consensus.ErrFutureBlock:
|
||||
delay := time.Unix(block.Time().Int64(), 0).Sub(time.Now()) // nolint: gosimple
|
||||
time.Sleep(delay)
|
||||
log.Info("Receive future block", "number", block.NumberU64(), "hash", block.Hash().Hex(), "delay", delay)
|
||||
time.Sleep(delay)
|
||||
goto again
|
||||
case consensus.ErrNoValidatorSignature:
|
||||
newBlock := block
|
||||
var errM2 error
|
||||
isM2 := false
|
||||
if f.appendM2HeaderHook != nil {
|
||||
if newBlock, err = f.appendM2HeaderHook(block); err != nil {
|
||||
log.Error("Append m2 to block header fail", "err", err)
|
||||
if newBlock, isM2, errM2 = f.appendM2HeaderHook(block); errM2 != nil {
|
||||
log.Error("Append m2 to block header fail", "err", errM2)
|
||||
return
|
||||
}
|
||||
}
|
||||
if newBlock.Hash() == block.Hash() {
|
||||
if !isM2 {
|
||||
go f.broadcastBlock(block, true)
|
||||
if err := f.prepareBlock(block); err != nil {
|
||||
log.Debug("Propagated block prepare failed", "peer", peer, "number", block.Number(), "hash", hash, "err", err)
|
||||
return
|
||||
}
|
||||
return
|
||||
}
|
||||
log.Debug("Append M2 to header block", "numer", block.NumberU64(), "hahs", block.Hash())
|
||||
if err := f.prepareBlock(block); err != nil {
|
||||
log.Debug("Propagated block prepare failed", "peer", peer, "number", block.Number(), "hash", hash, "err", err)
|
||||
return
|
||||
}
|
||||
block = newBlock
|
||||
fastBroadCast = false
|
||||
goto again
|
||||
default:
|
||||
// Something went very wrong, drop the peer
|
||||
log.Debug("Propagated block verification failed", "peer", peer, "number", block.Number(), "hash", hash, "err", err)
|
||||
f.dropPeer(peer)
|
||||
return
|
||||
}
|
||||
|
||||
// Run the actual import and log any issues
|
||||
if _, err := f.insertChain(types.Blocks{block}); err != nil {
|
||||
if err := f.insertBlock(block); err != nil {
|
||||
log.Debug("Propagated block import failed", "peer", peer, "number", block.Number(), "hash", hash, "err", err)
|
||||
return
|
||||
}
|
||||
|
|
@ -703,9 +723,10 @@ func (f *Fetcher) insert(peer string, block *types.Block) {
|
|||
}
|
||||
// If import succeeded, broadcast the block
|
||||
propAnnounceOutTimer.UpdateSince(block.ReceivedAt)
|
||||
go f.broadcastBlock(block, true)
|
||||
//go f.broadcastBlock(block, false)
|
||||
}()
|
||||
if !fastBroadCast {
|
||||
go f.broadcastBlock(block, true)
|
||||
}
|
||||
}()
|
||||
}
|
||||
|
||||
// forgetHash removes all traces of a block announcement from the fetcher's
|
||||
|
|
@ -768,6 +789,6 @@ func (f *Fetcher) SetSignHook(signHook func(*types.Block) error) {
|
|||
}
|
||||
|
||||
// Bind append m2 to block header hook when imported into chain.
|
||||
func (f *Fetcher) SetAppendM2HeaderHook(appendM2HeaderHook func(*types.Block) (*types.Block, error)) {
|
||||
func (f *Fetcher) SetAppendM2HeaderHook(appendM2HeaderHook func(*types.Block) (*types.Block, bool, error)) {
|
||||
f.appendM2HeaderHook = appendM2HeaderHook
|
||||
}
|
||||
|
|
@ -92,7 +92,7 @@ func newTester() *fetcherTester {
|
|||
blocks: map[common.Hash]*types.Block{genesis.Hash(): genesis},
|
||||
drops: make(map[string]bool),
|
||||
}
|
||||
tester.fetcher = New(tester.getBlock, tester.verifyHeader, tester.broadcastBlock, tester.chainHeight, tester.insertChain, tester.dropPeer)
|
||||
tester.fetcher = New(tester.getBlock, tester.verifyHeader, tester.broadcastBlock, tester.chainHeight, tester.insertBlock, tester.prepareBlock, tester.dropPeer)
|
||||
tester.fetcher.Start()
|
||||
|
||||
return tester
|
||||
|
|
@ -123,7 +123,7 @@ func (f *fetcherTester) chainHeight() uint64 {
|
|||
return f.blocks[f.hashes[len(f.hashes)-1]].NumberU64()
|
||||
}
|
||||
|
||||
// insertChain injects a new blocks into the simulated chain.
|
||||
// insertBlock injects a new blocks into the simulated chain.
|
||||
func (f *fetcherTester) insertChain(blocks types.Blocks) (int, error) {
|
||||
f.lock.Lock()
|
||||
defer f.lock.Unlock()
|
||||
|
|
@ -144,6 +144,31 @@ func (f *fetcherTester) insertChain(blocks types.Blocks) (int, error) {
|
|||
return 0, nil
|
||||
}
|
||||
|
||||
// insertBlock injects a new blocks into the simulated chain.
|
||||
func (f *fetcherTester) insertBlock(block *types.Block) error {
|
||||
f.lock.Lock()
|
||||
defer f.lock.Unlock()
|
||||
|
||||
// Make sure the parent in known
|
||||
if _, ok := f.blocks[block.ParentHash()]; !ok {
|
||||
return errors.New("unknown parent")
|
||||
}
|
||||
// Discard any new blocks if the same height already exists
|
||||
if block.NumberU64() <= f.blocks[f.hashes[len(f.hashes)-1]].NumberU64() {
|
||||
return nil
|
||||
}
|
||||
// Otherwise build our current chain
|
||||
f.hashes = append(f.hashes, block.Hash())
|
||||
f.blocks[block.Hash()] = block
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// insertBlock injects a new blocks into the simulated chain.
|
||||
func (f *fetcherTester) prepareBlock(block *types.Block) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
// dropPeer is an emulator for the peer removal, simply accumulating the various
|
||||
// peers dropped by the fetcher.
|
||||
func (f *fetcherTester) dropPeer(peer string) {
|
||||
|
|
@ -288,7 +313,7 @@ func testSequentialAnnouncements(t *testing.T, protocol int) {
|
|||
|
||||
// Iteratively announce blocks until all are imported
|
||||
imported := make(chan *types.Block)
|
||||
tester.fetcher.importedHook = func(block *types.Block) error {
|
||||
tester.fetcher.signHook = func(block *types.Block) error {
|
||||
imported <- block
|
||||
return nil
|
||||
}
|
||||
|
|
@ -329,7 +354,7 @@ func testConcurrentAnnouncements(t *testing.T, protocol int) {
|
|||
}
|
||||
// Iteratively announce blocks until all are imported
|
||||
imported := make(chan *types.Block)
|
||||
tester.fetcher.importedHook = func(block *types.Block) error {
|
||||
tester.fetcher.signHook = func(block *types.Block) error {
|
||||
imported <- block
|
||||
return nil
|
||||
}
|
||||
|
|
@ -369,7 +394,7 @@ func testOverlappingAnnouncements(t *testing.T, protocol int) {
|
|||
for i := 0; i < overlap; i++ {
|
||||
imported <- nil
|
||||
}
|
||||
tester.fetcher.importedHook = func(block *types.Block) error {
|
||||
tester.fetcher.signHook = func(block *types.Block) error {
|
||||
imported <- block
|
||||
return nil
|
||||
}
|
||||
|
|
@ -446,7 +471,7 @@ func testRandomArrivalImport(t *testing.T, protocol int) {
|
|||
|
||||
// Iteratively announce blocks, skipping one entry
|
||||
imported := make(chan *types.Block, len(hashes)-1)
|
||||
tester.fetcher.importedHook = func(block *types.Block) error {
|
||||
tester.fetcher.signHook = func(block *types.Block) error {
|
||||
imported <- block
|
||||
return nil
|
||||
}
|
||||
|
|
@ -480,7 +505,7 @@ func testQueueGapFill(t *testing.T, protocol int) {
|
|||
|
||||
// Iteratively announce blocks, skipping one entry
|
||||
imported := make(chan *types.Block, len(hashes)-1)
|
||||
tester.fetcher.importedHook = func(block *types.Block) error {
|
||||
tester.fetcher.signHook = func(block *types.Block) error {
|
||||
imported <- block
|
||||
return nil
|
||||
}
|
||||
|
|
@ -512,15 +537,15 @@ func testImportDeduplication(t *testing.T, protocol int) {
|
|||
bodyFetcher := tester.makeBodyFetcher("valid", blocks, 0)
|
||||
|
||||
counter := uint32(0)
|
||||
tester.fetcher.insertChain = func(blocks types.Blocks) (int, error) {
|
||||
atomic.AddUint32(&counter, uint32(len(blocks)))
|
||||
return tester.insertChain(blocks)
|
||||
tester.fetcher.insertBlock = func(block *types.Block) error {
|
||||
atomic.AddUint32(&counter, uint32(1))
|
||||
return tester.insertBlock(block)
|
||||
}
|
||||
// Instrument the fetching and imported events
|
||||
fetching := make(chan []common.Hash)
|
||||
imported := make(chan *types.Block, len(hashes)-1)
|
||||
tester.fetcher.fetchingHook = func(hashes []common.Hash) { fetching <- hashes }
|
||||
tester.fetcher.importedHook = func(block *types.Block) error {
|
||||
tester.fetcher.signHook = func(block *types.Block) error {
|
||||
imported <- block
|
||||
return nil
|
||||
}
|
||||
|
|
@ -632,7 +657,7 @@ func testInvalidNumberAnnouncement(t *testing.T, protocol int) {
|
|||
badBodyFetcher := tester.makeBodyFetcher("bad", blocks, 0)
|
||||
|
||||
imported := make(chan *types.Block)
|
||||
tester.fetcher.importedHook = func(block *types.Block) error {
|
||||
tester.fetcher.signHook = func(block *types.Block) error {
|
||||
imported <- block
|
||||
return nil
|
||||
}
|
||||
|
|
@ -687,7 +712,7 @@ func testEmptyBlockShortCircuit(t *testing.T, protocol int) {
|
|||
tester.fetcher.completingHook = func(hashes []common.Hash) { completing <- hashes }
|
||||
|
||||
imported := make(chan *types.Block)
|
||||
tester.fetcher.importedHook = func(block *types.Block) error {
|
||||
tester.fetcher.signHook = func(block *types.Block) error {
|
||||
imported <- block
|
||||
return nil
|
||||
}
|
||||
|
|
@ -720,7 +745,7 @@ func testHashMemoryExhaustionAttack(t *testing.T, protocol int) {
|
|||
tester := newTester()
|
||||
|
||||
imported, announces := make(chan *types.Block), int32(0)
|
||||
tester.fetcher.importedHook = func(block *types.Block) error {
|
||||
tester.fetcher.signHook = func(block *types.Block) error {
|
||||
imported <- block
|
||||
return nil
|
||||
}
|
||||
|
|
@ -770,7 +795,7 @@ func TestBlockMemoryExhaustionAttack(t *testing.T) {
|
|||
tester := newTester()
|
||||
|
||||
imported, enqueued := make(chan *types.Block), int32(0)
|
||||
tester.fetcher.importedHook = func(block *types.Block) error {
|
||||
tester.fetcher.signHook = func(block *types.Block) error {
|
||||
imported <- block
|
||||
return nil
|
||||
}
|
||||
|
|
|
|||
|
|
@ -170,16 +170,26 @@ func NewProtocolManager(config *params.ChainConfig, mode downloader.SyncMode, ne
|
|||
heighter := func() uint64 {
|
||||
return blockchain.CurrentBlock().NumberU64()
|
||||
}
|
||||
inserter := func(blocks types.Blocks) (int, error) {
|
||||
inserter := func(block *types.Block) error {
|
||||
// If fast sync is running, deny importing weird blocks
|
||||
if atomic.LoadUint32(&manager.fastSync) == 1 {
|
||||
log.Warn("Discarded bad propagated block", "number", blocks[0].Number(), "hash", blocks[0].Hash())
|
||||
return 0, nil
|
||||
log.Warn("Discarded bad propagated block", "number", block.Number(), "hash", block.Hash())
|
||||
return nil
|
||||
}
|
||||
atomic.StoreUint32(&manager.acceptTxs, 1) // Mark initial sync done on any fetcher import
|
||||
return manager.blockchain.InsertChain(blocks)
|
||||
return manager.blockchain.InsertBlock(block)
|
||||
}
|
||||
manager.fetcher = fetcher.New(blockchain.GetBlockByHash, validator, manager.BroadcastBlock, heighter, inserter, manager.removePeer)
|
||||
|
||||
prepare := func(block *types.Block) error {
|
||||
// If fast sync is running, deny importing weird blocks
|
||||
if atomic.LoadUint32(&manager.fastSync) == 1 {
|
||||
log.Warn("Discarded bad propagated block", "number", block.Number(), "hash", block.Hash())
|
||||
return nil
|
||||
}
|
||||
atomic.StoreUint32(&manager.acceptTxs, 1) // Mark initial sync done on any fetcher import
|
||||
return manager.blockchain.PrepareBlock(block)
|
||||
}
|
||||
manager.fetcher = fetcher.New(blockchain.GetBlockByHash, validator, manager.BroadcastBlock, heighter, inserter, prepare, manager.removePeer)
|
||||
|
||||
return manager, nil
|
||||
}
|
||||
|
|
|
|||
|
|
@ -169,9 +169,7 @@ func (pm *ProtocolManager) synchronise(peer *peer) {
|
|||
// Make sure the peer's TD is higher than our own
|
||||
currentBlock := pm.blockchain.CurrentBlock()
|
||||
td := pm.blockchain.GetTd(currentBlock.Hash(), currentBlock.NumberU64())
|
||||
|
||||
pHead, pTd := peer.Head()
|
||||
log.Debug("ProtocolManager synchronise ", "p", peer, "pTd", pTd, "currentTd", td)
|
||||
if pTd.Cmp(td) <= 0 {
|
||||
return
|
||||
}
|
||||
|
|
|
|||
|
|
@ -310,7 +310,6 @@ func (self *worker) update() {
|
|||
self.commitNewWork()
|
||||
}
|
||||
}
|
||||
// System stopped
|
||||
case <-self.chainHeadSub.Err():
|
||||
return
|
||||
case <-self.chainSideSub.Err():
|
||||
|
|
@ -545,7 +544,6 @@ func (self *worker) commitNewWork() {
|
|||
}
|
||||
}
|
||||
}
|
||||
|
||||
tstamp := tstart.Unix()
|
||||
if parent.Time().Cmp(new(big.Int).SetInt64(tstamp)) >= 0 {
|
||||
tstamp = parent.Time().Int64() + 1
|
||||
|
|
|
|||
Loading…
Reference in a new issue