diff --git a/eth/protocols/snap/syncv2.go b/eth/protocols/snap/syncv2.go index bad52de184..161e5d2058 100644 --- a/eth/protocols/snap/syncv2.go +++ b/eth/protocols/snap/syncv2.go @@ -38,6 +38,7 @@ import ( "github.com/ethereum/go-ethereum/event" "github.com/ethereum/go-ethereum/log" "github.com/ethereum/go-ethereum/p2p/msgrate" + "github.com/ethereum/go-ethereum/params" "github.com/ethereum/go-ethereum/rlp" "github.com/ethereum/go-ethereum/trie" "github.com/ethereum/go-ethereum/trie/trienode" @@ -55,6 +56,22 @@ const ( // the assumption that the gas limit is 60M. maxAccessListRequestCount = 28 + // maxCatchUpBlocks is the maximum gap (in blocks) that BAL catch-up is + // allowed to span. BALs are only retained by peers for a limited window + // (roughly two weeks, ~100k blocks at 12s block time). If the pivot has + // moved further than this conservative bound, the BALs needed to roll the + // flat state forward are likely no longer available, so we discard the + // stale progress and restart the sync from scratch rather than attempting + // a catch-up that is bound to fail partway through. + maxCatchUpBlocks = params.FullImmutabilityThreshold + + // catchUpWindow is the number of blocks BAL catch-up fetches and applies at + // a time. The whole gap can span up to maxCatchUpBlocks, so fetching it in + // one shot would buffer every block's BAL (~100 KiB each) in memory before + // applying any. Processing the gap in bounded windows caps peak memory to a + // single window's worth of BALs. + catchUpWindow = 512 + // syncProgressVersion is the version byte prepended to the JSON-encoded // syncProgressV2 when persisted. On load, a mismatching version byte causes // the persisted progress to be discarded and sync to start fresh. @@ -387,6 +404,8 @@ type syncerV2 struct { startTime time.Time // Time instance when snapshot sync started logTime time.Time // Time instance when status was last reported + catchUpWindow uint64 // Number of blocks fetched/applied per BAL catch-up window (overridable in tests) + pend sync.WaitGroup // Tracks network request goroutines for graceful shutdown lock sync.RWMutex // Protects fields that can change outside of sync (peers, reqs, pivot) } @@ -415,7 +434,8 @@ func newSyncerV2(db ethdb.Database, scheme string) *syncerV2 { bytecodeReqs: make(map[uint64]*bytecodeRequestV2), accessListReqs: make(map[uint64]*accessListRequest), - extProgress: new(syncProgressV2), + extProgress: new(syncProgressV2), + catchUpWindow: catchUpWindow, } if raw := rawdb.ReadSnapshotSyncStatus(db); len(raw) > 0 && raw[0] == syncProgressVersion { var progress syncProgressV2 @@ -559,10 +579,18 @@ func (s *syncerV2) Sync(target *types.Header, cancel chan struct{}) error { // progress is still usable. If yes, roll forward via BAL catch-up. If not, // wipe everything and restart fresh. if isPivotChanged { - if isPivotReorged(s.db, prevPivot, target) { + switch { + case isPivotReorged(s.db, prevPivot, target): log.Warn("Restarting snap sync from scratch", "oldnumber", prevPivot.Number, "oldHash", prevPivot.Hash()) s.resetSyncState() - } else { + case catchUpExceedsRetention(prevPivot, target): + // The pivot moved further than the BAL retention window. The access + // lists required for catch-up are almost certainly unavailable from + // peers, so discard the stale progress and resync from scratch + // instead of starting a catch-up doomed to stall. + log.Warn("Catch-up gap exceeds BAL retention, restarting snap sync from scratch", "oldnumber", prevPivot.Number, "newnumber", target.Number, "gap", new(big.Int).Sub(target.Number, prevPivot.Number), "limit", maxCatchUpBlocks) + s.resetSyncState() + default: // A canonical pivot move past a frozen pivot should be impossible: // the downloader both refuses moves (FrozenPivot) and resumes new // cycles against the frozen header itself. Reaching this branch @@ -728,6 +756,15 @@ func isPivotReorged(db ethdb.Database, prev, curr *types.Header) bool { return canonical != prev.Hash() } +// catchUpExceedsRetention reports whether rolling the flat state forward from +// prev to curr would span more blocks than peers are expected to retain BALs +// for. Beyond this bound the access lists needed for catch-up are likely gone, +// so the caller should wipe and resync from scratch instead. +func catchUpExceedsRetention(prev, curr *types.Header) bool { + gap := new(big.Int).Sub(curr.Number, prev.Number) + return gap.Cmp(big.NewInt(maxCatchUpBlocks)) > 0 +} + // catchUp runs the BAL catch-up. When the pivot has moved, it fetches BALs // for the gap blocks, verifies them against block headers, and applies the // diffs to roll flat state forward. @@ -738,69 +775,77 @@ func (s *syncerV2) catchUp(target *types.Header, cancel chan struct{}) error { s.lock.RUnlock() log.Info("Starting BAL catch-up", "from", from, "to", to, "blocks", to-from+1) - // Collect block hashes and headers for the gap range. - var ( - hashes = make([]common.Hash, 0, to-from+1) - headers = make(map[common.Hash]*types.Header, to-from+1) - ) - for num := from; num <= to; num++ { - hash := rawdb.ReadCanonicalHash(s.db, num) - if hash == (common.Hash{}) { - return fmt.Errorf("missing canonical hash for block %d during catch-up", num) - } - header := rawdb.ReadHeader(s.db, hash, num) - if header == nil { - return fmt.Errorf("missing header for block %d (hash %v) during catch-up", num, hash) - } - hashes = append(hashes, hash) - headers[hash] = header - } - - // Fetch BALs from peers - rawBALs, err := s.fetchAccessLists(hashes, headers, cancel) - if err != nil { - return err - } - - // Apply each BAL in block order. BALs are already verified by fetchAccessLists. - for i, raw := range rawBALs { + for start := from; start <= to; start += s.catchUpWindow { select { case <-cancel: return ErrCancelled default: } - num := from + uint64(i) - hash := hashes[i] - - // Decode the raw RLP into a BAL. + end := start + s.catchUpWindow - 1 + if end > to { + end = to + } + // Collect block hashes and headers for this window. var ( - b bal.BlockAccessList - batch = s.db.NewBatch() + hashes = make([]common.Hash, 0, end-start+1) + headers = make(map[common.Hash]*types.Header, end-start+1) ) - if err := rlp.DecodeBytes(raw, &b); err != nil { - return fmt.Errorf("failed to decode BAL for block %d: %v", num, err) + for num := start; num <= end; num++ { + hash := rawdb.ReadCanonicalHash(s.db, num) + if hash == (common.Hash{}) { + return fmt.Errorf("missing canonical hash for block %d during catch-up", num) + } + header := rawdb.ReadHeader(s.db, hash, num) + if header == nil { + return fmt.Errorf("missing header for block %d (hash %v) during catch-up", num, hash) + } + hashes = append(hashes, hash) + headers[hash] = header } - // applyAccessList failures are persistent. If a block's apply fails - // here, the next Sync will resume from this block and hit the same - // failure. Auto-recovery isn't implemented yet. - if err := s.applyAccessList(&b, batch); err != nil { - return fmt.Errorf("BAL application failed for block %d: %v", num, err) - } - - // Persist incremental progress so a crash mid-catchUp can resume - // from the next unapplied block. - s.lock.Lock() - s.pivot = headers[hash] - s.lock.Unlock() - s.saveSyncStatusWithDB(batch) - - // Commit the state transition alongside the sync progress atomically. - if err := batch.Write(); err != nil { + // Fetch this window's BALs from peers. + rawBALs, err := s.fetchAccessLists(hashes, headers, cancel) + if err != nil { return err } + + // Apply each BAL in block order. BALs are already verified by fetchAccessLists. + for i, raw := range rawBALs { + select { + case <-cancel: + return ErrCancelled + default: + } + num := start + uint64(i) + hash := hashes[i] + + // Decode the raw RLP into a BAL. + var ( + b bal.BlockAccessList + batch = s.db.NewBatch() + ) + if err := rlp.DecodeBytes(raw, &b); err != nil { + return fmt.Errorf("failed to decode BAL for block %d: %v", num, err) + } + if err := s.applyAccessList(&b, batch); err != nil { + return fmt.Errorf("BAL application failed for block %d: %v", num, err) + } + + // Persist incremental progress so a crash mid-catchUp can resume + // from the next unapplied block. + s.lock.Lock() + s.pivot = headers[hash] + s.lock.Unlock() + s.saveSyncStatusWithDB(batch) + + // Commit the state transition alongside the sync progress atomically. + if err := batch.Write(); err != nil { + return err + } + } + log.Info("BAL catch-up progress", "applied", end, "target", to, "remaining", to-end) } - log.Info("BAL catch-up complete", "blocks", len(rawBALs)) + log.Info("BAL catch-up complete", "from", from, "to", to) return nil } diff --git a/eth/protocols/snap/syncv2_test.go b/eth/protocols/snap/syncv2_test.go index aed7aac5f2..bad4123560 100644 --- a/eth/protocols/snap/syncv2_test.go +++ b/eth/protocols/snap/syncv2_test.go @@ -1895,6 +1895,141 @@ func testCatchUpPersistsIncrementally(t *testing.T, scheme string) { } } +// TestCatchUpWindowed verifies that catch-up correctly rolls the flat state +// forward when the gap spans several windows. With catchUpWindow shrunk to 2, +// a 5-block gap is processed as three windows ([A+1,A+2], [A+3,A+4], [A+5]), +// exercising the window boundaries. Every block's BAL must be fetched and +// applied, and the final pivot must reach the target. +func TestCatchUpWindowed(t *testing.T) { + t.Parallel() + testCatchUpWindowed(t, rawdb.HashScheme) + testCatchUpWindowed(t, rawdb.PathScheme) +} + +func testCatchUpWindowed(t *testing.T, scheme string) { + nodeScheme, sourceAccountTrie, elems, addrs := makeAccountTrieWithAddresses(100, scheme) + rootA := sourceAccountTrie.Hash() + numA := uint64(100) + + targetAddr := addrs[0] + targetHash := crypto.Keccak256Hash(targetAddr[:]) + + db := rawdb.NewMemoryDatabase() + emptyHash := common.Hash{} + zero := uint64(0) + + // Persist header + canonical hash for the base pivot A so reorg detection + // in Sync passes and catchUp (not reset) runs. + pivotA := &types.Header{ + Number: new(big.Int).SetUint64(numA), Root: rootA, Difficulty: common.Big0, + BaseFee: common.Big0, WithdrawalsHash: &emptyHash, + BlobGasUsed: &zero, ExcessBlobGas: &zero, + ParentBeaconRoot: &emptyHash, RequestsHash: &emptyHash, + } + rawdb.WriteHeader(db, pivotA) + rawdb.WriteCanonicalHash(db, pivotA.Hash(), numA) + + // Build a 5-block gap, each block bumping targetAddr's balance. The last + // block's balance is the expected final state. + const gap = 5 + var ( + lastHeader *types.Header + lastBalance *uint256.Int + balsByHash = make(map[common.Hash]rlp.RawValue, gap) + ) + for i := 0; i < gap; i++ { + blockNum := numA + uint64(i) + 1 + balance := uint256.NewInt(uint64(1000 * (i + 1))) + + cb := bal.NewConstructionBlockAccessList() + cb.BalanceChange(0, targetAddr, balance) + var buf bytes.Buffer + if err := cb.EncodeRLP(&buf); err != nil { + t.Fatal(err) + } + var b bal.BlockAccessList + if err := rlp.DecodeBytes(buf.Bytes(), &b); err != nil { + t.Fatal(err) + } + balHash := b.Hash() + header := &types.Header{ + Number: new(big.Int).SetUint64(blockNum), Difficulty: common.Big0, + BaseFee: common.Big0, WithdrawalsHash: &emptyHash, + BlobGasUsed: &zero, ExcessBlobGas: &zero, + ParentBeaconRoot: &emptyHash, RequestsHash: &emptyHash, + BlockAccessListHash: &balHash, + } + rawdb.WriteHeader(db, header) + rawdb.WriteCanonicalHash(db, header.Hash(), blockNum) + balsByHash[header.Hash()] = buf.Bytes() + lastHeader, lastBalance = header, balance + } + + // Seed sync to A: persisted state ends with pivot=A and full flat state. + { + var ( + once sync.Once + cancel = make(chan struct{}) + term = func() { once.Do(func() { close(cancel) }) } + ) + syncer := newSyncerV2(db, nodeScheme) + src := newTestPeerV2("seed", t, term) + src.accountTrie = sourceAccountTrie.Copy() + src.accountValues = elems + syncer.Register(src) + src.remote = syncer + if err := syncer.Sync(pivotA, cancel); err != nil { + t.Fatalf("seed sync failed: %v", err) + } + } + + // Run catch-up to A+5 directly with a window of 2, forcing three windows + // ([A+1,A+2], [A+3,A+4], [A+5]). Calling catchUp in isolation keeps the + // focus on the windowing logic without the surrounding download/trie-gen + // phases (which would need a real target state root). + var ( + once sync.Once + cancel = make(chan struct{}) + term = func() { once.Do(func() { close(cancel) }) } + ) + syncer := newSyncerV2(db, nodeScheme) + syncer.loadSyncStatus() // restores pivot=A from the seed sync + if syncer.pivot == nil || syncer.pivot.Number.Uint64() != numA { + t.Fatalf("expected restored pivot at block %d, got %v", numA, syncer.pivot) + } + syncer.catchUpWindow = 2 + src := newTestPeerV2("catchup", t, term) + src.accountTrie = sourceAccountTrie.Copy() + src.accountValues = elems + src.accessLists = balsByHash + syncer.Register(src) + src.remote = syncer + if err := syncer.catchUp(lastHeader, cancel); err != nil { + t.Fatalf("windowed catch-up failed: %v", err) + } + + // The account must reflect the final block's balance, proving every window + // (including the trailing partial one) was fetched and applied in order. + data := rawdb.ReadAccountSnapshot(db, targetHash) + if len(data) == 0 { + t.Fatal("target account missing after windowed catch-up") + } + account, err := types.FullAccount(data) + if err != nil { + t.Fatalf("failed to decode account: %v", err) + } + if account.Balance.Cmp(lastBalance) != 0 { + t.Errorf("balance after windowed catch-up: got %v, want %v", account.Balance, lastBalance) + } + + // The persisted pivot must have advanced all the way to the target. + loader := newSyncerV2(db, nodeScheme) + loader.loadSyncStatus() + if loader.pivot == nil || loader.pivot.Hash() != lastHeader.Hash() { + t.Errorf("persisted pivot did not reach target after windowed catch-up") + } +} + // TestSyncStatusMarkedCompleteAfterCompletion verifies that after a full sync // completes, the persisted sync status reaches the complete phase. This lets a // subsequent Sync call distinguish "already done" from "fresh node" and skip.