eth/protocols/snap: refactor partial filter and fix sync bugs

Refactor partial state filter from DB skip markers to direct filter
checks via shouldSyncStorage()/shouldSyncCode(), avoiding stale marker
issues across sync cycles.

Additional fixes:
- Skip WriteAccountSnapshot/WriteStorageSnapshot in partial mode
  (forwardAccountTask, processStorageResponse, onHealState)
- Guard against negative ETA in reportSyncProgress when sync restarts
  with persisted progress counters
- Add break after forwardAccountTask in cleanStorageTasks to prevent
  nil pointer when task.res is cleared
- Add diagnostic log in assignAccountTasks when no idle peers available
This commit is contained in:
CPerezz 2026-02-08 00:48:54 +01:00
parent e48ede038d
commit 2a1747c07e
No known key found for this signature in database
GPG key ID: 62045F34B97177DD
2 changed files with 60 additions and 39 deletions

View file

@ -626,14 +626,13 @@ func (s *Syncer) Sync(root common.Hash, cancel chan struct{}) error {
// that skips storage/code healing for non-tracked contracts. // that skips storage/code healing for non-tracked contracts.
var scheduler *trie.Sync var scheduler *trie.Sync
if s.isPartialSync() { if s.isPartialSync() {
// Create filter callbacks that check skip markers in the database // Create filter callbacks that use the filter directly (not DB markers).
// This avoids stale marker issues across sync cycles.
shouldSyncStorage := func(accountHash common.Hash) bool { shouldSyncStorage := func(accountHash common.Hash) bool {
return !isStorageSkipped(s.db, accountHash) return s.shouldSyncStorage(accountHash)
} }
shouldSyncCode := func(accountHash common.Hash) bool { shouldSyncCode := func(accountHash common.Hash) bool {
// For now, use the same logic as storage (skip if storage is skipped) return s.shouldSyncCode(accountHash)
// This could be refined to have separate skip markers for code
return !isStorageSkipped(s.db, accountHash)
} }
scheduler = state.NewPartialStateSync(root, s.db, s.onHealState, s.scheme, shouldSyncStorage, shouldSyncCode) scheduler = state.NewPartialStateSync(root, s.db, s.onHealState, s.scheme, shouldSyncStorage, shouldSyncCode)
log.Info("Starting partial state snap sync", "root", root) log.Info("Starting partial state snap sync", "root", root)
@ -1043,6 +1042,7 @@ func (s *Syncer) cleanStorageTasks() {
// If this was the last pending task, forward the account task // If this was the last pending task, forward the account task
if task.pend == 0 { if task.pend == 0 {
s.forwardAccountTask(task) s.forwardAccountTask(task)
break // task.res is now nil, remaining SubTasks handled next cycle
} }
} }
} }
@ -1068,6 +1068,7 @@ func (s *Syncer) assignAccountTasks(success chan *accountResponse, fail chan *ac
idlers.caps = append(idlers.caps, s.rates.Capacity(id, AccountRangeMsg, targetTTL)) idlers.caps = append(idlers.caps, s.rates.Capacity(id, AccountRangeMsg, targetTTL))
} }
if len(idlers.ids) == 0 { if len(idlers.ids) == 0 {
log.Debug("No idle peers for account sync", "registered", len(s.peers), "idlers", len(s.accountIdlers), "stateless", len(s.statelessPeers), "tasks", len(s.tasks), "accountReqs", len(s.accountReqs))
return return
} }
sort.Sort(sort.Reverse(idlers)) sort.Sort(sort.Reverse(idlers))
@ -1992,9 +1993,8 @@ func (s *Syncer) processAccountResponse(res *accountResponse) {
if account.Root != types.EmptyRootHash { if account.Root != types.EmptyRootHash {
// Partial sync: check if we should sync this contract's storage // Partial sync: check if we should sync this contract's storage
if !s.shouldSyncStorage(accountHash) { if !s.shouldSyncStorage(accountHash) {
// Skip storage for non-tracked contracts // Skip storage for non-tracked contracts. The healing phase uses
// Mark as skipped so healing phase knows not to try healing this storage // the same filter check, so no DB markers needed.
markStorageSkipped(s.db, accountHash, account.Root)
res.task.stateCompleted[accountHash] = struct{}{} res.task.stateCompleted[accountHash] = struct{}{}
storageSkippedMeter.Mark(1) storageSkippedMeter.Mark(1)
s.storageSkipped++ s.storageSkipped++
@ -2304,8 +2304,9 @@ func (s *Syncer) processStorageResponse(res *storageResponse) {
// outdated during the sync, but it can be fixed later during the // outdated during the sync, but it can be fixed later during the
// snapshot generation. // snapshot generation.
for j := 0; j < len(res.hashes[i]); j++ { for j := 0; j < len(res.hashes[i]); j++ {
rawdb.WriteStorageSnapshot(batch, account, res.hashes[i][j], res.slots[i][j]) if !s.isPartialSync() {
rawdb.WriteStorageSnapshot(batch, account, res.hashes[i][j], res.slots[i][j])
}
// If we're storing large contracts, generate the trie nodes // If we're storing large contracts, generate the trie nodes
// on the fly to not trash the gluing points // on the fly to not trash the gluing points
if i == len(res.hashes)-1 && res.subTask != nil { if i == len(res.hashes)-1 && res.subTask != nil {
@ -2508,7 +2509,9 @@ func (s *Syncer) forwardAccountTask(task *accountTask) {
break break
} }
slim := types.SlimAccountRLP(*res.accounts[i]) slim := types.SlimAccountRLP(*res.accounts[i])
rawdb.WriteAccountSnapshot(batch, hash, slim) if !s.isPartialSync() {
rawdb.WriteAccountSnapshot(batch, hash, slim)
}
if !task.needHeal[i] { if !task.needHeal[i] {
// If the storage task is complete, drop it into the stack trie // If the storage task is complete, drop it into the stack trie
@ -3149,7 +3152,9 @@ func (s *Syncer) onHealState(paths [][]byte, value []byte) error {
return nil // Returning the error here would drop the remote peer return nil // Returning the error here would drop the remote peer
} }
blob := types.SlimAccountRLP(account) blob := types.SlimAccountRLP(account)
rawdb.WriteAccountSnapshot(s.stateWriter, common.BytesToHash(paths[0]), blob) if !s.isPartialSync() {
rawdb.WriteAccountSnapshot(s.stateWriter, common.BytesToHash(paths[0]), blob)
}
s.accountHealed += 1 s.accountHealed += 1
s.accountHealedBytes += common.StorageSize(1 + common.HashLength + len(blob)) s.accountHealedBytes += common.StorageSize(1 + common.HashLength + len(blob))
} }
@ -3159,11 +3164,13 @@ func (s *Syncer) onHealState(paths [][]byte, value []byte) error {
// Partial sync: skip storage healing for non-tracked contracts // Partial sync: skip storage healing for non-tracked contracts
// (accounts themselves are always synced/healed) // (accounts themselves are always synced/healed)
if isStorageSkipped(s.db, accountHash) { if !s.shouldSyncStorage(accountHash) {
return nil // Don't heal storage we intentionally skipped return nil // Don't heal storage for non-tracked contracts
} }
rawdb.WriteStorageSnapshot(s.stateWriter, accountHash, common.BytesToHash(paths[1]), value) if !s.isPartialSync() {
rawdb.WriteStorageSnapshot(s.stateWriter, accountHash, common.BytesToHash(paths[1]), value)
}
s.storageHealed += 1 s.storageHealed += 1
s.storageHealedBytes += common.StorageSize(1 + 2*common.HashLength + len(value)) s.storageHealedBytes += common.StorageSize(1 + 2*common.HashLength + len(value))
} }
@ -3228,15 +3235,21 @@ func (s *Syncer) reportSyncProgress(force bool) {
storage = fmt.Sprintf("%v@%v", log.FormatLogfmtUint64(s.storageSynced), s.storageBytes.TerminalString()) storage = fmt.Sprintf("%v@%v", log.FormatLogfmtUint64(s.storageSynced), s.storageBytes.TerminalString())
bytecode = fmt.Sprintf("%v@%v", log.FormatLogfmtUint64(s.bytecodeSynced), s.bytecodeBytes.TerminalString()) bytecode = fmt.Sprintf("%v@%v", log.FormatLogfmtUint64(s.bytecodeSynced), s.bytecodeBytes.TerminalString())
) )
// Guard against negative ETA (can happen when sync restarts with persisted
// progress, making the estimated total smaller than elapsed time).
eta := estTime - elapsed
if eta < 0 {
eta = 0
}
if s.isPartialSync() { if s.isPartialSync() {
log.Info("Syncing: partial state download in progress", "synced", progress, "state", synced, log.Info("Syncing: partial state download in progress", "synced", progress, "state", synced,
"accounts", accounts, "accounts", accounts,
"slots", storage, "slotsSkipped", s.storageSkipped, "slots", storage, "slotsSkipped", s.storageSkipped,
"codes", bytecode, "codesSkipped", s.bytecodeSkipped, "codes", bytecode, "codesSkipped", s.bytecodeSkipped,
"eta", common.PrettyDuration(estTime-elapsed)) "eta", common.PrettyDuration(eta))
} else { } else {
log.Info("Syncing: state download in progress", "synced", progress, "state", synced, log.Info("Syncing: state download in progress", "synced", progress, "state", synced,
"accounts", accounts, "slots", storage, "codes", bytecode, "eta", common.PrettyDuration(estTime-elapsed)) "accounts", accounts, "slots", storage, "codes", bytecode, "eta", common.PrettyDuration(eta))
} }
} }

View file

@ -172,16 +172,17 @@ func testPartialSyncAllAccounts(t *testing.T, scheme string) {
} }
} }
// TestPartialSyncSkipMarkers verifies that skip markers are correctly written // TestPartialSyncFilterBehavior verifies that the filter correctly identifies
// for accounts whose storage was intentionally skipped. // tracked vs untracked accounts and that storage is only synced for tracked ones.
func TestPartialSyncSkipMarkers(t *testing.T) { // Note: Skip markers are no longer used - the filter is checked directly during healing.
func TestPartialSyncFilterBehavior(t *testing.T) {
t.Parallel() t.Parallel()
testPartialSyncSkipMarkers(t, rawdb.HashScheme) testPartialSyncFilterBehavior(t, rawdb.HashScheme)
testPartialSyncSkipMarkers(t, rawdb.PathScheme) testPartialSyncFilterBehavior(t, rawdb.PathScheme)
} }
func testPartialSyncSkipMarkers(t *testing.T, scheme string) { func testPartialSyncFilterBehavior(t *testing.T, scheme string) {
var ( var (
once sync.Once once sync.Once
cancel = make(chan struct{}) cancel = make(chan struct{})
@ -219,25 +220,35 @@ func testPartialSyncSkipMarkers(t *testing.T, scheme string) {
} }
close(done) close(done)
// Count skip markers // Verify filter correctly identifies tracked vs untracked accounts
skippedCount := 0 trackedSet := make(map[common.Hash]struct{})
for _, h := range trackedHashes {
trackedSet[h] = struct{}{}
}
trackedCount := 0 trackedCount := 0
untrackedCount := 0
for _, elem := range elems { for _, elem := range elems {
accountHash := common.BytesToHash(elem.k) accountHash := common.BytesToHash(elem.k)
if isStorageSkipped(stateDb, accountHash) { if syncer.shouldSyncStorage(accountHash) {
skippedCount++
} else {
trackedCount++ trackedCount++
if _, ok := trackedSet[accountHash]; !ok {
t.Errorf("Filter says sync storage for %s but it's not in tracked set", accountHash.Hex()[:10])
}
} else {
untrackedCount++
if _, ok := trackedSet[accountHash]; ok {
t.Errorf("Filter says skip storage for %s but it's in tracked set", accountHash.Hex()[:10])
}
} }
} }
// We tracked 3, so 7 should have skip markers
expectedSkipped := numAccounts - len(trackedHashes)
if skippedCount != expectedSkipped {
t.Errorf("Expected %d skip markers, got %d", expectedSkipped, skippedCount)
}
if trackedCount != len(trackedHashes) { if trackedCount != len(trackedHashes) {
t.Errorf("Expected %d tracked (no skip marker), got %d", len(trackedHashes), trackedCount) t.Errorf("Expected filter to identify %d tracked, got %d", len(trackedHashes), trackedCount)
}
expectedUntracked := numAccounts - len(trackedHashes)
if untrackedCount != expectedUntracked {
t.Errorf("Expected filter to identify %d untracked, got %d", expectedUntracked, untrackedCount)
} }
} }
@ -697,11 +708,8 @@ func verifyPartialSync(t *testing.T, scheme string, db ethdb.KeyValueStore, root
} }
} }
} else { } else {
// Untracked should have skip marker // Untracked should not have storage (skip markers are no longer used,
if !isStorageSkipped(db, accountHash) { // the filter is checked directly during healing)
t.Errorf("Untracked account %s should have skip marker", accountHash.Hex()[:10])
}
// And should not have storage
if err == nil { if err == nil {
storeIt := trie.NewIterator(storageTrie.MustNodeIterator(nil)) storeIt := trie.NewIterator(storageTrie.MustNodeIterator(nil))
slots := 0 slots := 0