diff --git a/eth/protocols/snap/bal_apply_test.go b/eth/protocols/snap/bal_apply_test.go index e5c9188b1b..0db26331ad 100644 --- a/eth/protocols/snap/bal_apply_test.go +++ b/eth/protocols/snap/bal_apply_test.go @@ -325,10 +325,9 @@ func TestAccessListApplicationSkipsUnfetched(t *testing.T) { // is below Next so isFetched returns true; the unfetched hash equals Next // so isFetched returns false. syncer.tasks = []*accountTaskV2{{ - Next: unfetchedHash, - Last: common.MaxHash, - SubTasks: make(map[common.Hash][]*storageTaskV2), - stateCompleted: make(map[common.Hash]struct{}), + Next: unfetchedHash, + Last: common.MaxHash, + SubTasks: make(map[common.Hash][]*storageTaskV2), }} cb := bal.NewConstructionBlockAccessList() @@ -366,10 +365,9 @@ func TestAccessListApplicationSkipsUnfetchedStorage(t *testing.T) { } syncer.tasks = []*accountTaskV2{{ - Next: unfetchedHash, - Last: common.MaxHash, - SubTasks: make(map[common.Hash][]*storageTaskV2), - stateCompleted: make(map[common.Hash]struct{}), + Next: unfetchedHash, + Last: common.MaxHash, + SubTasks: make(map[common.Hash][]*storageTaskV2), }} // BAL touches an unfetched account with a storage write AND an empty @@ -425,7 +423,6 @@ func TestAccessListApplicationPartialStorage(t *testing.T) { Last: common.MaxHash, }}, }, - stateCompleted: make(map[common.Hash]struct{}), }} cb := bal.NewConstructionBlockAccessList() diff --git a/eth/protocols/snap/syncv2.go b/eth/protocols/snap/syncv2.go index fbbc10c60f..37da0b12b6 100644 --- a/eth/protocols/snap/syncv2.go +++ b/eth/protocols/snap/syncv2.go @@ -583,10 +583,19 @@ func (s *syncerV2) Sync(target *types.Header, cancel chan struct{}) error { log.Debug("Starting snapshot sync cycle", "root", root) - // If we resumed against a different pivot, decide whether the persisted - // progress is still usable. If yes, roll forward via BAL catch-up. If not, - // wipe everything and restart fresh. - if isPivotChanged { + if !isPivotChanged { + if prevPivot != nil { + // Resumed against the same pivot. An unclean shutdown may have left + // flushed snapshot data the journal doesn't cover. + if err := s.pruneStaleState(); err != nil { + log.Warn("Persisted progress unusable, restarting snap sync from scratch", "err", err) + s.resetSyncState() + } + } + } else { + // If we resumed against a different pivot, decide whether the persisted + // progress is still usable. If yes, roll forward via BAL catch-up. If not, + // wipe everything and restart fresh. switch { case isPivotReorged(s.db, prevPivot, target): log.Warn("Restarting snap sync from scratch", "oldnumber", prevPivot.Number, "oldHash", prevPivot.Hash()) @@ -599,6 +608,13 @@ func (s *syncerV2) Sync(target *types.Header, cancel chan struct{}) error { log.Warn("Catch-up gap exceeds BAL retention, restarting snap sync from scratch", "oldnumber", prevPivot.Number, "newnumber", target.Number, "gap", new(big.Int).Sub(target.Number, prevPivot.Number), "limit", maxCatchUpBlocks) s.resetSyncState() default: + // An unclean shutdown may have left flushed snapshot data the journal + // doesn't cover. + if err := s.pruneStaleState(); err != nil { + log.Warn("Persisted progress unusable, restarting snap sync from scratch", "err", err) + s.resetSyncState() + break + } // A canonical pivot move past a frozen pivot should be impossible: // the downloader both refuses moves (FrozenPivot) and resumes new // cycles against the frozen header itself. Reaching this branch @@ -683,6 +699,7 @@ func (s *syncerV2) downloadState(cancel chan struct{}) error { accountResps = make(chan *accountResponseV2) storageResps = make(chan *storageResponseV2) bytecodeResps = make(chan *bytecodeResponseV2) + lastJournal = time.Now() ) for { // Remove all completed tasks and terminate if everything's done @@ -691,7 +708,15 @@ func (s *syncerV2) downloadState(cancel chan struct{}) error { if len(s.tasks) == 0 { return nil } - + // Periodically persist the progress journal. The flat state batches + // are flushed continuously, while the journal otherwise only hits + // disk on graceful teardown; everything written beyond the journaled + // markers is sacrificed on an unclean shutdown, so the save interval + // bounds the loss. + if time.Since(lastJournal) > time.Minute { + lastJournal = time.Now() + s.saveSyncStatus() + } // Assign all the data retrieval tasks to any free peers s.assignAccountTasks(accountResps, accountReqFails, cancel) s.assignBytecodeTasks(bytecodeResps, bytecodeReqFails, cancel) @@ -1136,13 +1161,20 @@ func increaseKey(key []byte) []byte { return nil } -// DeleteHistoryByRange completely removes all database entries with the specific -// prefix. Note, this method assumes the space with the given prefix is exclusively -// occupied! +// deleteRange completely removes all database entries with the specific +// prefix. Note, this method assumes the space with the given prefix is +// exclusively occupied! func deleteRange(batch ethdb.Batch, prefix []byte) { - start := prefix - limit := increaseKey(bytes.Clone(prefix)) + deleteKeyRange(batch, prefix, increaseKey(bytes.Clone(prefix))) +} +// deleteKeyRange removes all database entries within the key range [start, +// limit). Note, this method assumes the range is exclusively occupied by +// sync data! +func deleteKeyRange(batch ethdb.Batch, start, limit []byte) { + if limit != nil && bytes.Compare(start, limit) >= 0 { + return + } // Try to remove the data in the range by a loop, as the leveldb // doesn't support the native range deletion. for { @@ -1154,7 +1186,9 @@ func deleteRange(batch ethdb.Batch, prefix []byte) { // therefore inconsistent. This is a tradeoff of the current LevelDB-based // approach. if errors.Is(err, ethdb.ErrTooManyKeys) { - batch.Write() + if err := batch.Write(); err != nil { + log.Crit("Failed to flush state deletions", "err", err) + } batch.Reset() continue } @@ -1172,6 +1206,72 @@ func (s *syncerV2) resetTrienodes(batch ethdb.Batch) { } } +// pruneStaleState removes any flat state the persisted journal does not cover. +func (s *syncerV2) pruneStaleState() error { + var ( + batch = s.db.NewBatch() + accountKey = func(h common.Hash) []byte { + return append(bytes.Clone(rawdb.SnapshotAccountPrefix), h.Bytes()...) + } + storageKey = func(hashes ...common.Hash) []byte { + key := bytes.Clone(rawdb.SnapshotStoragePrefix) + for _, h := range hashes { + key = append(key, h.Bytes()...) + } + return key + } + ) + keyRangeLimit := func(base []byte, last common.Hash) []byte { + if last != common.MaxHash { + return append(base, incHash(last).Bytes()...) + } + return increaseKey(base) + } + for _, task := range s.tasks { + deleteKeyRange(batch, accountKey(task.Next), keyRangeLimit(bytes.Clone(rawdb.SnapshotAccountPrefix), task.Last)) + + protected := make([]common.Hash, 0, len(task.stateCompleted)+len(task.SubTasks)) + for hash := range task.stateCompleted { + if bytes.Compare(hash[:], task.Next[:]) < 0 { + return errors.New("unexpected storage marker before the range") + } + if bytes.Compare(hash[:], task.Last[:]) > 0 { + return errors.New("unexpected storage marker after the range") + } + protected = append(protected, hash) + } + for hash := range task.SubTasks { + if _, ok := task.stateCompleted[hash]; ok { + return errors.New("unexpected duplicated storage marker") + } + if bytes.Compare(hash[:], task.Next[:]) < 0 { + return errors.New("unexpected storage marker before the range") + } + if bytes.Compare(hash[:], task.Last[:]) > 0 { + return errors.New("unexpected storage marker after the range") + } + protected = append(protected, hash) + } + sort.Slice(protected, func(i, j int) bool { + return bytes.Compare(protected[i][:], protected[j][:]) < 0 + }) + + start := storageKey(task.Next) + for _, hash := range protected { + deleteKeyRange(batch, start, storageKey(hash)) + start = increaseKey(storageKey(hash)) + } + deleteKeyRange(batch, start, keyRangeLimit(bytes.Clone(rawdb.SnapshotStoragePrefix), task.Last)) + + for account, subtasks := range task.SubTasks { + for _, sub := range subtasks { + deleteKeyRange(batch, storageKey(account, sub.Next), keyRangeLimit(storageKey(account), sub.Last)) + } + } + } + return batch.Write() +} + // resetSyncState wipes all persisted snap-sync data (sync status, account // and storage snapshots) and re-initializes in-memory state with a fresh // chunking of the account hash range. diff --git a/eth/protocols/snap/syncv2_test.go b/eth/protocols/snap/syncv2_test.go index bad4123560..9e366b108c 100644 --- a/eth/protocols/snap/syncv2_test.go +++ b/eth/protocols/snap/syncv2_test.go @@ -1411,10 +1411,9 @@ func TestSyncDetectsPivotReorged(t *testing.T) { seed.pivot = orphanPivot seed.accountSynced = 42 seed.tasks = []*accountTaskV2{{ - Next: common.HexToHash("0x80"), - Last: common.MaxHash, - SubTasks: make(map[common.Hash][]*storageTaskV2), - stateCompleted: make(map[common.Hash]struct{}), + Next: common.HexToHash("0x80"), + Last: common.MaxHash, + SubTasks: make(map[common.Hash][]*storageTaskV2), }} seed.saveSyncStatus() @@ -2182,6 +2181,186 @@ func TestInterruptedGenerationRecovery(t *testing.T) { } } +// TestPruneStaleState verifies that resuming a sync wipes exactly the flat +// state the journal does not cover: account and storage entries beyond the +// task cursors are removed, while everything below the cursors — and the +// journaled storage of completed-storage accounts and chunked large +// contracts inside the window — survives. +func TestPruneStaleState(t *testing.T) { + t.Parallel() + + var ( + db = rawdb.NewMemoryDatabase() + syncer = newSyncerV2(db, rawdb.HashScheme) + + fetched = common.Hash{0x20} // below the cursor, journal-covered + next = common.Hash{0x40} // task cursor + stale = common.Hash{0x50} // beyond the cursor, not covered + completed = common.Hash{0x60} // beyond the cursor, storage journaled complete + chunked = common.Hash{0x80} // beyond the cursor, large contract, partially journaled + staleToo = common.Hash{0xa0} // beyond the cursor, not covered + + subNext = common.Hash{0x50} // slot cursor of the chunked contract + slotLo = common.Hash{0x11} // below the slot cursor, journal-covered + slotHi = common.Hash{0x77} // beyond the slot cursor, not covered + + val = []byte{0xde, 0xad} + ) + syncer.tasks = []*accountTaskV2{{ + Next: next, + Last: common.MaxHash, + SubTasks: map[common.Hash][]*storageTaskV2{ + chunked: {{Next: subNext, Last: common.MaxHash}}, + }, + stateCompleted: map[common.Hash]struct{}{completed: {}}, + }} + + // Journal-covered state: account below the cursor with a storage slot, + // the completed account's storage, the chunked contract's low slots. + rawdb.WriteAccountSnapshot(db, fetched, val) + rawdb.WriteStorageSnapshot(db, fetched, slotLo, val) + rawdb.WriteStorageSnapshot(db, completed, slotLo, val) + rawdb.WriteStorageSnapshot(db, chunked, slotLo, val) + + // Uncovered state, flushed after the last journal save: accounts beyond + // the cursor (including the completed-storage one, whose account entry + // is only legitimate below the cursor), their storage, and the chunked + // contract's slots beyond its slot cursor. + rawdb.WriteAccountSnapshot(db, stale, val) + rawdb.WriteAccountSnapshot(db, completed, val) + rawdb.WriteAccountSnapshot(db, staleToo, val) + rawdb.WriteStorageSnapshot(db, stale, slotLo, val) + rawdb.WriteStorageSnapshot(db, staleToo, slotHi, val) + rawdb.WriteStorageSnapshot(db, chunked, slotHi, val) + + // Plant bare keys right at the start of the neighboring keyspaces. The + // task ranges end at the maximum hash, so a limit computed by bumping + // the full key would roll the carry over into these keyspaces and + // swallow such keys. + neighborOfAccounts := bytes.Clone(rawdb.SnapshotAccountPrefix) + neighborOfAccounts[len(neighborOfAccounts)-1]++ + db.Put(neighborOfAccounts, val) + + neighborOfStorage := bytes.Clone(rawdb.SnapshotStoragePrefix) + neighborOfStorage[len(neighborOfStorage)-1]++ + db.Put(neighborOfStorage, val) + + if err := syncer.pruneStaleState(); err != nil { + t.Fatalf("prune failed: %v", err) + } + for _, key := range [][]byte{neighborOfAccounts, neighborOfStorage} { + if has, _ := db.Has(key); !has { + t.Errorf("neighboring keyspace entry %x swallowed by the wipe", key) + } + } + + for _, tc := range []struct { + account common.Hash + slot *common.Hash + want bool + desc string + }{ + {fetched, nil, true, "account below cursor"}, + {fetched, &slotLo, true, "storage below cursor"}, + {completed, &slotLo, true, "storage of completed account"}, + {chunked, &slotLo, true, "chunked contract slot below slot cursor"}, + {stale, nil, false, "account beyond cursor"}, + {completed, nil, false, "account entry of completed account"}, + {staleToo, nil, false, "account beyond cursor (second)"}, + {stale, &slotLo, false, "storage of unprotected account"}, + {staleToo, &slotHi, false, "storage of unprotected account (second)"}, + {chunked, &slotHi, false, "chunked contract slot beyond slot cursor"}, + } { + var have bool + if tc.slot == nil { + have = len(rawdb.ReadAccountSnapshot(db, tc.account)) > 0 + } else { + have = len(rawdb.ReadStorageSnapshot(db, tc.account, *tc.slot)) > 0 + } + if have != tc.want { + t.Errorf("%s: present = %v, want %v", tc.desc, have, tc.want) + } + } +} + +// TestResumeWipesUncoveredState simulates an unclean shutdown mid-download: +// flat state flushed beyond the journaled cursor (which the journal therefore +// considers unfetched) must be wiped when the sync resumes — otherwise it +// would survive the re-download untouched and corrupt the generated trie. +func TestResumeWipesUncoveredState(t *testing.T) { + t.Parallel() + testResumeWipesUncoveredState(t, rawdb.HashScheme) + testResumeWipesUncoveredState(t, rawdb.PathScheme) +} + +func testResumeWipesUncoveredState(t *testing.T, scheme string) { + nodeScheme, sourceAccountTrie, elems := makeAccountTrieNoStorage(100, scheme) + root := sourceAccountTrie.Hash() + db := rawdb.NewMemoryDatabase() + pivot := mkPivot(0, root) + + // First run: interrupt the download after a couple of responses, leaving + // a journal behind (saved on the graceful teardown). + var ( + once1 sync.Once + cancel1 = make(chan struct{}) + term1 = func() { once1.Do(func() { close(cancel1) }) } + responses atomic.Int32 + ) + syncer1 := newSyncerV2(db, nodeScheme) + src1 := newTestPeerV2("source1", t, term1) + src1.accountTrie = sourceAccountTrie.Copy() + src1.accountValues = elems + src1.accountRequestV2Handler = func(tp *testPeerV2, id uint64, root common.Hash, origin common.Hash, limit common.Hash, cap int) error { + if responses.Add(1) > 2 { + term1() + return nil + } + return defaultAccountRequestHandlerV2(tp, id, root, origin, limit, cap) + } + syncer1.Register(src1) + src1.remote = syncer1 + syncer1.Sync(pivot, cancel1) + + // Simulate the unclean shutdown: plant a bogus account that the journal + // does not cover (right beyond an unfinished task's cursor) — as if it + // was flushed after the journal was last saved. The account is not part + // of the source trie, so the re-download would never overwrite it and + // trie generation would fail the root check if it survived. + loader := newSyncerV2(db, nodeScheme) + loader.loadSyncStatus() + if len(loader.tasks) == 0 { + t.Fatal("expected unfinished tasks in the journal") + } + task := loader.tasks[len(loader.tasks)-1] + bogus := incHash(task.Next) + rawdb.WriteAccountSnapshot(db, bogus, types.SlimAccountRLP(types.StateAccount{ + Nonce: 1, Balance: uint256.NewInt(1), + Root: types.EmptyRootHash, CodeHash: types.EmptyCodeHash[:], + })) + + // Second run: the resume must prune the bogus entry and complete cleanly. + var ( + once2 sync.Once + cancel2 = make(chan struct{}) + term2 = func() { once2.Do(func() { close(cancel2) }) } + ) + syncer2 := newSyncerV2(db, nodeScheme) + src2 := newTestPeerV2("source2", t, term2) + src2.accountTrie = sourceAccountTrie.Copy() + src2.accountValues = elems + syncer2.Register(src2) + src2.remote = syncer2 + + if err := syncer2.Sync(pivot, cancel2); err != nil { + t.Fatalf("resumed sync failed: %v", err) + } + if len(rawdb.ReadAccountSnapshot(db, bogus)) != 0 { + t.Fatal("uncovered account entry should have been pruned on resume") + } + verifyTrie(scheme, db, root, t) +} + // TestFetchAccessListsMultiplePeers verifies that fetch distributes work // across multiple idle peers. func TestFetchAccessListsMultiplePeers(t *testing.T) {