From ff435e081f1d63401ff6e96ca99f8b43f5ea2b09 Mon Sep 17 00:00:00 2001 From: Daniel Liu Date: Thu, 17 Apr 2025 14:58:35 +0800 Subject: [PATCH] core: rework blockchain import #17973 --- core/blockchain.go | 368 +++++++++++++++++++++++--------------- core/blockchain_insert.go | 143 +++++++++++++++ 2 files changed, 365 insertions(+), 146 deletions(-) create mode 100644 core/blockchain_insert.go diff --git a/core/blockchain.go b/core/blockchain.go index 2e1456c50b..629589757a 100644 --- a/core/blockchain.go +++ b/core/blockchain.go @@ -741,16 +741,21 @@ func (bc *BlockChain) ExportN(w io.Writer, first uint64, last uint64) error { } log.Info("Exporting batch of blocks", "count", last-first+1) + start, reported := time.Now(), time.Now() for nr := first; nr <= last; nr++ { block := bc.GetBlockByNumber(nr) if block == nil { return fmt.Errorf("export failed on #%d: not found", nr) } - if err := block.EncodeRLP(w); err != nil { return err } + if time.Since(reported) >= statsReportLimit { + log.Info("Exporting blocks", "exported", block.NumberU64()-first, "elapsed", common.PrettyDuration(time.Since(start))) + reported = time.Now() + } } + return nil } @@ -865,6 +870,12 @@ func (bc *BlockChain) HasFastBlock(hash common.Hash, number uint64) bool { return rawdb.HasReceipts(bc.db, hash, number) } +// HasState checks if state trie is fully present in the database or not. +func (bc *BlockChain) HasState(hash common.Hash) bool { + _, err := bc.stateCache.OpenTrie(hash) + return err == nil +} + // HasFullState checks if state trie is fully present in the database or not. func (bc *BlockChain) HasFullState(block *types.Block) bool { _, err := bc.stateCache.OpenTrie(block.Root()) @@ -1565,6 +1576,18 @@ func (bc *BlockChain) writeBlockWithState(block *types.Block, receipts []*types. return status, nil } +// addFutureBlock checks if the block is within the max allowed window to get +// accepted for future processing, and returns an error if the block is too far +// ahead and was not added. +func (bc *BlockChain) addFutureBlock(block *types.Block) error { + max := big.NewInt(time.Now().Unix() + maxTimeFutureBlocks) + if block.Time().Cmp(max) > 0 { + return fmt.Errorf("future block timestamp %v > allowed %v", block.Time(), max) + } + bc.futureBlocks.Add(block.Hash(), block) + return nil +} + // InsertChain attempts to insert the given batch of blocks in to the canonical // chain or, otherwise, create a fork. If an error is returned it will return // the index number of the failing block as well an error describing what went @@ -1580,7 +1603,7 @@ func (bc *BlockChain) InsertChain(chain types.Blocks) (int, error) { // Do a sanity check that the provided chain is actually ordered and linked for i := 1; i < len(chain); i++ { block, prev := chain[i], chain[i-1] - if block.NumberU64() != chain[i-1].NumberU64()+1 || block.ParentHash() != chain[i-1].Hash() { + if block.NumberU64() != prev.NumberU64()+1 || block.ParentHash() != prev.Hash() { // Chain broke ancestry, log a messge (programming error) and skip insertion log.Error("Non contiguous block insert", "number", block.Number(), @@ -1618,6 +1641,9 @@ func (bc *BlockChain) insertChain(chain types.Blocks, verifySeals bool) (int, [] return 0, nil, nil, nil } + // Start a parallel signature recovery (signer will fluke on fork transition, minimal perf loss) + SenderCacher.RecoverFromBlocks(types.MakeSigner(bc.chainConfig, chain[0].Number()), chain) + // A queued approach to delivering events. This is generally // faster than direct delivery and requires much less mutex // acquiring. @@ -1639,11 +1665,52 @@ func (bc *BlockChain) insertChain(chain types.Blocks, verifySeals bool) (int, [] abort, results := bc.engine.VerifyHeaders(bc, headers, seals) defer close(abort) - // Start a parallel signature recovery (signer will fluke on fork transition, minimal perf loss) - SenderCacher.RecoverFromBlocks(types.MakeSigner(bc.chainConfig, chain[0].Number()), chain) + // Peek the error for the first block to decide the directing import logic + it := newInsertIterator(chain, results, bc.Validator()) - // Iterate over the blocks and insert when the verifier permits - for i, block := range chain { + block, err := it.next() + switch { + // First block is pruned, insert as sidechain and reorg only if TD grows enough + case err == consensus.ErrPrunedAncestor: + return bc.insertSidechain(it) + + // First block is future, shove it (and all children) to the future queue (unknown ancestor) + case err == consensus.ErrFutureBlock || (err == consensus.ErrUnknownAncestor && bc.futureBlocks.Contains(it.first().ParentHash())): + for block != nil && (it.index == 0 || err == consensus.ErrUnknownAncestor) { + if err := bc.addFutureBlock(block); err != nil { + return it.index, events, coalescedLogs, err + } + block, err = it.next() + } + stats.queued += it.processed() + stats.ignored += it.remaining() + + // If there are any still remaining, mark as ignored + return it.index, events, coalescedLogs, err + + // First block (and state) is known + // 1. We did a roll-back, and should now do a re-import + // 2. The block is stored as a sidechain, and is lying about it's stateroot, and passes a stateroot + // from the canonical chain, which has not been verified. + case err == ErrKnownBlock: + // Skip all known blocks that behind us + current := bc.CurrentBlock().NumberU64() + + for block != nil && err == ErrKnownBlock && current >= block.NumberU64() { + stats.ignored++ + block, err = it.next() + } + // Falls through to the block import + + // Some other error occurred, abort + case err != nil: + stats.ignored += len(it.chain) + bc.reportBlock(block, nil, err) + return it.index, events, coalescedLogs, err + } + + // No validation errors for the first block (or chain prefix skipped) + for ; block != nil && err == nil; block, err = it.next() { // If the chain is terminating, stop processing blocks if atomic.LoadInt32(&bc.procInterrupt) == 1 { log.Debug("Premature abort during blocks processing") @@ -1652,87 +1719,19 @@ func (bc *BlockChain) insertChain(chain types.Blocks, verifySeals bool) (int, [] // If the header is a banned one, straight out abort if BadHashes[block.Hash()] { bc.reportBlock(block, nil, ErrBlacklistedHash) - return i, events, coalescedLogs, ErrBlacklistedHash + return it.index, events, coalescedLogs, ErrBlacklistedHash } - // Wait for the block's verification to complete - bstart := time.Now() + // Retrieve the parent block and it's state to execute on top + start := time.Now() - err := <-results - if err == nil { - err = bc.Validator().ValidateBody(block) - } - switch { - case err == ErrKnownBlock: - // Block and state both already known. However if the current block is below - // this number we did a rollback and we should reimport it nonetheless. - if bc.CurrentBlock().NumberU64() >= block.NumberU64() { - stats.ignored++ - continue - } - - case err == consensus.ErrFutureBlock: - // Allow up to MaxFuture second in the future blocks. If this limit is exceeded - // the chain is discarded and processed at a later time if given. - max := big.NewInt(time.Now().Unix() + maxTimeFutureBlocks) - if block.Time().Cmp(max) > 0 { - return i, events, coalescedLogs, fmt.Errorf("future block: %v > %v", block.Time(), max) - } - bc.futureBlocks.Add(block.Hash(), block) - stats.queued++ - continue - - case err == consensus.ErrUnknownAncestor && bc.futureBlocks.Contains(block.ParentHash()): - bc.futureBlocks.Add(block.Hash(), block) - stats.queued++ - continue - - case err == consensus.ErrPrunedAncestor: - // Block competing with the canonical chain, store in the db, but don't process - // until the competitor TD goes above the canonical TD - currentBlock := bc.CurrentBlock() - localTd := bc.GetTd(currentBlock.Hash(), currentBlock.NumberU64()) - externTd := new(big.Int).Add(bc.GetTd(block.ParentHash(), block.NumberU64()-1), block.Difficulty()) - if localTd.Cmp(externTd) > 0 { - if err = bc.writeBlockWithoutState(block, externTd); err != nil { - return i, events, coalescedLogs, err - } - continue - } - // Competitor chain beat canonical, gather all blocks from the common ancestor - var winner []*types.Block - - parent := bc.GetBlock(block.ParentHash(), block.NumberU64()-1) - for !bc.HasFullState(parent) { - winner = append(winner, parent) - parent = bc.GetBlock(parent.ParentHash(), parent.NumberU64()-1) - } - for j := 0; j < len(winner)/2; j++ { - winner[j], winner[len(winner)-1-j] = winner[len(winner)-1-j], winner[j] - } - log.Debug("Number block need calculated again", "number", block.NumberU64(), "hash", block.Hash().Hex(), "winners", len(winner)) - // Import all the pruned blocks to make the state available - // During reorg, we use verifySeals=false - _, evs, logs, err := bc.insertChain(winner, false) - events, coalescedLogs = evs, logs - - if err != nil { - return i, events, coalescedLogs, err - } - - case err != nil: - bc.reportBlock(block, nil, err) - return i, events, coalescedLogs, err - } - var parent *types.Block - if i == 0 { + parent := it.previous() + if parent == nil { parent = bc.GetBlock(block.ParentHash(), block.NumberU64()-1) - } else { - parent = chain[i-1] } // Create a new statedb using the parent block and report an error if it fails. statedb, err := state.New(parent.Root(), bc.stateCache) if err != nil { - return i, events, coalescedLogs, err + return it.index, events, coalescedLogs, err } // Process block using the parent state as reference point. t0 := time.Now() @@ -1740,29 +1739,29 @@ func (bc *BlockChain) insertChain(chain types.Blocks, verifySeals bool) (int, [] tradingState, lendingState, err := bc.processTradingAndLendingStates(isTIPXDCXReceiver, block, parent, statedb) if err != nil { bc.reportBlock(block, nil, err) - return i, events, coalescedLogs, err + return it.index, events, coalescedLogs, err } feeCapacity := state.GetTRC21FeeCapacityFromStateWithCache(parent.Root(), statedb) receipts, logs, usedGas, err := bc.processor.Process(block, statedb, tradingState, bc.vmConfig, feeCapacity) t1 := time.Now() if err != nil { bc.reportBlock(block, receipts, err) - return i, events, coalescedLogs, err + return it.index, events, coalescedLogs, err } // Validate the state using the default validator err = bc.Validator().ValidateState(block, parent, statedb, receipts, usedGas) if err != nil { bc.reportBlock(block, receipts, err) - return i, events, coalescedLogs, err + return it.index, events, coalescedLogs, err } t2 := time.Now() - proctime := time.Since(bstart) + proctime := time.Since(start) // Write the block to the chain and get the status. status, err := bc.writeBlockWithState(block, receipts, statedb, tradingState, lendingState) t3 := time.Now() if err != nil { - return i, events, coalescedLogs, err + return it.index, events, coalescedLogs, err } // Update the metrics subsystem with all the measurements @@ -1779,7 +1778,7 @@ func (bc *BlockChain) insertChain(chain types.Blocks, verifySeals bool) (int, [] trieAccess := statedb.AccountReads + statedb.AccountHashes + statedb.AccountUpdates + statedb.AccountCommits trieAccess += statedb.StorageReads + statedb.StorageHashes + statedb.StorageUpdates + statedb.StorageCommits - blockInsertTimer.UpdateSince(bstart) + blockInsertTimer.UpdateSince(start) blockExecutionTimer.Update(t1.Sub(t0) - trieAccess) blockValidationTimer.Update(t2.Sub(t1)) blockWriteTimer.Update(t3.Sub(t2)) @@ -1787,7 +1786,7 @@ func (bc *BlockChain) insertChain(chain types.Blocks, verifySeals bool) (int, [] switch status { case CanonStatTy: log.Debug("Inserted new block from downloader", "number", block.Number(), "hash", block.Hash(), "uncles", len(block.Uncles()), - "txs", len(block.Transactions()), "gas", block.GasUsed(), "elapsed", common.PrettyDuration(time.Since(bstart))) + "txs", len(block.Transactions()), "gas", block.GasUsed(), "elapsed", common.PrettyDuration(time.Since(start))) coalescedLogs = append(coalescedLogs, logs...) events = append(events, ChainEvent{block, block.Hash(), logs}) @@ -1802,17 +1801,19 @@ func (bc *BlockChain) insertChain(chain types.Blocks, verifySeals bool) (int, [] } case SideStatTy: log.Debug("Inserted forked block from downloader", "number", block.Number(), "hash", block.Hash(), "diff", block.Difficulty(), "elapsed", - common.PrettyDuration(time.Since(bstart)), "txs", len(block.Transactions()), "gas", block.GasUsed(), "uncles", len(block.Uncles())) + common.PrettyDuration(time.Since(start)), "txs", len(block.Transactions()), "gas", block.GasUsed(), "uncles", len(block.Uncles())) events = append(events, ChainSideEvent{block}) bc.UpdateBlocksHashCache(block) } + blockInsertTimer.UpdateSince(start) stats.processed++ stats.usedGas += usedGas + dirty, _ := bc.stateCache.TrieDB().Size() - stats.report(chain, i, dirty) + stats.report(chain, it.index, dirty) if bc.chainConfig.XDPoS != nil { engine, _ := bc.Engine().(*XDPoS.XDPoS) - isEpochSwithBlock, _, err := engine.IsEpochSwitch(chain[i].Header()) // epoch block + isEpochSwithBlock, _, err := engine.IsEpochSwitch(block.Header()) // epoch block if err != nil { log.Error("[insertChain] Error while checking and notifying channel CheckpointCh if the incoming block is epoch switch block", "Hash", block.Hash(), "Number", block.Number()) bc.reportBlock(block, nil, err) @@ -1822,12 +1823,142 @@ func (bc *BlockChain) insertChain(chain types.Blocks, verifySeals bool) (int, [] } } } + + // Any blocks remaining here? The only ones we care about are the future ones + if block != nil && err == consensus.ErrFutureBlock { + if err := bc.addFutureBlock(block); err != nil { + return it.index, events, coalescedLogs, err + } + block, err = it.next() + + for ; block != nil && err == consensus.ErrUnknownAncestor; block, err = it.next() { + if err := bc.addFutureBlock(block); err != nil { + return it.index, events, coalescedLogs, err + } + stats.queued++ + } + } + stats.ignored += it.remaining() + // Append a single chain head event if we've progressed the chain if lastCanon != nil && bc.CurrentBlock().Hash() == lastCanon.Hash() { log.Debug("New ChainHeadEvent ", "number", lastCanon.NumberU64(), "hash", lastCanon.Hash()) events = append(events, ChainHeadEvent{lastCanon}) } - return 0, events, coalescedLogs, nil + return it.index, events, coalescedLogs, nil +} + +// insertSidechain is called when an import batch hits upon a pruned ancestor +// error, which happens when a sidechain with a sufficiently old fork-block is +// found. +// +// The method writes all (header-and-body-valid) blocks to disk, then tries to +// switch over to the new chain if the TD exceeded the current chain. +func (bc *BlockChain) insertSidechain(it *insertIterator) (int, []interface{}, []*types.Log, error) { + var ( + externTd *big.Int + current = bc.CurrentBlock().NumberU64() + ) + // The first sidechain block error is already verified to be ErrPrunedAncestor. + // Since we don't import them here, we expect ErrUnknownAncestor for the remaining + // ones. Any other errors means that the block is invalid, and should not be written + // to disk. + block, err := it.current(), consensus.ErrPrunedAncestor + for ; block != nil && (err == consensus.ErrPrunedAncestor); block, err = it.next() { + // Check the canonical state root for that number + if number := block.NumberU64(); current >= number { + if canonical := bc.GetBlockByNumber(number); canonical != nil && canonical.Root() == block.Root() { + // This is most likely a shadow-state attack. When a fork is imported into the + // database, and it eventually reaches a block height which is not pruned, we + // just found that the state already exist! This means that the sidechain block + // refers to a state which already exists in our canon chain. + // + // If left unchecked, we would now proceed importing the blocks, without actually + // having verified the state of the previous blocks. + log.Warn("Sidechain ghost-state attack detected", "number", block.NumberU64(), "sideroot", block.Root(), "canonroot", canonical.Root()) + + // If someone legitimately side-mines blocks, they would still be imported as usual. However, + // we cannot risk writing unverified blocks to disk when they obviously target the pruning + // mechanism. + return it.index, nil, nil, errors.New("sidechain ghost-state attack") + } + } + if externTd == nil { + externTd = bc.GetTd(block.ParentHash(), block.NumberU64()-1) + } + externTd = new(big.Int).Add(externTd, block.Difficulty()) + + if !bc.HasBlock(block.Hash(), block.NumberU64()) { + start := time.Now() + if err := bc.writeBlockWithoutState(block, externTd); err != nil { + return it.index, nil, nil, err + } + log.Debug("Inserted sidechain block", "number", block.Number(), "hash", block.Hash(), + "diff", block.Difficulty(), "elapsed", common.PrettyDuration(time.Since(start)), + "txs", len(block.Transactions()), "gas", block.GasUsed(), "uncles", len(block.Uncles()), + "root", block.Root()) + } + } + // At this point, we've written all sidechain blocks to database. Loop ended + // either on some other error or all were processed. If there was some other + // error, we can ignore the rest of those blocks. + // + // If the externTd was larger than our local TD, we now need to reimport the previous + // blocks to regenerate the required state + localTd := bc.GetTd(bc.CurrentBlock().Hash(), current) + if localTd.Cmp(externTd) > 0 { + log.Info("Sidechain written to disk", "start", it.first().NumberU64(), "end", it.previous().NumberU64(), "sidetd", externTd, "localtd", localTd) + return it.index, nil, nil, err + } + // Gather all the sidechain hashes (full blocks may be memory heavy) + var ( + hashes []common.Hash + numbers []uint64 + ) + parent := bc.GetHeader(it.previous().Hash(), it.previous().NumberU64()) + for parent != nil && !bc.HasState(parent.Root) { + hashes = append(hashes, parent.Hash()) + numbers = append(numbers, parent.Number.Uint64()) + + parent = bc.GetHeader(parent.ParentHash, parent.Number.Uint64()-1) + } + if parent == nil { + return it.index, nil, nil, errors.New("missing parent") + } + // Import all the pruned blocks to make the state available + var ( + blocks []*types.Block + memory common.StorageSize + ) + for i := len(hashes) - 1; i >= 0; i-- { + // Append the next block to our batch + block := bc.GetBlock(hashes[i], numbers[i]) + + blocks = append(blocks, block) + memory += block.Size() + + // If memory use grew too large, import and continue. Sadly we need to discard + // all raised events and logs from notifications since we're too heavy on the + // memory here. + if len(blocks) >= 2048 || memory > 64*1024*1024 { + log.Info("Importing heavy sidechain segment", "blocks", len(blocks), "start", blocks[0].NumberU64(), "end", block.NumberU64()) + if _, _, _, err := bc.insertChain(blocks, false); err != nil { + return 0, nil, nil, err + } + blocks, memory = blocks[:0], 0 + + // If the chain is terminating, stop processing blocks + if atomic.LoadInt32(&bc.procInterrupt) == 1 { + log.Debug("Premature abort during blocks processing") + return 0, nil, nil, nil + } + } + } + if len(blocks) > 0 { + log.Info("Importing sidechain segment", "start", blocks[0].NumberU64(), "end", blocks[len(blocks)-1].NumberU64()) + return bc.insertChain(blocks, false) + } + return 0, nil, nil, nil } func (bc *BlockChain) InsertBlock(block *types.Block) error { @@ -2060,61 +2191,6 @@ func (bc *BlockChain) insertBlock(block *types.Block) ([]interface{}, []*types.L return events, coalescedLogs, nil } -// insertStats tracks and reports on block insertion. -type insertStats struct { - queued, processed, ignored int - usedGas uint64 - lastIndex int - startTime mclock.AbsTime -} - -// statsReportLimit is the time limit during import after which we always print -// out progress. This avoids the user wondering what's going on. -const statsReportLimit = 8 * time.Second - -// report prints statistics if some number of blocks have been processed -// or more than a few seconds have passed since the last message. -func (st *insertStats) report(chain []*types.Block, index int, dirty common.StorageSize) { - // Fetch the timings for the batch - var ( - now = mclock.Now() - elapsed = time.Duration(now) - time.Duration(st.startTime) - ) - // If we're at the last block of the batch or report period reached, log - if index == len(chain)-1 || elapsed >= statsReportLimit { - var ( - end = chain[index] - txs = countTransactions(chain[st.lastIndex : index+1]) - ) - context := []interface{}{ - "blocks", st.processed, "txs", txs, "mgas", float64(st.usedGas) / 1000000, - "elapsed", common.PrettyDuration(elapsed), "mgasps", float64(st.usedGas) * 1000 / float64(elapsed), - "number", end.Number(), "hash", end.Hash(), - } - if timestamp := time.Unix(end.Time().Int64(), 0); time.Since(timestamp) > time.Minute { - context = append(context, []interface{}{"age", common.PrettyAge(timestamp)}...) - } - context = append(context, []interface{}{"cache", dirty}...) - - if st.queued > 0 { - context = append(context, []interface{}{"queued", st.queued}...) - } - if st.ignored > 0 { - context = append(context, []interface{}{"ignored", st.ignored}...) - } - log.Info("Imported new chain segment", context...) - - *st = insertStats{startTime: now, lastIndex: index + 1} - } -} - -func countTransactions(chain []*types.Block) (c int) { - for _, b := range chain { - c += len(b.Transactions()) - } - return c -} - // collectLogs collects the logs that were generated or removed during // the processing of a block. These logs are later announced as deleted or reborn. func (bc *BlockChain) collectLogs(b *types.Block, removed bool) []*types.Log { diff --git a/core/blockchain_insert.go b/core/blockchain_insert.go new file mode 100644 index 0000000000..f857c0c082 --- /dev/null +++ b/core/blockchain_insert.go @@ -0,0 +1,143 @@ +// Copyright 2018 The go-ethereum Authors +// This file is part of the go-ethereum library. +// +// The go-ethereum library is free software: you can redistribute it and/or modify +// it under the terms of the GNU Lesser General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// The go-ethereum library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the go-ethereum library. If not, see . + +package core + +import ( + "time" + + "github.com/XinFinOrg/XDPoSChain/common" + "github.com/XinFinOrg/XDPoSChain/common/mclock" + "github.com/XinFinOrg/XDPoSChain/core/types" + "github.com/XinFinOrg/XDPoSChain/log" +) + +// insertStats tracks and reports on block insertion. +type insertStats struct { + queued, processed, ignored int + usedGas uint64 + lastIndex int + startTime mclock.AbsTime +} + +// statsReportLimit is the time limit during import and export after which we +// always print out progress. This avoids the user wondering what's going on. +const statsReportLimit = 8 * time.Second + +// report prints statistics if some number of blocks have been processed +// or more than a few seconds have passed since the last message. +func (st *insertStats) report(chain []*types.Block, index int, cache common.StorageSize) { + // Fetch the timings for the batch + var ( + now = mclock.Now() + elapsed = time.Duration(now) - time.Duration(st.startTime) + ) + // If we're at the last block of the batch or report period reached, log + if index == len(chain)-1 || elapsed >= statsReportLimit { + // Count the number of transactions in this segment + var txs int + for _, block := range chain[st.lastIndex : index+1] { + txs += len(block.Transactions()) + } + end := chain[index] + + // Assemble the log context and send it to the logger + context := []interface{}{ + "blocks", st.processed, "txs", txs, "mgas", float64(st.usedGas) / 1000000, + "elapsed", common.PrettyDuration(elapsed), "mgasps", float64(st.usedGas) * 1000 / float64(elapsed), + "number", end.Number(), "hash", end.Hash(), + } + if timestamp := time.Unix(end.Time().Int64(), 0); time.Since(timestamp) > time.Minute { + context = append(context, []interface{}{"age", common.PrettyAge(timestamp)}...) + } + context = append(context, []interface{}{"cache", cache}...) + + if st.queued > 0 { + context = append(context, []interface{}{"queued", st.queued}...) + } + if st.ignored > 0 { + context = append(context, []interface{}{"ignored", st.ignored}...) + } + log.Info("Imported new chain segment", context...) + + // Bump the stats reported to the next section + *st = insertStats{startTime: now, lastIndex: index + 1} + } +} + +// insertIterator is a helper to assist during chain import. +type insertIterator struct { + chain types.Blocks + results <-chan error + index int + validator Validator +} + +// newInsertIterator creates a new iterator based on the given blocks, which are +// assumed to be a contiguous chain. +func newInsertIterator(chain types.Blocks, results <-chan error, validator Validator) *insertIterator { + return &insertIterator{ + chain: chain, + results: results, + index: -1, + validator: validator, + } +} + +// next returns the next block in the iterator, along with any potential validation +// error for that block. When the end is reached, it will return (nil, nil). +func (it *insertIterator) next() (*types.Block, error) { + if it.index+1 >= len(it.chain) { + it.index = len(it.chain) + return nil, nil + } + it.index++ + if err := <-it.results; err != nil { + return it.chain[it.index], err + } + return it.chain[it.index], it.validator.ValidateBody(it.chain[it.index]) +} + +// current returns the current block that's being processed. +func (it *insertIterator) current() *types.Block { + if it.index < 0 || it.index+1 >= len(it.chain) { + return nil + } + return it.chain[it.index] +} + +// previous returns the previous block was being processed, or nil +func (it *insertIterator) previous() *types.Block { + if it.index < 1 { + return nil + } + return it.chain[it.index-1] +} + +// first returns the first block in the it. +func (it *insertIterator) first() *types.Block { + return it.chain[0] +} + +// remaining returns the number of remaining blocks. +func (it *insertIterator) remaining() int { + return len(it.chain) - it.index +} + +// processed returns the number of processed blocks. +func (it *insertIterator) processed() int { + return it.index + 1 +}