eth/protocols/snap: implement partial sync mode with skip markers

Adds partial sync mode to the snap syncer that filters which contracts
have their storage and bytecode synced based on the configured filter.

Key changes:
- Syncer accepts optional ContractFilter for partial mode
- Skip markers (SnapSkipped prefix) track intentionally skipped accounts
- processAccountResponse checks filter before requesting storage/code
- Healing phase uses NewPartialStateSync to respect skip markers
- Helper functions for skip marker persistence (mark/check/delete)

When partial sync is active, only tracked contracts have their storage
synced, reducing sync size from ~1TB+ to ~30-40GB while maintaining
a complete account trie for balance queries.

Part of partial statefulness Phase 2.
This commit is contained in:
CPerezz 2026-02-02 13:47:48 +01:00
parent 413374b99f
commit b82f9fea07
No known key found for this signature in database
GPG key ID: 62045F34B97177DD
4 changed files with 366 additions and 16 deletions

View file

@ -33,6 +33,7 @@ import (
"github.com/ethereum/go-ethereum/common/math"
"github.com/ethereum/go-ethereum/core/rawdb"
"github.com/ethereum/go-ethereum/core/state"
"github.com/ethereum/go-ethereum/core/state/partial"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/crypto"
"github.com/ethereum/go-ethereum/ethdb"
@ -445,6 +446,11 @@ type Syncer struct {
db ethdb.KeyValueStore // Database to store the trie nodes into (and dedup)
scheme string // Node scheme used in node database
// Partial state filter (nil = sync everything, i.e., full node)
// When set, only accounts in the filter have their storage/bytecode synced.
// ALL accounts are always synced - only storage and bytecode are filtered.
filter partial.ContractFilter
root common.Hash // Current state trie root being synced
tasks []*accountTask // Current account task set being synced
snapped bool // Flag to signal that snap phase is done
@ -512,11 +518,14 @@ type Syncer struct {
}
// NewSyncer creates a new snapshot syncer to download the Ethereum state over the
// snap protocol.
func NewSyncer(db ethdb.KeyValueStore, scheme string) *Syncer {
// snap protocol. The optional filter parameter enables partial statefulness mode
// where only configured contracts have their storage and bytecode synced.
// Pass nil for full node behavior (sync everything).
func NewSyncer(db ethdb.KeyValueStore, scheme string, filter partial.ContractFilter) *Syncer {
return &Syncer{
db: db,
scheme: scheme,
filter: filter,
peers: make(map[string]SyncPeer),
peerJoin: new(event.Feed),
@ -609,8 +618,27 @@ func (s *Syncer) Sync(root common.Hash, cancel chan struct{}) error {
// any peers and initialize the syncer if it was not yet run
s.lock.Lock()
s.root = root
// Create the state sync scheduler. For partial sync, use the filtered version
// that skips storage/code healing for non-tracked contracts.
var scheduler *trie.Sync
if s.isPartialSync() {
// Create filter callbacks that check skip markers in the database
shouldSyncStorage := func(accountHash common.Hash) bool {
return !isStorageSkipped(s.db, accountHash)
}
shouldSyncCode := func(accountHash common.Hash) bool {
// For now, use the same logic as storage (skip if storage is skipped)
// This could be refined to have separate skip markers for code
return !isStorageSkipped(s.db, accountHash)
}
scheduler = state.NewPartialStateSync(root, s.db, s.onHealState, s.scheme, shouldSyncStorage, shouldSyncCode)
} else {
scheduler = state.NewStateSync(root, s.db, s.onHealState, s.scheme)
}
s.healer = &healTask{
scheduler: state.NewStateSync(root, s.db, s.onHealState, s.scheme),
scheduler: scheduler,
trieTasks: make(map[string]common.Hash),
codeTasks: make(map[common.Hash]struct{}),
}
@ -1938,28 +1966,46 @@ func (s *Syncer) processAccountResponse(res *accountResponse) {
res.task.pend = 0
for i, account := range res.accounts {
accountHash := res.hashes[i]
// Check if the account is a contract with an unknown code
if !bytes.Equal(account.CodeHash, types.EmptyCodeHash.Bytes()) {
if !rawdb.HasCodeWithPrefix(s.db, common.BytesToHash(account.CodeHash)) {
res.task.codeTasks[common.BytesToHash(account.CodeHash)] = struct{}{}
res.task.needCode[i] = true
res.task.pend++
// Partial sync: check if we should sync this contract's bytecode
if s.shouldSyncCode(accountHash) {
res.task.codeTasks[common.BytesToHash(account.CodeHash)] = struct{}{}
res.task.needCode[i] = true
res.task.pend++
} else {
// Skip bytecode for non-tracked contracts
bytecodeSkippedMeter.Mark(1)
}
}
}
// Check if the account is a contract with an unknown storage trie
if account.Root != types.EmptyRootHash {
// Partial sync: check if we should sync this contract's storage
if !s.shouldSyncStorage(accountHash) {
// Skip storage for non-tracked contracts
// Mark as skipped so healing phase knows not to try healing this storage
markStorageSkipped(s.db, accountHash, account.Root)
res.task.stateCompleted[accountHash] = struct{}{}
storageSkippedMeter.Mark(1)
continue
}
// If the storage was already retrieved in the last cycle, there's no need
// to resync it again, regardless of whether the storage root is consistent
// or not.
if _, exist := res.task.stateCompleted[res.hashes[i]]; exist {
if _, exist := res.task.stateCompleted[accountHash]; exist {
// The leftover storage tasks are not expected, unless system is
// very wrong.
if _, ok := res.task.SubTasks[res.hashes[i]]; ok {
panic(fmt.Errorf("unexpected leftover storage tasks, owner: %x", res.hashes[i]))
if _, ok := res.task.SubTasks[accountHash]; ok {
panic(fmt.Errorf("unexpected leftover storage tasks, owner: %x", accountHash))
}
// Mark the healing tag if storage root node is inconsistent, or
// it's non-existent due to storage chunking.
if !rawdb.HasTrieNode(s.db, res.hashes[i], nil, account.Root, s.scheme) {
if !rawdb.HasTrieNode(s.db, accountHash, nil, account.Root, s.scheme) {
res.task.needHeal[i] = true
}
} else {
@ -1967,20 +2013,20 @@ func (s *Syncer) processAccountResponse(res *accountResponse) {
// don't restart it from scratch. This happens if a sync cycle
// is interrupted and resumed later. However, *do* update the
// previous root hash.
if subtasks, ok := res.task.SubTasks[res.hashes[i]]; ok {
log.Debug("Resuming large storage retrieval", "account", res.hashes[i], "root", account.Root)
if subtasks, ok := res.task.SubTasks[accountHash]; ok {
log.Debug("Resuming large storage retrieval", "account", accountHash, "root", account.Root)
for _, subtask := range subtasks {
subtask.root = account.Root
}
res.task.needHeal[i] = true
resumed[res.hashes[i]] = struct{}{}
resumed[accountHash] = struct{}{}
largeStorageResumedGauge.Inc(1)
} else {
// It's possible that in the hash scheme, the storage, along
// with the trie nodes of the given root, is already present
// in the database. Schedule the storage task anyway to simplify
// the logic here.
res.task.stateTasks[res.hashes[i]] = account.Root
res.task.stateTasks[accountHash] = account.Root
}
res.task.needState[i] = true
res.task.pend++
@ -3090,6 +3136,7 @@ func (s *Syncer) onHealByteCodes(peer SyncPeer, id uint64, bytecodes [][]byte) e
// Note it's not concurrent safe, please handle the concurrent issue outside.
func (s *Syncer) onHealState(paths [][]byte, value []byte) error {
if len(paths) == 1 {
// Account trie leaf - ALWAYS process (never skip accounts)
var account types.StateAccount
if err := rlp.DecodeBytes(value, &account); err != nil {
return nil // Returning the error here would drop the remote peer
@ -3100,7 +3147,16 @@ func (s *Syncer) onHealState(paths [][]byte, value []byte) error {
s.accountHealedBytes += common.StorageSize(1 + common.HashLength + len(blob))
}
if len(paths) == 2 {
rawdb.WriteStorageSnapshot(s.stateWriter, common.BytesToHash(paths[0]), common.BytesToHash(paths[1]), value)
// Storage trie leaf
accountHash := common.BytesToHash(paths[0])
// Partial sync: skip storage healing for non-tracked contracts
// (accounts themselves are always synced/healed)
if isStorageSkipped(s.db, accountHash) {
return nil // Don't heal storage we intentionally skipped
}
rawdb.WriteStorageSnapshot(s.stateWriter, accountHash, common.BytesToHash(paths[1]), value)
s.storageHealed += 1
s.storageHealedBytes += common.StorageSize(1 + 2*common.HashLength + len(value))
}

View file

@ -0,0 +1,83 @@
// Copyright 2025 The go-ethereum Authors
// This file is part of the go-ethereum library.
//
// The go-ethereum library is free software: you can redistribute it and/or modify
// it under the terms of the GNU Lesser General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// The go-ethereum library is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Lesser General Public License for more details.
//
// You should have received a copy of the GNU Lesser General Public License
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
package snap
import (
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/ethdb"
"github.com/ethereum/go-ethereum/metrics"
)
// Database key prefix for tracking intentionally skipped storage during partial sync.
// These markers allow the healing phase to know which accounts had storage intentionally
// skipped (vs. accounts that need storage healing due to sync interruption).
var skippedStoragePrefix = []byte("SnapSkipped")
// Metrics for partial sync progress tracking
var (
storageSkippedMeter = metrics.NewRegisteredMeter("snap/sync/storage/skipped", nil)
bytecodeSkippedMeter = metrics.NewRegisteredMeter("snap/sync/bytecode/skipped", nil)
)
// skippedStorageKey returns the database key for a skipped storage marker.
// The key format is: skippedStoragePrefix + accountHash (32 bytes)
func skippedStorageKey(accountHash common.Hash) []byte {
return append(skippedStoragePrefix, accountHash.Bytes()...)
}
// markStorageSkipped records that storage was intentionally skipped for an account.
// This is used during partial sync to skip storage for contracts not in the configured list.
// The storageRoot is stored so we can verify consistency if needed.
func markStorageSkipped(db ethdb.KeyValueWriter, accountHash common.Hash, storageRoot common.Hash) {
db.Put(skippedStorageKey(accountHash), storageRoot.Bytes())
}
// isStorageSkipped checks if storage was intentionally skipped for an account.
// Returns true if this account's storage was skipped during partial sync.
func isStorageSkipped(db ethdb.KeyValueReader, accountHash common.Hash) bool {
has, _ := db.Has(skippedStorageKey(accountHash))
return has
}
// deleteStorageSkipped removes the skip marker for an account.
// Used during cleanup or when re-syncing with different configuration.
func deleteStorageSkipped(db ethdb.KeyValueWriter, accountHash common.Hash) {
db.Delete(skippedStorageKey(accountHash))
}
// shouldSyncStorage returns true if storage should be synced for this account hash.
// If no filter is configured (filter == nil), all storage is synced (full node behavior).
func (s *Syncer) shouldSyncStorage(accountHash common.Hash) bool {
if s.filter == nil {
return true // No filter = sync everything (full node)
}
return s.filter.ShouldSyncStorageByHash(accountHash)
}
// shouldSyncCode returns true if bytecode should be synced for this account hash.
// If no filter is configured (filter == nil), all bytecode is synced (full node behavior).
func (s *Syncer) shouldSyncCode(accountHash common.Hash) bool {
if s.filter == nil {
return true // No filter = sync everything (full node)
}
return s.filter.ShouldSyncCodeByHash(accountHash)
}
// isPartialSync returns true if partial sync mode is active.
func (s *Syncer) isPartialSync() bool {
return s.filter != nil
}

View file

@ -0,0 +1,211 @@
// Copyright 2025 The go-ethereum Authors
// This file is part of the go-ethereum library.
//
// The go-ethereum library is free software: you can redistribute it and/or modify
// it under the terms of the GNU Lesser General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// The go-ethereum library is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Lesser General Public License for more details.
//
// You should have received a copy of the GNU Lesser General Public License
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
package snap
import (
"testing"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/rawdb"
"github.com/ethereum/go-ethereum/core/state/partial"
"github.com/ethereum/go-ethereum/crypto"
)
func TestPartialSyncFilterStorage(t *testing.T) {
// Create filter with specific contracts
tracked := []common.Address{
common.HexToAddress("0xC02aaA39b223FE8D0A0e5C4F27eAD9083C756Cc2"), // WETH
common.HexToAddress("0xA0b86991c6218b36c1d19D4a2e9Eb0cE3606eB48"), // USDC
}
filter := partial.NewConfiguredFilter(tracked)
// Verify tracked contracts pass filter by address
for _, addr := range tracked {
if !filter.ShouldSyncStorage(addr) {
t.Errorf("Tracked contract %s should pass storage filter", addr.Hex())
}
if !filter.ShouldSyncCode(addr) {
t.Errorf("Tracked contract %s should pass code filter", addr.Hex())
}
if !filter.IsTracked(addr) {
t.Errorf("Tracked contract %s should be marked as tracked", addr.Hex())
}
}
// Verify untracked contracts are filtered
untracked := common.HexToAddress("0x1234567890123456789012345678901234567890")
if filter.ShouldSyncStorage(untracked) {
t.Error("Untracked contract should be filtered for storage")
}
if filter.ShouldSyncCode(untracked) {
t.Error("Untracked contract should be filtered for code")
}
if filter.IsTracked(untracked) {
t.Error("Untracked contract should not be marked as tracked")
}
// Verify hash-based filter works
for _, addr := range tracked {
trackedHash := crypto.Keccak256Hash(addr.Bytes())
if !filter.ShouldSyncStorageByHash(trackedHash) {
t.Errorf("Tracked contract hash %s should pass storage filter", trackedHash.Hex())
}
if !filter.ShouldSyncCodeByHash(trackedHash) {
t.Errorf("Tracked contract hash %s should pass code filter", trackedHash.Hex())
}
}
// Verify untracked hash is filtered
untrackedHash := crypto.Keccak256Hash(untracked.Bytes())
if filter.ShouldSyncStorageByHash(untrackedHash) {
t.Error("Untracked contract hash should be filtered for storage")
}
if filter.ShouldSyncCodeByHash(untrackedHash) {
t.Error("Untracked contract hash should be filtered for code")
}
}
func TestAllowAllFilter(t *testing.T) {
filter := &partial.AllowAllFilter{}
// Any address should pass
testAddresses := []common.Address{
common.HexToAddress("0xC02aaA39b223FE8D0A0e5C4F27eAD9083C756Cc2"),
common.HexToAddress("0x1234567890123456789012345678901234567890"),
common.HexToAddress("0x0000000000000000000000000000000000000000"),
}
for _, addr := range testAddresses {
if !filter.ShouldSyncStorage(addr) {
t.Errorf("AllowAllFilter should allow storage for %s", addr.Hex())
}
if !filter.ShouldSyncCode(addr) {
t.Errorf("AllowAllFilter should allow code for %s", addr.Hex())
}
if !filter.IsTracked(addr) {
t.Errorf("AllowAllFilter should mark %s as tracked", addr.Hex())
}
hash := crypto.Keccak256Hash(addr.Bytes())
if !filter.ShouldSyncStorageByHash(hash) {
t.Errorf("AllowAllFilter should allow storage by hash for %s", hash.Hex())
}
if !filter.ShouldSyncCodeByHash(hash) {
t.Errorf("AllowAllFilter should allow code by hash for %s", hash.Hex())
}
}
}
func TestSkipMarkerPersistence(t *testing.T) {
db := rawdb.NewMemoryDatabase()
accountHash := common.HexToHash("0x1234567890abcdef1234567890abcdef1234567890abcdef1234567890abcdef")
storageRoot := common.HexToHash("0xabcdef1234567890abcdef1234567890abcdef1234567890abcdef1234567890")
// Initially not skipped
if isStorageSkipped(db, accountHash) {
t.Error("Account should not be marked as skipped initially")
}
// Mark as skipped
markStorageSkipped(db, accountHash, storageRoot)
// Verify marker persists
if !isStorageSkipped(db, accountHash) {
t.Error("Skip marker should persist after write")
}
// Delete and verify
deleteStorageSkipped(db, accountHash)
if isStorageSkipped(db, accountHash) {
t.Error("Skip marker should be removed after delete")
}
}
func TestSyncerFilterMethods(t *testing.T) {
db := rawdb.NewMemoryDatabase()
// Test with nil filter (full node mode)
syncer := NewSyncer(db, rawdb.HashScheme, nil)
anyHash := common.HexToHash("0x1234567890abcdef1234567890abcdef1234567890abcdef1234567890abcdef")
if !syncer.shouldSyncStorage(anyHash) {
t.Error("Nil filter should sync all storage")
}
if !syncer.shouldSyncCode(anyHash) {
t.Error("Nil filter should sync all code")
}
if syncer.isPartialSync() {
t.Error("Nil filter means not in partial sync mode")
}
// Test with configured filter (partial mode)
tracked := []common.Address{
common.HexToAddress("0xC02aaA39b223FE8D0A0e5C4F27eAD9083C756Cc2"),
}
filter := partial.NewConfiguredFilter(tracked)
partialSyncer := NewSyncer(db, rawdb.HashScheme, filter)
if !partialSyncer.isPartialSync() {
t.Error("Configured filter should indicate partial sync mode")
}
// Tracked contract should pass
trackedHash := crypto.Keccak256Hash(tracked[0].Bytes())
if !partialSyncer.shouldSyncStorage(trackedHash) {
t.Error("Tracked contract should pass storage filter")
}
if !partialSyncer.shouldSyncCode(trackedHash) {
t.Error("Tracked contract should pass code filter")
}
// Untracked contract should be filtered
untrackedHash := crypto.Keccak256Hash(common.HexToAddress("0x1234").Bytes())
if partialSyncer.shouldSyncStorage(untrackedHash) {
t.Error("Untracked contract should be filtered for storage")
}
if partialSyncer.shouldSyncCode(untrackedHash) {
t.Error("Untracked contract should be filtered for code")
}
}
func TestConfiguredFilterContracts(t *testing.T) {
tracked := []common.Address{
common.HexToAddress("0xC02aaA39b223FE8D0A0e5C4F27eAD9083C756Cc2"),
common.HexToAddress("0xA0b86991c6218b36c1d19D4a2e9Eb0cE3606eB48"),
}
filter := partial.NewConfiguredFilter(tracked)
// Verify Contracts() returns all tracked addresses
contracts := filter.Contracts()
if len(contracts) != len(tracked) {
t.Errorf("Expected %d contracts, got %d", len(tracked), len(contracts))
}
// Check all tracked are in result (order may differ)
for _, addr := range tracked {
found := false
for _, c := range contracts {
if c == addr {
found = true
break
}
}
if !found {
t.Errorf("Contract %s not found in Contracts() result", addr.Hex())
}
}
}

View file

@ -624,7 +624,7 @@ func testSyncBloatedProof(t *testing.T, scheme string) {
func setupSyncer(scheme string, peers ...*testPeer) *Syncer {
stateDb := rawdb.NewMemoryDatabase()
syncer := NewSyncer(stateDb, scheme)
syncer := NewSyncer(stateDb, scheme, nil)
for _, peer := range peers {
syncer.Register(peer)
peer.remote = syncer