diff --git a/consensus/consensus.go b/consensus/consensus.go index be5e661c12..b02afa63c4 100644 --- a/consensus/consensus.go +++ b/consensus/consensus.go @@ -58,7 +58,7 @@ type Engine interface { // VerifyHeader checks whether a header conforms to the consensus rules of a // given engine. Verifying the seal may be done optionally here, or explicitly // via the VerifySeal method. - VerifyHeader(chain ChainReader, header *types.Header, seal bool) error + VerifyHeader(chain ChainReader, header *types.Header, fullVerify bool) error // VerifyHeaders is similar to VerifyHeader, but verifies a batch of headers // concurrently. The method returns a quit channel to abort the operations and diff --git a/core/blockchain.go b/core/blockchain.go index e31f933a7e..679d2f3a1d 100644 --- a/core/blockchain.go +++ b/core/blockchain.go @@ -75,6 +75,13 @@ type CacheConfig struct { TrieNodeLimit int // Memory limit (MB) at which to flush the current in-memory trie to disk TrieTimeLimit time.Duration // Time limit after which to flush the current in-memory trie to disk } +type ResultProcessBlock struct { + logs []*types.Log + receipts []*types.Receipt + state *state.StateDB + proctime time.Duration + usedGas uint64 +} // BlockChain represents the canonical chain given a database with a genesis // block. The Blockchain manages chain imports, reverts, chain reorganisations. @@ -115,14 +122,16 @@ type BlockChain struct { currentBlock atomic.Value // Current head of the block chain currentFastBlock atomic.Value // Current head of the fast-sync chain (may be above the block chain!) - stateCache state.Database // State database to reuse between imports (contains state cache) - bodyCache *lru.Cache // Cache for the most recent block bodies - bodyRLPCache *lru.Cache // Cache for the most recent block bodies in RLP encoded format - blockCache *lru.Cache // Cache for the most recent entire blocks - futureBlocks *lru.Cache // future blocks are blocks added for later processing - - quit chan struct{} // blockchain quit channel - running int32 // running must be called atomically + stateCache state.Database // State database to reuse between imports (contains state cache) + bodyCache *lru.Cache // Cache for the most recent block bodies + bodyRLPCache *lru.Cache // Cache for the most recent block bodies in RLP encoded format + blockCache *lru.Cache // Cache for the most recent entire blocks + futureBlocks *lru.Cache // future blocks are blocks added for later processing + resultProcess *lru.Cache + calculatingBlock *lru.Cache + downloadingBlock *lru.Cache + quit chan struct{} // blockchain quit channel + running int32 // running must be called atomically // procInterrupt must be atomically called procInterrupt int32 // interrupt signaler for block processing wg sync.WaitGroup // chain processing wait group for shutting down @@ -152,21 +161,26 @@ func NewBlockChain(db ethdb.Database, cacheConfig *CacheConfig, chainConfig *par blockCache, _ := lru.New(blockCacheLimit) futureBlocks, _ := lru.New(maxFutureBlocks) badBlocks, _ := lru.New(badBlockLimit) - + resultProcess, _ := lru.New(blockCacheLimit) + preparingBlock, _ := lru.New(blockCacheLimit) + downloadingBlock, _ := lru.New(blockCacheLimit) bc := &BlockChain{ - chainConfig: chainConfig, - cacheConfig: cacheConfig, - db: db, - triegc: prque.New(), - stateCache: state.NewDatabase(db), - quit: make(chan struct{}), - bodyCache: bodyCache, - bodyRLPCache: bodyRLPCache, - blockCache: blockCache, - futureBlocks: futureBlocks, - engine: engine, - vmConfig: vmConfig, - badBlocks: badBlocks, + chainConfig: chainConfig, + cacheConfig: cacheConfig, + db: db, + triegc: prque.New(), + stateCache: state.NewDatabase(db), + quit: make(chan struct{}), + bodyCache: bodyCache, + bodyRLPCache: bodyRLPCache, + blockCache: blockCache, + futureBlocks: futureBlocks, + resultProcess: resultProcess, + calculatingBlock: preparingBlock, + downloadingBlock: downloadingBlock, + engine: engine, + vmConfig: vmConfig, + badBlocks: badBlocks, } bc.SetValidator(NewBlockValidator(chainConfig, bc, engine)) bc.SetProcessor(NewStateProcessor(chainConfig, bc, engine)) @@ -1049,6 +1063,7 @@ func (bc *BlockChain) insertChain(chain types.Blocks) (int, []interface{}, []*ty for i, block := range chain { headers[i] = block.Header() seals[i] = true + bc.downloadingBlock.Add(block.Hash(), true) } abort, results := bc.engine.VerifyHeaders(bc, headers, seals) defer close(abort) @@ -1168,7 +1183,7 @@ func (bc *BlockChain) insertChain(chain types.Blocks) (int, []interface{}, []*ty } switch status { case CanonStatTy: - log.Debug("Inserted new block", "number", block.Number(), "hash", block.Hash(), "uncles", len(block.Uncles()), + log.Debug("Inserted new block from downloader", "number", block.Number(), "hash", block.Hash(), "uncles", len(block.Uncles()), "txs", len(block.Transactions()), "gas", block.GasUsed(), "elapsed", common.PrettyDuration(time.Since(bstart))) coalescedLogs = append(coalescedLogs, logs...) @@ -1180,7 +1195,7 @@ func (bc *BlockChain) insertChain(chain types.Blocks) (int, []interface{}, []*ty bc.gcproc += proctime case SideStatTy: - log.Debug("Inserted forked block", "number", block.Number(), "hash", block.Hash(), "diff", block.Difficulty(), "elapsed", + log.Debug("Inserted forked block from downloader", "number", block.Number(), "hash", block.Hash(), "diff", block.Difficulty(), "elapsed", common.PrettyDuration(time.Since(bstart)), "txs", len(block.Transactions()), "gas", block.GasUsed(), "uncles", len(block.Uncles())) blockInsertTimer.UpdateSince(bstart) @@ -1189,7 +1204,7 @@ func (bc *BlockChain) insertChain(chain types.Blocks) (int, []interface{}, []*ty stats.processed++ stats.usedGas += usedGas stats.report(chain, i, bc.stateCache.TrieDB().Size()) - if bc.chainConfig.XDPoS != nil { + if status == CanonStatTy && bc.chainConfig.XDPoS != nil { // epoch block if (chain[i].NumberU64() % bc.chainConfig.XDPoS.Epoch) == 0 { CheckpointCh <- 1 @@ -1206,11 +1221,212 @@ func (bc *BlockChain) insertChain(chain types.Blocks) (int, []interface{}, []*ty } // Append a single chain head event if we've progressed the chain if lastCanon != nil && bc.CurrentBlock().Hash() == lastCanon.Hash() { + log.Debug("New ChainHeadEvent ", "number", lastCanon.NumberU64(), "hash", lastCanon.Hash()) events = append(events, ChainHeadEvent{lastCanon}) } return 0, events, coalescedLogs, nil } +func (bc *BlockChain) InsertBlock(block *types.Block) error { + events, logs, err := bc.insertBlock(block) + bc.PostChainEvents(events, logs) + return err +} + +func (bc *BlockChain) PrepareBlock(block *types.Block) (err error) { + defer log.Debug("Done prepare block ", "number", block.NumberU64(), "hash", block.Hash(), "validator", block.Header().Validator, "err", err) + if _, check := bc.resultProcess.Get(block.Hash()); check { + log.Debug("Stop prepare a block because the result cached", "number", block.NumberU64(), "hash", block.Hash(), "validator", block.Header().Validator) + return nil + } + if _, check := bc.calculatingBlock.Get(block.Hash()); check { + log.Debug("Stop prepare a block because inserting", "number", block.NumberU64(), "hash", block.Hash(), "validator", block.Header().Validator) + return nil + } + err = bc.engine.VerifyHeader(bc, block.Header(), false) + if err != nil { + return err + } + result, err := bc.getResultBlock(block, false) + if err == nil { + bc.resultProcess.Add(block.Hash(), result) + return nil + } else if err == ErrKnownBlock { + return nil + } else if err == ErrStopPreparingBlock { + log.Debug("Stop prepare a block because calculating", "number", block.NumberU64(), "hash", block.Hash(), "validator", block.Header().Validator) + return nil + } + return err +} + +func (bc *BlockChain) getResultBlock(block *types.Block, verifiedM2 bool) (*ResultProcessBlock, error) { + var calculatedBlock *CalculatedBlock + if verifiedM2 { + if result, check := bc.resultProcess.Get(block.HashNoValidator()); check { + log.Debug("Get result block from cache ", "number", block.NumberU64(), "hash", block.Hash(), "hash no validator", block.HashNoValidator()) + return result.(*ResultProcessBlock), nil + } + log.Debug("Not found cache prepare block ", "number", block.NumberU64(), "hash", block.Hash(), "validator", block.HashNoValidator()) + if calculatedBlock, _ := bc.calculatingBlock.Get(block.HashNoValidator()); calculatedBlock != nil { + calculatedBlock.(*CalculatedBlock).stop = true + } + } + calculatedBlock = &CalculatedBlock{block, false} + bc.calculatingBlock.Add(block.HashNoValidator(), calculatedBlock) + // Start the parallel header verifier + // If the chain is terminating, stop processing blocks + if atomic.LoadInt32(&bc.procInterrupt) == 1 { + log.Debug("Premature abort during blocks processing") + return nil, ErrBlacklistedHash + } + // If the header is a banned one, straight out abort + if BadHashes[block.Hash()] { + bc.reportBlock(block, nil, ErrBlacklistedHash) + return nil, ErrBlacklistedHash + } + // Wait for the block's verification to complete + bstart := time.Now() + err := bc.Validator().ValidateBody(block) + switch { + case err == ErrKnownBlock: + // Block and state both already known. However if the current block is below + // this number we did a rollback and we should reimport it nonetheless. + if bc.CurrentBlock().NumberU64() >= block.NumberU64() { + return nil, ErrKnownBlock + } + case err == consensus.ErrPrunedAncestor: + // Block competing with the canonical chain, store in the db, but don't process + // until the competitor TD goes above the canonical TD + currentBlock := bc.CurrentBlock() + localTd := bc.GetTd(currentBlock.Hash(), currentBlock.NumberU64()) + externTd := new(big.Int).Add(bc.GetTd(block.ParentHash(), block.NumberU64()-1), block.Difficulty()) + if localTd.Cmp(externTd) > 0 { + return nil, err + } + // Competitor chain beat canonical, gather all blocks from the common ancestor + var winner []*types.Block + + parent := bc.GetBlock(block.ParentHash(), block.NumberU64()-1) + for !bc.HasState(parent.Root()) { + winner = append(winner, parent) + parent = bc.GetBlock(parent.ParentHash(), parent.NumberU64()-1) + } + for j := 0; j < len(winner)/2; j++ { + winner[j], winner[len(winner)-1-j] = winner[len(winner)-1-j], winner[j] + } + log.Debug("Number block need calculated again", "number", block.NumberU64(), "hash", block.Hash().Hex(), "winners", len(winner)) + // Import all the pruned blocks to make the state available + _, _, _, err := bc.insertChain(winner) + if err != nil { + return nil, err + } + case err != nil: + bc.reportBlock(block, nil, err) + return nil, err + } + // Create a new statedb using the parent block and report an + // error if it fails. + var parent = bc.GetBlock(block.ParentHash(), block.NumberU64()-1) + state, err := state.New(parent.Root(), bc.stateCache) + if err != nil { + return nil, err + } + // Process block using the parent state as reference point. + receipts, logs, usedGas, err := bc.processor.ProcessBlockNoValidator(calculatedBlock, state, bc.vmConfig) + process := time.Since(bstart) + if err != nil { + if err != ErrStopPreparingBlock { + bc.reportBlock(block, receipts, err) + } + return nil, err + } + // Validate the state using the default validator + err = bc.Validator().ValidateState(block, parent, state, receipts, usedGas) + if err != nil { + bc.reportBlock(block, receipts, err) + return nil, err + } + proctime := time.Since(bstart) + log.Debug("Caculate new block", "number", block.Number(), "hash", block.Hash(), "uncles", len(block.Uncles()), + "txs", len(block.Transactions()), "gas", block.GasUsed(), "elapsed", common.PrettyDuration(time.Since(bstart)), "process", process) + return &ResultProcessBlock{receipts: receipts, logs: logs, state: state, proctime: proctime, usedGas: usedGas}, nil +} + +// insertChain will execute the actual chain insertion and event aggregation. The +// only reason this method exists as a separate one is to make locking cleaner +// with deferred statements. +func (bc *BlockChain) insertBlock(block *types.Block) ([]interface{}, []*types.Log, error) { + var ( + stats = insertStats{startTime: mclock.Now()} + events = make([]interface{}, 0, 1) + coalescedLogs []*types.Log + ) + if _, check := bc.downloadingBlock.Get(block.Hash()); check { + log.Debug("Stop fetcher a block because downloading", "number", block.NumberU64(), "hash", block.Hash()) + return events, coalescedLogs, nil + } + result, err := bc.getResultBlock(block, true) + if err != nil { + return events, coalescedLogs, err + } + defer bc.resultProcess.Remove(block.HashNoValidator()) + bc.wg.Add(1) + defer bc.wg.Done() + // Write the block to the chain and get the status. + bc.chainmu.Lock() + defer bc.chainmu.Unlock() + if bc.HasBlockAndState(block.Hash(), block.NumberU64()) { + return events, coalescedLogs, nil + } + status, err := bc.WriteBlockWithState(block, result.receipts, result.state) + + if err != nil { + return events, coalescedLogs, err + } + switch status { + case CanonStatTy: + log.Debug("Inserted new block from fetcher", "number", block.Number(), "hash", block.Hash(), "uncles", len(block.Uncles()), + "txs", len(block.Transactions()), "gas", block.GasUsed(), "elapsed", common.PrettyDuration(time.Since(block.ReceivedAt))) + + coalescedLogs = append(coalescedLogs, result.logs...) + events = append(events, ChainEvent{block, block.Hash(), result.logs}) + + // Only count canonical blocks for GC processing time + bc.gcproc += result.proctime + + case SideStatTy: + log.Debug("Inserted forked block from fetcher", "number", block.Number(), "hash", block.Hash(), "diff", block.Difficulty(), "elapsed", + common.PrettyDuration(time.Since(block.ReceivedAt)), "txs", len(block.Transactions()), "gas", block.GasUsed(), "uncles", len(block.Uncles())) + + blockInsertTimer.Update(result.proctime) + events = append(events, ChainSideEvent{block}) + } + stats.processed++ + stats.usedGas += result.usedGas + stats.report(types.Blocks{block}, 0, bc.stateCache.TrieDB().Size()) + if status == CanonStatTy && bc.chainConfig.XDPoS != nil { + // epoch block + if (block.NumberU64() % bc.chainConfig.XDPoS.Epoch) == 0 { + CheckpointCh <- 1 + } + // prepare set of masternodes for the next epoch + if (block.NumberU64() % bc.chainConfig.XDPoS.Epoch) == (bc.chainConfig.XDPoS.Epoch - bc.chainConfig.XDPoS.Gap) { + err := bc.UpdateM1() + if err != nil { + log.Error("Error when update masternodes set. Stopping node", "err", err) + os.Exit(1) + } + } + } + // Append a single chain head event if we've progressed the chain + if status == CanonStatTy && bc.CurrentBlock().Hash() == block.Hash() { + events = append(events, ChainHeadEvent{block}) + log.Debug("New ChainHeadEvent from fetcher ", "number", block.NumberU64(), "hash", block.Hash()) + } + return events, coalescedLogs, nil +} + // insertStats tracks and reports on block insertion. type insertStats struct { queued, processed, ignored int diff --git a/core/error.go b/core/error.go index d091aeedd4..6ba184851d 100644 --- a/core/error.go +++ b/core/error.go @@ -36,4 +36,6 @@ var ( ErrNotXDPoS = errors.New("XDPoS not found in config") ErrNotFoundM1 = errors.New("list M1 not found ") + + ErrStopPreparingBlock = errors.New("stop calculate a block not vrified M2") ) diff --git a/core/state_processor.go b/core/state_processor.go index 010a3a153c..840497b22d 100644 --- a/core/state_processor.go +++ b/core/state_processor.go @@ -1,3 +1,4 @@ + // Copyright 2015 The go-ethereum Authors // This file is part of the go-ethereum library. // @@ -40,6 +41,10 @@ type StateProcessor struct { bc *BlockChain // Canonical block chain engine consensus.Engine // Consensus engine used for block rewards } +type CalculatedBlock struct { + block *types.Block + stop bool +} // NewStateProcessor initialises a new StateProcessor. func NewStateProcessor(config *params.ChainConfig, bc *BlockChain, engine consensus.Engine) *StateProcessor { @@ -69,9 +74,7 @@ func (p *StateProcessor) Process(block *types.Block, statedb *state.StateDB, cfg if p.config.DAOForkSupport && p.config.DAOForkBlock != nil && p.config.DAOForkBlock.Cmp(block.Number()) == 0 { misc.ApplyDAOHardFork(statedb) } - InitSignerInTransactions(p.config, header, block.Transactions()) - // Iterate over and process the individual transactions for i, tx := range block.Transactions() { statedb.Prepare(tx.Hash(), block.Hash(), i) receipt, _, err := ApplyTransaction(p.config, p.bc, nil, gp, statedb, header, tx, usedGas, cfg) @@ -83,7 +86,44 @@ func (p *StateProcessor) Process(block *types.Block, statedb *state.StateDB, cfg } // Finalize the block, applying any consensus engine specific extras (e.g. block rewards) p.engine.Finalize(p.bc, header, statedb, block.Transactions(), block.Uncles(), receipts) + return receipts, allLogs, *usedGas, nil +} +func (p *StateProcessor) ProcessBlockNoValidator(cBlock *CalculatedBlock, statedb *state.StateDB, cfg vm.Config) (types.Receipts, []*types.Log, uint64, error) { + block := cBlock.block + var ( + receipts types.Receipts + usedGas = new(uint64) + header = block.Header() + allLogs []*types.Log + gp = new(GasPool).AddGas(block.GasLimit()) + ) + // Mutate the the block and state according to any hard-fork specs + if p.config.DAOForkSupport && p.config.DAOForkBlock != nil && p.config.DAOForkBlock.Cmp(block.Number()) == 0 { + misc.ApplyDAOHardFork(statedb) + } + if cBlock.stop { + return nil, nil, 0, ErrStopPreparingBlock + } + InitSignerInTransactions(p.config, header, block.Transactions()) + if cBlock.stop { + return nil, nil, 0, ErrStopPreparingBlock + } + // Iterate over and process the individual transactions + receipts = make([]*types.Receipt, block.Transactions().Len()) + for i, tx := range block.Transactions() { + statedb.Prepare(tx.Hash(), block.Hash(), i) + receipt, _, err := ApplyTransaction(p.config, p.bc, nil, gp, statedb, header, tx, usedGas, cfg) + if err != nil { + return nil, nil, 0, err + } + if cBlock.stop { + return nil, nil, 0, ErrStopPreparingBlock + } + receipts[i] = receipt + } + // Finalize the block, applying any consensus engine specific extras (e.g. block rewards) + p.engine.Finalize(p.bc, header, statedb, block.Transactions(), block.Uncles(), receipts) return receipts, allLogs, *usedGas, nil } diff --git a/core/tx_pool.go b/core/tx_pool.go index aee9e51975..09335e87ef 100644 --- a/core/tx_pool.go +++ b/core/tx_pool.go @@ -652,6 +652,7 @@ func (pool *TxPool) add(tx *types.Transaction, local bool) (bool, error) { } // If the transaction pool is full, discard underpriced transactions if uint64(len(pool.all)) >= pool.config.GlobalSlots+pool.config.GlobalQueue { + log.Debug("Add transaction to pool full", "hash", hash, "nonce", tx.Nonce()) // If the new transaction is underpriced, don't accept it if pool.priced.Underpriced(tx, pool.locals) { log.Trace("Discarding underpriced transaction", "hash", hash, "price", tx.GasPrice()) @@ -879,9 +880,6 @@ func (pool *TxPool) addTx(tx *types.Transaction, local bool) error { // addTxs attempts to queue a batch of transactions if they are valid. func (pool *TxPool) addTxs(txs []*types.Transaction, local bool) []error { - for _, tx := range txs { - types.CacheSigner(pool.signer, tx) - } pool.mu.Lock() defer pool.mu.Unlock() diff --git a/core/types.go b/core/types.go index d0bbaf0aa7..8b686e566b 100644 --- a/core/types.go +++ b/core/types.go @@ -42,5 +42,5 @@ type Validator interface { // of gas used in the process and return an error if any of the internal rules // failed. type Processor interface { - Process(block *types.Block, statedb *state.StateDB, cfg vm.Config) (types.Receipts, []*types.Log, uint64, error) + ProcessBlockNoValidator(block *CalculatedBlock, statedb *state.StateDB, cfg vm.Config) (types.Receipts, []*types.Log, uint64, error) } diff --git a/core/types/block.go b/core/types/block.go index 25865d4b25..b0924ecca8 100644 --- a/core/types/block.go +++ b/core/types/block.go @@ -125,6 +125,30 @@ func (h *Header) HashNoNonce() common.Hash { }) } +// HashNoNonce returns the hash which is used as input for the proof-of-work search. +func (h *Header) HashNoValidator() common.Hash { + return rlpHash([]interface{}{ + h.ParentHash, + h.UncleHash, + h.Coinbase, + h.Root, + h.TxHash, + h.ReceiptHash, + h.Bloom, + h.Difficulty, + h.Number, + h.GasLimit, + h.GasUsed, + h.Time, + h.Extra, + h.MixDigest, + h.Nonce, + h.Validators, + []byte{}, + h.Penalties, + }) +} + // Size returns the approximate memory used by all internal contents. It is used // to approximate and limit the memory consumption of various caches. func (h *Header) Size() common.StorageSize { @@ -337,6 +361,9 @@ func (b *Block) Body() *Body { return &Body{b.transactions, b.uncles} } func (b *Block) HashNoNonce() common.Hash { return b.header.HashNoNonce() } +func (b *Block) HashNoValidator() common.Hash { + return b.header.HashNoValidator() +} // Size returns the true RLP encoded storage size of the block, either by encoding // and returning it, or returning a previsouly cached value. @@ -455,4 +482,4 @@ func (self blockSorter) Swap(i, j int) { } func (self blockSorter) Less(i, j int) bool { return self.by(self.blocks[i], self.blocks[j]) } -func Number(b1, b2 *Block) bool { return b1.header.Number.Cmp(b2.header.Number) < 0 } +func Number(b1, b2 *Block) bool { return b1.header.Number.Cmp(b2.header.Number) < 0 } \ No newline at end of file diff --git a/eth/backend.go b/eth/backend.go index e8a5e30739..3f0bd0d5c0 100644 --- a/eth/backend.go +++ b/eth/backend.go @@ -1,3 +1,4 @@ + // Copyright 2014 The go-ethereum Authors // This file is part of the go-ethereum library. // @@ -203,29 +204,28 @@ func New(ctx *node.ServiceContext, config *Config) (*Ethereum, error) { return nil } - appendM2HeaderHook := func(block *types.Block) (*types.Block, error) { + appendM2HeaderHook := func(block *types.Block) (*types.Block, bool, error) { eb, err := eth.Etherbase() if err != nil { log.Error("Cannot get etherbase for append m2 header", "err", err) - return block, fmt.Errorf("etherbase missing: %v", err) + return block, false, fmt.Errorf("etherbase missing: %v", err) } m1, err := c.RecoverSigner(block.Header()) if err != nil { - return block, fmt.Errorf("can't get block creator: %v", err) + return block, false, fmt.Errorf("can't get block creator: %v", err) } m2, err := c.GetValidator(m1, eth.blockchain, block.Header()) if err != nil { - return block, fmt.Errorf("can't get block validator: %v", err) + return block, false, fmt.Errorf("can't get block validator: %v", err) } if m2 == eb { wallet, _ := eth.accountManager.Find(accounts.Account{Address: eb}) header := block.Header() sighash, _ := wallet.SignHash(accounts.Account{Address: eb}, XDPoS.SigHash(header).Bytes()) header.Validator = sighash - block = types.NewBlockWithHeader(header).WithBody(block.Transactions(), block.Uncles()) + return types.NewBlockWithHeader(header).WithBody(block.Transactions(), block.Uncles()), true, nil } - - return block, nil + return block, false, nil } eth.protocolManager.fetcher.SetSignHook(signHook) @@ -301,8 +301,8 @@ func New(ctx *node.ServiceContext, config *Config) (*Ethereum, error) { if foudationWalletAddr == (common.Address{}) { log.Error("Foundation Wallet Address is empty", "error", foudationWalletAddr) } - start := time.Now() if number > 0 && number-rCheckpoint > 0 && foudationWalletAddr != (common.Address{}) { + start := time.Now() // Get signers in blockSigner smartcontract. addr := common.HexToAddress(common.BlockSigners) // Get reward inflation. @@ -334,8 +334,8 @@ func New(ctx *node.ServiceContext, config *Config) (*Ethereum, error) { } } } + log.Debug("Time Calculated HookReward ", "block", header.Number.Uint64(), "time", common.PrettyDuration(time.Since(start))) } - log.Debug("Time Calculated HookReward ", "block", header.Number.Uint64(), "time", common.PrettyDuration(time.Since(start))) return nil } @@ -354,8 +354,8 @@ func New(ctx *node.ServiceContext, config *Config) (*Ethereum, error) { } } return nil - } - eth.txPool.IsMasterNode = func(address common.Address) bool { + } + eth.txPool.IsMasterNode = func(address common.Address) bool { currentHeader := eth.blockchain.CurrentHeader() snap, err := c.GetSnapshot(eth.blockchain, currentHeader) if err != nil { diff --git a/eth/fetcher/fetcher.go b/eth/fetcher/fetcher.go index 6055f8f23b..2bc5573bb3 100644 --- a/eth/fetcher/fetcher.go +++ b/eth/fetcher/fetcher.go @@ -62,8 +62,10 @@ type blockBroadcasterFn func(block *types.Block, propagate bool) // chainHeightFn is a callback type to retrieve the current chain height. type chainHeightFn func() uint64 -// chainInsertFn is a callback type to insert a batch of blocks into the local chain. -type chainInsertFn func(blocks types.Blocks) (int, error) +// blockInsertFn is a callback type to insert a batch of blocks into the local chain. +type blockInsertFn func(block *types.Block) error + +type blockPrepareFn func(block *types.Block) error // peerDropFn is a callback type for dropping a peer detected as malicious. type peerDropFn func(id string) @@ -135,8 +137,9 @@ type Fetcher struct { verifyHeader headerVerifierFn // Checks if a block's headers have a valid proof of work broadcastBlock blockBroadcasterFn // Broadcasts a block to connected peers chainHeight chainHeightFn // Retrieves the current chain's height - insertChain chainInsertFn // Injects a batch of blocks into the chain - dropPeer peerDropFn // Drops a peer for misbehaving + insertBlock blockInsertFn // Injects a batch of blocks into the chain + prepareBlock blockPrepareFn + dropPeer peerDropFn // Drops a peer for misbehaving // Testing hooks announceChangeHook func(common.Hash, bool) // Method to call upon adding or deleting a hash from the announce list @@ -144,11 +147,11 @@ type Fetcher struct { fetchingHook func([]common.Hash) // Method to call upon starting a block (eth/61) or header (eth/62) fetch completingHook func([]common.Hash) // Method to call upon starting a block body fetch (eth/62) signHook func(*types.Block) error - appendM2HeaderHook func(*types.Block) (*types.Block, error) + appendM2HeaderHook func(*types.Block) (*types.Block, bool, error) } // New creates a block fetcher to retrieve blocks based on hash announcements. -func New(getBlock blockRetrievalFn, verifyHeader headerVerifierFn, broadcastBlock blockBroadcasterFn, chainHeight chainHeightFn, insertChain chainInsertFn, dropPeer peerDropFn) *Fetcher { +func New(getBlock blockRetrievalFn, verifyHeader headerVerifierFn, broadcastBlock blockBroadcasterFn, chainHeight chainHeightFn, insertBlock blockInsertFn, prepareBlock blockPrepareFn, dropPeer peerDropFn) *Fetcher { knownBlocks, _ := lru.NewARC(blockLimit) return &Fetcher{ notify: make(chan *announce), @@ -171,7 +174,8 @@ func New(getBlock blockRetrievalFn, verifyHeader headerVerifierFn, broadcastBloc verifyHeader: verifyHeader, broadcastBlock: broadcastBlock, chainHeight: chainHeight, - insertChain: insertChain, + insertBlock: insertBlock, + prepareBlock: prepareBlock, dropPeer: dropPeer, } } @@ -605,7 +609,7 @@ func (f *Fetcher) rescheduleComplete(complete *time.Timer) { func (f *Fetcher) enqueue(peer string, block *types.Block) { hash := block.Hash() if f.knowns.Contains(hash) { - log.Debug("Discarded propagated block, known block", "peer", peer, "number", block.Number(), "hash", hash, "limit", blockLimit) + log.Trace("Discarded propagated block, known block", "peer", peer, "number", block.Number(), "hash", hash, "limit", blockLimit) return } // Ensure the peer isn't DOSing us @@ -657,40 +661,56 @@ func (f *Fetcher) insert(peer string, block *types.Block) { log.Debug("Unknown parent of propagated block", "peer", peer, "number", block.Number(), "hash", hash, "parent", block.ParentHash()) return } + fastBroadCast := true again: + err := f.verifyHeader(block.Header()) // Quickly validate the header and propagate the block if it passes - switch err := f.verifyHeader(block.Header()); err { + switch err { case nil: // All ok, quickly propagate to our peers propBroadcastOutTimer.UpdateSince(block.ReceivedAt) - go f.broadcastBlock(block, true) + if fastBroadCast { + go f.broadcastBlock(block, true) + } case consensus.ErrFutureBlock: delay := time.Unix(block.Time().Int64(), 0).Sub(time.Now()) // nolint: gosimple - time.Sleep(delay) log.Info("Receive future block", "number", block.NumberU64(), "hash", block.Hash().Hex(), "delay", delay) + time.Sleep(delay) goto again case consensus.ErrNoValidatorSignature: newBlock := block + var errM2 error + isM2 := false if f.appendM2HeaderHook != nil { - if newBlock, err = f.appendM2HeaderHook(block); err != nil { - log.Error("Append m2 to block header fail", "err", err) + if newBlock, isM2, errM2 = f.appendM2HeaderHook(block); errM2 != nil { + log.Error("Append m2 to block header fail", "err", errM2) return } } - if newBlock.Hash() == block.Hash() { + if !isM2 { go f.broadcastBlock(block, true) + if err := f.prepareBlock(block); err != nil { + log.Debug("Propagated block prepare failed", "peer", peer, "number", block.Number(), "hash", hash, "err", err) + return + } + return + } + log.Debug("Append M2 to header block", "numer", block.NumberU64(), "hahs", block.Hash()) + if err := f.prepareBlock(block); err != nil { + log.Debug("Propagated block prepare failed", "peer", peer, "number", block.Number(), "hash", hash, "err", err) return } block = newBlock + fastBroadCast = false + goto again default: // Something went very wrong, drop the peer log.Debug("Propagated block verification failed", "peer", peer, "number", block.Number(), "hash", hash, "err", err) f.dropPeer(peer) return } - // Run the actual import and log any issues - if _, err := f.insertChain(types.Blocks{block}); err != nil { + if err := f.insertBlock(block); err != nil { log.Debug("Propagated block import failed", "peer", peer, "number", block.Number(), "hash", hash, "err", err) return } @@ -703,9 +723,10 @@ func (f *Fetcher) insert(peer string, block *types.Block) { } // If import succeeded, broadcast the block propAnnounceOutTimer.UpdateSince(block.ReceivedAt) - go f.broadcastBlock(block, true) - //go f.broadcastBlock(block, false) - }() + if !fastBroadCast { + go f.broadcastBlock(block, true) + } + }() } // forgetHash removes all traces of a block announcement from the fetcher's @@ -768,6 +789,6 @@ func (f *Fetcher) SetSignHook(signHook func(*types.Block) error) { } // Bind append m2 to block header hook when imported into chain. -func (f *Fetcher) SetAppendM2HeaderHook(appendM2HeaderHook func(*types.Block) (*types.Block, error)) { +func (f *Fetcher) SetAppendM2HeaderHook(appendM2HeaderHook func(*types.Block) (*types.Block, bool, error)) { f.appendM2HeaderHook = appendM2HeaderHook } \ No newline at end of file diff --git a/eth/fetcher/fetcher_test.go b/eth/fetcher/fetcher_test.go index ec84ae03f5..a6eafd32e2 100644 --- a/eth/fetcher/fetcher_test.go +++ b/eth/fetcher/fetcher_test.go @@ -92,7 +92,7 @@ func newTester() *fetcherTester { blocks: map[common.Hash]*types.Block{genesis.Hash(): genesis}, drops: make(map[string]bool), } - tester.fetcher = New(tester.getBlock, tester.verifyHeader, tester.broadcastBlock, tester.chainHeight, tester.insertChain, tester.dropPeer) + tester.fetcher = New(tester.getBlock, tester.verifyHeader, tester.broadcastBlock, tester.chainHeight, tester.insertBlock, tester.prepareBlock, tester.dropPeer) tester.fetcher.Start() return tester @@ -123,7 +123,7 @@ func (f *fetcherTester) chainHeight() uint64 { return f.blocks[f.hashes[len(f.hashes)-1]].NumberU64() } -// insertChain injects a new blocks into the simulated chain. +// insertBlock injects a new blocks into the simulated chain. func (f *fetcherTester) insertChain(blocks types.Blocks) (int, error) { f.lock.Lock() defer f.lock.Unlock() @@ -144,6 +144,31 @@ func (f *fetcherTester) insertChain(blocks types.Blocks) (int, error) { return 0, nil } +// insertBlock injects a new blocks into the simulated chain. +func (f *fetcherTester) insertBlock(block *types.Block) error { + f.lock.Lock() + defer f.lock.Unlock() + + // Make sure the parent in known + if _, ok := f.blocks[block.ParentHash()]; !ok { + return errors.New("unknown parent") + } + // Discard any new blocks if the same height already exists + if block.NumberU64() <= f.blocks[f.hashes[len(f.hashes)-1]].NumberU64() { + return nil + } + // Otherwise build our current chain + f.hashes = append(f.hashes, block.Hash()) + f.blocks[block.Hash()] = block + + return nil +} + +// insertBlock injects a new blocks into the simulated chain. +func (f *fetcherTester) prepareBlock(block *types.Block) error { + return nil +} + // dropPeer is an emulator for the peer removal, simply accumulating the various // peers dropped by the fetcher. func (f *fetcherTester) dropPeer(peer string) { @@ -288,7 +313,7 @@ func testSequentialAnnouncements(t *testing.T, protocol int) { // Iteratively announce blocks until all are imported imported := make(chan *types.Block) - tester.fetcher.importedHook = func(block *types.Block) error { + tester.fetcher.signHook = func(block *types.Block) error { imported <- block return nil } @@ -329,7 +354,7 @@ func testConcurrentAnnouncements(t *testing.T, protocol int) { } // Iteratively announce blocks until all are imported imported := make(chan *types.Block) - tester.fetcher.importedHook = func(block *types.Block) error { + tester.fetcher.signHook = func(block *types.Block) error { imported <- block return nil } @@ -369,7 +394,7 @@ func testOverlappingAnnouncements(t *testing.T, protocol int) { for i := 0; i < overlap; i++ { imported <- nil } - tester.fetcher.importedHook = func(block *types.Block) error { + tester.fetcher.signHook = func(block *types.Block) error { imported <- block return nil } @@ -446,7 +471,7 @@ func testRandomArrivalImport(t *testing.T, protocol int) { // Iteratively announce blocks, skipping one entry imported := make(chan *types.Block, len(hashes)-1) - tester.fetcher.importedHook = func(block *types.Block) error { + tester.fetcher.signHook = func(block *types.Block) error { imported <- block return nil } @@ -480,7 +505,7 @@ func testQueueGapFill(t *testing.T, protocol int) { // Iteratively announce blocks, skipping one entry imported := make(chan *types.Block, len(hashes)-1) - tester.fetcher.importedHook = func(block *types.Block) error { + tester.fetcher.signHook = func(block *types.Block) error { imported <- block return nil } @@ -512,15 +537,15 @@ func testImportDeduplication(t *testing.T, protocol int) { bodyFetcher := tester.makeBodyFetcher("valid", blocks, 0) counter := uint32(0) - tester.fetcher.insertChain = func(blocks types.Blocks) (int, error) { - atomic.AddUint32(&counter, uint32(len(blocks))) - return tester.insertChain(blocks) + tester.fetcher.insertBlock = func(block *types.Block) error { + atomic.AddUint32(&counter, uint32(1)) + return tester.insertBlock(block) } // Instrument the fetching and imported events fetching := make(chan []common.Hash) imported := make(chan *types.Block, len(hashes)-1) tester.fetcher.fetchingHook = func(hashes []common.Hash) { fetching <- hashes } - tester.fetcher.importedHook = func(block *types.Block) error { + tester.fetcher.signHook = func(block *types.Block) error { imported <- block return nil } @@ -632,7 +657,7 @@ func testInvalidNumberAnnouncement(t *testing.T, protocol int) { badBodyFetcher := tester.makeBodyFetcher("bad", blocks, 0) imported := make(chan *types.Block) - tester.fetcher.importedHook = func(block *types.Block) error { + tester.fetcher.signHook = func(block *types.Block) error { imported <- block return nil } @@ -687,7 +712,7 @@ func testEmptyBlockShortCircuit(t *testing.T, protocol int) { tester.fetcher.completingHook = func(hashes []common.Hash) { completing <- hashes } imported := make(chan *types.Block) - tester.fetcher.importedHook = func(block *types.Block) error { + tester.fetcher.signHook = func(block *types.Block) error { imported <- block return nil } @@ -720,7 +745,7 @@ func testHashMemoryExhaustionAttack(t *testing.T, protocol int) { tester := newTester() imported, announces := make(chan *types.Block), int32(0) - tester.fetcher.importedHook = func(block *types.Block) error { + tester.fetcher.signHook = func(block *types.Block) error { imported <- block return nil } @@ -770,7 +795,7 @@ func TestBlockMemoryExhaustionAttack(t *testing.T) { tester := newTester() imported, enqueued := make(chan *types.Block), int32(0) - tester.fetcher.importedHook = func(block *types.Block) error { + tester.fetcher.signHook = func(block *types.Block) error { imported <- block return nil } diff --git a/eth/handler.go b/eth/handler.go index 5244ef318f..cdbcab8aaf 100644 --- a/eth/handler.go +++ b/eth/handler.go @@ -170,16 +170,26 @@ func NewProtocolManager(config *params.ChainConfig, mode downloader.SyncMode, ne heighter := func() uint64 { return blockchain.CurrentBlock().NumberU64() } - inserter := func(blocks types.Blocks) (int, error) { + inserter := func(block *types.Block) error { // If fast sync is running, deny importing weird blocks if atomic.LoadUint32(&manager.fastSync) == 1 { - log.Warn("Discarded bad propagated block", "number", blocks[0].Number(), "hash", blocks[0].Hash()) - return 0, nil + log.Warn("Discarded bad propagated block", "number", block.Number(), "hash", block.Hash()) + return nil } atomic.StoreUint32(&manager.acceptTxs, 1) // Mark initial sync done on any fetcher import - return manager.blockchain.InsertChain(blocks) + return manager.blockchain.InsertBlock(block) } - manager.fetcher = fetcher.New(blockchain.GetBlockByHash, validator, manager.BroadcastBlock, heighter, inserter, manager.removePeer) + + prepare := func(block *types.Block) error { + // If fast sync is running, deny importing weird blocks + if atomic.LoadUint32(&manager.fastSync) == 1 { + log.Warn("Discarded bad propagated block", "number", block.Number(), "hash", block.Hash()) + return nil + } + atomic.StoreUint32(&manager.acceptTxs, 1) // Mark initial sync done on any fetcher import + return manager.blockchain.PrepareBlock(block) + } + manager.fetcher = fetcher.New(blockchain.GetBlockByHash, validator, manager.BroadcastBlock, heighter, inserter, prepare, manager.removePeer) return manager, nil } diff --git a/eth/sync.go b/eth/sync.go index 882c94878f..770d6db277 100644 --- a/eth/sync.go +++ b/eth/sync.go @@ -169,9 +169,7 @@ func (pm *ProtocolManager) synchronise(peer *peer) { // Make sure the peer's TD is higher than our own currentBlock := pm.blockchain.CurrentBlock() td := pm.blockchain.GetTd(currentBlock.Hash(), currentBlock.NumberU64()) - pHead, pTd := peer.Head() - log.Debug("ProtocolManager synchronise ", "p", peer, "pTd", pTd, "currentTd", td) if pTd.Cmp(td) <= 0 { return } diff --git a/miner/worker.go b/miner/worker.go index a7d924305a..990b554f44 100644 --- a/miner/worker.go +++ b/miner/worker.go @@ -310,7 +310,6 @@ func (self *worker) update() { self.commitNewWork() } } - // System stopped case <-self.chainHeadSub.Err(): return case <-self.chainSideSub.Err(): @@ -545,7 +544,6 @@ func (self *worker) commitNewWork() { } } } - tstamp := tstart.Unix() if parent.Time().Cmp(new(big.Int).SetInt64(tstamp)) >= 0 { tstamp = parent.Time().Int64() + 1