diff --git a/eth/protocols/snap/sync.go b/eth/protocols/snap/sync.go index 841bfb446e..a795125df8 100644 --- a/eth/protocols/snap/sync.go +++ b/eth/protocols/snap/sync.go @@ -33,6 +33,7 @@ import ( "github.com/ethereum/go-ethereum/common/math" "github.com/ethereum/go-ethereum/core/rawdb" "github.com/ethereum/go-ethereum/core/state" + "github.com/ethereum/go-ethereum/core/state/partial" "github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/crypto" "github.com/ethereum/go-ethereum/ethdb" @@ -445,6 +446,11 @@ type Syncer struct { db ethdb.KeyValueStore // Database to store the trie nodes into (and dedup) scheme string // Node scheme used in node database + // Partial state filter (nil = sync everything, i.e., full node) + // When set, only accounts in the filter have their storage/bytecode synced. + // ALL accounts are always synced - only storage and bytecode are filtered. + filter partial.ContractFilter + root common.Hash // Current state trie root being synced tasks []*accountTask // Current account task set being synced snapped bool // Flag to signal that snap phase is done @@ -512,11 +518,14 @@ type Syncer struct { } // NewSyncer creates a new snapshot syncer to download the Ethereum state over the -// snap protocol. -func NewSyncer(db ethdb.KeyValueStore, scheme string) *Syncer { +// snap protocol. The optional filter parameter enables partial statefulness mode +// where only configured contracts have their storage and bytecode synced. +// Pass nil for full node behavior (sync everything). +func NewSyncer(db ethdb.KeyValueStore, scheme string, filter partial.ContractFilter) *Syncer { return &Syncer{ db: db, scheme: scheme, + filter: filter, peers: make(map[string]SyncPeer), peerJoin: new(event.Feed), @@ -609,8 +618,27 @@ func (s *Syncer) Sync(root common.Hash, cancel chan struct{}) error { // any peers and initialize the syncer if it was not yet run s.lock.Lock() s.root = root + + // Create the state sync scheduler. For partial sync, use the filtered version + // that skips storage/code healing for non-tracked contracts. + var scheduler *trie.Sync + if s.isPartialSync() { + // Create filter callbacks that check skip markers in the database + shouldSyncStorage := func(accountHash common.Hash) bool { + return !isStorageSkipped(s.db, accountHash) + } + shouldSyncCode := func(accountHash common.Hash) bool { + // For now, use the same logic as storage (skip if storage is skipped) + // This could be refined to have separate skip markers for code + return !isStorageSkipped(s.db, accountHash) + } + scheduler = state.NewPartialStateSync(root, s.db, s.onHealState, s.scheme, shouldSyncStorage, shouldSyncCode) + } else { + scheduler = state.NewStateSync(root, s.db, s.onHealState, s.scheme) + } + s.healer = &healTask{ - scheduler: state.NewStateSync(root, s.db, s.onHealState, s.scheme), + scheduler: scheduler, trieTasks: make(map[string]common.Hash), codeTasks: make(map[common.Hash]struct{}), } @@ -1938,28 +1966,46 @@ func (s *Syncer) processAccountResponse(res *accountResponse) { res.task.pend = 0 for i, account := range res.accounts { + accountHash := res.hashes[i] + // Check if the account is a contract with an unknown code if !bytes.Equal(account.CodeHash, types.EmptyCodeHash.Bytes()) { if !rawdb.HasCodeWithPrefix(s.db, common.BytesToHash(account.CodeHash)) { - res.task.codeTasks[common.BytesToHash(account.CodeHash)] = struct{}{} - res.task.needCode[i] = true - res.task.pend++ + // Partial sync: check if we should sync this contract's bytecode + if s.shouldSyncCode(accountHash) { + res.task.codeTasks[common.BytesToHash(account.CodeHash)] = struct{}{} + res.task.needCode[i] = true + res.task.pend++ + } else { + // Skip bytecode for non-tracked contracts + bytecodeSkippedMeter.Mark(1) + } } } // Check if the account is a contract with an unknown storage trie if account.Root != types.EmptyRootHash { + // Partial sync: check if we should sync this contract's storage + if !s.shouldSyncStorage(accountHash) { + // Skip storage for non-tracked contracts + // Mark as skipped so healing phase knows not to try healing this storage + markStorageSkipped(s.db, accountHash, account.Root) + res.task.stateCompleted[accountHash] = struct{}{} + storageSkippedMeter.Mark(1) + continue + } + // If the storage was already retrieved in the last cycle, there's no need // to resync it again, regardless of whether the storage root is consistent // or not. - if _, exist := res.task.stateCompleted[res.hashes[i]]; exist { + if _, exist := res.task.stateCompleted[accountHash]; exist { // The leftover storage tasks are not expected, unless system is // very wrong. - if _, ok := res.task.SubTasks[res.hashes[i]]; ok { - panic(fmt.Errorf("unexpected leftover storage tasks, owner: %x", res.hashes[i])) + if _, ok := res.task.SubTasks[accountHash]; ok { + panic(fmt.Errorf("unexpected leftover storage tasks, owner: %x", accountHash)) } // Mark the healing tag if storage root node is inconsistent, or // it's non-existent due to storage chunking. - if !rawdb.HasTrieNode(s.db, res.hashes[i], nil, account.Root, s.scheme) { + if !rawdb.HasTrieNode(s.db, accountHash, nil, account.Root, s.scheme) { res.task.needHeal[i] = true } } else { @@ -1967,20 +2013,20 @@ func (s *Syncer) processAccountResponse(res *accountResponse) { // don't restart it from scratch. This happens if a sync cycle // is interrupted and resumed later. However, *do* update the // previous root hash. - if subtasks, ok := res.task.SubTasks[res.hashes[i]]; ok { - log.Debug("Resuming large storage retrieval", "account", res.hashes[i], "root", account.Root) + if subtasks, ok := res.task.SubTasks[accountHash]; ok { + log.Debug("Resuming large storage retrieval", "account", accountHash, "root", account.Root) for _, subtask := range subtasks { subtask.root = account.Root } res.task.needHeal[i] = true - resumed[res.hashes[i]] = struct{}{} + resumed[accountHash] = struct{}{} largeStorageResumedGauge.Inc(1) } else { // It's possible that in the hash scheme, the storage, along // with the trie nodes of the given root, is already present // in the database. Schedule the storage task anyway to simplify // the logic here. - res.task.stateTasks[res.hashes[i]] = account.Root + res.task.stateTasks[accountHash] = account.Root } res.task.needState[i] = true res.task.pend++ @@ -3090,6 +3136,7 @@ func (s *Syncer) onHealByteCodes(peer SyncPeer, id uint64, bytecodes [][]byte) e // Note it's not concurrent safe, please handle the concurrent issue outside. func (s *Syncer) onHealState(paths [][]byte, value []byte) error { if len(paths) == 1 { + // Account trie leaf - ALWAYS process (never skip accounts) var account types.StateAccount if err := rlp.DecodeBytes(value, &account); err != nil { return nil // Returning the error here would drop the remote peer @@ -3100,7 +3147,16 @@ func (s *Syncer) onHealState(paths [][]byte, value []byte) error { s.accountHealedBytes += common.StorageSize(1 + common.HashLength + len(blob)) } if len(paths) == 2 { - rawdb.WriteStorageSnapshot(s.stateWriter, common.BytesToHash(paths[0]), common.BytesToHash(paths[1]), value) + // Storage trie leaf + accountHash := common.BytesToHash(paths[0]) + + // Partial sync: skip storage healing for non-tracked contracts + // (accounts themselves are always synced/healed) + if isStorageSkipped(s.db, accountHash) { + return nil // Don't heal storage we intentionally skipped + } + + rawdb.WriteStorageSnapshot(s.stateWriter, accountHash, common.BytesToHash(paths[1]), value) s.storageHealed += 1 s.storageHealedBytes += common.StorageSize(1 + 2*common.HashLength + len(value)) } diff --git a/eth/protocols/snap/sync_partial.go b/eth/protocols/snap/sync_partial.go new file mode 100644 index 0000000000..90abc59768 --- /dev/null +++ b/eth/protocols/snap/sync_partial.go @@ -0,0 +1,83 @@ +// Copyright 2025 The go-ethereum Authors +// This file is part of the go-ethereum library. +// +// The go-ethereum library is free software: you can redistribute it and/or modify +// it under the terms of the GNU Lesser General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// The go-ethereum library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the go-ethereum library. If not, see . + +package snap + +import ( + "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/ethdb" + "github.com/ethereum/go-ethereum/metrics" +) + +// Database key prefix for tracking intentionally skipped storage during partial sync. +// These markers allow the healing phase to know which accounts had storage intentionally +// skipped (vs. accounts that need storage healing due to sync interruption). +var skippedStoragePrefix = []byte("SnapSkipped") + +// Metrics for partial sync progress tracking +var ( + storageSkippedMeter = metrics.NewRegisteredMeter("snap/sync/storage/skipped", nil) + bytecodeSkippedMeter = metrics.NewRegisteredMeter("snap/sync/bytecode/skipped", nil) +) + +// skippedStorageKey returns the database key for a skipped storage marker. +// The key format is: skippedStoragePrefix + accountHash (32 bytes) +func skippedStorageKey(accountHash common.Hash) []byte { + return append(skippedStoragePrefix, accountHash.Bytes()...) +} + +// markStorageSkipped records that storage was intentionally skipped for an account. +// This is used during partial sync to skip storage for contracts not in the configured list. +// The storageRoot is stored so we can verify consistency if needed. +func markStorageSkipped(db ethdb.KeyValueWriter, accountHash common.Hash, storageRoot common.Hash) { + db.Put(skippedStorageKey(accountHash), storageRoot.Bytes()) +} + +// isStorageSkipped checks if storage was intentionally skipped for an account. +// Returns true if this account's storage was skipped during partial sync. +func isStorageSkipped(db ethdb.KeyValueReader, accountHash common.Hash) bool { + has, _ := db.Has(skippedStorageKey(accountHash)) + return has +} + +// deleteStorageSkipped removes the skip marker for an account. +// Used during cleanup or when re-syncing with different configuration. +func deleteStorageSkipped(db ethdb.KeyValueWriter, accountHash common.Hash) { + db.Delete(skippedStorageKey(accountHash)) +} + +// shouldSyncStorage returns true if storage should be synced for this account hash. +// If no filter is configured (filter == nil), all storage is synced (full node behavior). +func (s *Syncer) shouldSyncStorage(accountHash common.Hash) bool { + if s.filter == nil { + return true // No filter = sync everything (full node) + } + return s.filter.ShouldSyncStorageByHash(accountHash) +} + +// shouldSyncCode returns true if bytecode should be synced for this account hash. +// If no filter is configured (filter == nil), all bytecode is synced (full node behavior). +func (s *Syncer) shouldSyncCode(accountHash common.Hash) bool { + if s.filter == nil { + return true // No filter = sync everything (full node) + } + return s.filter.ShouldSyncCodeByHash(accountHash) +} + +// isPartialSync returns true if partial sync mode is active. +func (s *Syncer) isPartialSync() bool { + return s.filter != nil +} diff --git a/eth/protocols/snap/sync_partial_test.go b/eth/protocols/snap/sync_partial_test.go new file mode 100644 index 0000000000..95c8e1eda8 --- /dev/null +++ b/eth/protocols/snap/sync_partial_test.go @@ -0,0 +1,211 @@ +// Copyright 2025 The go-ethereum Authors +// This file is part of the go-ethereum library. +// +// The go-ethereum library is free software: you can redistribute it and/or modify +// it under the terms of the GNU Lesser General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// The go-ethereum library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the go-ethereum library. If not, see . + +package snap + +import ( + "testing" + + "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/core/rawdb" + "github.com/ethereum/go-ethereum/core/state/partial" + "github.com/ethereum/go-ethereum/crypto" +) + +func TestPartialSyncFilterStorage(t *testing.T) { + // Create filter with specific contracts + tracked := []common.Address{ + common.HexToAddress("0xC02aaA39b223FE8D0A0e5C4F27eAD9083C756Cc2"), // WETH + common.HexToAddress("0xA0b86991c6218b36c1d19D4a2e9Eb0cE3606eB48"), // USDC + } + filter := partial.NewConfiguredFilter(tracked) + + // Verify tracked contracts pass filter by address + for _, addr := range tracked { + if !filter.ShouldSyncStorage(addr) { + t.Errorf("Tracked contract %s should pass storage filter", addr.Hex()) + } + if !filter.ShouldSyncCode(addr) { + t.Errorf("Tracked contract %s should pass code filter", addr.Hex()) + } + if !filter.IsTracked(addr) { + t.Errorf("Tracked contract %s should be marked as tracked", addr.Hex()) + } + } + + // Verify untracked contracts are filtered + untracked := common.HexToAddress("0x1234567890123456789012345678901234567890") + if filter.ShouldSyncStorage(untracked) { + t.Error("Untracked contract should be filtered for storage") + } + if filter.ShouldSyncCode(untracked) { + t.Error("Untracked contract should be filtered for code") + } + if filter.IsTracked(untracked) { + t.Error("Untracked contract should not be marked as tracked") + } + + // Verify hash-based filter works + for _, addr := range tracked { + trackedHash := crypto.Keccak256Hash(addr.Bytes()) + if !filter.ShouldSyncStorageByHash(trackedHash) { + t.Errorf("Tracked contract hash %s should pass storage filter", trackedHash.Hex()) + } + if !filter.ShouldSyncCodeByHash(trackedHash) { + t.Errorf("Tracked contract hash %s should pass code filter", trackedHash.Hex()) + } + } + + // Verify untracked hash is filtered + untrackedHash := crypto.Keccak256Hash(untracked.Bytes()) + if filter.ShouldSyncStorageByHash(untrackedHash) { + t.Error("Untracked contract hash should be filtered for storage") + } + if filter.ShouldSyncCodeByHash(untrackedHash) { + t.Error("Untracked contract hash should be filtered for code") + } +} + +func TestAllowAllFilter(t *testing.T) { + filter := &partial.AllowAllFilter{} + + // Any address should pass + testAddresses := []common.Address{ + common.HexToAddress("0xC02aaA39b223FE8D0A0e5C4F27eAD9083C756Cc2"), + common.HexToAddress("0x1234567890123456789012345678901234567890"), + common.HexToAddress("0x0000000000000000000000000000000000000000"), + } + + for _, addr := range testAddresses { + if !filter.ShouldSyncStorage(addr) { + t.Errorf("AllowAllFilter should allow storage for %s", addr.Hex()) + } + if !filter.ShouldSyncCode(addr) { + t.Errorf("AllowAllFilter should allow code for %s", addr.Hex()) + } + if !filter.IsTracked(addr) { + t.Errorf("AllowAllFilter should mark %s as tracked", addr.Hex()) + } + + hash := crypto.Keccak256Hash(addr.Bytes()) + if !filter.ShouldSyncStorageByHash(hash) { + t.Errorf("AllowAllFilter should allow storage by hash for %s", hash.Hex()) + } + if !filter.ShouldSyncCodeByHash(hash) { + t.Errorf("AllowAllFilter should allow code by hash for %s", hash.Hex()) + } + } +} + +func TestSkipMarkerPersistence(t *testing.T) { + db := rawdb.NewMemoryDatabase() + accountHash := common.HexToHash("0x1234567890abcdef1234567890abcdef1234567890abcdef1234567890abcdef") + storageRoot := common.HexToHash("0xabcdef1234567890abcdef1234567890abcdef1234567890abcdef1234567890") + + // Initially not skipped + if isStorageSkipped(db, accountHash) { + t.Error("Account should not be marked as skipped initially") + } + + // Mark as skipped + markStorageSkipped(db, accountHash, storageRoot) + + // Verify marker persists + if !isStorageSkipped(db, accountHash) { + t.Error("Skip marker should persist after write") + } + + // Delete and verify + deleteStorageSkipped(db, accountHash) + if isStorageSkipped(db, accountHash) { + t.Error("Skip marker should be removed after delete") + } +} + +func TestSyncerFilterMethods(t *testing.T) { + db := rawdb.NewMemoryDatabase() + + // Test with nil filter (full node mode) + syncer := NewSyncer(db, rawdb.HashScheme, nil) + anyHash := common.HexToHash("0x1234567890abcdef1234567890abcdef1234567890abcdef1234567890abcdef") + + if !syncer.shouldSyncStorage(anyHash) { + t.Error("Nil filter should sync all storage") + } + if !syncer.shouldSyncCode(anyHash) { + t.Error("Nil filter should sync all code") + } + if syncer.isPartialSync() { + t.Error("Nil filter means not in partial sync mode") + } + + // Test with configured filter (partial mode) + tracked := []common.Address{ + common.HexToAddress("0xC02aaA39b223FE8D0A0e5C4F27eAD9083C756Cc2"), + } + filter := partial.NewConfiguredFilter(tracked) + partialSyncer := NewSyncer(db, rawdb.HashScheme, filter) + + if !partialSyncer.isPartialSync() { + t.Error("Configured filter should indicate partial sync mode") + } + + // Tracked contract should pass + trackedHash := crypto.Keccak256Hash(tracked[0].Bytes()) + if !partialSyncer.shouldSyncStorage(trackedHash) { + t.Error("Tracked contract should pass storage filter") + } + if !partialSyncer.shouldSyncCode(trackedHash) { + t.Error("Tracked contract should pass code filter") + } + + // Untracked contract should be filtered + untrackedHash := crypto.Keccak256Hash(common.HexToAddress("0x1234").Bytes()) + if partialSyncer.shouldSyncStorage(untrackedHash) { + t.Error("Untracked contract should be filtered for storage") + } + if partialSyncer.shouldSyncCode(untrackedHash) { + t.Error("Untracked contract should be filtered for code") + } +} + +func TestConfiguredFilterContracts(t *testing.T) { + tracked := []common.Address{ + common.HexToAddress("0xC02aaA39b223FE8D0A0e5C4F27eAD9083C756Cc2"), + common.HexToAddress("0xA0b86991c6218b36c1d19D4a2e9Eb0cE3606eB48"), + } + filter := partial.NewConfiguredFilter(tracked) + + // Verify Contracts() returns all tracked addresses + contracts := filter.Contracts() + if len(contracts) != len(tracked) { + t.Errorf("Expected %d contracts, got %d", len(tracked), len(contracts)) + } + + // Check all tracked are in result (order may differ) + for _, addr := range tracked { + found := false + for _, c := range contracts { + if c == addr { + found = true + break + } + } + if !found { + t.Errorf("Contract %s not found in Contracts() result", addr.Hex()) + } + } +} diff --git a/eth/protocols/snap/sync_test.go b/eth/protocols/snap/sync_test.go index b11ad4e78a..a6635f5b8d 100644 --- a/eth/protocols/snap/sync_test.go +++ b/eth/protocols/snap/sync_test.go @@ -624,7 +624,7 @@ func testSyncBloatedProof(t *testing.T, scheme string) { func setupSyncer(scheme string, peers ...*testPeer) *Syncer { stateDb := rawdb.NewMemoryDatabase() - syncer := NewSyncer(stateDb, scheme) + syncer := NewSyncer(stateDb, scheme, nil) for _, peer := range peers { syncer.Register(peer) peer.remote = syncer