eth/protocols/snap: remove uncovered states before resuming (#35159)
Some checks are pending
/ Linux Build (push) Waiting to run
/ Linux Build (arm) (push) Waiting to run
/ Keeper Build (push) Waiting to run
/ Windows Build (push) Waiting to run
/ Docker Image (push) Waiting to run

This PR fixes an issue where flat states are continuously persisted
during downloadState, while the sync journal is only persisted at the
end of Sync.

As a result, an unclean shutdown can leave the on-disk flat state ahead
of the journal markers. Some persisted entries may be stale (storage
slots that should have been deleted), and these dangling entries are not
detected or fixed by subsequent state downloads.

To address this, this PR introduces a cleanup step before state
downloading begins. It removes all state entries that are not covered by
the persisted journal markers.
This commit is contained in:
rjl493456442 2026-06-17 13:44:12 +08:00 committed by GitHub
parent 0e810e4984
commit 7122ecc3eb
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
3 changed files with 300 additions and 24 deletions

View file

@ -325,10 +325,9 @@ func TestAccessListApplicationSkipsUnfetched(t *testing.T) {
// is below Next so isFetched returns true; the unfetched hash equals Next
// so isFetched returns false.
syncer.tasks = []*accountTaskV2{{
Next: unfetchedHash,
Last: common.MaxHash,
SubTasks: make(map[common.Hash][]*storageTaskV2),
stateCompleted: make(map[common.Hash]struct{}),
Next: unfetchedHash,
Last: common.MaxHash,
SubTasks: make(map[common.Hash][]*storageTaskV2),
}}
cb := bal.NewConstructionBlockAccessList()
@ -366,10 +365,9 @@ func TestAccessListApplicationSkipsUnfetchedStorage(t *testing.T) {
}
syncer.tasks = []*accountTaskV2{{
Next: unfetchedHash,
Last: common.MaxHash,
SubTasks: make(map[common.Hash][]*storageTaskV2),
stateCompleted: make(map[common.Hash]struct{}),
Next: unfetchedHash,
Last: common.MaxHash,
SubTasks: make(map[common.Hash][]*storageTaskV2),
}}
// BAL touches an unfetched account with a storage write AND an empty
@ -425,7 +423,6 @@ func TestAccessListApplicationPartialStorage(t *testing.T) {
Last: common.MaxHash,
}},
},
stateCompleted: make(map[common.Hash]struct{}),
}}
cb := bal.NewConstructionBlockAccessList()

View file

