From b82f9fea0752d0e9ca73f86d1d78894c5371a3a4 Mon Sep 17 00:00:00 2001 From: CPerezz Date: Mon, 2 Feb 2026 13:47:48 +0100 Subject: [PATCH] eth/protocols/snap: implement partial sync mode with skip markers Adds partial sync mode to the snap syncer that filters which contracts have their storage and bytecode synced based on the configured filter. Key changes: - Syncer accepts optional ContractFilter for partial mode - Skip markers (SnapSkipped prefix) track intentionally skipped accounts - processAccountResponse checks filter before requesting storage/code - Healing phase uses NewPartialStateSync to respect skip markers - Helper functions for skip marker persistence (mark/check/delete) When partial sync is active, only tracked contracts have their storage synced, reducing sync size from ~1TB+ to ~30-40GB while maintaining a complete account trie for balance queries. Part of partial statefulness Phase 2. --- eth/protocols/snap/sync.go | 86 ++++++++-- eth/protocols/snap/sync_partial.go | 83 ++++++++++ eth/protocols/snap/sync_partial_test.go | 211 ++++++++++++++++++++++++ eth/protocols/snap/sync_test.go | 2 +- 4 files changed, 366 insertions(+), 16 deletions(-) create mode 100644 eth/protocols/snap/sync_partial.go create mode 100644 eth/protocols/snap/sync_partial_test.go diff --git a/eth/protocols/snap/sync.go b/eth/protocols/snap/sync.go index 841bfb446e..a795125df8 100644 --- a/eth/protocols/snap/sync.go +++ b/eth/protocols/snap/sync.go @@ -33,6 +33,7 @@ import ( "github.com/ethereum/go-ethereum/common/math" "github.com/ethereum/go-ethereum/core/rawdb" "github.com/ethereum/go-ethereum/core/state" + "github.com/ethereum/go-ethereum/core/state/partial" "github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/crypto" "github.com/ethereum/go-ethereum/ethdb" @@ -445,6 +446,11 @@ type Syncer struct { db ethdb.KeyValueStore // Database to store the trie nodes into (and dedup) scheme string // Node scheme used in node database + // Partial state filter (nil = sync everything, i.e., full node) + // When set, only accounts in the filter have their storage/bytecode synced. + // ALL accounts are always synced - only storage and bytecode are filtered. + filter partial.ContractFilter + root common.Hash // Current state trie root being synced tasks []*accountTask // Current account task set being synced snapped bool // Flag to signal that snap phase is done @@ -512,11 +518,14 @@ type Syncer struct { } // NewSyncer creates a new snapshot syncer to download the Ethereum state over the -// snap protocol. -func NewSyncer(db ethdb.KeyValueStore, scheme string) *Syncer { +// snap protocol. The optional filter parameter enables partial statefulness mode +// where only configured contracts have their storage and bytecode synced. +// Pass nil for full node behavior (sync everything). +func NewSyncer(db ethdb.KeyValueStore, scheme string, filter partial.ContractFilter) *Syncer { return &Syncer{ db: db, scheme: scheme, + filter: filter, peers: make(map[string]SyncPeer), peerJoin: new(event.Feed), @@ -609,8 +618,27 @@ func (s *Syncer) Sync(root common.Hash, cancel chan struct{}) error { // any peers and initialize the syncer if it was not yet run s.lock.Lock() s.root = root + + // Create the state sync scheduler. For partial sync, use the filtered version + // that skips storage/code healing for non-tracked contracts. + var scheduler *trie.Sync + if s.isPartialSync() { + // Create filter callbacks that check skip markers in the database + shouldSyncStorage := func(accountHash common.Hash) bool { + return !isStorageSkipped(s.db, accountHash) + } + shouldSyncCode := func(accountHash common.Hash) bool { + // For now, use the same logic as storage (skip if storage is skipped) + // This could be refined to have separate skip markers for code + return !isStorageSkipped(s.db, accountHash) + } + scheduler = state.NewPartialStateSync(root, s.db, s.onHealState, s.scheme, shouldSyncStorage, shouldSyncCode) + } else { + scheduler = state.NewStateSync(root, s.db, s.onHealState, s.scheme) + } + s.healer = &healTask{ - scheduler: state.NewStateSync(root, s.db, s.onHealState, s.scheme), + scheduler: scheduler, trieTasks: make(map[string]common.Hash), codeTasks: make(map[common.Hash]struct{}), } @@ -1938,28 +1966,46 @@ func (s *Syncer) processAccountResponse(res *accountResponse) { res.task.pend = 0 for i, account := range res.accounts { + accountHash := res.hashes[i] + // Check if the account is a contract with an unknown code if !bytes.Equal(account.CodeHash, types.EmptyCodeHash.Bytes()) { if !rawdb.HasCodeWithPrefix(s.db, common.BytesToHash(account.CodeHash)) { - res.task.codeTasks[common.BytesToHash(account.CodeHash)] = struct{}{} - res.task.needCode[i] = true - res.task.pend++ + // Partial sync: check if we should sync this contract's bytecode + if s.shouldSyncCode(accountHash) { + res.task.codeTasks[common.BytesToHash(account.CodeHash)] = struct{}{} + res.task.needCode[i] = true + res.task.pend++ + } else { + // Skip bytecode for non-tracked contracts + bytecodeSkippedMeter.Mark(1) + } } } // Check if the account is a contract with an unknown storage trie if account.Root != types.EmptyRootHash { + // Partial sync: check if we should sync this contract's storage + if !s.shouldSyncStorage(accountHash) { + // Skip storage for non-tracked contracts + // Mark as skipped so healing phase knows not to try healing this storage + markStorageSkipped(s.db, accountHash, account.Root) + res.task.stateCompleted[accountHash] = struct{}{} + storageSkippedMeter.Mark(1) + continue + } + // If the storage was already retrieved in the last cycle, there's no need // to resync it again, regardless of whether the storage root is consistent // or not. - if _, exist := res.task.stateCompleted[res.hashes[i]]; exist { + if _, exist := res.task.stateCompleted[accountHash]; exist { // The leftover storage tasks are not expected, unless system is // very wrong. - if _, ok := res.task.SubTasks[res.hashes[i]]; ok { - panic(fmt.Errorf("unexpected leftover storage tasks, owner: %x", res.hashes[i])) + if _, ok := res.task.SubTasks[accountHash]; ok { + panic(fmt.Errorf("unexpected leftover storage tasks, owner: %x", accountHash)) } // Mark the healing tag if storage root node is inconsistent, or // it's non-existent due to storage chunking. - if !rawdb.HasTrieNode(s.db, res.hashes[i], nil, account.Root, s.scheme) { + if !rawdb.HasTrieNode(s.db, accountHash, nil, account.Root, s.scheme) { res.task.needHeal[i] = true } } else { @@ -1967,20 +2013,20 @@ func (s *Syncer) processAccountResponse(res *accountResponse) { // don't restart it from scratch. This happens if a sync cycle // is interrupted and resumed later. However, *do* update the // previous root hash. - if subtasks, ok := res.task.SubTasks[res.hashes[i]]; ok { - log.Debug("Resuming large storage retrieval", "account", res.hashes[i], "root", account.Root) + if subtasks, ok := res.task.SubTasks[accountHash]; ok { + log.Debug("Resuming large storage retrieval", "account", accountHash, "root", account.Root) for _, subtask := range subtasks { subtask.root = account.Root } res.task.needHeal[i] = true - resumed[res.hashes[i]] = struct{}{} + resumed[accountHash] = struct{}{} largeStorageResumedGauge.Inc(1) } else { // It's possible that in the hash scheme, the storage, along // with the trie nodes of the given root, is already present // in the database. Schedule the storage task anyway to simplify // the logic here. - res.task.stateTasks[res.hashes[i]] = account.Root + res.task.stateTasks[accountHash] = account.Root } res.task.needState[i] = true res.task.pend++ @@ -3090,6 +3136,7 @@ func (s *Syncer) onHealByteCodes(peer SyncPeer, id uint64, bytecodes [][]byte) e // Note it's not concurrent safe, please handle the concurrent issue outside. func (s *Syncer) onHealState(paths [][]byte, value []byte) error { if len(paths) == 1 { + // Account trie leaf - ALWAYS process (never skip accounts) var account types.StateAccount if err := rlp.DecodeBytes(value, &account); err != nil { return nil // Returning the error here would drop the remote peer @@ -3100,7 +3147,16 @@ func (s *Syncer) onHealState(paths [][]byte, value []byte) error { s.accountHealedBytes += common.StorageSize(1 + common.HashLength + len(blob)) } if len(paths) == 2 { - rawdb.WriteStorageSnapshot(s.stateWriter, common.BytesToHash(paths[0]), common.BytesToHash(paths[1]), value) + // Storage trie leaf + accountHash := common.BytesToHash(paths[0]) + + // Partial sync: skip storage healing for non-tracked contracts + // (accounts themselves are always synced/healed) + if isStorageSkipped(s.db, accountHash) { + return nil // Don't heal storage we intentionally skipped + } + + rawdb.WriteStorageSnapshot(s.stateWriter, accountHash, common.BytesToHash(paths[1]), value) s.storageHealed += 1 s.storageHealedBytes += common.StorageSize(1 + 2*common.HashLength + len(value)) } diff --git a/eth/protocols/snap/sync_partial.go b/eth/protocols/snap/sync_partial.go new file mode 100644 index 0000000000..90abc59768 --- /dev/null +++ b/eth/protocols/snap/sync_partial.go @@ -0,0 +1,83 @@ +// Copyright 2025 The go-ethereum Authors +// This file is part of the go-ethereum library. +// +// The go-ethereum library is free software: you can redistribute it and/or modify +// it under the terms of the GNU Lesser General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// The go-ethereum library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the go-ethereum library. If not, see . + +package snap + +import ( + "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/ethdb" + "github.com/ethereum/go-ethereum/metrics" +) + +// Database key prefix for tracking intentionally skipped storage during partial sync. +// These markers allow the healing phase to know which accounts had storage intentionally +// skipped (vs. accounts that need storage healing due to sync interruption). +var skippedStoragePrefix = []byte("SnapSkipped") + +// Metrics for partial sync progress tracking +var ( + storageSkippedMeter = metrics.NewRegisteredMeter("snap/sync/storage/skipped", nil) + bytecodeSkippedMeter = metrics.NewRegisteredMeter("snap/sync/bytecode/skipped", nil) +) + +// skippedStorageKey returns the database key for a skipped storage marker. +// The key format is: skippedStoragePrefix + accountHash (32 bytes) +func skippedStorageKey(accountHash common.Hash) []byte { + return append(skippedStoragePrefix, accountHash.Bytes()...) +} + +// markStorageSkipped records that storage was intentionally skipped for an account. +// This is used during partial sync to skip storage for contracts not in the configured list. +// The storageRoot is stored so we can verify consistency if needed. +func markStorageSkipped(db ethdb.KeyValueWriter, accountHash common.Hash, storageRoot common.Hash) { + db.Put(skippedStorageKey(accountHash), storageRoot.Bytes()) +} + +// isStorageSkipped checks if storage was intentionally skipped for an account. +// Returns true if this account's storage was skipped during partial sync. +func isStorageSkipped(db ethdb.KeyValueReader, accountHash common.Hash) bool { + has, _ := db.Has(skippedStorageKey(accountHash)) + return has +} + +// deleteStorageSkipped removes the skip marker for an account. +// Used during cleanup or when re-syncing with different configuration. +func deleteStorageSkipped(db ethdb.KeyValueWriter, accountHash common.Hash) { + db.Delete(skippedStorageKey(accountHash)) +} + +// shouldSyncStorage returns true if storage should be synced for this account hash. +// If no filter is configured (filter == nil), all storage is synced (full node behavior). +func (s *Syncer) shouldSyncStorage(accountHash common.Hash) bool { + if s.filter == nil { + return true // No filter = sync everything (full node) + } + return s.filter.ShouldSyncStorageByHash(accountHash) +} + +// shouldSyncCode returns true if bytecode should be synced for this account hash. +// If no filter is configured (filter == nil), all bytecode is synced (full node behavior). +func (s *Syncer) shouldSyncCode(accountHash common.Hash) bool { + if s.filter == nil { + return true // No filter = sync everything (full node) + } + return s.filter.ShouldSyncCodeByHash(accountHash) +} + +// isPartialSync returns true if partial sync mode is active. +func (s *Syncer) isPartialSync() bool { + return s.filter != nil +} diff --git a/eth/protocols/snap/sync_partial_test.go b/eth/protocols/snap/sync_partial_test.go new file mode 100644 index 0000000000..95c8e1eda8 --- /dev/null +++ b/eth/protocols/snap/sync_partial_test.go @@ -0,0 +1,211 @@ +// Copyright 2025 The go-ethereum Authors +// This file is part of the go-ethereum library. +// +// The go-ethereum library is free software: you can redistribute it and/or modify +// it under the terms of the GNU Lesser General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// The go-ethereum library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the go-ethereum library. If not, see . + +package snap + +import ( + "testing" + + "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/core/rawdb" + "github.com/ethereum/go-ethereum/core/state/partial" + "github.com/ethereum/go-ethereum/crypto" +) + +func TestPartialSyncFilterStorage(t *testing.T) { + // Create filter with specific contracts + tracked := []common.Address{ + common.HexToAddress("0xC02aaA39b223FE8D0A0e5C4F27eAD9083C756Cc2"), // WETH + common.HexToAddress("0xA0b86991c6218b36c1d19D4a2e9Eb0cE3606eB48"), // USDC + } + filter := partial.NewConfiguredFilter(tracked) + + // Verify tracked contracts pass filter by address + for _, addr := range tracked { + if !filter.ShouldSyncStorage(addr) { + t.Errorf("Tracked contract %s should pass storage filter", addr.Hex()) + } + if !filter.ShouldSyncCode(addr) { + t.Errorf("Tracked contract %s should pass code filter", addr.Hex()) + } + if !filter.IsTracked(addr) { + t.Errorf("Tracked contract %s should be marked as tracked", addr.Hex()) + } + } + + // Verify untracked contracts are filtered + untracked := common.HexToAddress("0x1234567890123456789012345678901234567890") + if filter.ShouldSyncStorage(untracked) { + t.Error("Untracked contract should be filtered for storage") + } + if filter.ShouldSyncCode(untracked) { + t.Error("Untracked contract should be filtered for code") + } + if filter.IsTracked(untracked) { + t.Error("Untracked contract should not be marked as tracked") + } + + // Verify hash-based filter works + for _, addr := range tracked { + trackedHash := crypto.Keccak256Hash(addr.Bytes()) + if !filter.ShouldSyncStorageByHash(trackedHash) { + t.Errorf("Tracked contract hash %s should pass storage filter", trackedHash.Hex()) + } + if !filter.ShouldSyncCodeByHash(trackedHash) { + t.Errorf("Tracked contract hash %s should pass code filter", trackedHash.Hex()) + } + } + + // Verify untracked hash is filtered + untrackedHash := crypto.Keccak256Hash(untracked.Bytes()) + if filter.ShouldSyncStorageByHash(untrackedHash) { + t.Error("Untracked contract hash should be filtered for storage") + } + if filter.ShouldSyncCodeByHash(untrackedHash) { + t.Error("Untracked contract hash should be filtered for code") + } +} + +func TestAllowAllFilter(t *testing.T) { + filter := &partial.AllowAllFilter{} + + // Any address should pass + testAddresses := []common.Address{ + common.HexToAddress("0xC02aaA39b223FE8D0A0e5C4F27eAD9083C756Cc2"), + common.HexToAddress("0x1234567890123456789012345678901234567890"), + common.HexToAddress("0x0000000000000000000000000000000000000000"), + } + + for _, addr := range testAddresses { + if !filter.ShouldSyncStorage(addr) { + t.Errorf("AllowAllFilter should allow storage for %s", addr.Hex()) + } + if !filter.ShouldSyncCode(addr) { + t.Errorf("AllowAllFilter should allow code for %s", addr.Hex()) + } + if !filter.IsTracked(addr) { + t.Errorf("AllowAllFilter should mark %s as tracked", addr.Hex()) + } + + hash := crypto.Keccak256Hash(addr.Bytes()) + if !filter.ShouldSyncStorageByHash(hash) { + t.Errorf("AllowAllFilter should allow storage by hash for %s", hash.Hex()) + } + if !filter.ShouldSyncCodeByHash(hash) { + t.Errorf("AllowAllFilter should allow code by hash for %s", hash.Hex()) + } + } +} + +func TestSkipMarkerPersistence(t *testing.T) { + db := rawdb.NewMemoryDatabase() + accountHash := common.HexToHash("0x1234567890abcdef1234567890abcdef1234567890abcdef1234567890abcdef") + storageRoot := common.HexToHash("0xabcdef1234567890abcdef1234567890abcdef1234567890abcdef1234567890") + + // Initially not skipped + if isStorageSkipped(db, accountHash) { + t.Error("Account should not be marked as skipped initially") + } + + // Mark as skipped + markStorageSkipped(db, accountHash, storageRoot) + + // Verify marker persists + if !isStorageSkipped(db, accountHash) { + t.Error("Skip marker should persist after write") + } + + // Delete and verify + deleteStorageSkipped(db, accountHash) + if isStorageSkipped(db, accountHash) { + t.Error("Skip marker should be removed after delete") + } +} + +func TestSyncerFilterMethods(t *testing.T) { + db := rawdb.NewMemoryDatabase() + + // Test with nil filter (full node mode) + syncer := NewSyncer(db, rawdb.HashScheme, nil) + anyHash := common.HexToHash("0x1234567890abcdef1234567890abcdef1234567890abcdef1234567890abcdef") + + if !syncer.shouldSyncStorage(anyHash) { + t.Error("Nil filter should sync all storage") + } + if !syncer.shouldSyncCode(anyHash) { + t.Error("Nil filter should sync all code") + } + if syncer.isPartialSync() { + t.Error("Nil filter means not in partial sync mode") + } + + // Test with configured filter (partial mode) + tracked := []common.Address{ + common.HexToAddress("0xC02aaA39b223FE8D0A0e5C4F27eAD9083C756Cc2"), + } + filter := partial.NewConfiguredFilter(tracked) + partialSyncer := NewSyncer(db, rawdb.HashScheme, filter) + + if !partialSyncer.isPartialSync() { + t.Error("Configured filter should indicate partial sync mode") + } + + // Tracked contract should pass + trackedHash := crypto.Keccak256Hash(tracked[0].Bytes()) + if !partialSyncer.shouldSyncStorage(trackedHash) { + t.Error("Tracked contract should pass storage filter") + } + if !partialSyncer.shouldSyncCode(trackedHash) { + t.Error("Tracked contract should pass code filter") + } + + // Untracked contract should be filtered + untrackedHash := crypto.Keccak256Hash(common.HexToAddress("0x1234").Bytes()) + if partialSyncer.shouldSyncStorage(untrackedHash) { + t.Error("Untracked contract should be filtered for storage") + } + if partialSyncer.shouldSyncCode(untrackedHash) { + t.Error("Untracked contract should be filtered for code") + } +} + +func TestConfiguredFilterContracts(t *testing.T) { + tracked := []common.Address{ + common.HexToAddress("0xC02aaA39b223FE8D0A0e5C4F27eAD9083C756Cc2"), + common.HexToAddress("0xA0b86991c6218b36c1d19D4a2e9Eb0cE3606eB48"), + } + filter := partial.NewConfiguredFilter(tracked) + + // Verify Contracts() returns all tracked addresses + contracts := filter.Contracts() + if len(contracts) != len(tracked) { + t.Errorf("Expected %d contracts, got %d", len(tracked), len(contracts)) + } + + // Check all tracked are in result (order may differ) + for _, addr := range tracked { + found := false + for _, c := range contracts { + if c == addr { + found = true + break + } + } + if !found { + t.Errorf("Contract %s not found in Contracts() result", addr.Hex()) + } + } +} diff --git a/eth/protocols/snap/sync_test.go b/eth/protocols/snap/sync_test.go index b11ad4e78a..a6635f5b8d 100644 --- a/eth/protocols/snap/sync_test.go +++ b/eth/protocols/snap/sync_test.go @@ -624,7 +624,7 @@ func testSyncBloatedProof(t *testing.T, scheme string) { func setupSyncer(scheme string, peers ...*testPeer) *Syncer { stateDb := rawdb.NewMemoryDatabase() - syncer := NewSyncer(stateDb, scheme) + syncer := NewSyncer(stateDb, scheme, nil) for _, peer := range peers { syncer.Register(peer) peer.remote = syncer