From 2a1747c07ece9a2f17bdd99bc9201e3df316bbda Mon Sep 17 00:00:00 2001 From: CPerezz Date: Sun, 8 Feb 2026 00:48:54 +0100 Subject: [PATCH] eth/protocols/snap: refactor partial filter and fix sync bugs Refactor partial state filter from DB skip markers to direct filter checks via shouldSyncStorage()/shouldSyncCode(), avoiding stale marker issues across sync cycles. Additional fixes: - Skip WriteAccountSnapshot/WriteStorageSnapshot in partial mode (forwardAccountTask, processStorageResponse, onHealState) - Guard against negative ETA in reportSyncProgress when sync restarts with persisted progress counters - Add break after forwardAccountTask in cleanStorageTasks to prevent nil pointer when task.res is cleared - Add diagnostic log in assignAccountTasks when no idle peers available --- eth/protocols/snap/sync.go | 47 +++++++++++------ .../snap/sync_partial_integration_test.go | 52 +++++++++++-------- 2 files changed, 60 insertions(+), 39 deletions(-) diff --git a/eth/protocols/snap/sync.go b/eth/protocols/snap/sync.go index 2ad75d1343..8b4e4de074 100644 --- a/eth/protocols/snap/sync.go +++ b/eth/protocols/snap/sync.go @@ -626,14 +626,13 @@ func (s *Syncer) Sync(root common.Hash, cancel chan struct{}) error { // that skips storage/code healing for non-tracked contracts. var scheduler *trie.Sync if s.isPartialSync() { - // Create filter callbacks that check skip markers in the database + // Create filter callbacks that use the filter directly (not DB markers). + // This avoids stale marker issues across sync cycles. shouldSyncStorage := func(accountHash common.Hash) bool { - return !isStorageSkipped(s.db, accountHash) + return s.shouldSyncStorage(accountHash) } shouldSyncCode := func(accountHash common.Hash) bool { - // For now, use the same logic as storage (skip if storage is skipped) - // This could be refined to have separate skip markers for code - return !isStorageSkipped(s.db, accountHash) + return s.shouldSyncCode(accountHash) } scheduler = state.NewPartialStateSync(root, s.db, s.onHealState, s.scheme, shouldSyncStorage, shouldSyncCode) log.Info("Starting partial state snap sync", "root", root) @@ -1043,6 +1042,7 @@ func (s *Syncer) cleanStorageTasks() { // If this was the last pending task, forward the account task if task.pend == 0 { s.forwardAccountTask(task) + break // task.res is now nil, remaining SubTasks handled next cycle } } } @@ -1068,6 +1068,7 @@ func (s *Syncer) assignAccountTasks(success chan *accountResponse, fail chan *ac idlers.caps = append(idlers.caps, s.rates.Capacity(id, AccountRangeMsg, targetTTL)) } if len(idlers.ids) == 0 { + log.Debug("No idle peers for account sync", "registered", len(s.peers), "idlers", len(s.accountIdlers), "stateless", len(s.statelessPeers), "tasks", len(s.tasks), "accountReqs", len(s.accountReqs)) return } sort.Sort(sort.Reverse(idlers)) @@ -1992,9 +1993,8 @@ func (s *Syncer) processAccountResponse(res *accountResponse) { if account.Root != types.EmptyRootHash { // Partial sync: check if we should sync this contract's storage if !s.shouldSyncStorage(accountHash) { - // Skip storage for non-tracked contracts - // Mark as skipped so healing phase knows not to try healing this storage - markStorageSkipped(s.db, accountHash, account.Root) + // Skip storage for non-tracked contracts. The healing phase uses + // the same filter check, so no DB markers needed. res.task.stateCompleted[accountHash] = struct{}{} storageSkippedMeter.Mark(1) s.storageSkipped++ @@ -2304,8 +2304,9 @@ func (s *Syncer) processStorageResponse(res *storageResponse) { // outdated during the sync, but it can be fixed later during the // snapshot generation. for j := 0; j < len(res.hashes[i]); j++ { - rawdb.WriteStorageSnapshot(batch, account, res.hashes[i][j], res.slots[i][j]) - + if !s.isPartialSync() { + rawdb.WriteStorageSnapshot(batch, account, res.hashes[i][j], res.slots[i][j]) + } // If we're storing large contracts, generate the trie nodes // on the fly to not trash the gluing points if i == len(res.hashes)-1 && res.subTask != nil { @@ -2508,7 +2509,9 @@ func (s *Syncer) forwardAccountTask(task *accountTask) { break } slim := types.SlimAccountRLP(*res.accounts[i]) - rawdb.WriteAccountSnapshot(batch, hash, slim) + if !s.isPartialSync() { + rawdb.WriteAccountSnapshot(batch, hash, slim) + } if !task.needHeal[i] { // If the storage task is complete, drop it into the stack trie @@ -3149,7 +3152,9 @@ func (s *Syncer) onHealState(paths [][]byte, value []byte) error { return nil // Returning the error here would drop the remote peer } blob := types.SlimAccountRLP(account) - rawdb.WriteAccountSnapshot(s.stateWriter, common.BytesToHash(paths[0]), blob) + if !s.isPartialSync() { + rawdb.WriteAccountSnapshot(s.stateWriter, common.BytesToHash(paths[0]), blob) + } s.accountHealed += 1 s.accountHealedBytes += common.StorageSize(1 + common.HashLength + len(blob)) } @@ -3159,11 +3164,13 @@ func (s *Syncer) onHealState(paths [][]byte, value []byte) error { // Partial sync: skip storage healing for non-tracked contracts // (accounts themselves are always synced/healed) - if isStorageSkipped(s.db, accountHash) { - return nil // Don't heal storage we intentionally skipped + if !s.shouldSyncStorage(accountHash) { + return nil // Don't heal storage for non-tracked contracts } - rawdb.WriteStorageSnapshot(s.stateWriter, accountHash, common.BytesToHash(paths[1]), value) + if !s.isPartialSync() { + rawdb.WriteStorageSnapshot(s.stateWriter, accountHash, common.BytesToHash(paths[1]), value) + } s.storageHealed += 1 s.storageHealedBytes += common.StorageSize(1 + 2*common.HashLength + len(value)) } @@ -3228,15 +3235,21 @@ func (s *Syncer) reportSyncProgress(force bool) { storage = fmt.Sprintf("%v@%v", log.FormatLogfmtUint64(s.storageSynced), s.storageBytes.TerminalString()) bytecode = fmt.Sprintf("%v@%v", log.FormatLogfmtUint64(s.bytecodeSynced), s.bytecodeBytes.TerminalString()) ) + // Guard against negative ETA (can happen when sync restarts with persisted + // progress, making the estimated total smaller than elapsed time). + eta := estTime - elapsed + if eta < 0 { + eta = 0 + } if s.isPartialSync() { log.Info("Syncing: partial state download in progress", "synced", progress, "state", synced, "accounts", accounts, "slots", storage, "slotsSkipped", s.storageSkipped, "codes", bytecode, "codesSkipped", s.bytecodeSkipped, - "eta", common.PrettyDuration(estTime-elapsed)) + "eta", common.PrettyDuration(eta)) } else { log.Info("Syncing: state download in progress", "synced", progress, "state", synced, - "accounts", accounts, "slots", storage, "codes", bytecode, "eta", common.PrettyDuration(estTime-elapsed)) + "accounts", accounts, "slots", storage, "codes", bytecode, "eta", common.PrettyDuration(eta)) } } diff --git a/eth/protocols/snap/sync_partial_integration_test.go b/eth/protocols/snap/sync_partial_integration_test.go index a01e63b436..3457030028 100644 --- a/eth/protocols/snap/sync_partial_integration_test.go +++ b/eth/protocols/snap/sync_partial_integration_test.go @@ -172,16 +172,17 @@ func testPartialSyncAllAccounts(t *testing.T, scheme string) { } } -// TestPartialSyncSkipMarkers verifies that skip markers are correctly written -// for accounts whose storage was intentionally skipped. -func TestPartialSyncSkipMarkers(t *testing.T) { +// TestPartialSyncFilterBehavior verifies that the filter correctly identifies +// tracked vs untracked accounts and that storage is only synced for tracked ones. +// Note: Skip markers are no longer used - the filter is checked directly during healing. +func TestPartialSyncFilterBehavior(t *testing.T) { t.Parallel() - testPartialSyncSkipMarkers(t, rawdb.HashScheme) - testPartialSyncSkipMarkers(t, rawdb.PathScheme) + testPartialSyncFilterBehavior(t, rawdb.HashScheme) + testPartialSyncFilterBehavior(t, rawdb.PathScheme) } -func testPartialSyncSkipMarkers(t *testing.T, scheme string) { +func testPartialSyncFilterBehavior(t *testing.T, scheme string) { var ( once sync.Once cancel = make(chan struct{}) @@ -219,25 +220,35 @@ func testPartialSyncSkipMarkers(t *testing.T, scheme string) { } close(done) - // Count skip markers - skippedCount := 0 + // Verify filter correctly identifies tracked vs untracked accounts + trackedSet := make(map[common.Hash]struct{}) + for _, h := range trackedHashes { + trackedSet[h] = struct{}{} + } + trackedCount := 0 + untrackedCount := 0 for _, elem := range elems { accountHash := common.BytesToHash(elem.k) - if isStorageSkipped(stateDb, accountHash) { - skippedCount++ - } else { + if syncer.shouldSyncStorage(accountHash) { trackedCount++ + if _, ok := trackedSet[accountHash]; !ok { + t.Errorf("Filter says sync storage for %s but it's not in tracked set", accountHash.Hex()[:10]) + } + } else { + untrackedCount++ + if _, ok := trackedSet[accountHash]; ok { + t.Errorf("Filter says skip storage for %s but it's in tracked set", accountHash.Hex()[:10]) + } } } - // We tracked 3, so 7 should have skip markers - expectedSkipped := numAccounts - len(trackedHashes) - if skippedCount != expectedSkipped { - t.Errorf("Expected %d skip markers, got %d", expectedSkipped, skippedCount) - } if trackedCount != len(trackedHashes) { - t.Errorf("Expected %d tracked (no skip marker), got %d", len(trackedHashes), trackedCount) + t.Errorf("Expected filter to identify %d tracked, got %d", len(trackedHashes), trackedCount) + } + expectedUntracked := numAccounts - len(trackedHashes) + if untrackedCount != expectedUntracked { + t.Errorf("Expected filter to identify %d untracked, got %d", expectedUntracked, untrackedCount) } } @@ -697,11 +708,8 @@ func verifyPartialSync(t *testing.T, scheme string, db ethdb.KeyValueStore, root } } } else { - // Untracked should have skip marker - if !isStorageSkipped(db, accountHash) { - t.Errorf("Untracked account %s should have skip marker", accountHash.Hex()[:10]) - } - // And should not have storage + // Untracked should not have storage (skip markers are no longer used, + // the filter is checked directly during healing) if err == nil { storeIt := trie.NewIterator(storageTrie.MustNodeIterator(nil)) slots := 0