core, eth/downloader: implement pruning mode sync (#31414)

This pull request introduces new sync logic for pruning mode. The downloader will now skip
insertion of block bodies and receipts before the configured history cutoff point.

Originally, in snap sync, the header chain and other components (bodies and receipts) were
inserted separately. However, in Proof-of-Stake, this separation is unnecessary since the
sync target is already verified by the CL.

To simplify the process, this pull request modifies `InsertReceiptChain` to insert headers
along with block bodies and receipts together. Besides, `InsertReceiptChain` doesn't have
the notion of reorg, as the common ancestor is always be found before the sync and extra
side chain is truncated at the beginning if they fall in the ancient store. The stale
canonical chain flags will always be rewritten by the new chain. Explicit reorg logic is
no longer required in `InsertReceiptChain`.
This commit is contained in:
rjl493456442 2025-04-03 21:16:35 +08:00 committed by GitHub
parent 22c0605b68
commit 90d44e715d
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
17 changed files with 598 additions and 542 deletions

View file

@ -457,7 +457,7 @@ func importHistory(ctx *cli.Context) error {
network = networks[0]
}
if err := utils.ImportHistory(chain, db, dir, network); err != nil {
if err := utils.ImportHistory(chain, dir, network); err != nil {
return err
}
fmt.Printf("Import done in %v\n", time.Since(start))

View file

@ -246,8 +246,9 @@ func readList(filename string) ([]string, error) {
}
// ImportHistory imports Era1 files containing historical block information,
// starting from genesis.
func ImportHistory(chain *core.BlockChain, db ethdb.Database, dir string, network string) error {
// starting from genesis. The assumption is held that the provided chain
// segment in Era1 file should all be canonical and verified.
func ImportHistory(chain *core.BlockChain, dir string, network string) error {
if chain.CurrentSnapBlock().Number.BitLen() != 0 {
return errors.New("history import only supported when starting from genesis")
}
@ -308,11 +309,6 @@ func ImportHistory(chain *core.BlockChain, db ethdb.Database, dir string, networ
if err != nil {
return fmt.Errorf("error reading receipts %d: %w", it.Number(), err)
}
if status, err := chain.HeaderChain().InsertHeaderChain([]*types.Header{block.Header()}, start); err != nil {
return fmt.Errorf("error inserting header %d: %w", it.Number(), err)
} else if status != core.CanonStatTy {
return fmt.Errorf("error inserting header %d, not canon: %v", it.Number(), status)
}
if _, err := chain.InsertReceiptChain([]*types.Block{block}, []types.Receipts{receipts}, 2^64-1); err != nil {
return fmt.Errorf("error inserting body %d: %w", it.Number(), err)
}

View file

@ -171,7 +171,7 @@ func TestHistoryImportAndExport(t *testing.T) {
if err != nil {
t.Fatalf("unable to initialize chain: %v", err)
}
if err := ImportHistory(imported, db2, dir, "mainnet"); err != nil {
if err := ImportHistory(imported, dir, "mainnet"); err != nil {
t.Fatalf("failed to import chain: %v", err)
}
if have, want := imported.CurrentHeader(), chain.CurrentHeader(); have.Hash() != want.Hash() {

View file

@ -24,6 +24,7 @@ import (
"math/big"
"runtime"
"slices"
"sort"
"strings"
"sync"
"sync/atomic"
@ -157,7 +158,8 @@ type CacheConfig struct {
// This defines the cutoff block for history expiry.
// Blocks before this number may be unavailable in the chain database.
HistoryPruningCutoff uint64
HistoryPruningCutoffNumber uint64
HistoryPruningCutoffHash common.Hash
}
// triedbConfig derives the configures for trie database.
@ -262,7 +264,6 @@ type BlockChain struct {
txLookupLock sync.RWMutex
txLookupCache *lru.Cache[common.Hash, txLookup]
wg sync.WaitGroup
quit chan struct{} // shutdown signal, closed in Stop.
stopping atomic.Bool // false if chain is running, true when stopped
procInterrupt atomic.Bool // interrupt signaler for block processing
@ -333,10 +334,10 @@ func NewBlockChain(db ethdb.Database, cacheConfig *CacheConfig, genesis *Genesis
bc.processor = NewStateProcessor(chainConfig, bc.hc)
genesisHeader := bc.GetHeaderByNumber(0)
bc.genesisBlock = types.NewBlockWithHeader(genesisHeader)
if bc.genesisBlock == nil {
if genesisHeader == nil {
return nil, ErrNoGenesis
}
bc.genesisBlock = types.NewBlockWithHeader(genesisHeader)
bc.currentBlock.Store(nil)
bc.currentSnapBlock.Store(nil)
@ -1110,7 +1111,6 @@ func (bc *BlockChain) stopWithoutSaving() {
// the mutex should become available quickly. It cannot be taken again after Close has
// returned.
bc.chainmu.Close()
bc.wg.Wait()
}
// Stop stops the blockchain service. If any imports are currently in progress
@ -1197,79 +1197,64 @@ const (
SideStatTy
)
// InsertReceiptChain attempts to complete an already existing header chain with
// transaction and receipt data.
// InsertReceiptChain inserts a batch of blocks along with their receipts into
// the database. Unlike InsertChain, this function does not verify the state root
// in the blocks. It is used exclusively for snap sync. All the inserted blocks
// will be regarded as canonical, chain reorg is not supported.
//
// The optional ancientLimit can also be specified and chain segment before that
// will be directly stored in the ancient, getting rid of the chain migration.
func (bc *BlockChain) InsertReceiptChain(blockChain types.Blocks, receiptChain []types.Receipts, ancientLimit uint64) (int, error) {
// We don't require the chainMu here since we want to maximize the
// concurrency of header insertion and receipt insertion.
bc.wg.Add(1)
defer bc.wg.Done()
// Verify the supplied headers before insertion without lock
var headers []*types.Header
for _, block := range blockChain {
headers = append(headers, block.Header())
var (
ancientBlocks, liveBlocks types.Blocks
ancientReceipts, liveReceipts []types.Receipts
)
// Do a sanity check that the provided chain is actually ordered and linked
for i, block := range blockChain {
if i != 0 {
prev := blockChain[i-1]
if block.NumberU64() != prev.NumberU64()+1 || block.ParentHash() != prev.Hash() {
log.Error("Non contiguous receipt insert",
"number", block.Number(), "hash", block.Hash(), "parent", block.ParentHash(),
"prevnumber", prev.Number(), "prevhash", prev.Hash())
return 0, fmt.Errorf("non contiguous insert: item %d is #%d [%x..], item %d is #%d [%x..] (parent [%x..])",
i-1, prev.NumberU64(), prev.Hash().Bytes()[:4],
i, block.NumberU64(), block.Hash().Bytes()[:4], block.ParentHash().Bytes()[:4])
}
}
if block.NumberU64() <= ancientLimit {
ancientBlocks, ancientReceipts = append(ancientBlocks, block), append(ancientReceipts, receiptChain[i])
} else {
liveBlocks, liveReceipts = append(liveBlocks, block), append(liveReceipts, receiptChain[i])
}
// Here we also validate that blob transactions in the block do not contain a sidecar.
// While the sidecar does not affect the block hash / tx hash, sending blobs within a block is not allowed.
// Here we also validate that blob transactions in the block do not
// contain a sidecar. While the sidecar does not affect the block hash
// or tx hash, sending blobs within a block is not allowed.
for txIndex, tx := range block.Transactions() {
if tx.Type() == types.BlobTxType && tx.BlobTxSidecar() != nil {
return 0, fmt.Errorf("block #%d contains unexpected blob sidecar in tx at index %d", block.NumberU64(), txIndex)
}
}
}
if n, err := bc.hc.ValidateHeaderChain(headers); err != nil {
return n, err
}
// Hold the mutation lock
if !bc.chainmu.TryLock() {
return 0, errChainStopped
}
defer bc.chainmu.Unlock()
var (
stats = struct{ processed, ignored int32 }{}
start = time.Now()
size = int64(0)
)
// updateHead updates the head snap sync block if the inserted blocks are better
// and returns an indicator whether the inserted blocks are canonical.
updateHead := func(head *types.Block) bool {
if !bc.chainmu.TryLock() {
return false
// updateHead updates the head header and head snap block flags.
updateHead := func(header *types.Header) error {
batch := bc.db.NewBatch()
hash := header.Hash()
rawdb.WriteHeadHeaderHash(batch, hash)
rawdb.WriteHeadFastBlockHash(batch, hash)
if err := batch.Write(); err != nil {
return err
}
defer bc.chainmu.Unlock()
// Rewind may have occurred, skip in that case.
if bc.CurrentHeader().Number.Cmp(head.Number()) >= 0 {
rawdb.WriteHeadFastBlockHash(bc.db, head.Hash())
bc.currentSnapBlock.Store(head.Header())
headFastBlockGauge.Update(int64(head.NumberU64()))
return true
}
return false
bc.hc.currentHeader.Store(header)
bc.currentSnapBlock.Store(header)
headHeaderGauge.Update(header.Number.Int64())
headFastBlockGauge.Update(header.Number.Int64())
return nil
}
// writeAncient writes blockchain and corresponding receipt chain into ancient store.
//
// this function only accepts canonical chain data. All side chain will be reverted
// eventually.
writeAncient := func(blockChain types.Blocks, receiptChain []types.Receipts) (int, error) {
first := blockChain[0]
last := blockChain[len(blockChain)-1]
// Ensure genesis is in ancients.
if first.NumberU64() == 1 {
// Ensure genesis is in the ancient store
if blockChain[0].NumberU64() == 1 {
if frozen, _ := bc.db.Ancients(); frozen == 0 {
writeSize, err := rawdb.WriteAncientBlocks(bc.db, []*types.Block{bc.genesisBlock}, []types.Receipts{nil})
if err != nil {
@ -1280,12 +1265,6 @@ func (bc *BlockChain) InsertReceiptChain(blockChain types.Blocks, receiptChain [
log.Info("Wrote genesis to ancients")
}
}
// Before writing the blocks to the ancients, we need to ensure that
// they correspond to the what the headerchain 'expects'.
// We only check the last block/header, since it's a contiguous chain.
if !bc.HasHeader(last.Hash(), last.NumberU64()) {
return 0, fmt.Errorf("containing header #%d [%x..] unknown", last.Number(), last.Hash().Bytes()[:4])
}
// Write all chain data to ancients.
writeSize, err := rawdb.WriteAncientBlocks(bc.db, blockChain, receiptChain)
if err != nil {
@ -1298,44 +1277,28 @@ func (bc *BlockChain) InsertReceiptChain(blockChain types.Blocks, receiptChain [
if err := bc.db.Sync(); err != nil {
return 0, err
}
// Update the current snap block because all block data is now present in DB.
previousSnapBlock := bc.CurrentSnapBlock().Number.Uint64()
if !updateHead(blockChain[len(blockChain)-1]) {
// We end up here if the header chain has reorg'ed, and the blocks/receipts
// don't match the canonical chain.
if _, err := bc.db.TruncateHead(previousSnapBlock + 1); err != nil {
log.Error("Can't truncate ancient store after failed insert", "err", err)
}
return 0, errSideChainReceipts
}
// Delete block data from the main database.
var (
batch = bc.db.NewBatch()
canonHashes = make(map[common.Hash]struct{}, len(blockChain))
)
// Write hash to number mappings
batch := bc.db.NewBatch()
for _, block := range blockChain {
canonHashes[block.Hash()] = struct{}{}
if block.NumberU64() == 0 {
continue
}
rawdb.DeleteCanonicalHash(batch, block.NumberU64())
rawdb.DeleteBlockWithoutNumber(batch, block.Hash(), block.NumberU64())
}
// Delete side chain hash-to-number mappings.
for _, nh := range rawdb.ReadAllHashesInRange(bc.db, first.NumberU64(), last.NumberU64()) {
if _, canon := canonHashes[nh.Hash]; !canon {
rawdb.DeleteHeader(batch, nh.Hash, nh.Number)
}
rawdb.WriteHeaderNumber(batch, block.Hash(), block.NumberU64())
}
if err := batch.Write(); err != nil {
return 0, err
}
// Update the current snap block because all block data is now present in DB.
if err := updateHead(blockChain[len(blockChain)-1].Header()); err != nil {
return 0, err
}
stats.processed += int32(len(blockChain))
return 0, nil
}
// writeLive writes blockchain and corresponding receipt chain into active store.
// writeLive writes the blockchain and corresponding receipt chain to the active store.
//
// Notably, in different snap sync cycles, the supplied chain may partially reorganize
// existing local chain segments (reorg around the chain tip). The reorganized part
// will be included in the provided chain segment, and stale canonical markers will be
// silently rewritten. Therefore, no explicit reorg logic is needed.
writeLive := func(blockChain types.Blocks, receiptChain []types.Receipts) (int, error) {
var (
skipPresenceCheck = false
@ -1346,10 +1309,6 @@ func (bc *BlockChain) InsertReceiptChain(blockChain types.Blocks, receiptChain [
if bc.insertStopped() {
return 0, errInsertionInterrupted
}
// Short circuit if the owner header is unknown
if !bc.HasHeader(block.Hash(), block.NumberU64()) {
return i, fmt.Errorf("containing header #%d [%x..] unknown", block.Number(), block.Hash().Bytes()[:4])
}
if !skipPresenceCheck {
// Ignore if the entire data is already known
if bc.HasBlock(block.Hash(), block.NumberU64()) {
@ -1363,7 +1322,8 @@ func (bc *BlockChain) InsertReceiptChain(blockChain types.Blocks, receiptChain [
}
}
// Write all the data out into the database
rawdb.WriteBody(batch, block.Hash(), block.NumberU64(), block.Body())
rawdb.WriteCanonicalHash(batch, block.Hash(), block.NumberU64())
rawdb.WriteBlock(batch, block)
rawdb.WriteReceipts(batch, block.Hash(), block.NumberU64(), receiptChain[i])
// Write everything belongs to the blocks into the database. So that
@ -1387,21 +1347,27 @@ func (bc *BlockChain) InsertReceiptChain(blockChain types.Blocks, receiptChain [
return 0, err
}
}
updateHead(blockChain[len(blockChain)-1])
if err := updateHead(blockChain[len(blockChain)-1].Header()); err != nil {
return 0, err
}
return 0, nil
}
// Write downloaded chain data and corresponding receipt chain data
if len(ancientBlocks) > 0 {
if n, err := writeAncient(ancientBlocks, ancientReceipts); err != nil {
// Split the supplied blocks into two groups, according to the
// given ancient limit.
index := sort.Search(len(blockChain), func(i int) bool {
return blockChain[i].NumberU64() >= ancientLimit
})
if index > 0 {
if n, err := writeAncient(blockChain[:index], receiptChain[:index]); err != nil {
if err == errInsertionInterrupted {
return 0, nil
}
return n, err
}
}
if len(liveBlocks) > 0 {
if n, err := writeLive(liveBlocks, liveReceipts); err != nil {
if index != len(blockChain) {
if n, err := writeLive(blockChain[index:], receiptChain[index:]); err != nil {
if err == errInsertionInterrupted {
return 0, nil
}
@ -1420,7 +1386,6 @@ func (bc *BlockChain) InsertReceiptChain(blockChain types.Blocks, receiptChain [
context = append(context, []interface{}{"ignored", stats.ignored}...)
}
log.Debug("Imported new block receipts", context...)
return 0, nil
}
@ -2505,15 +2470,83 @@ func (bc *BlockChain) InsertHeaderChain(chain []*types.Header) (int, error) {
if i, err := bc.hc.ValidateHeaderChain(chain); err != nil {
return i, err
}
if !bc.chainmu.TryLock() {
return 0, errChainStopped
}
defer bc.chainmu.Unlock()
_, err := bc.hc.InsertHeaderChain(chain, start)
return 0, err
}
// InsertHeadersBeforeCutoff inserts the given headers into the ancient store
// as they are claimed older than the configured chain cutoff point. All the
// inserted headers are regarded as canonical and chain reorg is not supported.
func (bc *BlockChain) InsertHeadersBeforeCutoff(headers []*types.Header) (int, error) {
if len(headers) == 0 {
return 0, nil
}
// TODO(rjl493456442): Headers before the configured cutoff have already
// been verified by the hash of cutoff header. Theoretically, header validation
// could be skipped here.
if n, err := bc.hc.ValidateHeaderChain(headers); err != nil {
return n, err
}
if !bc.chainmu.TryLock() {
return 0, errChainStopped
}
defer bc.chainmu.Unlock()
// Initialize the ancient store with genesis block if it's empty.
var (
frozen, _ = bc.db.Ancients()
first = headers[0].Number.Uint64()
)
if first == 1 && frozen == 0 {
_, err := rawdb.WriteAncientBlocks(bc.db, []*types.Block{bc.genesisBlock}, []types.Receipts{nil})
if err != nil {
log.Error("Error writing genesis to ancients", "err", err)
return 0, err
}
log.Info("Wrote genesis to ancient store")
} else if frozen != first {
return 0, fmt.Errorf("headers are gapped with the ancient store, first: %d, ancient: %d", first, frozen)
}
// Write headers to the ancient store, with block bodies and receipts set to nil
// to ensure consistency across tables in the freezer.
_, err := rawdb.WriteAncientHeaderChain(bc.db, headers)
if err != nil {
return 0, err
}
if err := bc.db.Sync(); err != nil {
return 0, err
}
// Write hash to number mappings
batch := bc.db.NewBatch()
for _, header := range headers {
rawdb.WriteHeaderNumber(batch, header.Hash(), header.Number.Uint64())
}
// Write head header and head snap block flags
last := headers[len(headers)-1]
rawdb.WriteHeadHeaderHash(batch, last.Hash())
rawdb.WriteHeadFastBlockHash(batch, last.Hash())
if err := batch.Write(); err != nil {
return 0, err
}
// Truncate the useless chain segment (zero bodies and receipts) in the
// ancient store.
if _, err := bc.db.TruncateTail(last.Number.Uint64() + 1); err != nil {
return 0, err
}
// Last step update all in-memory markers
bc.hc.currentHeader.Store(last)
bc.currentSnapBlock.Store(last)
headHeaderGauge.Update(last.Number.Int64())
headFastBlockGauge.Update(last.Number.Int64())
return 0, nil
}
// SetBlockValidatorAndProcessorForTesting sets the current validator and processor.
// This method can be used to force an invalid blockchain to be verified for tests.
// This method is unsafe and should only be used before block import starts.
@ -2533,9 +2566,3 @@ func (bc *BlockChain) SetTrieFlushInterval(interval time.Duration) {
func (bc *BlockChain) GetTrieFlushInterval() time.Duration {
return time.Duration(bc.flushInterval.Load())
}
// HistoryPruningCutoff returns the configured history pruning point.
// Blocks before this might not be available in the database.
func (bc *BlockChain) HistoryPruningCutoff() uint64 {
return bc.cacheConfig.HistoryPruningCutoff
}

View file

@ -407,6 +407,12 @@ func (bc *BlockChain) TxIndexProgress() (TxIndexProgress, error) {
return bc.txIndexer.txIndexProgress()
}
// HistoryPruningCutoff returns the configured history pruning point.
// Blocks before this might not be available in the database.
func (bc *BlockChain) HistoryPruningCutoff() (uint64, common.Hash) {
return bc.cacheConfig.HistoryPruningCutoffNumber, bc.cacheConfig.HistoryPruningCutoffHash
}
// TrieDB retrieves the low level trie database used for data storage.
func (bc *BlockChain) TrieDB() *triedb.Database {
return bc.triedb

View file

@ -733,13 +733,6 @@ func testFastVsFullChains(t *testing.T, scheme string) {
fast, _ := NewBlockChain(fastDb, DefaultCacheConfigWithScheme(scheme), gspec, nil, ethash.NewFaker(), vm.Config{}, nil)
defer fast.Stop()
headers := make([]*types.Header, len(blocks))
for i, block := range blocks {
headers[i] = block.Header()
}
if n, err := fast.InsertHeaderChain(headers); err != nil {
t.Fatalf("failed to insert header %d: %v", n, err)
}
if n, err := fast.InsertReceiptChain(blocks, receipts, 0); err != nil {
t.Fatalf("failed to insert receipt %d: %v", n, err)
}
@ -753,9 +746,6 @@ func testFastVsFullChains(t *testing.T, scheme string) {
ancient, _ := NewBlockChain(ancientDb, DefaultCacheConfigWithScheme(scheme), gspec, nil, ethash.NewFaker(), vm.Config{}, nil)
defer ancient.Stop()
if n, err := ancient.InsertHeaderChain(headers); err != nil {
t.Fatalf("failed to insert header %d: %v", n, err)
}
if n, err := ancient.InsertReceiptChain(blocks, receipts, uint64(len(blocks)/2)); err != nil {
t.Fatalf("failed to insert receipt %d: %v", n, err)
}
@ -880,13 +870,6 @@ func testLightVsFastVsFullChainHeads(t *testing.T, scheme string) {
fast, _ := NewBlockChain(fastDb, DefaultCacheConfigWithScheme(scheme), gspec, nil, ethash.NewFaker(), vm.Config{}, nil)
defer fast.Stop()
headers := make([]*types.Header, len(blocks))
for i, block := range blocks {
headers[i] = block.Header()
}
if n, err := fast.InsertHeaderChain(headers); err != nil {
t.Fatalf("failed to insert header %d: %v", n, err)
}
if n, err := fast.InsertReceiptChain(blocks, receipts, 0); err != nil {
t.Fatalf("failed to insert receipt %d: %v", n, err)
}
@ -900,9 +883,6 @@ func testLightVsFastVsFullChainHeads(t *testing.T, scheme string) {
ancient, _ := NewBlockChain(ancientDb, DefaultCacheConfigWithScheme(scheme), gspec, nil, ethash.NewFaker(), vm.Config{}, nil)
defer ancient.Stop()
if n, err := ancient.InsertHeaderChain(headers); err != nil {
t.Fatalf("failed to insert header %d: %v", n, err)
}
if n, err := ancient.InsertReceiptChain(blocks, receipts, uint64(3*len(blocks)/4)); err != nil {
t.Fatalf("failed to insert receipt %d: %v", n, err)
}
@ -916,6 +896,11 @@ func testLightVsFastVsFullChainHeads(t *testing.T, scheme string) {
// Import the chain as a light node and ensure all pointers are updated
lightDb := makeDb()
defer lightDb.Close()
headers := make([]*types.Header, len(blocks))
for i, block := range blocks {
headers[i] = block.Header()
}
light, _ := NewBlockChain(lightDb, DefaultCacheConfigWithScheme(scheme), gspec, nil, ethash.NewFaker(), vm.Config{}, nil)
if n, err := light.InsertHeaderChain(headers); err != nil {
t.Fatalf("failed to insert header %d: %v", n, err)
@ -1710,13 +1695,6 @@ func testBlockchainRecovery(t *testing.T, scheme string) {
defer ancientDb.Close()
ancient, _ := NewBlockChain(ancientDb, DefaultCacheConfigWithScheme(scheme), gspec, nil, ethash.NewFaker(), vm.Config{}, nil)
headers := make([]*types.Header, len(blocks))
for i, block := range blocks {
headers[i] = block.Header()
}
if n, err := ancient.InsertHeaderChain(headers); err != nil {
t.Fatalf("failed to insert header %d: %v", n, err)
}
if n, err := ancient.InsertReceiptChain(blocks, receipts, uint64(3*len(blocks)/4)); err != nil {
t.Fatalf("failed to insert receipt %d: %v", n, err)
}
@ -1741,82 +1719,6 @@ func testBlockchainRecovery(t *testing.T, scheme string) {
}
}
// This test checks that InsertReceiptChain will roll back correctly when attempting to insert a side chain.
func TestInsertReceiptChainRollback(t *testing.T) {
testInsertReceiptChainRollback(t, rawdb.HashScheme)
testInsertReceiptChainRollback(t, rawdb.PathScheme)
}
func testInsertReceiptChainRollback(t *testing.T, scheme string) {
// Generate forked chain. The returned BlockChain object is used to process the side chain blocks.
tmpChain, sideblocks, canonblocks, gspec, err := getLongAndShortChains(scheme)
if err != nil {
t.Fatal(err)
}
defer tmpChain.Stop()
// Get the side chain receipts.
if _, err := tmpChain.InsertChain(sideblocks); err != nil {
t.Fatal("processing side chain failed:", err)
}
t.Log("sidechain head:", tmpChain.CurrentBlock().Number, tmpChain.CurrentBlock().Hash())
sidechainReceipts := make([]types.Receipts, len(sideblocks))
for i, block := range sideblocks {
sidechainReceipts[i] = tmpChain.GetReceiptsByHash(block.Hash())
}
// Get the canon chain receipts.
if _, err := tmpChain.InsertChain(canonblocks); err != nil {
t.Fatal("processing canon chain failed:", err)
}
t.Log("canon head:", tmpChain.CurrentBlock().Number, tmpChain.CurrentBlock().Hash())
canonReceipts := make([]types.Receipts, len(canonblocks))
for i, block := range canonblocks {
canonReceipts[i] = tmpChain.GetReceiptsByHash(block.Hash())
}
// Set up a BlockChain that uses the ancient store.
ancientDb, err := rawdb.NewDatabaseWithFreezer(rawdb.NewMemoryDatabase(), "", "", false)
if err != nil {
t.Fatalf("failed to create temp freezer db: %v", err)
}
defer ancientDb.Close()
ancientChain, _ := NewBlockChain(ancientDb, DefaultCacheConfigWithScheme(scheme), gspec, nil, ethash.NewFaker(), vm.Config{}, nil)
defer ancientChain.Stop()
// Import the canonical header chain.
canonHeaders := make([]*types.Header, len(canonblocks))
for i, block := range canonblocks {
canonHeaders[i] = block.Header()
}
if _, err = ancientChain.InsertHeaderChain(canonHeaders); err != nil {
t.Fatal("can't import canon headers:", err)
}
// Try to insert blocks/receipts of the side chain.
_, err = ancientChain.InsertReceiptChain(sideblocks, sidechainReceipts, uint64(len(sideblocks)))
if err == nil {
t.Fatal("expected error from InsertReceiptChain.")
}
if ancientChain.CurrentSnapBlock().Number.Uint64() != 0 {
t.Fatalf("failed to rollback ancient data, want %d, have %d", 0, ancientChain.CurrentSnapBlock().Number)
}
if frozen, err := ancientChain.db.Ancients(); err != nil || frozen != 1 {
t.Fatalf("failed to truncate ancient data, frozen index is %d", frozen)
}
// Insert blocks/receipts of the canonical chain.
_, err = ancientChain.InsertReceiptChain(canonblocks, canonReceipts, uint64(len(canonblocks)))
if err != nil {
t.Fatalf("can't import canon chain receipts: %v", err)
}
if ancientChain.CurrentSnapBlock().Number.Uint64() != canonblocks[len(canonblocks)-1].NumberU64() {
t.Fatalf("failed to insert ancient recept chain after rollback")
}
if frozen, _ := ancientChain.db.Ancients(); frozen != uint64(len(canonblocks))+1 {
t.Fatalf("wrong ancients count %d", frozen)
}
}
// Tests that importing a very large side fork, which is larger than the canon chain,
// but where the difficulty per block is kept low: this means that it will not
// overtake the 'canon' chain until after it's passed canon by about 200 blocks.
@ -2088,14 +1990,6 @@ func testInsertKnownChainData(t *testing.T, typ string, scheme string) {
}
} else if typ == "receipts" {
inserter = func(blocks []*types.Block, receipts []types.Receipts) error {
headers := make([]*types.Header, 0, len(blocks))
for _, block := range blocks {
headers = append(headers, block.Header())
}
_, err := chain.InsertHeaderChain(headers)
if err != nil {
return err
}
_, err = chain.InsertReceiptChain(blocks, receipts, 0)
return err
}
@ -2262,14 +2156,6 @@ func testInsertKnownChainDataWithMerging(t *testing.T, typ string, mergeHeight i
}
} else if typ == "receipts" {
inserter = func(blocks []*types.Block, receipts []types.Receipts) error {
headers := make([]*types.Header, 0, len(blocks))
for _, block := range blocks {
headers = append(headers, block.Header())
}
i, err := chain.InsertHeaderChain(headers)
if err != nil {
return fmt.Errorf("index %d: %w", i, err)
}
_, err = chain.InsertReceiptChain(blocks, receipts, 0)
return err
}
@ -4265,3 +4151,220 @@ func TestEIP7702(t *testing.T) {
t.Fatalf("addr2 storage wrong: expected %d, got %d", fortyTwo, actual)
}
}
// Tests the scenario that the synchronization target in snap sync has been changed
// with a chain reorg at the tip. In this case the reorg'd segment should be unmarked
// with canonical flags.
func TestChainReorgSnapSync(t *testing.T) {
testChainReorgSnapSync(t, 0)
testChainReorgSnapSync(t, 32)
testChainReorgSnapSync(t, gomath.MaxUint64)
}
func testChainReorgSnapSync(t *testing.T, ancientLimit uint64) {
// log.SetDefault(log.NewLogger(log.NewTerminalHandlerWithLevel(os.Stderr, log.LevelDebug, true)))
// Configure and generate a sample block chain
var (
key, _ = crypto.HexToECDSA("b71c71a67e1177ad4e901695e1b4b9ee17ae16c6668d313eac2f96dbcda3f291")
address = crypto.PubkeyToAddress(key.PublicKey)
funds = big.NewInt(1000000000000000)
gspec = &Genesis{
Config: params.TestChainConfig,
Alloc: types.GenesisAlloc{address: {Balance: funds}},
BaseFee: big.NewInt(params.InitialBaseFee),
}
signer = types.LatestSigner(gspec.Config)
engine = beacon.New(ethash.NewFaker())
)
genDb, blocks, receipts := GenerateChainWithGenesis(gspec, engine, 32, func(i int, block *BlockGen) {
block.SetCoinbase(common.Address{0x00})
// If the block number is multiple of 3, send a few bonus transactions to the miner
if i%3 == 2 {
for j := 0; j < i%4+1; j++ {
tx, err := types.SignTx(types.NewTransaction(block.TxNonce(address), common.Address{0x00}, big.NewInt(1000), params.TxGas, block.header.BaseFee, nil), signer, key)
if err != nil {
panic(err)
}
block.AddTx(tx)
}
}
})
chainA, receiptsA := GenerateChain(gspec.Config, blocks[len(blocks)-1], engine, genDb, 16, func(i int, gen *BlockGen) {
gen.SetCoinbase(common.Address{0: byte(0xa), 19: byte(i)})
})
chainB, receiptsB := GenerateChain(gspec.Config, blocks[len(blocks)-1], engine, genDb, 20, func(i int, gen *BlockGen) {
gen.SetCoinbase(common.Address{0: byte(0xb), 19: byte(i)})
})
db, _ := rawdb.NewDatabaseWithFreezer(rawdb.NewMemoryDatabase(), "", "", false)
defer db.Close()
chain, _ := NewBlockChain(db, DefaultCacheConfigWithScheme(rawdb.PathScheme), gspec, nil, beacon.New(ethash.NewFaker()), vm.Config{}, nil)
defer chain.Stop()
if n, err := chain.InsertReceiptChain(blocks, receipts, ancientLimit); err != nil {
t.Fatalf("failed to insert receipt %d: %v", n, err)
}
if n, err := chain.InsertReceiptChain(chainA, receiptsA, ancientLimit); err != nil {
t.Fatalf("failed to insert receipt %d: %v", n, err)
}
// If the common ancestor is below the ancient limit, rewind the chain head.
// It's aligned with the behavior in the snap sync
ancestor := blocks[len(blocks)-1].NumberU64()
if ancestor < ancientLimit {
rawdb.WriteLastPivotNumber(db, ancestor)
chain.SetHead(ancestor)
}
if n, err := chain.InsertReceiptChain(chainB, receiptsB, ancientLimit); err != nil {
t.Fatalf("failed to insert receipt %d: %v", n, err)
}
head := chain.CurrentSnapBlock()
if head.Hash() != chainB[len(chainB)-1].Hash() {
t.Errorf("head snap block #%d: header mismatch: want: %v, got: %v", head.Number, chainB[len(chainB)-1].Hash(), head.Hash())
}
// Iterate over all chain data components, and cross reference
for i := 0; i < len(blocks); i++ {
num, hash := blocks[i].NumberU64(), blocks[i].Hash()
header := chain.GetHeaderByNumber(num)
if header.Hash() != hash {
t.Errorf("block #%d: header mismatch: want: %v, got: %v", num, hash, header.Hash())
}
}
for i := 0; i < len(chainA); i++ {
num, hash := chainA[i].NumberU64(), chainA[i].Hash()
header := chain.GetHeaderByNumber(num)
if header == nil {
continue
}
if header.Hash() == hash {
t.Errorf("block #%d: unexpected canonical header: %v", num, hash)
}
}
for i := 0; i < len(chainB); i++ {
num, hash := chainB[i].NumberU64(), chainB[i].Hash()
header := chain.GetHeaderByNumber(num)
if header.Hash() != hash {
t.Errorf("block #%d: header mismatch: want: %v, got: %v", num, hash, header.Hash())
}
}
}
// Tests the scenario that all the inserted chain segment are with the configured
// chain cutoff point. In this case the chain segment before the cutoff should
// be persisted without the receipts and bodies; chain after should be persisted
// normally.
func TestInsertChainWithCutoff(t *testing.T) {
testInsertChainWithCutoff(t, 32, 32) // cutoff = 32, ancientLimit = 32
testInsertChainWithCutoff(t, 32, 64) // cutoff = 32, ancientLimit = 64 (entire chain in ancient)
testInsertChainWithCutoff(t, 32, 65) // cutoff = 32, ancientLimit = 65 (64 blocks in ancient, 1 block in live)
}
func testInsertChainWithCutoff(t *testing.T, cutoff uint64, ancientLimit uint64) {
// log.SetDefault(log.NewLogger(log.NewTerminalHandlerWithLevel(os.Stderr, log.LevelDebug, true)))
// Configure and generate a sample block chain
var (
key, _ = crypto.HexToECDSA("b71c71a67e1177ad4e901695e1b4b9ee17ae16c6668d313eac2f96dbcda3f291")
address = crypto.PubkeyToAddress(key.PublicKey)
funds = big.NewInt(1000000000000000)
gspec = &Genesis{
Config: params.TestChainConfig,
Alloc: types.GenesisAlloc{address: {Balance: funds}},
BaseFee: big.NewInt(params.InitialBaseFee),
}
signer = types.LatestSigner(gspec.Config)
engine = beacon.New(ethash.NewFaker())
)
_, blocks, receipts := GenerateChainWithGenesis(gspec, engine, int(2*cutoff), func(i int, block *BlockGen) {
block.SetCoinbase(common.Address{0x00})
tx, err := types.SignTx(types.NewTransaction(block.TxNonce(address), common.Address{0x00}, big.NewInt(1000), params.TxGas, block.header.BaseFee, nil), signer, key)
if err != nil {
panic(err)
}
block.AddTx(tx)
})
db, _ := rawdb.NewDatabaseWithFreezer(rawdb.NewMemoryDatabase(), "", "", false)
defer db.Close()
cutoffBlock := blocks[cutoff-1]
config := DefaultCacheConfigWithScheme(rawdb.PathScheme)
config.HistoryPruningCutoffNumber = cutoffBlock.NumberU64()
config.HistoryPruningCutoffHash = cutoffBlock.Hash()
chain, _ := NewBlockChain(db, DefaultCacheConfigWithScheme(rawdb.PathScheme), gspec, nil, beacon.New(ethash.NewFaker()), vm.Config{}, nil)
defer chain.Stop()
var (
headersBefore []*types.Header
blocksAfter []*types.Block
receiptsAfter []types.Receipts
)
for i, b := range blocks {
if b.NumberU64() < cutoffBlock.NumberU64() {
headersBefore = append(headersBefore, b.Header())
} else {
blocksAfter = append(blocksAfter, b)
receiptsAfter = append(receiptsAfter, receipts[i])
}
}
if n, err := chain.InsertHeadersBeforeCutoff(headersBefore); err != nil {
t.Fatalf("failed to insert headers before cutoff %d: %v", n, err)
}
if n, err := chain.InsertReceiptChain(blocksAfter, receiptsAfter, ancientLimit); err != nil {
t.Fatalf("failed to insert receipt %d: %v", n, err)
}
headSnap := chain.CurrentSnapBlock()
if headSnap.Hash() != blocks[len(blocks)-1].Hash() {
t.Errorf("head snap block #%d: header mismatch: want: %v, got: %v", headSnap.Number, blocks[len(blocks)-1].Hash(), headSnap.Hash())
}
headHeader := chain.CurrentHeader()
if headHeader.Hash() != blocks[len(blocks)-1].Hash() {
t.Errorf("head header #%d: header mismatch: want: %v, got: %v", headHeader.Number, blocks[len(blocks)-1].Hash(), headHeader.Hash())
}
headBlock := chain.CurrentBlock()
if headBlock.Hash() != gspec.ToBlock().Hash() {
t.Errorf("head block #%d: header mismatch: want: %v, got: %v", headBlock.Number, gspec.ToBlock().Hash(), headBlock.Hash())
}
// Iterate over all chain data components, and cross reference
for i := 0; i < len(blocks); i++ {
num, hash := blocks[i].NumberU64(), blocks[i].Hash()
// Canonical headers should be visible regardless of cutoff
header := chain.GetHeaderByNumber(num)
if header.Hash() != hash {
t.Errorf("block #%d: header mismatch: want: %v, got: %v", num, hash, header.Hash())
}
tail, err := db.Tail()
if err != nil {
t.Fatalf("Failed to get chain tail, %v", err)
}
if tail != cutoffBlock.NumberU64() {
t.Fatalf("Unexpected chain tail, want: %d, got: %d", cutoffBlock.NumberU64(), tail)
}
// Block bodies and receipts before the cutoff should be non-existent
if num < cutoffBlock.NumberU64() {
body := chain.GetBody(hash)
if body != nil {
t.Fatalf("Unexpected block body: %d, cutoff: %d", num, cutoffBlock.NumberU64())
}
receipts := chain.GetReceiptsByHash(hash)
if receipts != nil {
t.Fatalf("Unexpected block receipts: %d, cutoff: %d", num, cutoffBlock.NumberU64())
}
} else {
body := chain.GetBody(hash)
if body == nil || len(body.Transactions) != 1 {
t.Fatalf("Missed block body: %d, cutoff: %d", num, cutoffBlock.NumberU64())
}
receipts := chain.GetReceiptsByHash(hash)
if receipts == nil || len(receipts) != 1 {
t.Fatalf("Missed block receipts: %d, cutoff: %d", num, cutoffBlock.NumberU64())
}
}
}
}

View file

@ -28,8 +28,6 @@ var (
// ErrNoGenesis is returned when there is no Genesis Block.
ErrNoGenesis = errors.New("genesis not found in chain")
errSideChainReceipts = errors.New("side blocks can't be accepted as ancient chain data")
)
// List of evm-call-message pre-checking errors. All state transition messages will

View file

@ -237,7 +237,8 @@ func (hc *HeaderChain) WriteHeaders(headers []*types.Header) (int, error) {
}
// writeHeadersAndSetHead writes a batch of block headers and applies the last
// header as the chain head if the fork choicer says it's ok to update the chain.
// header as the chain head.
//
// Note: This method is not concurrent-safe with inserting blocks simultaneously
// into the chain, as side effects caused by reorganisations cannot be emulated
// without the real blocks. Hence, writing headers directly should only be done
@ -272,12 +273,14 @@ func (hc *HeaderChain) writeHeadersAndSetHead(headers []*types.Header) (*headerW
return result, nil
}
// ValidateHeaderChain verifies that the supplied header chain is contiguous
// and conforms to consensus rules.
func (hc *HeaderChain) ValidateHeaderChain(chain []*types.Header) (int, error) {
// Do a sanity check that the provided chain is actually ordered and linked
for i := 1; i < len(chain); i++ {
if chain[i].Number.Uint64() != chain[i-1].Number.Uint64()+1 {
hash := chain[i].Hash()
parentHash := chain[i-1].Hash()
hash, parentHash := chain[i].Hash(), chain[i-1].Hash()
// Chain broke ancestry, log a message (programming error) and skip insertion
log.Error("Non contiguous header insert", "number", chain[i].Number, "hash", hash,
"parent", chain[i].ParentHash, "prevnumber", chain[i-1].Number, "prevhash", parentHash)
@ -302,7 +305,6 @@ func (hc *HeaderChain) ValidateHeaderChain(chain []*types.Header) (int, error) {
return i, err
}
}
return 0, nil
}

View file

@ -737,6 +737,30 @@ func writeAncientBlock(op ethdb.AncientWriteOp, block *types.Block, header *type
return nil
}
// WriteAncientHeaderChain writes the supplied headers along with nil block
// bodies and receipts into the ancient store. It's supposed to be used for
// storing chain segment before the chain cutoff.
func WriteAncientHeaderChain(db ethdb.AncientWriter, headers []*types.Header) (int64, error) {
return db.ModifyAncients(func(op ethdb.AncientWriteOp) error {
for _, header := range headers {
num := header.Number.Uint64()
if err := op.AppendRaw(ChainFreezerHashTable, num, header.Hash().Bytes()); err != nil {
return fmt.Errorf("can't add block %d hash: %v", num, err)
}
if err := op.Append(ChainFreezerHeaderTable, num, header); err != nil {
return fmt.Errorf("can't append block header %d: %v", num, err)
}
if err := op.AppendRaw(ChainFreezerBodiesTable, num, nil); err != nil {
return fmt.Errorf("can't append block body %d: %v", num, err)
}
if err := op.AppendRaw(ChainFreezerReceiptTable, num, nil); err != nil {
return fmt.Errorf("can't append block %d receipts: %v", num, err)
}
}
return nil
})
}
// DeleteBlock removes all block data associated with a hash.
func DeleteBlock(db ethdb.KeyValueWriter, hash common.Hash, number uint64) {
DeleteReceipts(db, hash, number)

View file

@ -464,6 +464,48 @@ func TestAncientStorage(t *testing.T) {
}
}
func TestWriteAncientHeaderChain(t *testing.T) {
db, err := NewDatabaseWithFreezer(NewMemoryDatabase(), t.TempDir(), "", false)
if err != nil {
t.Fatalf("failed to create database with ancient backend")
}
defer db.Close()
// Create a test block
var headers []*types.Header
headers = append(headers, &types.Header{
Number: big.NewInt(0),
Extra: []byte("test block"),
UncleHash: types.EmptyUncleHash,
TxHash: types.EmptyTxsHash,
ReceiptHash: types.EmptyReceiptsHash,
})
headers = append(headers, &types.Header{
Number: big.NewInt(1),
Extra: []byte("test block"),
UncleHash: types.EmptyUncleHash,
TxHash: types.EmptyTxsHash,
ReceiptHash: types.EmptyReceiptsHash,
})
// Write and verify the header in the database
WriteAncientHeaderChain(db, headers)
for _, header := range headers {
if blob := ReadHeaderRLP(db, header.Hash(), header.Number.Uint64()); len(blob) == 0 {
t.Fatalf("no header returned")
}
if h := ReadCanonicalHash(db, header.Number.Uint64()); h != header.Hash() {
t.Fatalf("no canonical hash returned")
}
if blob := ReadBodyRLP(db, header.Hash(), header.Number.Uint64()); len(blob) != 0 {
t.Fatalf("unexpected body returned")
}
if blob := ReadReceiptsRLP(db, header.Hash(), header.Number.Uint64()); len(blob) != 0 {
t.Fatalf("unexpected body returned")
}
}
}
func TestCanonicalHashIteration(t *testing.T) {
var cases = []struct {
from, to uint64

View file

@ -58,9 +58,10 @@ type txIndexer struct {
// newTxIndexer initializes the transaction indexer.
func newTxIndexer(limit uint64, chain *BlockChain) *txIndexer {
cutoff, _ := chain.HistoryPruningCutoff()
indexer := &txIndexer{
limit: limit,
cutoff: chain.HistoryPruningCutoff(),
cutoff: cutoff,
db: chain.db,
progress: make(chan chan TxIndexProgress),
term: make(chan chan struct{}),

View file

@ -154,13 +154,18 @@ func New(stack *node.Node, config *ethconfig.Config) (*Ethereum, error) {
}
// Validate history pruning configuration.
var historyPruningCutoff uint64
var (
cutoffNumber uint64
cutoffHash common.Hash
)
if config.HistoryMode == ethconfig.PostMergeHistory {
prunecfg, ok := ethconfig.HistoryPrunePoints[genesisHash]
if !ok {
return nil, fmt.Errorf("no history pruning point is defined for genesis %x", genesisHash)
}
historyPruningCutoff = prunecfg.BlockNumber
cutoffNumber = prunecfg.BlockNumber
cutoffHash = prunecfg.BlockHash
log.Info("Chain cutoff configured", "number", cutoffNumber, "hash", cutoffHash)
}
// Set networkID to chainID by default.
@ -213,7 +218,8 @@ func New(stack *node.Node, config *ethconfig.Config) (*Ethereum, error) {
Preimages: config.Preimages,
StateHistory: config.StateHistory,
StateScheme: scheme,
HistoryPruningCutoff: historyPruningCutoff,
HistoryPruningCutoffNumber: cutoffNumber,
HistoryPruningCutoffHash: cutoffHash,
}
)
if config.VMTrace != "" {
@ -246,7 +252,7 @@ func New(stack *node.Node, config *ethconfig.Config) (*Ethereum, error) {
HashScheme: scheme == rawdb.HashScheme,
}
chainView := eth.newChainView(eth.blockchain.CurrentBlock())
historyCutoff := eth.blockchain.HistoryPruningCutoff()
historyCutoff, _ := eth.blockchain.HistoryPruningCutoff()
var finalBlock uint64
if fb := eth.blockchain.CurrentFinalBlock(); fb != nil {
finalBlock = fb.Number.Uint64()
@ -443,7 +449,7 @@ func (s *Ethereum) updateFilterMapsHeads() {
if head == nil || newHead.Hash() != head.Hash() {
head = newHead
chainView := s.newChainView(head)
historyCutoff := s.blockchain.HistoryPruningCutoff()
historyCutoff, _ := s.blockchain.HistoryPruningCutoff()
var finalBlock uint64
if fb := s.blockchain.CurrentFinalBlock(); fb != nil {
finalBlock = fb.Number.Uint64()

View file

@ -273,8 +273,7 @@ func (d *Downloader) findBeaconAncestor() (uint64, error) {
// fetchHeaders feeds skeleton headers to the downloader queue for scheduling
// until sync errors or is finished.
func (d *Downloader) fetchHeaders(from uint64) error {
var head *types.Header
_, tail, _, err := d.skeleton.Bounds()
head, tail, _, err := d.skeleton.Bounds()
if err != nil {
return err
}
@ -294,6 +293,27 @@ func (d *Downloader) fetchHeaders(from uint64) error {
fsHeaderContCheckTimer := time.NewTimer(fsHeaderContCheck)
defer fsHeaderContCheckTimer.Stop()
// Verify the header at configured chain cutoff, ensuring it's matched with
// the configured hash. Skip the check if the configured cutoff is even higher
// than the sync target, which is definitely not a common case.
if d.chainCutoffNumber != 0 && d.chainCutoffNumber >= from && d.chainCutoffNumber <= head.Number.Uint64() {
h := d.skeleton.Header(d.chainCutoffNumber)
if h == nil {
if d.chainCutoffNumber < tail.Number.Uint64() {
dist := tail.Number.Uint64() - d.chainCutoffNumber
if len(localHeaders) >= int(dist) {
h = localHeaders[dist-1]
}
}
}
if h == nil {
return fmt.Errorf("header at chain cutoff is not available, cutoff: %d", d.chainCutoffNumber)
}
if h.Hash() != d.chainCutoffHash {
return fmt.Errorf("header at chain cutoff mismatched, want: %v, got: %v", d.chainCutoffHash, h.Hash())
}
}
for {
// Some beacon headers might have appeared since the last cycle, make
// sure we're always syncing to all available ones

View file

@ -20,6 +20,7 @@ package downloader
import (
"errors"
"fmt"
"sort"
"sync"
"sync/atomic"
"time"
@ -120,6 +121,12 @@ type Downloader struct {
committed atomic.Bool
ancientLimit uint64 // The maximum block number which can be regarded as ancient data.
// The cutoff block number and hash before which chain segments (bodies
// and receipts) are skipped during synchronization. 0 means the entire
// chain segment is aimed for synchronization.
chainCutoffNumber uint64
chainCutoffHash common.Hash
// Channels
headerProcCh chan *headerTask // Channel to feed the header processor new tasks
@ -163,9 +170,6 @@ type BlockChain interface {
// CurrentHeader retrieves the head header from the local chain.
CurrentHeader() *types.Header
// InsertHeaderChain inserts a batch of headers into the local chain.
InsertHeaderChain([]*types.Header) (int, error)
// SetHead rewinds the local chain to a new head.
SetHead(uint64) error
@ -187,10 +191,17 @@ type BlockChain interface {
// SnapSyncCommitHead directly commits the head block to a certain entity.
SnapSyncCommitHead(common.Hash) error
// InsertHeadersBeforeCutoff inserts a batch of headers before the configured
// chain cutoff into the ancient store.
InsertHeadersBeforeCutoff([]*types.Header) (int, error)
// InsertChain inserts a batch of blocks into the local chain.
InsertChain(types.Blocks) (int, error)
// InsertReceiptChain inserts a batch of receipts into the local chain.
// InsertReceiptChain inserts a batch of blocks along with their receipts
// into the local chain. Blocks older than the specified `ancientLimit`
// are stored directly in the ancient store, while newer blocks are stored
// in the live key-value store.
InsertReceiptChain(types.Blocks, []types.Receipts, uint64) (int, error)
// Snapshots returns the blockchain snapshot tree to paused it during sync.
@ -199,16 +210,23 @@ type BlockChain interface {
// TrieDB retrieves the low level trie database used for interacting
// with trie nodes.
TrieDB() *triedb.Database
// HistoryPruningCutoff returns the configured history pruning point.
// Block bodies along with the receipts will be skipped for synchronization.
HistoryPruningCutoff() (uint64, common.Hash)
}
// New creates a new downloader to fetch hashes and blocks from remote peers.
func New(stateDb ethdb.Database, mux *event.TypeMux, chain BlockChain, dropPeer peerDropFn, success func()) *Downloader {
cutoffNumber, cutoffHash := chain.HistoryPruningCutoff()
dl := &Downloader{
stateDB: stateDb,
mux: mux,
queue: newQueue(blockCacheMaxItems, blockCacheInitialItems),
peers: newPeerSet(),
blockchain: chain,
chainCutoffNumber: cutoffNumber,
chainCutoffHash: cutoffHash,
dropPeer: dropPeer,
headerProcCh: make(chan *headerTask, 1),
quitCh: make(chan struct{}),
@ -503,6 +521,12 @@ func (d *Downloader) syncToHead() (err error) {
} else {
d.ancientLimit = 0
}
// Extend the ancient chain segment range if the ancient limit is even
// below the pre-configured chain cutoff.
if d.chainCutoffNumber != 0 && d.chainCutoffNumber > d.ancientLimit {
d.ancientLimit = d.chainCutoffNumber
log.Info("Extend the ancient range with configured cutoff", "cutoff", d.chainCutoffNumber)
}
frozen, _ := d.stateDB.Ancients() // Ignore the error here since light client can also hit here.
// If a part of blockchain data has already been written into active store,
@ -521,14 +545,23 @@ func (d *Downloader) syncToHead() (err error) {
log.Info("Truncated excess ancient chain segment", "oldhead", frozen-1, "newhead", origin)
}
}
// Skip ancient chain segments if Geth is running with a configured chain cutoff.
// These segments are not guaranteed to be available in the network.
chainOffset := origin + 1
if mode == ethconfig.SnapSync && d.chainCutoffNumber != 0 {
if chainOffset < d.chainCutoffNumber {
chainOffset = d.chainCutoffNumber
log.Info("Skip chain segment before cutoff", "origin", origin, "cutoff", d.chainCutoffNumber)
}
}
// Initiate the sync using a concurrent header and content retrieval algorithm
d.queue.Prepare(origin+1, mode)
d.queue.Prepare(chainOffset, mode)
// In beacon mode, headers are served by the skeleton syncer
fetchers := []func() error{
func() error { return d.fetchHeaders(origin + 1) }, // Headers are always retrieved
func() error { return d.fetchBodies(origin + 1) }, // Bodies are retrieved during normal and snap sync
func() error { return d.fetchReceipts(origin + 1) }, // Receipts are retrieved during snap sync
func() error { return d.fetchBodies(chainOffset) }, // Bodies are retrieved during normal and snap sync
func() error { return d.fetchReceipts(chainOffset) }, // Receipts are retrieved during snap sync
func() error { return d.processHeaders(origin + 1) },
}
if mode == ethconfig.SnapSync {
@ -666,7 +699,7 @@ func (d *Downloader) processHeaders(origin uint64) error {
return nil
}
// Otherwise split the chunk of headers into batches and process them
headers, hashes := task.headers, task.hashes
headers, hashes, scheduled := task.headers, task.hashes, false
for len(headers) > 0 {
// Terminate if something failed in between processing chunks
@ -683,17 +716,21 @@ func (d *Downloader) processHeaders(origin uint64) error {
chunkHeaders := headers[:limit]
chunkHashes := hashes[:limit]
// In case of header only syncing, validate the chunk immediately
if mode == ethconfig.SnapSync {
// Although the received headers might be all valid, a legacy
// PoW/PoA sync must not accept post-merge headers. Make sure
// that any transition is rejected at this point.
if len(chunkHeaders) > 0 {
if n, err := d.blockchain.InsertHeaderChain(chunkHeaders); err != nil {
log.Warn("Invalid header encountered", "number", chunkHeaders[n].Number, "hash", chunkHashes[n], "parent", chunkHeaders[n].ParentHash, "err", err)
// Split the headers around the chain cutoff
var cutoff int
if mode == ethconfig.SnapSync && d.chainCutoffNumber != 0 {
cutoff = sort.Search(len(chunkHeaders), func(i int) bool {
return chunkHeaders[i].Number.Uint64() >= d.chainCutoffNumber
})
}
// Insert the header chain into the ancient store (with block bodies and
// receipts set to nil) if they fall before the cutoff.
if mode == ethconfig.SnapSync && cutoff != 0 {
if n, err := d.blockchain.InsertHeadersBeforeCutoff(chunkHeaders[:cutoff]); err != nil {
log.Warn("Failed to insert ancient header chain", "number", chunkHeaders[n].Number, "hash", chunkHashes[n], "parent", chunkHeaders[n].ParentHash, "err", err)
return fmt.Errorf("%w: %v", errInvalidChain, err)
}
}
log.Debug("Inserted headers before cutoff", "number", chunkHeaders[cutoff-1].Number, "hash", chunkHashes[cutoff-1])
}
// If we've reached the allowed number of pending headers, stall a bit
for d.queue.PendingBodies() >= maxQueuedHeaders || d.queue.PendingReceipts() >= maxQueuedHeaders {
@ -704,12 +741,21 @@ func (d *Downloader) processHeaders(origin uint64) error {
case <-timer.C:
}
}
// Otherwise insert the headers for content retrieval
inserts := d.queue.Schedule(chunkHeaders, chunkHashes, origin)
if len(inserts) != len(chunkHeaders) {
// Otherwise, schedule the headers for content retrieval (block bodies and
// potentially receipts in snap sync).
//
// Skip the bodies/receipts retrieval scheduling before the cutoff in snap
// sync if chain pruning is configured.
if mode == ethconfig.SnapSync && cutoff != 0 {
chunkHeaders = chunkHeaders[cutoff:]
chunkHashes = chunkHashes[cutoff:]
}
if len(chunkHeaders) > 0 {
scheduled = true
if d.queue.Schedule(chunkHeaders, chunkHashes, origin+uint64(cutoff)) != len(chunkHeaders) {
return fmt.Errorf("%w: stale headers", errBadPeer)
}
}
headers = headers[limit:]
hashes = hashes[limit:]
origin += uint64(limit)
@ -721,7 +767,8 @@ func (d *Downloader) processHeaders(origin uint64) error {
}
d.syncStatsLock.Unlock()
// Signal the content downloaders of the availability of new tasks
// Signal the downloader of the availability of new tasks
if scheduled {
for _, ch := range []chan bool{d.queue.blockWakeCh, d.queue.receiptWakeCh} {
select {
case ch <- true:
@ -730,6 +777,7 @@ func (d *Downloader) processHeaders(origin uint64) error {
}
}
}
}
}
// processFullSyncContent takes fetch results from the queue and imports them into the chain.
@ -1085,10 +1133,20 @@ func (d *Downloader) reportSnapSyncProgress(force bool) {
header = d.blockchain.CurrentHeader()
block = d.blockchain.CurrentSnapBlock()
)
syncedBlocks := block.Number.Uint64() - d.syncStartBlock
if syncedBlocks == 0 {
// Prevent reporting if nothing has been synchronized yet
if block.Number.Uint64() <= d.syncStartBlock {
return
}
// Prevent reporting noise if the actual chain synchronization (headers
// and bodies) hasn't started yet. Inserting the ancient header chain is
// fast enough and would introduce significant bias if included in the count.
if d.chainCutoffNumber != 0 && block.Number.Uint64() <= d.chainCutoffNumber {
return
}
fetchedBlocks := block.Number.Uint64() - d.syncStartBlock
if d.chainCutoffNumber != 0 && d.chainCutoffNumber > d.syncStartBlock {
fetchedBlocks = block.Number.Uint64() - d.chainCutoffNumber
}
// Retrieve the current chain head and calculate the ETA
latest, _, _, err := d.skeleton.Bounds()
if err != nil {
@ -1103,7 +1161,7 @@ func (d *Downloader) reportSnapSyncProgress(force bool) {
}
var (
left = latest.Number.Uint64() - block.Number.Uint64()
eta = time.Since(d.syncStartTime) / time.Duration(syncedBlocks) * time.Duration(left)
eta = time.Since(d.syncStartTime) / time.Duration(fetchedBlocks) * time.Duration(left)
progress = fmt.Sprintf("%.2f%%", float64(block.Number.Uint64())*100/float64(latest.Number.Uint64()))
headers = fmt.Sprintf("%v@%v", log.FormatLogfmtUint64(header.Number.Uint64()), common.StorageSize(headerBytes).TerminalString())

View file

@ -25,7 +25,6 @@ import (
var (
headerInMeter = metrics.NewRegisteredMeter("eth/downloader/headers/in", nil)
headerReqTimer = metrics.NewRegisteredTimer("eth/downloader/headers/req", nil)
headerDropMeter = metrics.NewRegisteredMeter("eth/downloader/headers/drop", nil)
headerTimeoutMeter = metrics.NewRegisteredMeter("eth/downloader/headers/timeout", nil)
bodyInMeter = metrics.NewRegisteredMeter("eth/downloader/bodies/in", nil)

View file

@ -73,7 +73,7 @@ type fetchResult struct {
Withdrawals types.Withdrawals
}
func newFetchResult(header *types.Header, fastSync bool) *fetchResult {
func newFetchResult(header *types.Header, snapSync bool) *fetchResult {
item := &fetchResult{
Header: header,
}
@ -82,7 +82,7 @@ func newFetchResult(header *types.Header, fastSync bool) *fetchResult {
} else if header.WithdrawalsHash != nil {
item.Withdrawals = make(types.Withdrawals, 0)
}
if fastSync && !header.EmptyReceipts() {
if snapSync && !header.EmptyReceipts() {
item.pending.Store(item.pending.Load() | (1 << receiptType))
}
return item
@ -125,18 +125,7 @@ func (f *fetchResult) Done(kind uint) bool {
// queue represents hashes that are either need fetching or are being fetched
type queue struct {
mode SyncMode // Synchronisation mode to decide on the block parts to schedule for fetching
// Headers are "special", they download in batches, supported by a skeleton chain
headerHead common.Hash // Hash of the last queued header to verify order
headerTaskPool map[uint64]*types.Header // Pending header retrieval tasks, mapping starting indexes to skeleton headers
headerTaskQueue *prque.Prque[int64, uint64] // Priority queue of the skeleton indexes to fetch the filling headers for
headerPeerMiss map[string]map[uint64]struct{} // Set of per-peer header batches known to be unavailable
headerPendPool map[string]*fetchRequest // Currently pending header retrieval operations
headerResults []*types.Header // Result cache accumulating the completed headers
headerHashes []common.Hash // Result cache accumulating the completed header hashes
headerProced int // Number of headers already processed from the results
headerOffset uint64 // Number of the first header in the result cache
headerContCh chan bool // Channel to notify when header download finishes
// All data retrievals below are based on an already assembles header chain
blockTaskPool map[common.Hash]*types.Header // Pending block (body) retrieval tasks, mapping hashes to headers
@ -163,7 +152,6 @@ type queue struct {
func newQueue(blockCacheLimit int, thresholdInitialSize int) *queue {
lock := new(sync.RWMutex)
q := &queue{
headerContCh: make(chan bool, 1),
blockTaskQueue: prque.New[int64, *types.Header](nil),
blockWakeCh: make(chan bool, 1),
receiptTaskQueue: prque.New[int64, *types.Header](nil),
@ -182,9 +170,7 @@ func (q *queue) Reset(blockCacheLimit int, thresholdInitialSize int) {
q.closed = false
q.mode = ethconfig.FullSync
q.headerHead = common.Hash{}
q.headerPendPool = make(map[string]*fetchRequest)
q.blockTaskPool = make(map[common.Hash]*types.Header)
q.blockTaskQueue.Reset()
@ -207,14 +193,6 @@ func (q *queue) Close() {
q.lock.Unlock()
}
// PendingHeaders retrieves the number of header requests pending for retrieval.
func (q *queue) PendingHeaders() int {
q.lock.Lock()
defer q.lock.Unlock()
return q.headerTaskQueue.Size()
}
// PendingBodies retrieves the number of block body requests pending for retrieval.
func (q *queue) PendingBodies() int {
q.lock.Lock()
@ -260,54 +238,14 @@ func (q *queue) Idle() bool {
return (queued + pending) == 0
}
// ScheduleSkeleton adds a batch of header retrieval tasks to the queue to fill
// up an already retrieved header skeleton.
func (q *queue) ScheduleSkeleton(from uint64, skeleton []*types.Header) {
q.lock.Lock()
defer q.lock.Unlock()
// No skeleton retrieval can be in progress, fail hard if so (huge implementation bug)
if q.headerResults != nil {
panic("skeleton assembly already in progress")
}
// Schedule all the header retrieval tasks for the skeleton assembly
q.headerTaskPool = make(map[uint64]*types.Header)
q.headerTaskQueue = prque.New[int64, uint64](nil)
q.headerPeerMiss = make(map[string]map[uint64]struct{}) // Reset availability to correct invalid chains
q.headerResults = make([]*types.Header, len(skeleton)*MaxHeaderFetch)
q.headerHashes = make([]common.Hash, len(skeleton)*MaxHeaderFetch)
q.headerProced = 0
q.headerOffset = from
q.headerContCh = make(chan bool, 1)
for i, header := range skeleton {
index := from + uint64(i*MaxHeaderFetch)
q.headerTaskPool[index] = header
q.headerTaskQueue.Push(index, -int64(index))
}
}
// RetrieveHeaders retrieves the header chain assemble based on the scheduled
// skeleton.
func (q *queue) RetrieveHeaders() ([]*types.Header, []common.Hash, int) {
q.lock.Lock()
defer q.lock.Unlock()
headers, hashes, proced := q.headerResults, q.headerHashes, q.headerProced
q.headerResults, q.headerHashes, q.headerProced = nil, nil, 0
return headers, hashes, proced
}
// Schedule adds a set of headers for the download queue for scheduling, returning
// the new headers encountered.
func (q *queue) Schedule(headers []*types.Header, hashes []common.Hash, from uint64) []*types.Header {
func (q *queue) Schedule(headers []*types.Header, hashes []common.Hash, from uint64) int {
q.lock.Lock()
defer q.lock.Unlock()
// Insert all the headers prioritised by the contained block number
inserts := make([]*types.Header, 0, len(headers))
var inserts int
for i, header := range headers {
// Make sure chain order is honoured and preserved throughout
hash := hashes[i]
@ -337,7 +275,7 @@ func (q *queue) Schedule(headers []*types.Header, hashes []common.Hash, from uin
q.receiptTaskQueue.Push(header, -int64(header.Number.Uint64()))
}
}
inserts = append(inserts, header)
inserts++
q.headerHead = hash
from++
}
@ -390,7 +328,7 @@ func (q *queue) Results(block bool) []*fetchResult {
q.resultSize = common.StorageSize(blockCacheSizeWeight)*size +
(1-common.StorageSize(blockCacheSizeWeight))*q.resultSize
}
// Using the newly calibrated resultsize, figure out the new throttle limit
// Using the newly calibrated result size, figure out the new throttle limit
// on the result cache
throttleThreshold := uint64((common.StorageSize(blockCacheMemory) + q.resultSize - 1) / q.resultSize)
throttleThreshold = q.resultCache.SetThrottleThreshold(throttleThreshold)
@ -428,46 +366,6 @@ func (q *queue) stats() []interface{} {
}
}
// ReserveHeaders reserves a set of headers for the given peer, skipping any
// previously failed batches.
func (q *queue) ReserveHeaders(p *peerConnection, count int) *fetchRequest {
q.lock.Lock()
defer q.lock.Unlock()
// Short circuit if the peer's already downloading something (sanity check to
// not corrupt state)
if _, ok := q.headerPendPool[p.id]; ok {
return nil
}
// Retrieve a batch of hashes, skipping previously failed ones
send, skip := uint64(0), []uint64{}
for send == 0 && !q.headerTaskQueue.Empty() {
from, _ := q.headerTaskQueue.Pop()
if q.headerPeerMiss[p.id] != nil {
if _, ok := q.headerPeerMiss[p.id][from]; ok {
skip = append(skip, from)
continue
}
}
send = from
}
// Merge all the skipped batches back
for _, from := range skip {
q.headerTaskQueue.Push(from, -int64(from))
}
// Assemble and return the block download request
if send == 0 {
return nil
}
request := &fetchRequest{
Peer: p,
From: send,
Time: time.Now(),
}
q.headerPendPool[p.id] = request
return request
}
// ReserveBodies reserves a set of body fetches for the given peer, skipping any
// previously failed downloads. Beside the next batch of needed fetches, it also
// returns a flag whether empty blocks were queued requiring processing.
@ -594,10 +492,6 @@ func (q *queue) Revoke(peerID string) {
q.lock.Lock()
defer q.lock.Unlock()
if request, ok := q.headerPendPool[peerID]; ok {
q.headerTaskQueue.Push(request.From, -int64(request.From))
delete(q.headerPendPool, peerID)
}
if request, ok := q.blockPendPool[peerID]; ok {
for _, header := range request.Headers {
q.blockTaskQueue.Push(header, -int64(header.Number.Uint64()))
@ -612,16 +506,6 @@ func (q *queue) Revoke(peerID string) {
}
}
// ExpireHeaders cancels a request that timed out and moves the pending fetch
// task back into the queue for rescheduling.
func (q *queue) ExpireHeaders(peer string) int {
q.lock.Lock()
defer q.lock.Unlock()
headerTimeoutMeter.Mark(1)
return q.expire(peer, q.headerPendPool, q.headerTaskQueue)
}
// ExpireBodies checks for in flight block body requests that exceeded a timeout
// allowance, canceling them and returning the responsible peers for penalisation.
func (q *queue) ExpireBodies(peer string) int {
@ -670,116 +554,6 @@ func (q *queue) expire(peer string, pendPool map[string]*fetchRequest, taskQueue
return len(req.Headers)
}
// DeliverHeaders injects a header retrieval response into the header results
// cache. This method either accepts all headers it received, or none of them
// if they do not map correctly to the skeleton.
//
// If the headers are accepted, the method makes an attempt to deliver the set
// of ready headers to the processor to keep the pipeline full. However, it will
// not block to prevent stalling other pending deliveries.
func (q *queue) DeliverHeaders(id string, headers []*types.Header, hashes []common.Hash, headerProcCh chan *headerTask) (int, error) {
q.lock.Lock()
defer q.lock.Unlock()
var logger log.Logger
if len(id) < 16 {
// Tests use short IDs, don't choke on them
logger = log.New("peer", id)
} else {
logger = log.New("peer", id[:16])
}
// Short circuit if the data was never requested
request := q.headerPendPool[id]
if request == nil {
headerDropMeter.Mark(int64(len(headers)))
return 0, errNoFetchesPending
}
delete(q.headerPendPool, id)
headerReqTimer.UpdateSince(request.Time)
headerInMeter.Mark(int64(len(headers)))
// Ensure headers can be mapped onto the skeleton chain
target := q.headerTaskPool[request.From].Hash()
accepted := len(headers) == MaxHeaderFetch
if accepted {
if headers[0].Number.Uint64() != request.From {
logger.Trace("First header broke chain ordering", "number", headers[0].Number, "hash", hashes[0], "expected", request.From)
accepted = false
} else if hashes[len(headers)-1] != target {
logger.Trace("Last header broke skeleton structure ", "number", headers[len(headers)-1].Number, "hash", hashes[len(headers)-1], "expected", target)
accepted = false
}
}
if accepted {
parentHash := hashes[0]
for i, header := range headers[1:] {
hash := hashes[i+1]
if want := request.From + 1 + uint64(i); header.Number.Uint64() != want {
logger.Warn("Header broke chain ordering", "number", header.Number, "hash", hash, "expected", want)
accepted = false
break
}
if parentHash != header.ParentHash {
logger.Warn("Header broke chain ancestry", "number", header.Number, "hash", hash)
accepted = false
break
}
// Set-up parent hash for next round
parentHash = hash
}
}
// If the batch of headers wasn't accepted, mark as unavailable
if !accepted {
logger.Trace("Skeleton filling not accepted", "from", request.From)
headerDropMeter.Mark(int64(len(headers)))
miss := q.headerPeerMiss[id]
if miss == nil {
q.headerPeerMiss[id] = make(map[uint64]struct{})
miss = q.headerPeerMiss[id]
}
miss[request.From] = struct{}{}
q.headerTaskQueue.Push(request.From, -int64(request.From))
return 0, errors.New("delivery not accepted")
}
// Clean up a successful fetch and try to deliver any sub-results
copy(q.headerResults[request.From-q.headerOffset:], headers)
copy(q.headerHashes[request.From-q.headerOffset:], hashes)
delete(q.headerTaskPool, request.From)
ready := 0
for q.headerProced+ready < len(q.headerResults) && q.headerResults[q.headerProced+ready] != nil {
ready += MaxHeaderFetch
}
if ready > 0 {
// Headers are ready for delivery, gather them and push forward (non blocking)
processHeaders := make([]*types.Header, ready)
copy(processHeaders, q.headerResults[q.headerProced:q.headerProced+ready])
processHashes := make([]common.Hash, ready)
copy(processHashes, q.headerHashes[q.headerProced:q.headerProced+ready])
select {
case headerProcCh <- &headerTask{
headers: processHeaders,
hashes: processHashes,
}:
logger.Trace("Pre-scheduled new headers", "count", len(processHeaders), "from", processHeaders[0].Number)
q.headerProced += len(processHeaders)
default:
}
}
// Check for termination and return
if len(q.headerTaskPool) == 0 {
q.headerContCh <- false
}
return len(headers), nil
}
// DeliverBodies injects a block body retrieval response into the results queue.
// The method returns the number of blocks bodies accepted from the delivery and
// also wakes any threads waiting for data delivery.

View file

@ -76,7 +76,7 @@ func (r *resultStore) SetThrottleThreshold(threshold uint64) uint64 {
// throttled - if true, the store is at capacity, this particular header is not prio now
// item - the result to store data into
// err - any error that occurred
func (r *resultStore) AddFetch(header *types.Header, fastSync bool) (stale, throttled bool, item *fetchResult, err error) {
func (r *resultStore) AddFetch(header *types.Header, snapSync bool) (stale, throttled bool, item *fetchResult, err error) {
r.lock.Lock()
defer r.lock.Unlock()
@ -86,7 +86,7 @@ func (r *resultStore) AddFetch(header *types.Header, fastSync bool) (stale, thro
return stale, throttled, item, err
}
if item == nil {
item = newFetchResult(header, fastSync)
item = newFetchResult(header, snapSync)
r.items[index] = item
}
return stale, throttled, item, err