@ -583,10 +583,19 @@ func (s *syncerV2) Sync(target *types.Header, cancel chan struct{}) error {
log.Debug("Starting snapshot sync cycle", "root", root)
// If we resumed against a different pivot, decide whether the persisted
// progress is still usable. If yes, roll forward via BAL catch-up. If not,
// wipe everything and restart fresh.
if isPivotChanged {
if !isPivotChanged {
if prevPivot != nil {
// Resumed against the same pivot. An unclean shutdown may have left
// flushed snapshot data the journal doesn't cover.
if err := s.pruneStaleState(); err != nil {
log.Warn("Persisted progress unusable, restarting snap sync from scratch", "err", err)
s.resetSyncState()
}
}
} else {
// If we resumed against a different pivot, decide whether the persisted
// progress is still usable. If yes, roll forward via BAL catch-up. If not,
// wipe everything and restart fresh.
switch {
case isPivotReorged(s.db, prevPivot, target):
log.Warn("Restarting snap sync from scratch", "oldnumber", prevPivot.Number, "oldHash", prevPivot.Hash())
@ -599,6 +608,13 @@ func (s *syncerV2) Sync(target *types.Header, cancel chan struct{}) error {
log.Warn("Catch-up gap exceeds BAL retention, restarting snap sync from scratch", "oldnumber", prevPivot.Number, "newnumber", target.Number, "gap", new(big.Int).Sub(target.Number, prevPivot.Number), "limit", maxCatchUpBlocks)
s.resetSyncState()
default:
// An unclean shutdown may have left flushed snapshot data the journal
// doesn't cover.
if err := s.pruneStaleState(); err != nil {
log.Warn("Persisted progress unusable, restarting snap sync from scratch", "err", err)
s.resetSyncState()
break
}
// A canonical pivot move past a frozen pivot should be impossible:
// the downloader both refuses moves (FrozenPivot) and resumes new
// cycles against the frozen header itself. Reaching this branch
@ -683,6 +699,7 @@ func (s *syncerV2) downloadState(cancel chan struct{}) error {
accountResps = make(chan *accountResponseV2)
storageResps = make(chan *storageResponseV2)
bytecodeResps = make(chan *bytecodeResponseV2)
lastJournal = time.Now()
)
for {
// Remove all completed tasks and terminate if everything's done
@ -691,7 +708,15 @@ func (s *syncerV2) downloadState(cancel chan struct{}) error {
if len(s.tasks) == 0 {
return nil
}
// Periodically persist the progress journal. The flat state batches
// are flushed continuously, while the journal otherwise only hits
// disk on graceful teardown; everything written beyond the journaled
// markers is sacrificed on an unclean shutdown, so the save interval
// bounds the loss.
if time.Since(lastJournal) > time.Minute {
lastJournal = time.Now()
s.saveSyncStatus()
}
// Assign all the data retrieval tasks to any free peers
s.assignAccountTasks(accountResps, accountReqFails, cancel)
s.assignBytecodeTasks(bytecodeResps, bytecodeReqFails, cancel)
@ -1136,13 +1161,20 @@ func increaseKey(key []byte) []byte {
return nil
}
// DeleteHistoryByRange completely removes all database entries with the specific
// prefix. Note, this method assumes the space with the given prefix is exclusively
// occupied!
// deleteRange completely removes all database entries with the specific
// prefix. Note, this method assumes the space with the given prefix is
// exclusively occupied!
func deleteRange(batch ethdb.Batch, prefix []byte) {
start := prefix
limit := increaseKey(bytes.Clone(prefix))
deleteKeyRange(batch, prefix, increaseKey(bytes.Clone(prefix)))
}
// deleteKeyRange removes all database entries within the key range [start,
// limit). Note, this method assumes the range is exclusively occupied by
// sync data!
func deleteKeyRange(batch ethdb.Batch, start, limit []byte) {
if limit != nil && bytes.Compare(start, limit) >= 0 {
return
}
// Try to remove the data in the range by a loop, as the leveldb
// doesn't support the native range deletion.
for {
@ -1154,7 +1186,9 @@ func deleteRange(batch ethdb.Batch, prefix []byte) {
// therefore inconsistent. This is a tradeoff of the current LevelDB-based
// approach.
if errors.Is(err, ethdb.ErrTooManyKeys) {
batch.Write()
if err := batch.Write(); err != nil {
log.Crit("Failed to flush state deletions", "err", err)
}
batch.Reset()
continue
}
@ -1172,6 +1206,72 @@ func (s *syncerV2) resetTrienodes(batch ethdb.Batch) {
}
}
// pruneStaleState removes any flat state the persisted journal does not cover.
func (s *syncerV2) pruneStaleState() error {
var (
batch = s.db.NewBatch()
accountKey = func(h common.Hash) []byte {
return append(bytes.Clone(rawdb.SnapshotAccountPrefix), h.Bytes()...)
}
storageKey = func(hashes ...common.Hash) []byte {
key := bytes.Clone(rawdb.SnapshotStoragePrefix)
for _, h := range hashes {
key = append(key, h.Bytes()...)
}
return key
}
)
keyRangeLimit := func(base []byte, last common.Hash) []byte {
if last != common.MaxHash {
return append(base, incHash(last).Bytes()...)
}
return increaseKey(base)
}
for _, task := range s.tasks {
deleteKeyRange(batch, accountKey(task.Next), keyRangeLimit(bytes.Clone(rawdb.SnapshotAccountPrefix), task.Last))
protected := make([]common.Hash, 0, len(task.stateCompleted)+len(task.SubTasks))
for hash := range task.stateCompleted {
if bytes.Compare(hash[:], task.Next[:]) < 0 {
return errors.New("unexpected storage marker before the range")
}
if bytes.Compare(hash[:], task.Last[:]) > 0 {
return errors.New("unexpected storage marker after the range")
}
protected = append(protected, hash)
}
for hash := range task.SubTasks {
if _, ok := task.stateCompleted[hash]; ok {
return errors.New("unexpected duplicated storage marker")
}
if bytes.Compare(hash[:], task.Next[:]) < 0 {
return errors.New("unexpected storage marker before the range")
}
if bytes.Compare(hash[:], task.Last[:]) > 0 {
return errors.New("unexpected storage marker after the range")
}
protected = append(protected, hash)
}
sort.Slice(protected, func(i, j int) bool {
return bytes.Compare(protected[i][:], protected[j][:]) < 0
})
start := storageKey(task.Next)
for _, hash := range protected {
deleteKeyRange(batch, start, storageKey(hash))
start = increaseKey(storageKey(hash))
}
deleteKeyRange(batch, start, keyRangeLimit(bytes.Clone(rawdb.SnapshotStoragePrefix), task.Last))
for account, subtasks := range task.SubTasks {
for _, sub := range subtasks {
deleteKeyRange(batch, storageKey(account, sub.Next), keyRangeLimit(storageKey(account), sub.Last))
}
}
}
return batch.Write()
}
// resetSyncState wipes all persisted snap-sync data (sync status, account
// and storage snapshots) and re-initializes in-memory state with a fresh
// chunking of the account hash range.

View file

@ -1411,10 +1411,9 @@ func TestSyncDetectsPivotReorged(t *testing.T) {
seed.pivot = orphanPivot
seed.accountSynced = 42
seed.tasks = []*accountTaskV2{{
Next: common.HexToHash("0x80"),
Last: common.MaxHash,
SubTasks: make(map[common.Hash][]*storageTaskV2),
stateCompleted: make(map[common.Hash]struct{}),
Next: common.HexToHash("0x80"),
Last: common.MaxHash,
SubTasks: make(map[common.Hash][]*storageTaskV2),
}}
seed.saveSyncStatus()
@ -2182,6 +2181,186 @@ func TestInterruptedGenerationRecovery(t *testing.T) {
}
}
// TestPruneStaleState verifies that resuming a sync wipes exactly the flat
// state the journal does not cover: account and storage entries beyond the
// task cursors are removed, while everything below the cursors — and the
// journaled storage of completed-storage accounts and chunked large
// contracts inside the window — survives.
func TestPruneStaleState(t *testing.T) {
t.Parallel()
var (
db = rawdb.NewMemoryDatabase()
syncer = newSyncerV2(db, rawdb.HashScheme)
fetched = common.Hash{0x20} // below the cursor, journal-covered
next = common.Hash{0x40} // task cursor
stale = common.Hash{0x50} // beyond the cursor, not covered
completed = common.Hash{0x60} // beyond the cursor, storage journaled complete
chunked = common.Hash{0x80} // beyond the cursor, large contract, partially journaled
staleToo = common.Hash{0xa0} // beyond the cursor, not covered
subNext = common.Hash{0x50} // slot cursor of the chunked contract
slotLo = common.Hash{0x11} // below the slot cursor, journal-covered
slotHi = common.Hash{0x77} // beyond the slot cursor, not covered
val = []byte{0xde, 0xad}
)
syncer.tasks = []*accountTaskV2{{
Next: next,
Last: common.MaxHash,
SubTasks: map[common.Hash][]*storageTaskV2{
chunked: {{Next: subNext, Last: common.MaxHash}},
},
stateCompleted: map[common.Hash]struct{}{completed: {}},
}}
// Journal-covered state: account below the cursor with a storage slot,
// the completed account's storage, the chunked contract's low slots.
rawdb.WriteAccountSnapshot(db, fetched, val)
rawdb.WriteStorageSnapshot(db, fetched, slotLo, val)
rawdb.WriteStorageSnapshot(db, completed, slotLo, val)
rawdb.WriteStorageSnapshot(db, chunked, slotLo, val)
// Uncovered state, flushed after the last journal save: accounts beyond
// the cursor (including the completed-storage one, whose account entry
// is only legitimate below the cursor), their storage, and the chunked
// contract's slots beyond its slot cursor.
rawdb.WriteAccountSnapshot(db, stale, val)
rawdb.WriteAccountSnapshot(db, completed, val)
rawdb.WriteAccountSnapshot(db, staleToo, val)
rawdb.WriteStorageSnapshot(db, stale, slotLo, val)
rawdb.WriteStorageSnapshot(db, staleToo, slotHi, val)
rawdb.WriteStorageSnapshot(db, chunked, slotHi, val)
// Plant bare keys right at the start of the neighboring keyspaces. The
// task ranges end at the maximum hash, so a limit computed by bumping
// the full key would roll the carry over into these keyspaces and
// swallow such keys.
neighborOfAccounts := bytes.Clone(rawdb.SnapshotAccountPrefix)
neighborOfAccounts[len(neighborOfAccounts)-1]++
db.Put(neighborOfAccounts, val)
neighborOfStorage := bytes.Clone(rawdb.SnapshotStoragePrefix)
neighborOfStorage[len(neighborOfStorage)-1]++
db.Put(neighborOfStorage, val)
if err := syncer.pruneStaleState(); err != nil {
t.Fatalf("prune failed: %v", err)
}
for _, key := range [][]byte{neighborOfAccounts, neighborOfStorage} {
if has, _ := db.Has(key); !has {
t.Errorf("neighboring keyspace entry %x swallowed by the wipe", key)
}
}
for _, tc := range []struct {
account common.Hash
slot *common.Hash
want bool
desc string
}{
{fetched, nil, true, "account below cursor"},
{fetched, &slotLo, true, "storage below cursor"},
{completed, &slotLo, true, "storage of completed account"},
{chunked, &slotLo, true, "chunked contract slot below slot cursor"},
{stale, nil, false, "account beyond cursor"},
{completed, nil, false, "account entry of completed account"},
{staleToo, nil, false, "account beyond cursor (second)"},
{stale, &slotLo, false, "storage of unprotected account"},
{staleToo, &slotHi, false, "storage of unprotected account (second)"},
{chunked, &slotHi, false, "chunked contract slot beyond slot cursor"},
} {
var have bool
if tc.slot == nil {
have = len(rawdb.ReadAccountSnapshot(db, tc.account)) > 0
} else {
have = len(rawdb.ReadStorageSnapshot(db, tc.account, *tc.slot)) > 0
}
if have != tc.want {
t.Errorf("%s: present = %v, want %v", tc.desc, have, tc.want)
}
}
}
// TestResumeWipesUncoveredState simulates an unclean shutdown mid-download:
// flat state flushed beyond the journaled cursor (which the journal therefore
// considers unfetched) must be wiped when the sync resumes — otherwise it
// would survive the re-download untouched and corrupt the generated trie.
func TestResumeWipesUncoveredState(t *testing.T) {
t.Parallel()
testResumeWipesUncoveredState(t, rawdb.HashScheme)
testResumeWipesUncoveredState(t, rawdb.PathScheme)
}
func testResumeWipesUncoveredState(t *testing.T, scheme string) {
nodeScheme, sourceAccountTrie, elems := makeAccountTrieNoStorage(100, scheme)
root := sourceAccountTrie.Hash()
db := rawdb.NewMemoryDatabase()
pivot := mkPivot(0, root)
// First run: interrupt the download after a couple of responses, leaving
// a journal behind (saved on the graceful teardown).
var (
once1 sync.Once
cancel1 = make(chan struct{})
term1 = func() { once1.Do(func() { close(cancel1) }) }
responses atomic.Int32
)
syncer1 := newSyncerV2(db, nodeScheme)
src1 := newTestPeerV2("source1", t, term1)
src1.accountTrie = sourceAccountTrie.Copy()
src1.accountValues = elems
src1.accountRequestV2Handler = func(tp *testPeerV2, id uint64, root common.Hash, origin common.Hash, limit common.Hash, cap int) error {
if responses.Add(1) > 2 {
term1()
return nil
}
return defaultAccountRequestHandlerV2(tp, id, root, origin, limit, cap)
}
syncer1.Register(src1)
src1.remote = syncer1
syncer1.Sync(pivot, cancel1)
// Simulate the unclean shutdown: plant a bogus account that the journal
// does not cover (right beyond an unfinished task's cursor) — as if it
// was flushed after the journal was last saved. The account is not part
// of the source trie, so the re-download would never overwrite it and
// trie generation would fail the root check if it survived.
loader := newSyncerV2(db, nodeScheme)
loader.loadSyncStatus()
if len(loader.tasks) == 0 {
t.Fatal("expected unfinished tasks in the journal")
}
task := loader.tasks[len(loader.tasks)-1]
bogus := incHash(task.Next)
rawdb.WriteAccountSnapshot(db, bogus, types.SlimAccountRLP(types.StateAccount{
Nonce: 1, Balance: uint256.NewInt(1),
Root: types.EmptyRootHash, CodeHash: types.EmptyCodeHash[:],
}))
// Second run: the resume must prune the bogus entry and complete cleanly.
var (
once2 sync.Once
cancel2 = make(chan struct{})
term2 = func() { once2.Do(func() { close(cancel2) }) }
)
syncer2 := newSyncerV2(db, nodeScheme)
src2 := newTestPeerV2("source2", t, term2)
src2.accountTrie = sourceAccountTrie.Copy()
src2.accountValues = elems
syncer2.Register(src2)
src2.remote = syncer2
if err := syncer2.Sync(pivot, cancel2); err != nil {
t.Fatalf("resumed sync failed: %v", err)
}
if len(rawdb.ReadAccountSnapshot(db, bogus)) != 0 {
t.Fatal("uncovered account entry should have been pruned on resume")
}
verifyTrie(scheme, db, root, t)
}
// TestFetchAccessListsMultiplePeers verifies that fetch distributes work
// across multiple idle peers.
func TestFetchAccessListsMultiplePeers(t *testing.T) {