eth/protocols/snap: redo the snap sync if the bal is unavailable (#35181)
Some checks are pending
/ Linux Build (push) Waiting to run
/ Linux Build (arm) (push) Waiting to run
/ Keeper Build (push) Waiting to run
/ Windows Build (push) Waiting to run
/ Docker Image (push) Waiting to run

This PR introduces a new condition that if the local node falls behind
too much and the required BAL for catching up is very likely to be
unavailable, the entire snap sync will be restarting from scratch.

As the defined BAL retention window is weak-subjective-period which is
calculated dynamically. A more conservative threshold is used (90K
blocks) for robustness.

Apart from that, the BAL catchup will be divided into several spans and
apply one by one. It's essential to prevent the potential out-of-memory
panic of placing the entire BAL set in memory.
This commit is contained in:
rjl493456442 2026-06-17 09:57:08 +08:00 committed by GitHub
parent ad68ce261b
commit 1be5da2330
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
2 changed files with 234 additions and 54 deletions

View file

@ -38,6 +38,7 @@ import (
"github.com/ethereum/go-ethereum/event"
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/p2p/msgrate"
"github.com/ethereum/go-ethereum/params"
"github.com/ethereum/go-ethereum/rlp"
"github.com/ethereum/go-ethereum/trie"
"github.com/ethereum/go-ethereum/trie/trienode"
@ -55,6 +56,22 @@ const (
// the assumption that the gas limit is 60M.
maxAccessListRequestCount = 28
// maxCatchUpBlocks is the maximum gap (in blocks) that BAL catch-up is
// allowed to span. BALs are only retained by peers for a limited window
// (roughly two weeks, ~100k blocks at 12s block time). If the pivot has
// moved further than this conservative bound, the BALs needed to roll the
// flat state forward are likely no longer available, so we discard the
// stale progress and restart the sync from scratch rather than attempting
// a catch-up that is bound to fail partway through.
maxCatchUpBlocks = params.FullImmutabilityThreshold
// catchUpWindow is the number of blocks BAL catch-up fetches and applies at
// a time. The whole gap can span up to maxCatchUpBlocks, so fetching it in
// one shot would buffer every block's BAL (~100 KiB each) in memory before
// applying any. Processing the gap in bounded windows caps peak memory to a
// single window's worth of BALs.
catchUpWindow = 512
// syncProgressVersion is the version byte prepended to the JSON-encoded
// syncProgressV2 when persisted. On load, a mismatching version byte causes
// the persisted progress to be discarded and sync to start fresh.
@ -387,6 +404,8 @@ type syncerV2 struct {
startTime time.Time // Time instance when snapshot sync started
logTime time.Time // Time instance when status was last reported
catchUpWindow uint64 // Number of blocks fetched/applied per BAL catch-up window (overridable in tests)
pend sync.WaitGroup // Tracks network request goroutines for graceful shutdown
lock sync.RWMutex // Protects fields that can change outside of sync (peers, reqs, pivot)
}
@ -415,7 +434,8 @@ func newSyncerV2(db ethdb.Database, scheme string) *syncerV2 {
bytecodeReqs: make(map[uint64]*bytecodeRequestV2),
accessListReqs: make(map[uint64]*accessListRequest),
extProgress: new(syncProgressV2),
extProgress: new(syncProgressV2),
catchUpWindow: catchUpWindow,
}
if raw := rawdb.ReadSnapshotSyncStatus(db); len(raw) > 0 && raw[0] == syncProgressVersion {
var progress syncProgressV2
@ -559,10 +579,18 @@ func (s *syncerV2) Sync(target *types.Header, cancel chan struct{}) error {
// progress is still usable. If yes, roll forward via BAL catch-up. If not,
// wipe everything and restart fresh.
if isPivotChanged {
if isPivotReorged(s.db, prevPivot, target) {
switch {
case isPivotReorged(s.db, prevPivot, target):
log.Warn("Restarting snap sync from scratch", "oldnumber", prevPivot.Number, "oldHash", prevPivot.Hash())
s.resetSyncState()
} else {
case catchUpExceedsRetention(prevPivot, target):
// The pivot moved further than the BAL retention window. The access
// lists required for catch-up are almost certainly unavailable from
// peers, so discard the stale progress and resync from scratch
// instead of starting a catch-up doomed to stall.
log.Warn("Catch-up gap exceeds BAL retention, restarting snap sync from scratch", "oldnumber", prevPivot.Number, "newnumber", target.Number, "gap", new(big.Int).Sub(target.Number, prevPivot.Number), "limit", maxCatchUpBlocks)
s.resetSyncState()
default:
// A canonical pivot move past a frozen pivot should be impossible:
// the downloader both refuses moves (FrozenPivot) and resumes new
// cycles against the frozen header itself. Reaching this branch
@ -728,6 +756,15 @@ func isPivotReorged(db ethdb.Database, prev, curr *types.Header) bool {
return canonical != prev.Hash()
}
// catchUpExceedsRetention reports whether rolling the flat state forward from
// prev to curr would span more blocks than peers are expected to retain BALs
// for. Beyond this bound the access lists needed for catch-up are likely gone,
// so the caller should wipe and resync from scratch instead.
func catchUpExceedsRetention(prev, curr *types.Header) bool {
gap := new(big.Int).Sub(curr.Number, prev.Number)
return gap.Cmp(big.NewInt(maxCatchUpBlocks)) > 0
}
// catchUp runs the BAL catch-up. When the pivot has moved, it fetches BALs
// for the gap blocks, verifies them against block headers, and applies the
// diffs to roll flat state forward.
@ -738,69 +775,77 @@ func (s *syncerV2) catchUp(target *types.Header, cancel chan struct{}) error {
s.lock.RUnlock()
log.Info("Starting BAL catch-up", "from", from, "to", to, "blocks", to-from+1)
// Collect block hashes and headers for the gap range.
var (
hashes = make([]common.Hash, 0, to-from+1)
headers = make(map[common.Hash]*types.Header, to-from+1)
)
for num := from; num <= to; num++ {
hash := rawdb.ReadCanonicalHash(s.db, num)
if hash == (common.Hash{}) {
return fmt.Errorf("missing canonical hash for block %d during catch-up", num)
}
header := rawdb.ReadHeader(s.db, hash, num)
if header == nil {
return fmt.Errorf("missing header for block %d (hash %v) during catch-up", num, hash)
}
hashes = append(hashes, hash)
headers[hash] = header
}
// Fetch BALs from peers
rawBALs, err := s.fetchAccessLists(hashes, headers, cancel)
if err != nil {
return err
}
// Apply each BAL in block order. BALs are already verified by fetchAccessLists.
for i, raw := range rawBALs {
for start := from; start <= to; start += s.catchUpWindow {
select {
case <-cancel:
return ErrCancelled
default:
}
num := from + uint64(i)
hash := hashes[i]
// Decode the raw RLP into a BAL.
end := start + s.catchUpWindow - 1
if end > to {
end = to
}
// Collect block hashes and headers for this window.
var (
b bal.BlockAccessList
batch = s.db.NewBatch()
hashes = make([]common.Hash, 0, end-start+1)
headers = make(map[common.Hash]*types.Header, end-start+1)
)
if err := rlp.DecodeBytes(raw, &b); err != nil {
return fmt.Errorf("failed to decode BAL for block %d: %v", num, err)
for num := start; num <= end; num++ {
hash := rawdb.ReadCanonicalHash(s.db, num)
if hash == (common.Hash{}) {
return fmt.Errorf("missing canonical hash for block %d during catch-up", num)
}
header := rawdb.ReadHeader(s.db, hash, num)
if header == nil {
return fmt.Errorf("missing header for block %d (hash %v) during catch-up", num, hash)
}
hashes = append(hashes, hash)
headers[hash] = header
}
// applyAccessList failures are persistent. If a block's apply fails
// here, the next Sync will resume from this block and hit the same
// failure. Auto-recovery isn't implemented yet.
if err := s.applyAccessList(&b, batch); err != nil {
return fmt.Errorf("BAL application failed for block %d: %v", num, err)
}
// Persist incremental progress so a crash mid-catchUp can resume
// from the next unapplied block.
s.lock.Lock()
s.pivot = headers[hash]
s.lock.Unlock()
s.saveSyncStatusWithDB(batch)
// Commit the state transition alongside the sync progress atomically.
if err := batch.Write(); err != nil {
// Fetch this window's BALs from peers.
rawBALs, err := s.fetchAccessLists(hashes, headers, cancel)
if err != nil {
return err
}
// Apply each BAL in block order. BALs are already verified by fetchAccessLists.
for i, raw := range rawBALs {
select {
case <-cancel:
return ErrCancelled
default:
}
num := start + uint64(i)
hash := hashes[i]
// Decode the raw RLP into a BAL.
var (
b bal.BlockAccessList
batch = s.db.NewBatch()
)
if err := rlp.DecodeBytes(raw, &b); err != nil {
return fmt.Errorf("failed to decode BAL for block %d: %v", num, err)
}
if err := s.applyAccessList(&b, batch); err != nil {
return fmt.Errorf("BAL application failed for block %d: %v", num, err)
}
// Persist incremental progress so a crash mid-catchUp can resume
// from the next unapplied block.
s.lock.Lock()
s.pivot = headers[hash]
s.lock.Unlock()
s.saveSyncStatusWithDB(batch)
// Commit the state transition alongside the sync progress atomically.
if err := batch.Write(); err != nil {
return err
}
}
log.Info("BAL catch-up progress", "applied", end, "target", to, "remaining", to-end)
}
log.Info("BAL catch-up complete", "blocks", len(rawBALs))
log.Info("BAL catch-up complete", "from", from, "to", to)
return nil
}

View file

@ -1895,6 +1895,141 @@ func testCatchUpPersistsIncrementally(t *testing.T, scheme string) {
}
}
// TestCatchUpWindowed verifies that catch-up correctly rolls the flat state
// forward when the gap spans several windows. With catchUpWindow shrunk to 2,
// a 5-block gap is processed as three windows ([A+1,A+2], [A+3,A+4], [A+5]),
// exercising the window boundaries. Every block's BAL must be fetched and
// applied, and the final pivot must reach the target.
func TestCatchUpWindowed(t *testing.T) {
t.Parallel()
testCatchUpWindowed(t, rawdb.HashScheme)
testCatchUpWindowed(t, rawdb.PathScheme)
}
func testCatchUpWindowed(t *testing.T, scheme string) {
nodeScheme, sourceAccountTrie, elems, addrs := makeAccountTrieWithAddresses(100, scheme)
rootA := sourceAccountTrie.Hash()
numA := uint64(100)
targetAddr := addrs[0]
targetHash := crypto.Keccak256Hash(targetAddr[:])
db := rawdb.NewMemoryDatabase()
emptyHash := common.Hash{}
zero := uint64(0)
// Persist header + canonical hash for the base pivot A so reorg detection
// in Sync passes and catchUp (not reset) runs.
pivotA := &types.Header{
Number: new(big.Int).SetUint64(numA), Root: rootA, Difficulty: common.Big0,
BaseFee: common.Big0, WithdrawalsHash: &emptyHash,
BlobGasUsed: &zero, ExcessBlobGas: &zero,
ParentBeaconRoot: &emptyHash, RequestsHash: &emptyHash,
}
rawdb.WriteHeader(db, pivotA)
rawdb.WriteCanonicalHash(db, pivotA.Hash(), numA)
// Build a 5-block gap, each block bumping targetAddr's balance. The last
// block's balance is the expected final state.
const gap = 5
var (
lastHeader *types.Header
lastBalance *uint256.Int
balsByHash = make(map[common.Hash]rlp.RawValue, gap)
)
for i := 0; i < gap; i++ {
blockNum := numA + uint64(i) + 1
balance := uint256.NewInt(uint64(1000 * (i + 1)))
cb := bal.NewConstructionBlockAccessList()
cb.BalanceChange(0, targetAddr, balance)
var buf bytes.Buffer
if err := cb.EncodeRLP(&buf); err != nil {
t.Fatal(err)
}
var b bal.BlockAccessList
if err := rlp.DecodeBytes(buf.Bytes(), &b); err != nil {
t.Fatal(err)
}
balHash := b.Hash()
header := &types.Header{
Number: new(big.Int).SetUint64(blockNum), Difficulty: common.Big0,
BaseFee: common.Big0, WithdrawalsHash: &emptyHash,
BlobGasUsed: &zero, ExcessBlobGas: &zero,
ParentBeaconRoot: &emptyHash, RequestsHash: &emptyHash,
BlockAccessListHash: &balHash,
}
rawdb.WriteHeader(db, header)
rawdb.WriteCanonicalHash(db, header.Hash(), blockNum)
balsByHash[header.Hash()] = buf.Bytes()
lastHeader, lastBalance = header, balance
}
// Seed sync to A: persisted state ends with pivot=A and full flat state.
{
var (
once sync.Once
cancel = make(chan struct{})
term = func() { once.Do(func() { close(cancel) }) }
)
syncer := newSyncerV2(db, nodeScheme)
src := newTestPeerV2("seed", t, term)
src.accountTrie = sourceAccountTrie.Copy()
src.accountValues = elems
syncer.Register(src)
src.remote = syncer
if err := syncer.Sync(pivotA, cancel); err != nil {
t.Fatalf("seed sync failed: %v", err)
}
}
// Run catch-up to A+5 directly with a window of 2, forcing three windows
// ([A+1,A+2], [A+3,A+4], [A+5]). Calling catchUp in isolation keeps the
// focus on the windowing logic without the surrounding download/trie-gen
// phases (which would need a real target state root).
var (
once sync.Once
cancel = make(chan struct{})
term = func() { once.Do(func() { close(cancel) }) }
)
syncer := newSyncerV2(db, nodeScheme)
syncer.loadSyncStatus() // restores pivot=A from the seed sync
if syncer.pivot == nil || syncer.pivot.Number.Uint64() != numA {
t.Fatalf("expected restored pivot at block %d, got %v", numA, syncer.pivot)
}
syncer.catchUpWindow = 2
src := newTestPeerV2("catchup", t, term)
src.accountTrie = sourceAccountTrie.Copy()
src.accountValues = elems
src.accessLists = balsByHash
syncer.Register(src)
src.remote = syncer
if err := syncer.catchUp(lastHeader, cancel); err != nil {
t.Fatalf("windowed catch-up failed: %v", err)
}
// The account must reflect the final block's balance, proving every window
// (including the trailing partial one) was fetched and applied in order.
data := rawdb.ReadAccountSnapshot(db, targetHash)
if len(data) == 0 {
t.Fatal("target account missing after windowed catch-up")
}
account, err := types.FullAccount(data)
if err != nil {
t.Fatalf("failed to decode account: %v", err)
}
if account.Balance.Cmp(lastBalance) != 0 {
t.Errorf("balance after windowed catch-up: got %v, want %v", account.Balance, lastBalance)
}
// The persisted pivot must have advanced all the way to the target.
loader := newSyncerV2(db, nodeScheme)
loader.loadSyncStatus()
if loader.pivot == nil || loader.pivot.Hash() != lastHeader.Hash() {
t.Errorf("persisted pivot did not reach target after windowed catch-up")
}
}
// TestSyncStatusMarkedCompleteAfterCompletion verifies that after a full sync
// completes, the persisted sync status reaches the complete phase. This lets a
// subsequent Sync call distinguish "already done" from "fresh node" and skip.