diff --git a/eth/protocols/snap/syncv2.go b/eth/protocols/snap/syncv2.go
new file mode 100644
index 0000000000..0bbcd9c35f
--- /dev/null
+++ b/eth/protocols/snap/syncv2.go
@@ -0,0 +1,1959 @@
+// Copyright 2026 The go-ethereum Authors
+// This file is part of the go-ethereum library.
+//
+// The go-ethereum library is free software: you can redistribute it and/or modify
+// it under the terms of the GNU Lesser General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+//
+// The go-ethereum library is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU Lesser General Public License for more details.
+//
+// You should have received a copy of the GNU Lesser General Public License
+// along with the go-ethereum library. If not, see .
+
+package snap
+
+import (
+ "bytes"
+ "encoding/json"
+ "errors"
+ "fmt"
+ "math/big"
+ "math/rand"
+ "sort"
+ "sync"
+ "time"
+
+ "github.com/ethereum/go-ethereum/common"
+ "github.com/ethereum/go-ethereum/core/rawdb"
+ "github.com/ethereum/go-ethereum/core/types"
+ "github.com/ethereum/go-ethereum/crypto"
+ "github.com/ethereum/go-ethereum/ethdb"
+ "github.com/ethereum/go-ethereum/event"
+ "github.com/ethereum/go-ethereum/log"
+ "github.com/ethereum/go-ethereum/p2p/msgrate"
+ "github.com/ethereum/go-ethereum/rlp"
+ "github.com/ethereum/go-ethereum/trie"
+ "github.com/ethereum/go-ethereum/trie/trienode"
+)
+
+// accountRequestV2 tracks a pending account range request to ensure responses are
+// to actual requests and to validate any security constraints.
+//
+// Concurrency note: account requests and responses are handled concurrently from
+// the main runloop to allow Merkle proof verifications on the peer's thread and
+// to drop on invalid response. The request struct must contain all the data to
+// construct the response without accessing runloop internals (i.e. task). That
+// is only included to allow the runloop to match a response to the task being
+// synced without having yet another set of maps.
+type accountRequestV2 struct {
+ peer string // Peer to which this request is assigned
+ id uint64 // Request ID of this request
+ time time.Time // Timestamp when the request was sent
+
+ deliver chan *accountResponseV2 // Channel to deliver successful response on
+ revert chan *accountRequestV2 // Channel to deliver request failure on
+ cancel chan struct{} // Channel to track sync cancellation
+ timeout *time.Timer // Timer to track delivery timeout
+ stale chan struct{} // Channel to signal the request was dropped
+
+ origin common.Hash // First account requested to allow continuation checks
+ limit common.Hash // Last account requested to allow non-overlapping chunking
+
+ task *accountTaskV2 // Task which this request is filling (only access fields through the runloop!!)
+}
+
+// accountResponseV2 is an already Merkle-verified remote response to an account
+// range request.
+type accountResponseV2 struct {
+ task *accountTaskV2 // Task which this request is filling
+
+ hashes []common.Hash // Account hashes in the returned range
+ accounts []*types.StateAccount // Expanded accounts in the returned range
+
+ cont bool // Whether the account range has a continuation
+}
+
+// bytecodeRequestV2 tracks a pending bytecode request to ensure responses are to
+// actual requests and to validate any security constraints.
+//
+// Concurrency note: bytecode requests and responses are handled concurrently from
+// the main runloop to allow Keccak256 hash verifications on the peer's thread and
+// to drop on invalid response. The request struct must contain all the data to
+// construct the response without accessing runloop internals (i.e. task). That
+// is only included to allow the runloop to match a response to the task being
+// synced without having yet another set of maps.
+type bytecodeRequestV2 struct {
+ peer string // Peer to which this request is assigned
+ id uint64 // Request ID of this request
+ time time.Time // Timestamp when the request was sent
+
+ deliver chan *bytecodeResponseV2 // Channel to deliver successful response on
+ revert chan *bytecodeRequestV2 // Channel to deliver request failure on
+ cancel chan struct{} // Channel to track sync cancellation
+ timeout *time.Timer // Timer to track delivery timeout
+ stale chan struct{} // Channel to signal the request was dropped
+
+ hashes []common.Hash // Bytecode hashes to validate responses
+ task *accountTaskV2 // Task which this request is filling (only access fields through the runloop!!)
+}
+
+// bytecodeResponseV2 is an already verified remote response to a bytecode request.
+type bytecodeResponseV2 struct {
+ task *accountTaskV2 // Task which this request is filling
+
+ hashes []common.Hash // Hashes of the bytecode to avoid double hashing
+ codes [][]byte // Actual bytecodes to store into the database (nil = missing)
+}
+
+// storageRequestV2 tracks a pending storage ranges request to ensure responses are
+// to actual requests and to validate any security constraints.
+//
+// Concurrency note: storage requests and responses are handled concurrently from
+// the main runloop to allow Merkle proof verifications on the peer's thread and
+// to drop on invalid response. The request struct must contain all the data to
+// construct the response without accessing runloop internals (i.e. tasks). That
+// is only included to allow the runloop to match a response to the task being
+// synced without having yet another set of maps.
+type storageRequestV2 struct {
+ peer string // Peer to which this request is assigned
+ id uint64 // Request ID of this request
+ time time.Time // Timestamp when the request was sent
+
+ deliver chan *storageResponseV2 // Channel to deliver successful response on
+ revert chan *storageRequestV2 // Channel to deliver request failure on
+ cancel chan struct{} // Channel to track sync cancellation
+ timeout *time.Timer // Timer to track delivery timeout
+ stale chan struct{} // Channel to signal the request was dropped
+
+ accounts []common.Hash // Account hashes to validate responses
+ roots []common.Hash // Storage roots to validate responses
+
+ origin common.Hash // First storage slot requested to allow continuation checks
+ limit common.Hash // Last storage slot requested to allow non-overlapping chunking
+
+ mainTask *accountTaskV2 // Task which this response belongs to (only access fields through the runloop!!)
+ subTask *storageTaskV2 // Task which this response is filling (only access fields through the runloop!!)
+}
+
+// storageResponseV2 is an already Merkle-verified remote response to a storage
+// range request.
+type storageResponseV2 struct {
+ mainTask *accountTaskV2 // Task which this response belongs to
+ subTask *storageTaskV2 // Task which this response is filling
+
+ accounts []common.Hash // Account hashes requested, may be only partially filled
+ roots []common.Hash // Storage roots requested, may be only partially filled
+
+ hashes [][]common.Hash // Storage slot hashes in the returned range
+ slots [][][]byte // Storage slot values in the returned range
+
+ cont bool // Whether the last storage range has a continuation
+}
+
+// accountTaskV2 represents the sync task for a chunk of the account snapshot.
+type accountTaskV2 struct {
+ // These fields get serialized to key-value store on shutdown
+ Next common.Hash // Next account to sync in this interval
+ Last common.Hash // Last account to sync in this interval
+ SubTasks map[common.Hash][]*storageTaskV2 // Storage intervals needing fetching for large contracts
+
+ // Pending accounts whose storage has already been fully committed in
+ // this cycle, but which cannot advance Next yet because account commits
+ // must be sequential. Persisting them across cycle switches avoids
+ // refetching their storage.
+ StorageCompleted []common.Hash
+
+ // These fields are internals used during runtime
+ req *accountRequestV2 // Pending request to fill this task
+ res *accountResponseV2 // Validate response filling this task
+ pend int // Number of pending subtasks for this round
+
+ needCode []bool // Flags whether the filling accounts need code retrieval
+ needState []bool // Flags whether the filling accounts need storage retrieval
+
+ codeTasks map[common.Hash]struct{} // Code hashes that need retrieval
+ stateTasks map[common.Hash]common.Hash // Account hashes->roots that need full state retrieval
+ stateCompleted map[common.Hash]struct{} // Account hashes whose storage have been completed
+
+ done bool // Flag whether the task can be removed
+}
+
+// activeSubTasks returns the set of storage tasks covered by the current account
+// range. Normally this would be the entire subTask set, but on a sync interrupt
+// and later resume it can happen that a shorter account range is retrieved. This
+// method ensures that we only start up the subtasks covered by the latest account
+// response.
+//
+// Nil is returned if the account range is empty.
+func (task *accountTaskV2) activeSubTasks() map[common.Hash][]*storageTaskV2 {
+ if len(task.res.hashes) == 0 {
+ return nil
+ }
+ var (
+ tasks = make(map[common.Hash][]*storageTaskV2)
+ last = task.res.hashes[len(task.res.hashes)-1]
+ )
+ for hash, subTasks := range task.SubTasks {
+ if hash.Cmp(last) <= 0 {
+ tasks[hash] = subTasks
+ }
+ }
+ return tasks
+}
+
+// storageTaskV2 represents the sync task for a chunk of the storage snapshot.
+type storageTaskV2 struct {
+ Next common.Hash // Next account to sync in this interval
+ Last common.Hash // Last account to sync in this interval
+
+ // These fields are internals used during runtime
+ root common.Hash // Storage root hash for this instance
+ req *storageRequestV2 // Pending request to fill this task
+ done bool // Flag whether the task can be removed
+}
+
+// SyncProgressV2 is a database entry to allow suspending and resuming a snapshot state
+// sync. Opposed to full and fast sync, there is no way to restart a suspended
+// snap sync without prior knowledge of the suspension point.
+type SyncProgressV2 struct {
+ Tasks []*accountTaskV2 // The suspended account tasks (contract tasks within)
+
+ // Status report during syncing phase
+ AccountSynced uint64 // Number of accounts downloaded
+ AccountBytes common.StorageSize // Number of account trie bytes persisted to disk
+ BytecodeSynced uint64 // Number of bytecodes downloaded
+ BytecodeBytes common.StorageSize // Number of bytecode bytes downloaded
+ StorageSynced uint64 // Number of storage slots downloaded
+ StorageBytes common.StorageSize // Number of storage trie bytes persisted to disk
+}
+
+// SyncPeerV2 abstracts out the methods required for a peer to be synced against
+// with the goal of allowing the construction of mock peers without the full
+// blown networking.
+type SyncPeerV2 interface {
+ // ID retrieves the peer's unique identifier.
+ ID() string
+
+ // RequestAccountRange fetches a batch of accounts rooted in a specific account
+ // trie, starting with the origin.
+ RequestAccountRange(id uint64, root, origin, limit common.Hash, bytes int) error
+
+ // RequestStorageRanges fetches a batch of storage slots belonging to one or
+ // more accounts. If slots from only one account is requested, an origin marker
+ // may also be used to retrieve from there.
+ RequestStorageRanges(id uint64, root common.Hash, accounts []common.Hash, origin, limit []byte, bytes int) error
+
+ // RequestByteCodes fetches a batch of bytecodes by hash.
+ RequestByteCodes(id uint64, hashes []common.Hash, bytes int) error
+
+ // Log retrieves the peer's own contextual logger.
+ Log() log.Logger
+}
+
+// SyncerV2 is an Ethereum account and storage state syncer based on the snap
+// protocol. It's purpose is to download all the accounts and storage slots
+// from remote peers, fixing the state inconsistencies between multiple sync
+// targets with BALs(block level accessList) and ultimately reassemble the state
+// trie (both account trie and storage tries) locally.
+//
+// Every network request has a variety of failure events:
+// - The peer disconnects after task assignment, failing to send the request
+// - The peer disconnects after sending the request, before delivering on it
+// - The peer remains connected, but does not deliver a response in time
+// - The peer delivers a stale response after a previous timeout
+// - The peer delivers a refusal to serve the requested state
+type SyncerV2 struct {
+ db ethdb.KeyValueStore // Database to store the trie nodes into (and dedup)
+ scheme string // Node scheme used in node database
+
+ root common.Hash // Current state trie root being synced
+ tasks []*accountTaskV2 // Current account task set being synced
+ update chan struct{} // Notification channel for possible sync progression
+
+ peers map[string]SyncPeerV2 // Currently active peers to download from
+ peerJoin *event.Feed // Event feed to react to peers joining
+ peerDrop *event.Feed // Event feed to react to peers dropping
+ rates *msgrate.Trackers // Message throughput rates for peers
+
+ // Request tracking during syncing phase
+ statelessPeers map[string]struct{} // Peers that failed to deliver state data
+ accountIdlers map[string]struct{} // Peers that aren't serving account requests
+ bytecodeIdlers map[string]struct{} // Peers that aren't serving bytecode requests
+ storageIdlers map[string]struct{} // Peers that aren't serving storage requests
+
+ accountReqs map[uint64]*accountRequestV2 // Account requests currently running
+ bytecodeReqs map[uint64]*bytecodeRequestV2 // Bytecode requests currently running
+ storageReqs map[uint64]*storageRequestV2 // Storage requests currently running
+
+ accountSynced uint64 // Number of accounts downloaded
+ accountBytes common.StorageSize // Number of account trie bytes persisted to disk
+ bytecodeSynced uint64 // Number of bytecodes downloaded
+ bytecodeBytes common.StorageSize // Number of bytecode bytes downloaded
+ storageSynced uint64 // Number of storage slots downloaded
+ storageBytes common.StorageSize // Number of storage trie bytes persisted to disk
+
+ extProgress *SyncProgressV2 // progress that can be exposed to external caller.
+
+ startTime time.Time // Time instance when snapshot sync started
+ logTime time.Time // Time instance when status was last reported
+
+ pend sync.WaitGroup // Tracks network request goroutines for graceful shutdown
+ lock sync.RWMutex // Protects fields that can change outside of sync (peers, reqs, root)
+}
+
+// NewSyncerV2 creates a new snapshot syncer to download the Ethereum state over the
+// snap protocol.
+func NewSyncerV2(db ethdb.KeyValueStore, scheme string) *SyncerV2 {
+ return &SyncerV2{
+ db: db,
+ scheme: scheme,
+
+ peers: make(map[string]SyncPeerV2),
+ peerJoin: new(event.Feed),
+ peerDrop: new(event.Feed),
+ rates: msgrate.NewTrackers(log.New("proto", "snap")),
+ update: make(chan struct{}, 1),
+
+ accountIdlers: make(map[string]struct{}),
+ storageIdlers: make(map[string]struct{}),
+ bytecodeIdlers: make(map[string]struct{}),
+
+ accountReqs: make(map[uint64]*accountRequestV2),
+ storageReqs: make(map[uint64]*storageRequestV2),
+ bytecodeReqs: make(map[uint64]*bytecodeRequestV2),
+
+ extProgress: new(SyncProgressV2),
+ }
+}
+
+// Register injects a new data source into the syncer's peerset.
+func (s *SyncerV2) Register(peer SyncPeerV2) error {
+ // Make sure the peer is not registered yet
+ id := peer.ID()
+
+ s.lock.Lock()
+ if _, ok := s.peers[id]; ok {
+ log.Error("Snap peer already registered", "id", id)
+
+ s.lock.Unlock()
+ return errors.New("already registered")
+ }
+ s.peers[id] = peer
+ s.rates.Track(id, msgrate.NewTracker(s.rates.MeanCapacities(), s.rates.MedianRoundTrip()))
+
+ // Mark the peer as idle, even if no sync is running
+ s.accountIdlers[id] = struct{}{}
+ s.storageIdlers[id] = struct{}{}
+ s.bytecodeIdlers[id] = struct{}{}
+ s.lock.Unlock()
+
+ // Notify any active syncs that a new peer can be assigned data
+ s.peerJoin.Send(id)
+ return nil
+}
+
+// Unregister injects a new data source into the syncer's peerset.
+func (s *SyncerV2) Unregister(id string) error {
+ // Remove all traces of the peer from the registry
+ s.lock.Lock()
+ if _, ok := s.peers[id]; !ok {
+ log.Error("Snap peer not registered", "id", id)
+
+ s.lock.Unlock()
+ return errors.New("not registered")
+ }
+ delete(s.peers, id)
+ s.rates.Untrack(id)
+
+ // Remove status markers, even if no sync is running
+ delete(s.statelessPeers, id)
+
+ delete(s.accountIdlers, id)
+ delete(s.storageIdlers, id)
+ delete(s.bytecodeIdlers, id)
+ s.lock.Unlock()
+
+ // Notify any active syncs that pending requests need to be reverted
+ s.peerDrop.Send(id)
+ return nil
+}
+
+// Sync starts (or resumes a previous) sync cycle to iterate over a state trie
+// with the given root and reconstruct the nodes based on the snapshot leaves.
+// Previously downloaded segments will not be redownloaded of fixed, rather any
+// errors will be healed after the leaves are fully accumulated.
+func (s *SyncerV2) Sync(root common.Hash, cancel chan struct{}) error {
+ // Move the trie root from any previous value, revert stateless markers for
+ // any peers and initialize the syncer if it was not yet run
+ s.lock.Lock()
+ s.root = root
+ s.statelessPeers = make(map[string]struct{})
+ s.lock.Unlock()
+
+ if s.startTime.IsZero() {
+ s.startTime = time.Now()
+ }
+ // Retrieve the previous sync status from LevelDB and abort if already synced
+ s.loadSyncStatus()
+ if len(s.tasks) == 0 {
+ log.Debug("Snapshot sync already completed")
+ return nil
+ }
+ defer func() { // Persist any progress, independent of failure
+ for _, task := range s.tasks {
+ s.forwardAccountTask(task)
+ }
+ s.cleanAccountTasks()
+ s.saveSyncStatus()
+ }()
+
+ log.Debug("Starting snapshot sync cycle", "root", root)
+ defer s.report(true)
+
+ // Whether sync completed or not, disregard any future packets
+ defer func() {
+ log.Debug("Terminating snapshot sync cycle", "root", root)
+ s.lock.Lock()
+ s.accountReqs = make(map[uint64]*accountRequestV2)
+ s.storageReqs = make(map[uint64]*storageRequestV2)
+ s.bytecodeReqs = make(map[uint64]*bytecodeRequestV2)
+ s.lock.Unlock()
+ }()
+ // Keep scheduling sync tasks
+ peerJoin := make(chan string, 16)
+ peerJoinSub := s.peerJoin.Subscribe(peerJoin)
+ defer peerJoinSub.Unsubscribe()
+
+ peerDrop := make(chan string, 16)
+ peerDropSub := s.peerDrop.Subscribe(peerDrop)
+ defer peerDropSub.Unsubscribe()
+
+ // Create a set of unique channels for this sync cycle. We need these to be
+ // ephemeral so a data race doesn't accidentally deliver something stale on
+ // a persistent channel across syncs (yup, this happened)
+ var (
+ accountReqFails = make(chan *accountRequestV2)
+ storageReqFails = make(chan *storageRequestV2)
+ bytecodeReqFails = make(chan *bytecodeRequestV2)
+ accountResps = make(chan *accountResponseV2)
+ storageResps = make(chan *storageResponseV2)
+ bytecodeResps = make(chan *bytecodeResponseV2)
+ )
+ for {
+ // Remove all completed tasks and terminate sync if everything's done
+ s.cleanStorageTasks()
+ s.cleanAccountTasks()
+ if len(s.tasks) == 0 {
+ return nil
+ }
+ // Assign all the data retrieval tasks to any free peers
+ s.assignAccountTasks(accountResps, accountReqFails, cancel)
+ s.assignBytecodeTasks(bytecodeResps, bytecodeReqFails, cancel)
+ s.assignStorageTasks(storageResps, storageReqFails, cancel)
+
+ // Update sync progress
+ s.lock.Lock()
+ s.extProgress = &SyncProgressV2{
+ AccountSynced: s.accountSynced,
+ AccountBytes: s.accountBytes,
+ BytecodeSynced: s.bytecodeSynced,
+ BytecodeBytes: s.bytecodeBytes,
+ StorageSynced: s.storageSynced,
+ StorageBytes: s.storageBytes,
+ }
+ s.lock.Unlock()
+ // Wait for something to happen
+ select {
+ case <-s.update:
+ // Something happened (new peer, delivery, timeout), recheck tasks
+ case <-peerJoin:
+ // A new peer joined, try to schedule it new tasks
+ case id := <-peerDrop:
+ s.revertRequests(id)
+ case <-cancel:
+ return ErrCancelled
+
+ case req := <-accountReqFails:
+ s.revertAccountRequest(req)
+ case req := <-bytecodeReqFails:
+ s.revertBytecodeRequest(req)
+ case req := <-storageReqFails:
+ s.revertStorageRequest(req)
+
+ case res := <-accountResps:
+ s.processAccountResponse(res)
+ case res := <-bytecodeResps:
+ s.processBytecodeResponse(res)
+ case res := <-storageResps:
+ s.processStorageResponse(res)
+ }
+ // Report stats if something meaningful happened
+ s.report(false)
+ }
+}
+
+// loadSyncStatus retrieves a previously aborted sync status from the database,
+// or generates a fresh one if none is available.
+func (s *SyncerV2) loadSyncStatus() {
+ var progress SyncProgressV2
+
+ if status := rawdb.ReadSnapshotSyncStatus(s.db); status != nil {
+ if err := json.Unmarshal(status, &progress); err != nil {
+ log.Error("Failed to decode snap sync status", "err", err)
+ } else {
+ for _, task := range progress.Tasks {
+ log.Debug("Scheduled account sync task", "from", task.Next, "last", task.Last)
+ }
+ s.tasks = progress.Tasks
+
+ for _, task := range s.tasks {
+ // Restore the completed storages
+ task.stateCompleted = make(map[common.Hash]struct{})
+ for _, hash := range task.StorageCompleted {
+ task.stateCompleted[hash] = struct{}{}
+ }
+ task.StorageCompleted = nil
+ }
+ s.lock.Lock()
+ defer s.lock.Unlock()
+
+ s.accountSynced = progress.AccountSynced
+ s.accountBytes = progress.AccountBytes
+ s.bytecodeSynced = progress.BytecodeSynced
+ s.bytecodeBytes = progress.BytecodeBytes
+ s.storageSynced = progress.StorageSynced
+ s.storageBytes = progress.StorageBytes
+ return
+ }
+ }
+ // Either we've failed to decode the previous state, or there was none.
+ // Start a fresh sync by chunking up the account range and scheduling
+ // them for retrieval.
+ s.tasks = nil
+ s.accountSynced, s.accountBytes = 0, 0
+ s.bytecodeSynced, s.bytecodeBytes = 0, 0
+ s.storageSynced, s.storageBytes = 0, 0
+
+ var next common.Hash
+ step := new(big.Int).Sub(
+ new(big.Int).Div(
+ new(big.Int).Exp(common.Big2, common.Big256, nil),
+ big.NewInt(int64(accountConcurrency)),
+ ), common.Big1,
+ )
+ for i := 0; i < accountConcurrency; i++ {
+ last := common.BigToHash(new(big.Int).Add(next.Big(), step))
+ if i == accountConcurrency-1 {
+ // Make sure we don't overflow if the step is not a proper divisor
+ last = common.MaxHash
+ }
+ s.tasks = append(s.tasks, &accountTaskV2{
+ Next: next,
+ Last: last,
+ SubTasks: make(map[common.Hash][]*storageTaskV2),
+ stateCompleted: make(map[common.Hash]struct{}),
+ })
+ log.Debug("Created account sync task", "from", next, "last", last)
+ next = common.BigToHash(new(big.Int).Add(last.Big(), common.Big1))
+ }
+}
+
+// saveSyncStatus marshals the remaining sync tasks into leveldb.
+func (s *SyncerV2) saveSyncStatus() {
+ // Serialize any partial progress to disk before spinning down
+ for _, task := range s.tasks {
+ // Save the account hashes of completed storage.
+ task.StorageCompleted = make([]common.Hash, 0, len(task.stateCompleted))
+ for hash := range task.stateCompleted {
+ task.StorageCompleted = append(task.StorageCompleted, hash)
+ }
+ if len(task.StorageCompleted) > 0 {
+ log.Debug("Leftover completed storages", "number", len(task.StorageCompleted), "next", task.Next, "last", task.Last)
+ }
+ }
+ // Store the actual progress markers
+ progress := &SyncProgressV2{
+ Tasks: s.tasks,
+ AccountSynced: s.accountSynced,
+ AccountBytes: s.accountBytes,
+ BytecodeSynced: s.bytecodeSynced,
+ BytecodeBytes: s.bytecodeBytes,
+ StorageSynced: s.storageSynced,
+ StorageBytes: s.storageBytes,
+ }
+ status, err := json.Marshal(progress)
+ if err != nil {
+ panic(err) // This can only fail during implementation
+ }
+ rawdb.WriteSnapshotSyncStatus(s.db, status)
+}
+
+// Progress returns the snap sync status statistics.
+func (s *SyncerV2) Progress() *SyncProgressV2 {
+ s.lock.Lock()
+ defer s.lock.Unlock()
+
+ return s.extProgress
+}
+
+// cleanAccountTasks removes account range retrieval tasks that have already been
+// completed.
+func (s *SyncerV2) cleanAccountTasks() {
+ // If the sync was already done before, don't even bother
+ if len(s.tasks) == 0 {
+ return
+ }
+ // Sync wasn't finished previously, check for any task that can be finalized
+ for i := 0; i < len(s.tasks); i++ {
+ if s.tasks[i].done {
+ s.tasks = append(s.tasks[:i], s.tasks[i+1:]...)
+ i--
+ }
+ }
+ // If everything was just finalized just, generate the account trie and start heal
+ if len(s.tasks) == 0 {
+ // Push the final sync report
+ s.reportSyncProgressV2(true)
+ }
+}
+
+// cleanStorageTasks iterates over all the account tasks and storage sub-tasks
+// within, cleaning any that have been completed.
+func (s *SyncerV2) cleanStorageTasks() {
+ for _, task := range s.tasks {
+ for account, subtasks := range task.SubTasks {
+ // Remove storage range retrieval tasks that completed
+ for j := 0; j < len(subtasks); j++ {
+ if subtasks[j].done {
+ subtasks = append(subtasks[:j], subtasks[j+1:]...)
+ j--
+ }
+ }
+ if len(subtasks) > 0 {
+ task.SubTasks[account] = subtasks
+ continue
+ }
+ // If all storage chunks are done, mark the account as done too
+ for j, hash := range task.res.hashes {
+ if hash == account {
+ task.needState[j] = false
+ }
+ }
+ delete(task.SubTasks, account)
+ task.pend--
+
+ // Mark the state as complete to prevent resyncing
+ task.stateCompleted[account] = struct{}{}
+
+ // If this was the last pending task, forward the account task
+ if task.pend == 0 {
+ s.forwardAccountTask(task)
+ }
+ }
+ }
+}
+
+// assignAccountTasks attempts to match idle peers to pending account range
+// retrievals.
+func (s *SyncerV2) assignAccountTasks(success chan *accountResponseV2, fail chan *accountRequestV2, cancel chan struct{}) {
+ s.lock.Lock()
+ defer s.lock.Unlock()
+
+ // Sort the peers by download capacity to use faster ones if many available
+ idlers := &capacitySort{
+ ids: make([]string, 0, len(s.accountIdlers)),
+ caps: make([]int, 0, len(s.accountIdlers)),
+ }
+ targetTTL := s.rates.TargetTimeout()
+ for id := range s.accountIdlers {
+ if _, ok := s.statelessPeers[id]; ok {
+ continue
+ }
+ idlers.ids = append(idlers.ids, id)
+ idlers.caps = append(idlers.caps, s.rates.Capacity(id, AccountRangeMsg, targetTTL))
+ }
+ if len(idlers.ids) == 0 {
+ return
+ }
+ sort.Sort(sort.Reverse(idlers))
+
+ // Iterate over all the tasks and try to find a pending one
+ for _, task := range s.tasks {
+ // Skip any tasks already filling
+ if task.req != nil || task.res != nil {
+ continue
+ }
+ // Task pending retrieval, try to find an idle peer. If no such peer
+ // exists, we probably assigned tasks for all (or they are stateless).
+ // Abort the entire assignment mechanism.
+ if len(idlers.ids) == 0 {
+ return
+ }
+ var (
+ idle = idlers.ids[0]
+ peer = s.peers[idle]
+ cap = idlers.caps[0]
+ )
+ idlers.ids, idlers.caps = idlers.ids[1:], idlers.caps[1:]
+
+ // Matched a pending task to an idle peer, allocate a unique request id
+ var reqid uint64
+ for {
+ reqid = uint64(rand.Int63())
+ if reqid == 0 {
+ continue
+ }
+ if _, ok := s.accountReqs[reqid]; ok {
+ continue
+ }
+ break
+ }
+ // Generate the network query and send it to the peer
+ req := &accountRequestV2{
+ peer: idle,
+ id: reqid,
+ time: time.Now(),
+ deliver: success,
+ revert: fail,
+ cancel: cancel,
+ stale: make(chan struct{}),
+ origin: task.Next,
+ limit: task.Last,
+ task: task,
+ }
+ req.timeout = time.AfterFunc(s.rates.TargetTimeout(), func() {
+ peer.Log().Debug("Account range request timed out", "reqid", reqid)
+ s.rates.Update(idle, AccountRangeMsg, 0, 0)
+ s.scheduleRevertAccountRequest(req)
+ })
+ s.accountReqs[reqid] = req
+ delete(s.accountIdlers, idle)
+
+ s.pend.Add(1)
+ go func(root common.Hash) {
+ defer s.pend.Done()
+
+ // Attempt to send the remote request and revert if it fails
+ if cap > maxRequestSize {
+ cap = maxRequestSize
+ }
+ if cap < minRequestSize { // Don't bother with peers below a bare minimum performance
+ cap = minRequestSize
+ }
+ if err := peer.RequestAccountRange(reqid, root, req.origin, req.limit, cap); err != nil {
+ peer.Log().Debug("Failed to request account range", "err", err)
+ s.scheduleRevertAccountRequest(req)
+ }
+ }(s.root)
+
+ // Inject the request into the task to block further assignments
+ task.req = req
+ }
+}
+
+// assignBytecodeTasks attempts to match idle peers to pending code retrievals.
+func (s *SyncerV2) assignBytecodeTasks(success chan *bytecodeResponseV2, fail chan *bytecodeRequestV2, cancel chan struct{}) {
+ s.lock.Lock()
+ defer s.lock.Unlock()
+
+ // Sort the peers by download capacity to use faster ones if many available
+ idlers := &capacitySort{
+ ids: make([]string, 0, len(s.bytecodeIdlers)),
+ caps: make([]int, 0, len(s.bytecodeIdlers)),
+ }
+ targetTTL := s.rates.TargetTimeout()
+ for id := range s.bytecodeIdlers {
+ if _, ok := s.statelessPeers[id]; ok {
+ continue
+ }
+ idlers.ids = append(idlers.ids, id)
+ idlers.caps = append(idlers.caps, s.rates.Capacity(id, ByteCodesMsg, targetTTL))
+ }
+ if len(idlers.ids) == 0 {
+ return
+ }
+ sort.Sort(sort.Reverse(idlers))
+
+ // Iterate over all the tasks and try to find a pending one
+ for _, task := range s.tasks {
+ // Skip any tasks not in the bytecode retrieval phase
+ if task.res == nil {
+ continue
+ }
+ // Skip tasks that are already retrieving (or done with) all codes
+ if len(task.codeTasks) == 0 {
+ continue
+ }
+ // Task pending retrieval, try to find an idle peer. If no such peer
+ // exists, we probably assigned tasks for all (or they are stateless).
+ // Abort the entire assignment mechanism.
+ if len(idlers.ids) == 0 {
+ return
+ }
+ var (
+ idle = idlers.ids[0]
+ peer = s.peers[idle]
+ cap = idlers.caps[0]
+ )
+ idlers.ids, idlers.caps = idlers.ids[1:], idlers.caps[1:]
+
+ // Matched a pending task to an idle peer, allocate a unique request id
+ var reqid uint64
+ for {
+ reqid = uint64(rand.Int63())
+ if reqid == 0 {
+ continue
+ }
+ if _, ok := s.bytecodeReqs[reqid]; ok {
+ continue
+ }
+ break
+ }
+ // Generate the network query and send it to the peer
+ if cap > maxCodeRequestCount {
+ cap = maxCodeRequestCount
+ }
+ hashes := make([]common.Hash, 0, cap)
+ for hash := range task.codeTasks {
+ delete(task.codeTasks, hash)
+ hashes = append(hashes, hash)
+ if len(hashes) >= cap {
+ break
+ }
+ }
+ req := &bytecodeRequestV2{
+ peer: idle,
+ id: reqid,
+ time: time.Now(),
+ deliver: success,
+ revert: fail,
+ cancel: cancel,
+ stale: make(chan struct{}),
+ hashes: hashes,
+ task: task,
+ }
+ req.timeout = time.AfterFunc(s.rates.TargetTimeout(), func() {
+ peer.Log().Debug("Bytecode request timed out", "reqid", reqid)
+ s.rates.Update(idle, ByteCodesMsg, 0, 0)
+ s.scheduleRevertBytecodeRequest(req)
+ })
+ s.bytecodeReqs[reqid] = req
+ delete(s.bytecodeIdlers, idle)
+
+ s.pend.Add(1)
+ go func() {
+ defer s.pend.Done()
+
+ // Attempt to send the remote request and revert if it fails
+ if err := peer.RequestByteCodes(reqid, hashes, maxRequestSize); err != nil {
+ log.Debug("Failed to request bytecodes", "err", err)
+ s.scheduleRevertBytecodeRequest(req)
+ }
+ }()
+ }
+}
+
+// assignStorageTasks attempts to match idle peers to pending storage range
+// retrievals.
+func (s *SyncerV2) assignStorageTasks(success chan *storageResponseV2, fail chan *storageRequestV2, cancel chan struct{}) {
+ s.lock.Lock()
+ defer s.lock.Unlock()
+
+ // Sort the peers by download capacity to use faster ones if many available
+ idlers := &capacitySort{
+ ids: make([]string, 0, len(s.storageIdlers)),
+ caps: make([]int, 0, len(s.storageIdlers)),
+ }
+ targetTTL := s.rates.TargetTimeout()
+ for id := range s.storageIdlers {
+ if _, ok := s.statelessPeers[id]; ok {
+ continue
+ }
+ idlers.ids = append(idlers.ids, id)
+ idlers.caps = append(idlers.caps, s.rates.Capacity(id, StorageRangesMsg, targetTTL))
+ }
+ if len(idlers.ids) == 0 {
+ return
+ }
+ sort.Sort(sort.Reverse(idlers))
+
+ // Iterate over all the tasks and try to find a pending one
+ for _, task := range s.tasks {
+ // Skip any tasks not in the storage retrieval phase
+ if task.res == nil {
+ continue
+ }
+ // Skip tasks that are already retrieving (or done with) all small states
+ storageTaskV2s := task.activeSubTasks()
+ if len(storageTaskV2s) == 0 && len(task.stateTasks) == 0 {
+ continue
+ }
+ // Task pending retrieval, try to find an idle peer. If no such peer
+ // exists, we probably assigned tasks for all (or they are stateless).
+ // Abort the entire assignment mechanism.
+ if len(idlers.ids) == 0 {
+ return
+ }
+ var (
+ idle = idlers.ids[0]
+ peer = s.peers[idle]
+ cap = idlers.caps[0]
+ )
+ idlers.ids, idlers.caps = idlers.ids[1:], idlers.caps[1:]
+
+ // Matched a pending task to an idle peer, allocate a unique request id
+ var reqid uint64
+ for {
+ reqid = uint64(rand.Int63())
+ if reqid == 0 {
+ continue
+ }
+ if _, ok := s.storageReqs[reqid]; ok {
+ continue
+ }
+ break
+ }
+ // Generate the network query and send it to the peer. If there are
+ // large contract tasks pending, complete those before diving into
+ // even more new contracts.
+ if cap > maxRequestSize {
+ cap = maxRequestSize
+ }
+ if cap < minRequestSize { // Don't bother with peers below a bare minimum performance
+ cap = minRequestSize
+ }
+ storageSets := cap / 1024
+
+ var (
+ accounts = make([]common.Hash, 0, storageSets)
+ roots = make([]common.Hash, 0, storageSets)
+ subtask *storageTaskV2
+ )
+ for account, subtasks := range storageTaskV2s {
+ for _, st := range subtasks {
+ // Skip any subtasks already filling
+ if st.req != nil {
+ continue
+ }
+ // Found an incomplete storage chunk, schedule it
+ accounts = append(accounts, account)
+ roots = append(roots, st.root)
+ subtask = st
+ break // Large contract chunks are downloaded individually
+ }
+ if subtask != nil {
+ break // Large contract chunks are downloaded individually
+ }
+ }
+ if subtask == nil {
+ // No large contract required retrieval, but small ones available
+ for account, root := range task.stateTasks {
+ delete(task.stateTasks, account)
+
+ accounts = append(accounts, account)
+ roots = append(roots, root)
+
+ if len(accounts) >= storageSets {
+ break
+ }
+ }
+ }
+ // If nothing was found, it means this task is actually already fully
+ // retrieving, but large contracts are hard to detect. Skip to the next.
+ if len(accounts) == 0 {
+ continue
+ }
+ req := &storageRequestV2{
+ peer: idle,
+ id: reqid,
+ time: time.Now(),
+ deliver: success,
+ revert: fail,
+ cancel: cancel,
+ stale: make(chan struct{}),
+ accounts: accounts,
+ roots: roots,
+ mainTask: task,
+ subTask: subtask,
+ }
+ if subtask != nil {
+ req.origin = subtask.Next
+ req.limit = subtask.Last
+ }
+ req.timeout = time.AfterFunc(s.rates.TargetTimeout(), func() {
+ peer.Log().Debug("Storage request timed out", "reqid", reqid)
+ s.rates.Update(idle, StorageRangesMsg, 0, 0)
+ s.scheduleRevertStorageRequest(req)
+ })
+ s.storageReqs[reqid] = req
+ delete(s.storageIdlers, idle)
+
+ s.pend.Add(1)
+ go func(root common.Hash) {
+ defer s.pend.Done()
+
+ // Attempt to send the remote request and revert if it fails
+ var origin, limit []byte
+ if subtask != nil {
+ origin, limit = req.origin[:], req.limit[:]
+ }
+ if err := peer.RequestStorageRanges(reqid, root, accounts, origin, limit, cap); err != nil {
+ log.Debug("Failed to request storage", "err", err)
+ s.scheduleRevertStorageRequest(req)
+ }
+ }(s.root)
+
+ // Inject the request into the subtask to block further assignments
+ if subtask != nil {
+ subtask.req = req
+ }
+ }
+}
+
+// revertRequests locates all the currently pending requests from a particular
+// peer and reverts them, rescheduling for others to fulfill.
+func (s *SyncerV2) revertRequests(peer string) {
+ // Gather the requests first, revertals need the lock too
+ s.lock.Lock()
+ var accountReqs []*accountRequestV2
+ for _, req := range s.accountReqs {
+ if req.peer == peer {
+ accountReqs = append(accountReqs, req)
+ }
+ }
+ var bytecodeReqs []*bytecodeRequestV2
+ for _, req := range s.bytecodeReqs {
+ if req.peer == peer {
+ bytecodeReqs = append(bytecodeReqs, req)
+ }
+ }
+ var storageReqs []*storageRequestV2
+ for _, req := range s.storageReqs {
+ if req.peer == peer {
+ storageReqs = append(storageReqs, req)
+ }
+ }
+ s.lock.Unlock()
+
+ // Revert all the requests matching the peer
+ for _, req := range accountReqs {
+ s.revertAccountRequest(req)
+ }
+ for _, req := range bytecodeReqs {
+ s.revertBytecodeRequest(req)
+ }
+ for _, req := range storageReqs {
+ s.revertStorageRequest(req)
+ }
+}
+
+// scheduleRevertAccountRequest asks the event loop to clean up an account range
+// request and return all failed retrieval tasks to the scheduler for reassignment.
+func (s *SyncerV2) scheduleRevertAccountRequest(req *accountRequestV2) {
+ select {
+ case req.revert <- req:
+ // Sync event loop notified
+ case <-req.cancel:
+ // Sync cycle got cancelled
+ case <-req.stale:
+ // Request already reverted
+ }
+}
+
+// revertAccountRequest cleans up an account range request and returns all failed
+// retrieval tasks to the scheduler for reassignment.
+//
+// Note, this needs to run on the event runloop thread to reschedule to idle peers.
+// On peer threads, use scheduleRevertAccountRequest.
+func (s *SyncerV2) revertAccountRequest(req *accountRequestV2) {
+ log.Debug("Reverting account request", "peer", req.peer, "reqid", req.id)
+ select {
+ case <-req.stale:
+ log.Trace("Account request already reverted", "peer", req.peer, "reqid", req.id)
+ return
+ default:
+ }
+ close(req.stale)
+
+ // Remove the request from the tracked set and restore the peer to the
+ // idle pool so it can be reassigned work (skip if peer already left).
+ s.lock.Lock()
+ delete(s.accountReqs, req.id)
+ if _, ok := s.peers[req.peer]; ok {
+ s.accountIdlers[req.peer] = struct{}{}
+ }
+ s.lock.Unlock()
+
+ // If there's a timeout timer still running, abort it and mark the account
+ // task as not-pending, ready for rescheduling
+ req.timeout.Stop()
+ if req.task.req == req {
+ req.task.req = nil
+ }
+}
+
+// scheduleRevertBytecodeRequest asks the event loop to clean up a bytecode request
+// and return all failed retrieval tasks to the scheduler for reassignment.
+func (s *SyncerV2) scheduleRevertBytecodeRequest(req *bytecodeRequestV2) {
+ select {
+ case req.revert <- req:
+ // Sync event loop notified
+ case <-req.cancel:
+ // Sync cycle got cancelled
+ case <-req.stale:
+ // Request already reverted
+ }
+}
+
+// revertBytecodeRequest cleans up a bytecode request and returns all failed
+// retrieval tasks to the scheduler for reassignment.
+//
+// Note, this needs to run on the event runloop thread to reschedule to idle peers.
+// On peer threads, use scheduleRevertBytecodeRequest.
+func (s *SyncerV2) revertBytecodeRequest(req *bytecodeRequestV2) {
+ log.Debug("Reverting bytecode request", "peer", req.peer)
+ select {
+ case <-req.stale:
+ log.Trace("Bytecode request already reverted", "peer", req.peer, "reqid", req.id)
+ return
+ default:
+ }
+ close(req.stale)
+
+ // Remove the request from the tracked set and restore the peer to the
+ // idle pool so it can be reassigned work (skip if peer already left).
+ s.lock.Lock()
+ delete(s.bytecodeReqs, req.id)
+ if _, ok := s.peers[req.peer]; ok {
+ s.bytecodeIdlers[req.peer] = struct{}{}
+ }
+ s.lock.Unlock()
+
+ // If there's a timeout timer still running, abort it and mark the code
+ // retrievals as not-pending, ready for rescheduling
+ req.timeout.Stop()
+ for _, hash := range req.hashes {
+ req.task.codeTasks[hash] = struct{}{}
+ }
+}
+
+// scheduleRevertStorageRequest asks the event loop to clean up a storage range
+// request and return all failed retrieval tasks to the scheduler for reassignment.
+func (s *SyncerV2) scheduleRevertStorageRequest(req *storageRequestV2) {
+ select {
+ case req.revert <- req:
+ // Sync event loop notified
+ case <-req.cancel:
+ // Sync cycle got cancelled
+ case <-req.stale:
+ // Request already reverted
+ }
+}
+
+// revertStorageRequest cleans up a storage range request and returns all failed
+// retrieval tasks to the scheduler for reassignment.
+//
+// Note, this needs to run on the event runloop thread to reschedule to idle peers.
+// On peer threads, use scheduleRevertStorageRequest.
+func (s *SyncerV2) revertStorageRequest(req *storageRequestV2) {
+ log.Debug("Reverting storage request", "peer", req.peer)
+ select {
+ case <-req.stale:
+ log.Trace("Storage request already reverted", "peer", req.peer, "reqid", req.id)
+ return
+ default:
+ }
+ close(req.stale)
+
+ // Remove the request from the tracked set and restore the peer to the
+ // idle pool so it can be reassigned work (skip if peer already left).
+ s.lock.Lock()
+ delete(s.storageReqs, req.id)
+ if _, ok := s.peers[req.peer]; ok {
+ s.storageIdlers[req.peer] = struct{}{}
+ }
+ s.lock.Unlock()
+
+ // If there's a timeout timer still running, abort it and mark the storage
+ // task as not-pending, ready for rescheduling
+ req.timeout.Stop()
+ if req.subTask != nil {
+ req.subTask.req = nil
+ } else {
+ for i, account := range req.accounts {
+ req.mainTask.stateTasks[account] = req.roots[i]
+ }
+ }
+}
+
+// processAccountResponse integrates an already validated account range response
+// into the account tasks.
+func (s *SyncerV2) processAccountResponse(res *accountResponseV2) {
+ // Switch the task from pending to filling
+ res.task.req = nil
+ res.task.res = res
+
+ // Ensure that the response doesn't overflow into the subsequent task
+ lastBig := res.task.Last.Big()
+ for i, hash := range res.hashes {
+ // Mark the range complete if the last is already included.
+ // Keep iteration to delete the extra states if exists.
+ cmp := hash.Big().Cmp(lastBig)
+ if cmp == 0 {
+ res.cont = false
+ continue
+ }
+ if cmp > 0 {
+ // Chunk overflown, cut off excess
+ res.hashes = res.hashes[:i]
+ res.accounts = res.accounts[:i]
+ res.cont = false // Mark range completed
+ break
+ }
+ }
+ // Iterate over all the accounts and assemble which ones need further sub-
+ // filling before the entire account range can be persisted.
+ res.task.needCode = make([]bool, len(res.accounts))
+ res.task.needState = make([]bool, len(res.accounts))
+ res.task.codeTasks = make(map[common.Hash]struct{})
+ res.task.stateTasks = make(map[common.Hash]common.Hash)
+
+ resumed := make(map[common.Hash]struct{})
+
+ res.task.pend = 0
+ for i, account := range res.accounts {
+ // Check if the account is a contract with an unknown code
+ if !bytes.Equal(account.CodeHash, types.EmptyCodeHash.Bytes()) {
+ if !rawdb.HasCodeWithPrefix(s.db, common.BytesToHash(account.CodeHash)) {
+ res.task.codeTasks[common.BytesToHash(account.CodeHash)] = struct{}{}
+ res.task.needCode[i] = true
+ res.task.pend++
+ }
+ }
+ // Check if the account is a contract with an unknown storage trie
+ if account.Root != types.EmptyRootHash {
+ // If the storage was already retrieved in the last cycle, there's no need
+ // to resync it again, regardless of whether the storage root is consistent
+ // or not.
+ if _, exist := res.task.stateCompleted[res.hashes[i]]; exist {
+ // The leftover storage tasks are not expected, unless system is
+ // very wrong.
+ if _, ok := res.task.SubTasks[res.hashes[i]]; ok {
+ panic(fmt.Errorf("unexpected leftover storage tasks, owner: %x", res.hashes[i]))
+ }
+ } else {
+ // If there was a previous large state retrieval in progress,
+ // don't restart it from scratch. This happens if a sync cycle
+ // is interrupted and resumed later. However, *do* update the
+ // previous root hash.
+ if subtasks, ok := res.task.SubTasks[res.hashes[i]]; ok {
+ log.Debug("Resuming large storage retrieval", "account", res.hashes[i], "root", account.Root)
+ for _, subtask := range subtasks {
+ subtask.root = account.Root
+ }
+ resumed[res.hashes[i]] = struct{}{}
+ largeStorageResumedGauge.Inc(1)
+ } else {
+ // It's possible that in the hash scheme, the storage, along
+ // with the trie nodes of the given root, is already present
+ // in the database. Schedule the storage task anyway to simplify
+ // the logic here.
+ res.task.stateTasks[res.hashes[i]] = account.Root
+ }
+ res.task.needState[i] = true
+ res.task.pend++
+ }
+ }
+ }
+ // Delete any subtasks that have been aborted but not resumed. It's essential
+ // as the corresponding contract might be self-destructed in this cycle(it's
+ // no longer possible in ethereum as self-destruction is disabled in Cancun
+ // Fork, but the condition is still necessary for other networks).
+ //
+ // Keep the leftover storage tasks if they are not covered by the responded
+ // account range which should be picked up in next account wave.
+ if len(res.hashes) > 0 {
+ // The hash of last delivered account in the response
+ last := res.hashes[len(res.hashes)-1]
+ for hash := range res.task.SubTasks {
+ if hash.Cmp(last) > 0 {
+ log.Debug("Keeping suspended storage retrieval", "account", hash)
+ continue
+ }
+ if _, ok := resumed[hash]; !ok {
+ log.Warn("Aborting suspended storage retrieval", "account", hash)
+ delete(res.task.SubTasks, hash)
+ largeStorageDiscardGauge.Inc(1)
+ }
+ }
+ }
+ // If the account range contained no contracts, or all have been fully filled
+ // beforehand, short circuit storage filling and forward to the next task
+ if res.task.pend == 0 {
+ s.forwardAccountTask(res.task)
+ return
+ }
+ // Some accounts are incomplete, leave as is for the storage and contract
+ // task assigners to pick up and fill
+}
+
+// processBytecodeResponse integrates an already validated bytecode response
+// into the account tasks.
+func (s *SyncerV2) processBytecodeResponse(res *bytecodeResponseV2) {
+ batch := s.db.NewBatch()
+
+ var codes uint64
+ for i, hash := range res.hashes {
+ code := res.codes[i]
+
+ // If the bytecode was not delivered, reschedule it
+ if code == nil {
+ res.task.codeTasks[hash] = struct{}{}
+ continue
+ }
+ // Code was delivered, mark it not needed any more
+ for j, account := range res.task.res.accounts {
+ if res.task.needCode[j] && hash == common.BytesToHash(account.CodeHash) {
+ res.task.needCode[j] = false
+ res.task.pend--
+ }
+ }
+ // Push the bytecode into a database batch
+ codes++
+ rawdb.WriteCode(batch, hash, code)
+ }
+ bytes := common.StorageSize(batch.ValueSize())
+ if err := batch.Write(); err != nil {
+ log.Crit("Failed to persist bytecodes", "err", err)
+ }
+ s.bytecodeSynced += codes
+ s.bytecodeBytes += bytes
+
+ log.Debug("Persisted set of bytecodes", "count", codes, "bytes", bytes)
+
+ // If this delivery completed the last pending task, forward the account task
+ // to the next chunk
+ if res.task.pend == 0 {
+ s.forwardAccountTask(res.task)
+ return
+ }
+ // Some accounts are still incomplete, leave as is for the storage and contract
+ // task assigners to pick up and fill.
+}
+
+// processStorageResponse integrates an already validated storage response
+// into the account tasks.
+func (s *SyncerV2) processStorageResponse(res *storageResponseV2) {
+ // Switch the subtask from pending to idle
+ if res.subTask != nil {
+ res.subTask.req = nil
+ }
+ batch := ethdb.HookedBatch{
+ Batch: s.db.NewBatch(),
+ OnPut: func(key []byte, value []byte) {
+ s.storageBytes += common.StorageSize(len(key) + len(value))
+ },
+ }
+ var (
+ slots int
+ oldStorageBytes = s.storageBytes
+ )
+ // Iterate over all the accounts and reconstruct their storage tries from the
+ // delivered slots
+ for i, account := range res.accounts {
+ // If the account was not delivered, reschedule it
+ if i >= len(res.hashes) {
+ res.mainTask.stateTasks[account] = res.roots[i]
+ continue
+ }
+ // State was delivered, if complete mark as not needed any more, otherwise
+ // mark the account as needing healing
+ for j, hash := range res.mainTask.res.hashes {
+ if account != hash {
+ continue
+ }
+ acc := res.mainTask.res.accounts[j]
+
+ // If the packet contains multiple contract storage slots, all
+ // but the last are surely complete. The last contract may be
+ // chunked, so check it's continuation flag.
+ if res.subTask == nil && res.mainTask.needState[j] && (i < len(res.hashes)-1 || !res.cont) {
+ res.mainTask.needState[j] = false
+ res.mainTask.pend--
+ res.mainTask.stateCompleted[account] = struct{}{} // mark it as completed
+ smallStorageGauge.Inc(1)
+ }
+ // If the last contract was chunked, we need to switch to large
+ // contract handling mode
+ if res.subTask == nil && i == len(res.hashes)-1 && res.cont {
+ // If we haven't yet started a large-contract retrieval, create
+ // the subtasks for it within the main account task
+ if tasks, ok := res.mainTask.SubTasks[account]; !ok {
+ var (
+ keys = res.hashes[i]
+ chunks = uint64(storageConcurrency)
+ lastKey common.Hash
+ )
+ if len(keys) > 0 {
+ lastKey = keys[len(keys)-1]
+ }
+ // If the number of slots remaining is low, decrease the
+ // number of chunks. Somewhere on the order of 10-15K slots
+ // fit into a packet of 500KB. A key/slot pair is maximum 64
+ // bytes, so pessimistically maxRequestSize/64 = 8K.
+ //
+ // Chunk so that at least 2 packets are needed to fill a task.
+ if estimate, err := estimateRemainingSlots(len(keys), lastKey); err == nil {
+ if n := estimate / (2 * (maxRequestSize / 64)); n+1 < chunks {
+ chunks = n + 1
+ }
+ log.Debug("Chunked large contract", "initiators", len(keys), "tail", lastKey, "remaining", estimate, "chunks", chunks)
+ } else {
+ log.Debug("Chunked large contract", "initiators", len(keys), "tail", lastKey, "chunks", chunks)
+ }
+ r := newHashRange(lastKey, chunks)
+ if chunks == 1 {
+ smallStorageGauge.Inc(1)
+ } else {
+ largeStorageGauge.Inc(1)
+ }
+ tasks = append(tasks, &storageTaskV2{
+ Next: common.Hash{},
+ Last: r.End(),
+ root: acc.Root,
+ })
+ for r.Next() {
+ tasks = append(tasks, &storageTaskV2{
+ Next: r.Start(),
+ Last: r.End(),
+ root: acc.Root,
+ })
+ }
+ for _, task := range tasks {
+ log.Debug("Created storage sync task", "account", account, "root", acc.Root, "from", task.Next, "last", task.Last)
+ }
+ res.mainTask.SubTasks[account] = tasks
+
+ // Since we've just created the sub-tasks, this response
+ // is surely for the first one (zero origin)
+ res.subTask = tasks[0]
+ }
+ }
+ // If we're in large contract delivery mode, forward the subtask
+ if res.subTask != nil {
+ // Ensure the response doesn't overflow into the subsequent task
+ last := res.subTask.Last.Big()
+ // Find the first overflowing key. While at it, mark res as complete
+ // if we find the range to include or pass the 'last'
+ index := sort.Search(len(res.hashes[i]), func(k int) bool {
+ cmp := res.hashes[i][k].Big().Cmp(last)
+ if cmp >= 0 {
+ res.cont = false
+ }
+ return cmp > 0
+ })
+ if index >= 0 {
+ // cut off excess
+ res.hashes[i] = res.hashes[i][:index]
+ res.slots[i] = res.slots[i][:index]
+ }
+ // Forward the relevant storage chunk (even if created just now)
+ if res.cont {
+ res.subTask.Next = incHash(res.hashes[i][len(res.hashes[i])-1])
+ } else {
+ res.subTask.done = true
+ }
+ }
+ }
+ // Iterate over all the complete contracts, reconstruct the trie nodes and
+ // push them to disk. If the contract is chunked, the trie nodes will be
+ // reconstructed later.
+ slots += len(res.hashes[i])
+
+ // Persist the received storage segments. These flat state may be outdated
+ // during the sync, but it will be fixed by the BAL-healing.
+ for j := 0; j < len(res.hashes[i]); j++ {
+ rawdb.WriteStorageSnapshot(batch, account, res.hashes[i][j], res.slots[i][j])
+ }
+ }
+ // Flush anything written just now and update the stats
+ if err := batch.Write(); err != nil {
+ log.Crit("Failed to persist storage slots", "err", err)
+ }
+ s.storageSynced += uint64(slots)
+
+ log.Debug("Persisted set of storage slots", "accounts", len(res.hashes), "slots", slots, "bytes", s.storageBytes-oldStorageBytes)
+
+ // If this delivery completed the last pending task, forward the account task
+ // to the next chunk
+ if res.mainTask.pend == 0 {
+ s.forwardAccountTask(res.mainTask)
+ return
+ }
+ // Some accounts are still incomplete, leave as is for the storage and contract
+ // task assigners to pick up and fill.
+}
+
+// forwardAccountTask takes a filled account task and persists anything available
+// into the database, after which it forwards the next account marker so that the
+// task's next chunk may be filled.
+func (s *SyncerV2) forwardAccountTask(task *accountTaskV2) {
+ // Remove any pending delivery
+ res := task.res
+ if res == nil {
+ return // nothing to forward
+ }
+ task.res = nil
+
+ // Persist the received account segments. These flat state maybe
+ // outdated during the sync, but it can be fixed later during the
+ // snapshot generation.
+ oldAccountBytes := s.accountBytes
+
+ batch := ethdb.HookedBatch{
+ Batch: s.db.NewBatch(),
+ OnPut: func(key []byte, value []byte) {
+ s.accountBytes += common.StorageSize(len(key) + len(value))
+ },
+ }
+ for i, hash := range res.hashes {
+ if task.needCode[i] || task.needState[i] {
+ break
+ }
+ slim := types.SlimAccountRLP(*res.accounts[i])
+ rawdb.WriteAccountSnapshot(batch, hash, slim)
+ }
+ // Flush anything written just now and update the stats
+ if err := batch.Write(); err != nil {
+ log.Crit("Failed to persist accounts", "err", err)
+ }
+ s.accountSynced += uint64(len(res.accounts))
+
+ // Task filling persisted, push it the chunk marker forward to the first
+ // account still missing data.
+ for i, hash := range res.hashes {
+ if task.needCode[i] || task.needState[i] {
+ return
+ }
+ task.Next = incHash(hash)
+
+ // Remove the completion flag once the account range is pushed
+ // forward. The leftover accounts will be skipped in the next
+ // cycle.
+ delete(task.stateCompleted, hash)
+ }
+ // All accounts marked as complete, track if the entire task is done
+ task.done = !res.cont
+
+ // Error out if there is any leftover completion flag.
+ if task.done && len(task.stateCompleted) != 0 {
+ panic(fmt.Errorf("storage completion flags should be emptied, %d left", len(task.stateCompleted)))
+ }
+ log.Debug("Persisted range of accounts", "accounts", len(res.accounts), "bytes", s.accountBytes-oldAccountBytes)
+}
+
+// OnAccounts is a callback method to invoke when a range of accounts are
+// received from a remote peer.
+func (s *SyncerV2) OnAccounts(peer SyncPeerV2, id uint64, hashes []common.Hash, accounts [][]byte, proof [][]byte) error {
+ size := common.StorageSize(len(hashes) * common.HashLength)
+ for _, account := range accounts {
+ size += common.StorageSize(len(account))
+ }
+ for _, node := range proof {
+ size += common.StorageSize(len(node))
+ }
+ logger := peer.Log().New("reqid", id)
+ logger.Trace("Delivering range of accounts", "hashes", len(hashes), "accounts", len(accounts), "proofs", len(proof), "bytes", size)
+
+ // Whether or not the response is valid, we can mark the peer as idle and
+ // notify the scheduler to assign a new task. If the response is invalid,
+ // we'll drop the peer in a bit.
+ defer func() {
+ s.lock.Lock()
+ defer s.lock.Unlock()
+ if _, ok := s.peers[peer.ID()]; ok {
+ s.accountIdlers[peer.ID()] = struct{}{}
+ }
+ select {
+ case s.update <- struct{}{}:
+ default:
+ }
+ }()
+ s.lock.Lock()
+ // Ensure the response is for a valid request
+ req, ok := s.accountReqs[id]
+ if !ok {
+ // Request stale, perhaps the peer timed out but came through in the end
+ logger.Warn("Unexpected account range packet")
+ s.lock.Unlock()
+ return nil
+ }
+ delete(s.accountReqs, id)
+ s.rates.Update(peer.ID(), AccountRangeMsg, time.Since(req.time), int(size))
+
+ // Clean up the request timeout timer, we'll see how to proceed further based
+ // on the actual delivered content
+ if !req.timeout.Stop() {
+ // The timeout is already triggered, and this request will be reverted+rescheduled
+ s.lock.Unlock()
+ return nil
+ }
+ // Response is valid, but check if peer is signalling that it does not have
+ // the requested data. For account range queries that means the state being
+ // retrieved was either already pruned remotely, or the peer is not yet
+ // synced to our head.
+ if len(hashes) == 0 && len(accounts) == 0 && len(proof) == 0 {
+ logger.Debug("Peer rejected account range request", "root", s.root)
+ s.statelessPeers[peer.ID()] = struct{}{}
+ s.lock.Unlock()
+
+ // Signal this request as failed, and ready for rescheduling
+ s.scheduleRevertAccountRequest(req)
+ return nil
+ }
+ root := s.root
+ s.lock.Unlock()
+
+ // Reconstruct a partial trie from the response and verify it
+ keys := make([][]byte, len(hashes))
+ for i, key := range hashes {
+ keys[i] = common.CopyBytes(key[:])
+ }
+ nodes := make(trienode.ProofList, len(proof))
+ for i, node := range proof {
+ nodes[i] = node
+ }
+ cont, err := trie.VerifyRangeProof(root, req.origin[:], keys, accounts, nodes.Set())
+ if err != nil {
+ logger.Warn("Account range failed proof", "err", err)
+ // Signal this request as failed, and ready for rescheduling
+ s.scheduleRevertAccountRequest(req)
+ return err
+ }
+ accs := make([]*types.StateAccount, len(accounts))
+ for i, account := range accounts {
+ acc := new(types.StateAccount)
+ if err := rlp.DecodeBytes(account, acc); err != nil {
+ panic(err) // We created these blobs, we must be able to decode them
+ }
+ accs[i] = acc
+ }
+ response := &accountResponseV2{
+ task: req.task,
+ hashes: hashes,
+ accounts: accs,
+ cont: cont,
+ }
+ select {
+ case req.deliver <- response:
+ case <-req.cancel:
+ case <-req.stale:
+ }
+ return nil
+}
+
+// OnByteCodes is a callback method to invoke when a batch of contract
+// bytes codes are received from a remote peer in the syncing phase.
+func (s *SyncerV2) OnByteCodes(peer SyncPeerV2, id uint64, bytecodes [][]byte) error {
+ var size common.StorageSize
+ for _, code := range bytecodes {
+ size += common.StorageSize(len(code))
+ }
+ logger := peer.Log().New("reqid", id)
+ logger.Trace("Delivering set of bytecodes", "bytecodes", len(bytecodes), "bytes", size)
+
+ // Whether or not the response is valid, we can mark the peer as idle and
+ // notify the scheduler to assign a new task. If the response is invalid,
+ // we'll drop the peer in a bit.
+ defer func() {
+ s.lock.Lock()
+ defer s.lock.Unlock()
+ if _, ok := s.peers[peer.ID()]; ok {
+ s.bytecodeIdlers[peer.ID()] = struct{}{}
+ }
+ select {
+ case s.update <- struct{}{}:
+ default:
+ }
+ }()
+ s.lock.Lock()
+ // Ensure the response is for a valid request
+ req, ok := s.bytecodeReqs[id]
+ if !ok {
+ // Request stale, perhaps the peer timed out but came through in the end
+ logger.Warn("Unexpected bytecode packet")
+ s.lock.Unlock()
+ return nil
+ }
+ delete(s.bytecodeReqs, id)
+ s.rates.Update(peer.ID(), ByteCodesMsg, time.Since(req.time), len(bytecodes))
+
+ // Clean up the request timeout timer, we'll see how to proceed further based
+ // on the actual delivered content
+ if !req.timeout.Stop() {
+ // The timeout is already triggered, and this request will be reverted+rescheduled
+ s.lock.Unlock()
+ return nil
+ }
+
+ // Response is valid, but check if peer is signalling that it does not have
+ // the requested data. For bytecode range queries that means the peer is not
+ // yet synced.
+ if len(bytecodes) == 0 {
+ logger.Debug("Peer rejected bytecode request")
+ s.statelessPeers[peer.ID()] = struct{}{}
+ s.lock.Unlock()
+
+ // Signal this request as failed, and ready for rescheduling
+ s.scheduleRevertBytecodeRequest(req)
+ return nil
+ }
+ s.lock.Unlock()
+
+ // Cross reference the requested bytecodes with the response to find gaps
+ // that the serving node is missing
+ hasher := crypto.NewKeccakState()
+ hash := make([]byte, 32)
+
+ codes := make([][]byte, len(req.hashes))
+ for i, j := 0, 0; i < len(bytecodes); i++ {
+ // Find the next hash that we've been served, leaving misses with nils
+ hasher.Reset()
+ hasher.Write(bytecodes[i])
+ hasher.Read(hash)
+
+ for j < len(req.hashes) && !bytes.Equal(hash, req.hashes[j][:]) {
+ j++
+ }
+ if j < len(req.hashes) {
+ codes[j] = bytecodes[i]
+ j++
+ continue
+ }
+ // We've either ran out of hashes, or got unrequested data
+ logger.Warn("Unexpected bytecodes", "count", len(bytecodes)-i)
+ // Signal this request as failed, and ready for rescheduling
+ s.scheduleRevertBytecodeRequest(req)
+ return errors.New("unexpected bytecode")
+ }
+ // Response validated, send it to the scheduler for filling
+ response := &bytecodeResponseV2{
+ task: req.task,
+ hashes: req.hashes,
+ codes: codes,
+ }
+ select {
+ case req.deliver <- response:
+ case <-req.cancel:
+ case <-req.stale:
+ }
+ return nil
+}
+
+// OnStorage is a callback method to invoke when ranges of storage slots
+// are received from a remote peer.
+func (s *SyncerV2) OnStorage(peer SyncPeerV2, id uint64, hashes [][]common.Hash, slots [][][]byte, proof [][]byte) error {
+ // Gather some trace stats to aid in debugging issues
+ var (
+ hashCount int
+ slotCount int
+ size common.StorageSize
+ )
+ for _, hashset := range hashes {
+ size += common.StorageSize(common.HashLength * len(hashset))
+ hashCount += len(hashset)
+ }
+ for _, slotset := range slots {
+ for _, slot := range slotset {
+ size += common.StorageSize(len(slot))
+ }
+ slotCount += len(slotset)
+ }
+ for _, node := range proof {
+ size += common.StorageSize(len(node))
+ }
+ logger := peer.Log().New("reqid", id)
+ logger.Trace("Delivering ranges of storage slots", "accounts", len(hashes), "hashes", hashCount, "slots", slotCount, "proofs", len(proof), "size", size)
+
+ // Whether or not the response is valid, we can mark the peer as idle and
+ // notify the scheduler to assign a new task. If the response is invalid,
+ // we'll drop the peer in a bit.
+ defer func() {
+ s.lock.Lock()
+ defer s.lock.Unlock()
+ if _, ok := s.peers[peer.ID()]; ok {
+ s.storageIdlers[peer.ID()] = struct{}{}
+ }
+ select {
+ case s.update <- struct{}{}:
+ default:
+ }
+ }()
+ s.lock.Lock()
+ // Ensure the response is for a valid request
+ req, ok := s.storageReqs[id]
+ if !ok {
+ // Request stale, perhaps the peer timed out but came through in the end
+ logger.Warn("Unexpected storage ranges packet")
+ s.lock.Unlock()
+ return nil
+ }
+ delete(s.storageReqs, id)
+ s.rates.Update(peer.ID(), StorageRangesMsg, time.Since(req.time), int(size))
+
+ // Clean up the request timeout timer, we'll see how to proceed further based
+ // on the actual delivered content
+ if !req.timeout.Stop() {
+ // The timeout is already triggered, and this request will be reverted+rescheduled
+ s.lock.Unlock()
+ return nil
+ }
+
+ // Reject the response if the hash sets and slot sets don't match, or if the
+ // peer sent more data than requested.
+ if len(hashes) != len(slots) {
+ s.lock.Unlock()
+ s.scheduleRevertStorageRequest(req) // reschedule request
+ logger.Warn("Hash and slot set size mismatch", "hashset", len(hashes), "slotset", len(slots))
+ return errors.New("hash and slot set size mismatch")
+ }
+ if len(hashes) > len(req.accounts) {
+ s.lock.Unlock()
+ s.scheduleRevertStorageRequest(req) // reschedule request
+ logger.Warn("Hash set larger than requested", "hashset", len(hashes), "requested", len(req.accounts))
+ return errors.New("hash set larger than requested")
+ }
+ // Response is valid, but check if peer is signalling that it does not have
+ // the requested data. For storage range queries that means the state being
+ // retrieved was either already pruned remotely, or the peer is not yet
+ // synced to our head.
+ if len(hashes) == 0 && len(proof) == 0 {
+ logger.Debug("Peer rejected storage request")
+ s.statelessPeers[peer.ID()] = struct{}{}
+ s.lock.Unlock()
+ s.scheduleRevertStorageRequest(req) // reschedule request
+ return nil
+ }
+ s.lock.Unlock()
+
+ // Reconstruct the partial tries from the response and verify them
+ var cont bool
+
+ // If a proof was attached while the response is empty, it indicates that the
+ // requested range specified with 'origin' is empty. Construct an empty state
+ // response locally to finalize the range.
+ if len(hashes) == 0 && len(proof) > 0 {
+ hashes = append(hashes, []common.Hash{})
+ slots = append(slots, [][]byte{})
+ }
+ for i := 0; i < len(hashes); i++ {
+ // Convert the keys and proofs into an internal format
+ keys := make([][]byte, len(hashes[i]))
+ for j, key := range hashes[i] {
+ keys[j] = common.CopyBytes(key[:])
+ }
+ nodes := make(trienode.ProofList, 0, len(proof))
+ if i == len(hashes)-1 {
+ for _, node := range proof {
+ nodes = append(nodes, node)
+ }
+ }
+ var err error
+ if len(nodes) == 0 {
+ // No proof has been attached, the response must cover the entire key
+ // space and hash to the origin root.
+ _, err = trie.VerifyRangeProof(req.roots[i], nil, keys, slots[i], nil)
+ if err != nil {
+ s.scheduleRevertStorageRequest(req) // reschedule request
+ logger.Warn("Storage slots failed proof", "err", err)
+ return err
+ }
+ } else {
+ // A proof was attached, the response is only partial, check that the
+ // returned data is indeed part of the storage trie
+ proofdb := nodes.Set()
+
+ cont, err = trie.VerifyRangeProof(req.roots[i], req.origin[:], keys, slots[i], proofdb)
+ if err != nil {
+ s.scheduleRevertStorageRequest(req) // reschedule request
+ logger.Warn("Storage range failed proof", "err", err)
+ return err
+ }
+ }
+ }
+ // Partial tries reconstructed, send them to the scheduler for storage filling
+ response := &storageResponseV2{
+ mainTask: req.mainTask,
+ subTask: req.subTask,
+ accounts: req.accounts,
+ roots: req.roots,
+ hashes: hashes,
+ slots: slots,
+ cont: cont,
+ }
+ select {
+ case req.deliver <- response:
+ case <-req.cancel:
+ case <-req.stale:
+ }
+ return nil
+}
+
+// report calculates various status reports and provides it to the user.
+func (s *SyncerV2) report(force bool) {
+ if len(s.tasks) > 0 {
+ s.reportSyncProgressV2(force)
+ return
+ }
+}
+
+// reportSyncProgressV2 calculates various status reports and provides it to the user.
+func (s *SyncerV2) reportSyncProgressV2(force bool) {
+ // Don't report all the events, just occasionally
+ if !force && time.Since(s.logTime) < 8*time.Second {
+ return
+ }
+ // Don't report anything until we have a meaningful progress
+ synced := s.accountBytes + s.bytecodeBytes + s.storageBytes
+ if synced == 0 {
+ return
+ }
+ accountGaps := new(big.Int)
+ for _, task := range s.tasks {
+ accountGaps.Add(accountGaps, new(big.Int).Sub(task.Last.Big(), task.Next.Big()))
+ }
+ accountFills := new(big.Int).Sub(hashSpace, accountGaps)
+ if accountFills.BitLen() == 0 {
+ return
+ }
+ s.logTime = time.Now()
+ estBytes := float64(new(big.Int).Div(
+ new(big.Int).Mul(new(big.Int).SetUint64(uint64(synced)), hashSpace),
+ accountFills,
+ ).Uint64())
+
+ // Don't report anything until we have a meaningful progress
+ if estBytes < 1.0 {
+ return
+ }
+ // Cap the estimated state size using the synced size to avoid negative values
+ if estBytes < float64(synced) {
+ estBytes = float64(synced)
+ }
+ elapsed := time.Since(s.startTime)
+ estTime := elapsed / time.Duration(synced) * time.Duration(estBytes)
+
+ // Create a mega progress report
+ var (
+ progress = fmt.Sprintf("%.2f%%", float64(synced)*100/estBytes)
+ accounts = fmt.Sprintf("%v@%v", log.FormatLogfmtUint64(s.accountSynced), s.accountBytes.TerminalString())
+ storage = fmt.Sprintf("%v@%v", log.FormatLogfmtUint64(s.storageSynced), s.storageBytes.TerminalString())
+ bytecode = fmt.Sprintf("%v@%v", log.FormatLogfmtUint64(s.bytecodeSynced), s.bytecodeBytes.TerminalString())
+ )
+ log.Info("Syncing: state download in progress", "synced", progress, "state", synced,
+ "accounts", accounts, "slots", storage, "codes", bytecode, "eta", common.PrettyDuration(estTime-elapsed))
+}
diff --git a/eth/protocols/snap/syncv2_test.go b/eth/protocols/snap/syncv2_test.go
new file mode 100644
index 0000000000..d303d84c09
--- /dev/null
+++ b/eth/protocols/snap/syncv2_test.go
@@ -0,0 +1,1164 @@
+// Copyright 2026 The go-ethereum Authors
+// This file is part of the go-ethereum library.
+//
+// The go-ethereum library is free software: you can redistribute it and/or modify
+// it under the terms of the GNU Lesser General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+//
+// The go-ethereum library is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU Lesser General Public License for more details.
+//
+// You should have received a copy of the GNU Lesser General Public License
+// along with the go-ethereum library. If not, see .
+
+package snap
+
+import (
+ "bytes"
+ "fmt"
+ "sync"
+ "sync/atomic"
+ "testing"
+ "time"
+
+ "github.com/ethereum/go-ethereum/common"
+ "github.com/ethereum/go-ethereum/core/rawdb"
+ "github.com/ethereum/go-ethereum/core/types"
+ "github.com/ethereum/go-ethereum/ethdb"
+ "github.com/ethereum/go-ethereum/log"
+ "github.com/ethereum/go-ethereum/rlp"
+ "github.com/ethereum/go-ethereum/trie"
+ "github.com/ethereum/go-ethereum/trie/trienode"
+)
+
+// SyncerV2 (skeleton) only downloads the flat state (accounts, storage slots,
+// bytecodes) and does not perform trie generation or state healing. These tests
+// verify that, in a single uninterrupted sync cycle, the syncer fully downloads
+// all the expected flat state from the source peer(s).
+
+type (
+ accountHandlerFuncV2 func(t *testPeerV2, requestId uint64, root common.Hash, origin common.Hash, limit common.Hash, cap int) error
+ storageHandlerFuncV2 func(t *testPeerV2, requestId uint64, root common.Hash, accounts []common.Hash, origin, limit []byte, max int) error
+ codeHandlerFuncV2 func(t *testPeerV2, id uint64, hashes []common.Hash, max int) error
+)
+
+type testPeerV2 struct {
+ id string
+ test *testing.T
+ remote *SyncerV2
+ logger log.Logger
+ accountTrie *trie.Trie
+ accountValues []*kv
+ storageTries map[common.Hash]*trie.Trie
+ storageValues map[common.Hash][]*kv
+
+ accountRequestHandler accountHandlerFuncV2
+ storageRequestHandler storageHandlerFuncV2
+ codeRequestHandler codeHandlerFuncV2
+ term func()
+
+ // counters
+ nAccountRequests atomic.Int64
+ nStorageRequests atomic.Int64
+ nBytecodeRequests atomic.Int64
+}
+
+func newTestPeerV2(id string, t *testing.T, term func()) *testPeerV2 {
+ return &testPeerV2{
+ id: id,
+ test: t,
+ logger: log.New("id", id),
+ accountRequestHandler: defaultAccountRequestHandlerV2,
+ storageRequestHandler: defaultStorageRequestHandlerV2,
+ codeRequestHandler: defaultCodeRequestHandlerV2,
+ term: term,
+ }
+}
+
+func (t *testPeerV2) setStorageTries(tries map[common.Hash]*trie.Trie) {
+ t.storageTries = make(map[common.Hash]*trie.Trie)
+ for root, trie := range tries {
+ t.storageTries[root] = trie.Copy()
+ }
+}
+
+func (t *testPeerV2) ID() string { return t.id }
+func (t *testPeerV2) Log() log.Logger { return t.logger }
+
+func (t *testPeerV2) Stats() string {
+ return fmt.Sprintf(`Account requests: %d
+Storage requests: %d
+Bytecode requests: %d
+`, t.nAccountRequests.Load(), t.nStorageRequests.Load(), t.nBytecodeRequests.Load())
+}
+
+func (t *testPeerV2) RequestAccountRange(id uint64, root, origin, limit common.Hash, bytes int) error {
+ t.logger.Trace("Fetching range of accounts", "reqid", id, "root", root, "origin", origin, "limit", limit, "bytes", common.StorageSize(bytes))
+ t.nAccountRequests.Add(1)
+ go t.accountRequestHandler(t, id, root, origin, limit, bytes)
+ return nil
+}
+
+func (t *testPeerV2) RequestStorageRanges(id uint64, root common.Hash, accounts []common.Hash, origin, limit []byte, bytes int) error {
+ t.nStorageRequests.Add(1)
+ if len(accounts) == 1 && origin != nil {
+ t.logger.Trace("Fetching range of large storage slots", "reqid", id, "root", root, "account", accounts[0], "origin", common.BytesToHash(origin), "limit", common.BytesToHash(limit), "bytes", common.StorageSize(bytes))
+ } else {
+ t.logger.Trace("Fetching ranges of small storage slots", "reqid", id, "root", root, "accounts", len(accounts), "first", accounts[0], "bytes", common.StorageSize(bytes))
+ }
+ go t.storageRequestHandler(t, id, root, accounts, origin, limit, bytes)
+ return nil
+}
+
+func (t *testPeerV2) RequestByteCodes(id uint64, hashes []common.Hash, bytes int) error {
+ t.nBytecodeRequests.Add(1)
+ t.logger.Trace("Fetching set of byte codes", "reqid", id, "hashes", len(hashes), "bytes", common.StorageSize(bytes))
+ go t.codeRequestHandler(t, id, hashes, bytes)
+ return nil
+}
+
+func createAccountRequestResponseV2(t *testPeerV2, root common.Hash, origin common.Hash, limit common.Hash, cap int) (keys []common.Hash, vals [][]byte, proofs [][]byte) {
+ var size int
+ if limit == (common.Hash{}) {
+ limit = common.MaxHash
+ }
+ for _, entry := range t.accountValues {
+ if size > cap {
+ break
+ }
+ if bytes.Compare(origin[:], entry.k) <= 0 {
+ keys = append(keys, common.BytesToHash(entry.k))
+ vals = append(vals, entry.v)
+ size += 32 + len(entry.v)
+ }
+ if bytes.Compare(entry.k, limit[:]) >= 0 {
+ break
+ }
+ }
+ proof := trienode.NewProofSet()
+ if err := t.accountTrie.Prove(origin[:], proof); err != nil {
+ t.logger.Error("Could not prove inexistence of origin", "origin", origin, "error", err)
+ }
+ if len(keys) > 0 {
+ lastK := (keys[len(keys)-1])[:]
+ if err := t.accountTrie.Prove(lastK, proof); err != nil {
+ t.logger.Error("Could not prove last item", "error", err)
+ }
+ }
+ return keys, vals, proof.List()
+}
+
+func createStorageRequestResponseV2(t *testPeerV2, root common.Hash, accounts []common.Hash, origin, limit []byte, max int) (hashes [][]common.Hash, slots [][][]byte, proofs [][]byte) {
+ var size int
+ for _, account := range accounts {
+ var originHash common.Hash
+ if len(origin) > 0 {
+ originHash = common.BytesToHash(origin)
+ }
+ var limitHash = common.MaxHash
+ if len(limit) > 0 {
+ limitHash = common.BytesToHash(limit)
+ }
+ var (
+ keys []common.Hash
+ vals [][]byte
+ abort bool
+ )
+ for _, entry := range t.storageValues[account] {
+ if size >= max {
+ abort = true
+ break
+ }
+ if bytes.Compare(entry.k, originHash[:]) < 0 {
+ continue
+ }
+ keys = append(keys, common.BytesToHash(entry.k))
+ vals = append(vals, entry.v)
+ size += 32 + len(entry.v)
+ if bytes.Compare(entry.k, limitHash[:]) >= 0 {
+ break
+ }
+ }
+ if len(keys) > 0 {
+ hashes = append(hashes, keys)
+ slots = append(slots, vals)
+ }
+ if originHash != (common.Hash{}) || (abort && len(keys) > 0) {
+ proof := trienode.NewProofSet()
+ stTrie := t.storageTries[account]
+
+ if err := stTrie.Prove(originHash[:], proof); err != nil {
+ t.logger.Error("Could not prove inexistence of origin", "origin", originHash, "error", err)
+ }
+ if len(keys) > 0 {
+ lastK := (keys[len(keys)-1])[:]
+ if err := stTrie.Prove(lastK, proof); err != nil {
+ t.logger.Error("Could not prove last item", "error", err)
+ }
+ }
+ proofs = append(proofs, proof.List()...)
+ break
+ }
+ }
+ return hashes, slots, proofs
+}
+
+func createStorageRequestResponseAlwaysProveV2(t *testPeerV2, root common.Hash, accounts []common.Hash, bOrigin, bLimit []byte, max int) (hashes [][]common.Hash, slots [][][]byte, proofs [][]byte) {
+ var size int
+ max = max * 3 / 4
+
+ var origin common.Hash
+ if len(bOrigin) > 0 {
+ origin = common.BytesToHash(bOrigin)
+ }
+ var exit bool
+ for i, account := range accounts {
+ var keys []common.Hash
+ var vals [][]byte
+ for _, entry := range t.storageValues[account] {
+ if bytes.Compare(entry.k, origin[:]) < 0 {
+ exit = true
+ }
+ keys = append(keys, common.BytesToHash(entry.k))
+ vals = append(vals, entry.v)
+ size += 32 + len(entry.v)
+ if size > max {
+ exit = true
+ }
+ }
+ if i == len(accounts)-1 {
+ exit = true
+ }
+ hashes = append(hashes, keys)
+ slots = append(slots, vals)
+
+ if exit {
+ proof := trienode.NewProofSet()
+ stTrie := t.storageTries[account]
+
+ if err := stTrie.Prove(origin[:], proof); err != nil {
+ t.logger.Error("Could not prove inexistence of origin", "origin", origin, "error", err)
+ }
+ if len(keys) > 0 {
+ lastK := (keys[len(keys)-1])[:]
+ if err := stTrie.Prove(lastK, proof); err != nil {
+ t.logger.Error("Could not prove last item", "error", err)
+ }
+ }
+ proofs = append(proofs, proof.List()...)
+ break
+ }
+ }
+ return hashes, slots, proofs
+}
+
+// defaultAccountRequestHandlerV2 is a well-behaving handler for AccountRangeRequests.
+func defaultAccountRequestHandlerV2(t *testPeerV2, id uint64, root common.Hash, origin common.Hash, limit common.Hash, cap int) error {
+ keys, vals, proofs := createAccountRequestResponseV2(t, root, origin, limit, cap)
+ if err := t.remote.OnAccounts(t, id, keys, vals, proofs); err != nil {
+ t.test.Errorf("Remote side rejected our delivery: %v", err)
+ t.term()
+ return err
+ }
+ return nil
+}
+
+func defaultStorageRequestHandlerV2(t *testPeerV2, requestId uint64, root common.Hash, accounts []common.Hash, bOrigin, bLimit []byte, max int) error {
+ hashes, slots, proofs := createStorageRequestResponseV2(t, root, accounts, bOrigin, bLimit, max)
+ if err := t.remote.OnStorage(t, requestId, hashes, slots, proofs); err != nil {
+ t.test.Errorf("Remote side rejected our delivery: %v", err)
+ t.term()
+ }
+ return nil
+}
+
+func defaultCodeRequestHandlerV2(t *testPeerV2, id uint64, hashes []common.Hash, max int) error {
+ var bytecodes [][]byte
+ for _, h := range hashes {
+ bytecodes = append(bytecodes, getCodeByHash(h))
+ }
+ if err := t.remote.OnByteCodes(t, id, bytecodes); err != nil {
+ t.test.Errorf("Remote side rejected our delivery: %v", err)
+ t.term()
+ }
+ return nil
+}
+
+// Misbehaving handlers.
+
+func emptyRequestAccountRangeFnV2(t *testPeerV2, requestId uint64, root common.Hash, origin common.Hash, limit common.Hash, cap int) error {
+ t.remote.OnAccounts(t, requestId, nil, nil, nil)
+ return nil
+}
+
+func nonResponsiveRequestAccountRangeFnV2(t *testPeerV2, requestId uint64, root common.Hash, origin common.Hash, limit common.Hash, cap int) error {
+ return nil
+}
+
+func emptyStorageRequestHandlerV2(t *testPeerV2, requestId uint64, root common.Hash, accounts []common.Hash, origin, limit []byte, max int) error {
+ t.remote.OnStorage(t, requestId, nil, nil, nil)
+ return nil
+}
+
+func nonResponsiveStorageRequestHandlerV2(t *testPeerV2, requestId uint64, root common.Hash, accounts []common.Hash, origin, limit []byte, max int) error {
+ return nil
+}
+
+func proofHappyStorageRequestHandlerV2(t *testPeerV2, requestId uint64, root common.Hash, accounts []common.Hash, origin, limit []byte, max int) error {
+ hashes, slots, proofs := createStorageRequestResponseAlwaysProveV2(t, root, accounts, origin, limit, max)
+ if err := t.remote.OnStorage(t, requestId, hashes, slots, proofs); err != nil {
+ t.test.Errorf("Remote side rejected our delivery: %v", err)
+ t.term()
+ }
+ return nil
+}
+
+func corruptCodeRequestHandlerV2(t *testPeerV2, id uint64, hashes []common.Hash, max int) error {
+ var bytecodes [][]byte
+ for _, h := range hashes {
+ bytecodes = append(bytecodes, h[:])
+ }
+ if err := t.remote.OnByteCodes(t, id, bytecodes); err != nil {
+ t.logger.Info("remote error on delivery (as expected)", "error", err)
+ t.remote.Unregister(t.id)
+ }
+ return nil
+}
+
+func cappedCodeRequestHandlerV2(t *testPeerV2, id uint64, hashes []common.Hash, max int) error {
+ var bytecodes [][]byte
+ for _, h := range hashes[:1] {
+ bytecodes = append(bytecodes, getCodeByHash(h))
+ }
+ if err := t.remote.OnByteCodes(t, id, bytecodes); err != nil {
+ t.test.Errorf("Remote side rejected our delivery: %v", err)
+ t.term()
+ }
+ return nil
+}
+
+func starvingStorageRequestHandlerV2(t *testPeerV2, requestId uint64, root common.Hash, accounts []common.Hash, origin, limit []byte, max int) error {
+ return defaultStorageRequestHandlerV2(t, requestId, root, accounts, origin, limit, 500)
+}
+
+func starvingAccountRequestHandlerV2(t *testPeerV2, requestId uint64, root common.Hash, origin common.Hash, limit common.Hash, cap int) error {
+ return defaultAccountRequestHandlerV2(t, requestId, root, origin, limit, 500)
+}
+
+func corruptAccountRequestHandlerV2(t *testPeerV2, requestId uint64, root common.Hash, origin common.Hash, limit common.Hash, cap int) error {
+ hashes, accounts, proofs := createAccountRequestResponseV2(t, root, origin, limit, cap)
+ if len(proofs) > 0 {
+ proofs = proofs[1:]
+ }
+ if err := t.remote.OnAccounts(t, requestId, hashes, accounts, proofs); err != nil {
+ t.logger.Info("remote error on delivery (as expected)", "error", err)
+ t.remote.Unregister(t.id)
+ }
+ return nil
+}
+
+func corruptStorageRequestHandlerV2(t *testPeerV2, requestId uint64, root common.Hash, accounts []common.Hash, origin, limit []byte, max int) error {
+ hashes, slots, proofs := createStorageRequestResponseV2(t, root, accounts, origin, limit, max)
+ if len(proofs) > 0 {
+ proofs = proofs[1:]
+ }
+ if err := t.remote.OnStorage(t, requestId, hashes, slots, proofs); err != nil {
+ t.logger.Info("remote error on delivery (as expected)", "error", err)
+ t.remote.Unregister(t.id)
+ }
+ return nil
+}
+
+func noProofStorageRequestHandlerV2(t *testPeerV2, requestId uint64, root common.Hash, accounts []common.Hash, origin, limit []byte, max int) error {
+ hashes, slots, _ := createStorageRequestResponseV2(t, root, accounts, origin, limit, max)
+ if err := t.remote.OnStorage(t, requestId, hashes, slots, nil); err != nil {
+ t.logger.Info("remote error on delivery (as expected)", "error", err)
+ t.remote.Unregister(t.id)
+ }
+ return nil
+}
+
+func setupSyncerV2(scheme string, peers ...*testPeerV2) *SyncerV2 {
+ stateDb := rawdb.NewMemoryDatabase()
+ syncer := NewSyncerV2(stateDb, scheme)
+ for _, peer := range peers {
+ syncer.Register(peer)
+ peer.remote = syncer
+ }
+ return syncer
+}
+
+// verifyFlatState checks that the database contains the snapshot entries for
+// every expected account and storage slot, plus the bytecode for every account
+// that has one. Trie node presence is intentionally not checked: SyncerV2 only
+// downloads flat state.
+func verifyFlatState(t *testing.T, db ethdb.KeyValueStore, accountValues []*kv, storageValues map[common.Hash][]*kv) {
+ t.Helper()
+
+ for _, entry := range accountValues {
+ hash := common.BytesToHash(entry.k)
+ got := rawdb.ReadAccountSnapshot(db, hash)
+ if got == nil {
+ t.Fatalf("missing account snapshot for %x", hash)
+ }
+ var acc types.StateAccount
+ if err := rlp.DecodeBytes(entry.v, &acc); err != nil {
+ t.Fatalf("failed to decode source account %x: %v", hash, err)
+ }
+ want := types.SlimAccountRLP(acc)
+ if !bytes.Equal(got, want) {
+ t.Fatalf("account snapshot mismatch for %x:\n got %x\n want %x", hash, got, want)
+ }
+ if !bytes.Equal(acc.CodeHash, types.EmptyCodeHash.Bytes()) {
+ if !rawdb.HasCode(db, common.BytesToHash(acc.CodeHash)) {
+ t.Fatalf("missing code for hash %x (account %x)", acc.CodeHash, hash)
+ }
+ }
+ }
+ var accounts, slots int
+ for _, entry := range accountValues {
+ accounts++
+ account := common.BytesToHash(entry.k)
+ for _, slot := range storageValues[account] {
+ slotHash := common.BytesToHash(slot.k)
+ got := rawdb.ReadStorageSnapshot(db, account, slotHash)
+ if got == nil {
+ t.Fatalf("missing storage snapshot for account %x slot %x", account, slotHash)
+ }
+ if !bytes.Equal(got, slot.v) {
+ t.Fatalf("storage snapshot mismatch for account %x slot %x:\n got %x\n want %x", account, slotHash, got, slot.v)
+ }
+ slots++
+ }
+ }
+ t.Logf("flat state verified: accounts=%d slots=%d", accounts, slots)
+}
+
+// TestSyncV2 tests a basic sync with one peer.
+func TestSyncV2(t *testing.T) {
+ t.Parallel()
+ testSyncV2(t, rawdb.HashScheme)
+ testSyncV2(t, rawdb.PathScheme)
+}
+
+func testSyncV2(t *testing.T, scheme string) {
+ var (
+ once sync.Once
+ cancel = make(chan struct{})
+ term = func() { once.Do(func() { close(cancel) }) }
+ )
+ nodeScheme, sourceAccountTrie, elems := makeAccountTrieNoStorage(100, scheme)
+
+ mkSource := func(name string) *testPeerV2 {
+ source := newTestPeerV2(name, t, term)
+ source.accountTrie = sourceAccountTrie.Copy()
+ source.accountValues = elems
+ return source
+ }
+ syncer := setupSyncerV2(nodeScheme, mkSource("source"))
+ if err := syncer.Sync(sourceAccountTrie.Hash(), cancel); err != nil {
+ t.Fatalf("sync failed: %v", err)
+ }
+ verifyFlatState(t, syncer.db, elems, nil)
+}
+
+// TestSyncTinyTriePanicV2 tests a basic sync with one peer and a tiny trie.
+func TestSyncTinyTriePanicV2(t *testing.T) {
+ t.Parallel()
+ testSyncTinyTriePanicV2(t, rawdb.HashScheme)
+ testSyncTinyTriePanicV2(t, rawdb.PathScheme)
+}
+
+func testSyncTinyTriePanicV2(t *testing.T, scheme string) {
+ var (
+ once sync.Once
+ cancel = make(chan struct{})
+ term = func() { once.Do(func() { close(cancel) }) }
+ )
+ nodeScheme, sourceAccountTrie, elems := makeAccountTrieNoStorage(1, scheme)
+
+ mkSource := func(name string) *testPeerV2 {
+ source := newTestPeerV2(name, t, term)
+ source.accountTrie = sourceAccountTrie.Copy()
+ source.accountValues = elems
+ return source
+ }
+ syncer := setupSyncerV2(nodeScheme, mkSource("source"))
+ done := checkStall(t, term)
+ if err := syncer.Sync(sourceAccountTrie.Hash(), cancel); err != nil {
+ t.Fatalf("sync failed: %v", err)
+ }
+ close(done)
+ verifyFlatState(t, syncer.db, elems, nil)
+}
+
+// TestMultiSyncV2 tests a basic sync with multiple peers.
+func TestMultiSyncV2(t *testing.T) {
+ t.Parallel()
+ testMultiSyncV2(t, rawdb.HashScheme)
+ testMultiSyncV2(t, rawdb.PathScheme)
+}
+
+func testMultiSyncV2(t *testing.T, scheme string) {
+ var (
+ once sync.Once
+ cancel = make(chan struct{})
+ term = func() { once.Do(func() { close(cancel) }) }
+ )
+ nodeScheme, sourceAccountTrie, elems := makeAccountTrieNoStorage(100, scheme)
+
+ mkSource := func(name string) *testPeerV2 {
+ source := newTestPeerV2(name, t, term)
+ source.accountTrie = sourceAccountTrie.Copy()
+ source.accountValues = elems
+ return source
+ }
+ syncer := setupSyncerV2(nodeScheme, mkSource("sourceA"), mkSource("sourceB"))
+ done := checkStall(t, term)
+ if err := syncer.Sync(sourceAccountTrie.Hash(), cancel); err != nil {
+ t.Fatalf("sync failed: %v", err)
+ }
+ close(done)
+ verifyFlatState(t, syncer.db, elems, nil)
+}
+
+// TestSyncWithStorageV2 tests basic sync using accounts + storage + code.
+func TestSyncWithStorageV2(t *testing.T) {
+ t.Parallel()
+ testSyncWithStorageV2(t, rawdb.HashScheme)
+ testSyncWithStorageV2(t, rawdb.PathScheme)
+}
+
+func testSyncWithStorageV2(t *testing.T, scheme string) {
+ var (
+ once sync.Once
+ cancel = make(chan struct{})
+ term = func() { once.Do(func() { close(cancel) }) }
+ )
+ sourceAccountTrie, elems, storageTries, storageElems := makeAccountTrieWithStorage(scheme, 3, 3000, true, false, false)
+
+ mkSource := func(name string) *testPeerV2 {
+ source := newTestPeerV2(name, t, term)
+ source.accountTrie = sourceAccountTrie.Copy()
+ source.accountValues = elems
+ source.setStorageTries(storageTries)
+ source.storageValues = storageElems
+ return source
+ }
+ syncer := setupSyncerV2(scheme, mkSource("sourceA"))
+ done := checkStall(t, term)
+ if err := syncer.Sync(sourceAccountTrie.Hash(), cancel); err != nil {
+ t.Fatalf("sync failed: %v", err)
+ }
+ close(done)
+ verifyFlatState(t, syncer.db, elems, storageElems)
+}
+
+// TestMultiSyncManyUselessV2 keeps one good peer and several that return empty.
+func TestMultiSyncManyUselessV2(t *testing.T) {
+ t.Parallel()
+ testMultiSyncManyUselessV2(t, rawdb.HashScheme)
+ testMultiSyncManyUselessV2(t, rawdb.PathScheme)
+}
+
+func testMultiSyncManyUselessV2(t *testing.T, scheme string) {
+ var (
+ once sync.Once
+ cancel = make(chan struct{})
+ term = func() { once.Do(func() { close(cancel) }) }
+ )
+ sourceAccountTrie, elems, storageTries, storageElems := makeAccountTrieWithStorage(scheme, 100, 3000, true, false, false)
+
+ mkSource := func(name string, noAccount, noStorage bool) *testPeerV2 {
+ source := newTestPeerV2(name, t, term)
+ source.accountTrie = sourceAccountTrie.Copy()
+ source.accountValues = elems
+ source.setStorageTries(storageTries)
+ source.storageValues = storageElems
+ if noAccount {
+ source.accountRequestHandler = emptyRequestAccountRangeFnV2
+ }
+ if noStorage {
+ source.storageRequestHandler = emptyStorageRequestHandlerV2
+ }
+ return source
+ }
+ syncer := setupSyncerV2(
+ scheme,
+ mkSource("full", false, false),
+ mkSource("noAccounts", true, false),
+ mkSource("noStorage", false, true),
+ )
+ done := checkStall(t, term)
+ if err := syncer.Sync(sourceAccountTrie.Hash(), cancel); err != nil {
+ t.Fatalf("sync failed: %v", err)
+ }
+ close(done)
+ verifyFlatState(t, syncer.db, elems, storageElems)
+}
+
+// TestMultiSyncManyUselessWithLowTimeoutV2 is the same as above but with a very
+// low timeout, exercising the timeout/reschedule paths.
+func TestMultiSyncManyUselessWithLowTimeoutV2(t *testing.T) {
+ t.Parallel()
+ testMultiSyncManyUselessWithLowTimeoutV2(t, rawdb.HashScheme)
+ testMultiSyncManyUselessWithLowTimeoutV2(t, rawdb.PathScheme)
+}
+
+func testMultiSyncManyUselessWithLowTimeoutV2(t *testing.T, scheme string) {
+ var (
+ once sync.Once
+ cancel = make(chan struct{})
+ term = func() { once.Do(func() { close(cancel) }) }
+ )
+ sourceAccountTrie, elems, storageTries, storageElems := makeAccountTrieWithStorage(scheme, 100, 3000, true, false, false)
+
+ mkSource := func(name string, noAccount, noStorage bool) *testPeerV2 {
+ source := newTestPeerV2(name, t, term)
+ source.accountTrie = sourceAccountTrie.Copy()
+ source.accountValues = elems
+ source.setStorageTries(storageTries)
+ source.storageValues = storageElems
+ if !noAccount {
+ source.accountRequestHandler = emptyRequestAccountRangeFnV2
+ }
+ if !noStorage {
+ source.storageRequestHandler = emptyStorageRequestHandlerV2
+ }
+ return source
+ }
+ syncer := setupSyncerV2(
+ scheme,
+ mkSource("full", true, true),
+ mkSource("noAccounts", false, true),
+ mkSource("noStorage", true, false),
+ )
+ syncer.rates.OverrideTTLLimit = time.Millisecond
+
+ done := checkStall(t, term)
+ if err := syncer.Sync(sourceAccountTrie.Hash(), cancel); err != nil {
+ t.Fatalf("sync failed: %v", err)
+ }
+ close(done)
+ verifyFlatState(t, syncer.db, elems, storageElems)
+}
+
+// TestMultiSyncManyUnresponsiveV2 keeps one good peer and several that don't
+// respond at all.
+func TestMultiSyncManyUnresponsiveV2(t *testing.T) {
+ t.Parallel()
+ testMultiSyncManyUnresponsiveV2(t, rawdb.HashScheme)
+ testMultiSyncManyUnresponsiveV2(t, rawdb.PathScheme)
+}
+
+func testMultiSyncManyUnresponsiveV2(t *testing.T, scheme string) {
+ var (
+ once sync.Once
+ cancel = make(chan struct{})
+ term = func() { once.Do(func() { close(cancel) }) }
+ )
+ sourceAccountTrie, elems, storageTries, storageElems := makeAccountTrieWithStorage(scheme, 100, 3000, true, false, false)
+
+ mkSource := func(name string, noAccount, noStorage bool) *testPeerV2 {
+ source := newTestPeerV2(name, t, term)
+ source.accountTrie = sourceAccountTrie.Copy()
+ source.accountValues = elems
+ source.setStorageTries(storageTries)
+ source.storageValues = storageElems
+ if noAccount {
+ source.accountRequestHandler = nonResponsiveRequestAccountRangeFnV2
+ }
+ if noStorage {
+ source.storageRequestHandler = nonResponsiveStorageRequestHandlerV2
+ }
+ return source
+ }
+ syncer := setupSyncerV2(
+ scheme,
+ mkSource("full", false, false),
+ mkSource("noAccounts", true, false),
+ mkSource("noStorage", false, true),
+ )
+ syncer.rates.OverrideTTLLimit = time.Millisecond
+
+ done := checkStall(t, term)
+ if err := syncer.Sync(sourceAccountTrie.Hash(), cancel); err != nil {
+ t.Fatalf("sync failed: %v", err)
+ }
+ close(done)
+ verifyFlatState(t, syncer.db, elems, storageElems)
+}
+
+// TestSyncBoundaryAccountTrieV2 tests sync against a few normal peers, but the
+// account trie has a few boundary elements.
+func TestSyncBoundaryAccountTrieV2(t *testing.T) {
+ t.Parallel()
+ testSyncBoundaryAccountTrieV2(t, rawdb.HashScheme)
+ testSyncBoundaryAccountTrieV2(t, rawdb.PathScheme)
+}
+
+func testSyncBoundaryAccountTrieV2(t *testing.T, scheme string) {
+ var (
+ once sync.Once
+ cancel = make(chan struct{})
+ term = func() { once.Do(func() { close(cancel) }) }
+ )
+ nodeScheme, sourceAccountTrie, elems := makeBoundaryAccountTrie(scheme, 3000)
+
+ mkSource := func(name string) *testPeerV2 {
+ source := newTestPeerV2(name, t, term)
+ source.accountTrie = sourceAccountTrie.Copy()
+ source.accountValues = elems
+ return source
+ }
+ syncer := setupSyncerV2(
+ nodeScheme,
+ mkSource("peer-a"),
+ mkSource("peer-b"),
+ )
+ done := checkStall(t, term)
+ if err := syncer.Sync(sourceAccountTrie.Hash(), cancel); err != nil {
+ t.Fatalf("sync failed: %v", err)
+ }
+ close(done)
+ verifyFlatState(t, syncer.db, elems, nil)
+}
+
+// TestSyncNoStorageAndOneCappedPeerV2 tests sync using accounts and no storage,
+// where one peer is consistently returning very small results.
+func TestSyncNoStorageAndOneCappedPeerV2(t *testing.T) {
+ t.Parallel()
+ testSyncNoStorageAndOneCappedPeerV2(t, rawdb.HashScheme)
+ testSyncNoStorageAndOneCappedPeerV2(t, rawdb.PathScheme)
+}
+
+func testSyncNoStorageAndOneCappedPeerV2(t *testing.T, scheme string) {
+ var (
+ once sync.Once
+ cancel = make(chan struct{})
+ term = func() { once.Do(func() { close(cancel) }) }
+ )
+ nodeScheme, sourceAccountTrie, elems := makeAccountTrieNoStorage(3000, scheme)
+
+ mkSource := func(name string, slow bool) *testPeerV2 {
+ source := newTestPeerV2(name, t, term)
+ source.accountTrie = sourceAccountTrie.Copy()
+ source.accountValues = elems
+ if slow {
+ source.accountRequestHandler = starvingAccountRequestHandlerV2
+ }
+ return source
+ }
+
+ syncer := setupSyncerV2(
+ nodeScheme,
+ mkSource("nice-a", false),
+ mkSource("nice-b", false),
+ mkSource("nice-c", false),
+ mkSource("capped", true),
+ )
+ done := checkStall(t, term)
+ if err := syncer.Sync(sourceAccountTrie.Hash(), cancel); err != nil {
+ t.Fatalf("sync failed: %v", err)
+ }
+ close(done)
+ verifyFlatState(t, syncer.db, elems, nil)
+}
+
+// TestSyncNoStorageAndOneCodeCorruptPeerV2 has one peer that doesn't deliver
+// code requests properly.
+func TestSyncNoStorageAndOneCodeCorruptPeerV2(t *testing.T) {
+ t.Parallel()
+ testSyncNoStorageAndOneCodeCorruptPeerV2(t, rawdb.HashScheme)
+ testSyncNoStorageAndOneCodeCorruptPeerV2(t, rawdb.PathScheme)
+}
+
+func testSyncNoStorageAndOneCodeCorruptPeerV2(t *testing.T, scheme string) {
+ var (
+ once sync.Once
+ cancel = make(chan struct{})
+ term = func() { once.Do(func() { close(cancel) }) }
+ )
+ nodeScheme, sourceAccountTrie, elems := makeAccountTrieNoStorage(3000, scheme)
+
+ mkSource := func(name string, codeFn codeHandlerFuncV2) *testPeerV2 {
+ source := newTestPeerV2(name, t, term)
+ source.accountTrie = sourceAccountTrie.Copy()
+ source.accountValues = elems
+ source.codeRequestHandler = codeFn
+ return source
+ }
+ syncer := setupSyncerV2(
+ nodeScheme,
+ mkSource("capped", cappedCodeRequestHandlerV2),
+ mkSource("corrupt", corruptCodeRequestHandlerV2),
+ )
+ done := checkStall(t, term)
+ if err := syncer.Sync(sourceAccountTrie.Hash(), cancel); err != nil {
+ t.Fatalf("sync failed: %v", err)
+ }
+ close(done)
+ verifyFlatState(t, syncer.db, elems, nil)
+}
+
+func TestSyncNoStorageAndOneAccountCorruptPeerV2(t *testing.T) {
+ t.Parallel()
+ testSyncNoStorageAndOneAccountCorruptPeerV2(t, rawdb.HashScheme)
+ testSyncNoStorageAndOneAccountCorruptPeerV2(t, rawdb.PathScheme)
+}
+
+func testSyncNoStorageAndOneAccountCorruptPeerV2(t *testing.T, scheme string) {
+ var (
+ once sync.Once
+ cancel = make(chan struct{})
+ term = func() { once.Do(func() { close(cancel) }) }
+ )
+ nodeScheme, sourceAccountTrie, elems := makeAccountTrieNoStorage(3000, scheme)
+
+ mkSource := func(name string, accFn accountHandlerFuncV2) *testPeerV2 {
+ source := newTestPeerV2(name, t, term)
+ source.accountTrie = sourceAccountTrie.Copy()
+ source.accountValues = elems
+ source.accountRequestHandler = accFn
+ return source
+ }
+ syncer := setupSyncerV2(
+ nodeScheme,
+ mkSource("capped", starvingAccountRequestHandlerV2),
+ mkSource("corrupt", corruptAccountRequestHandlerV2),
+ )
+ done := checkStall(t, term)
+ if err := syncer.Sync(sourceAccountTrie.Hash(), cancel); err != nil {
+ t.Fatalf("sync failed: %v", err)
+ }
+ close(done)
+ verifyFlatState(t, syncer.db, elems, nil)
+}
+
+// TestSyncNoStorageAndOneCodeCappedPeerV2 has one peer that delivers code
+// hashes one by one.
+func TestSyncNoStorageAndOneCodeCappedPeerV2(t *testing.T) {
+ t.Parallel()
+ testSyncNoStorageAndOneCodeCappedPeerV2(t, rawdb.HashScheme)
+ testSyncNoStorageAndOneCodeCappedPeerV2(t, rawdb.PathScheme)
+}
+
+func testSyncNoStorageAndOneCodeCappedPeerV2(t *testing.T, scheme string) {
+ var (
+ once sync.Once
+ cancel = make(chan struct{})
+ term = func() { once.Do(func() { close(cancel) }) }
+ )
+ nodeScheme, sourceAccountTrie, elems := makeAccountTrieNoStorage(3000, scheme)
+
+ mkSource := func(name string, codeFn codeHandlerFuncV2) *testPeerV2 {
+ source := newTestPeerV2(name, t, term)
+ source.accountTrie = sourceAccountTrie.Copy()
+ source.accountValues = elems
+ source.codeRequestHandler = codeFn
+ return source
+ }
+ var counter int
+ syncer := setupSyncerV2(
+ nodeScheme,
+ mkSource("capped", func(t *testPeerV2, id uint64, hashes []common.Hash, max int) error {
+ counter++
+ return cappedCodeRequestHandlerV2(t, id, hashes, max)
+ }),
+ )
+ done := checkStall(t, term)
+ if err := syncer.Sync(sourceAccountTrie.Hash(), cancel); err != nil {
+ t.Fatalf("sync failed: %v", err)
+ }
+ close(done)
+
+ if threshold := 100; counter > threshold {
+ t.Logf("Error, expected < %d invocations, got %d", threshold, counter)
+ }
+ verifyFlatState(t, syncer.db, elems, nil)
+}
+
+// TestSyncBoundaryStorageTrieV2 tests sync against a few normal peers, but the
+// storage trie has a few boundary elements.
+func TestSyncBoundaryStorageTrieV2(t *testing.T) {
+ t.Parallel()
+ testSyncBoundaryStorageTrieV2(t, rawdb.HashScheme)
+ testSyncBoundaryStorageTrieV2(t, rawdb.PathScheme)
+}
+
+func testSyncBoundaryStorageTrieV2(t *testing.T, scheme string) {
+ var (
+ once sync.Once
+ cancel = make(chan struct{})
+ term = func() { once.Do(func() { close(cancel) }) }
+ )
+ sourceAccountTrie, elems, storageTries, storageElems := makeAccountTrieWithStorage(scheme, 10, 1000, false, true, false)
+
+ mkSource := func(name string) *testPeerV2 {
+ source := newTestPeerV2(name, t, term)
+ source.accountTrie = sourceAccountTrie.Copy()
+ source.accountValues = elems
+ source.setStorageTries(storageTries)
+ source.storageValues = storageElems
+ return source
+ }
+ syncer := setupSyncerV2(
+ scheme,
+ mkSource("peer-a"),
+ mkSource("peer-b"),
+ )
+ done := checkStall(t, term)
+ if err := syncer.Sync(sourceAccountTrie.Hash(), cancel); err != nil {
+ t.Fatalf("sync failed: %v", err)
+ }
+ close(done)
+ verifyFlatState(t, syncer.db, elems, storageElems)
+}
+
+// TestSyncWithStorageAndOneCappedPeerV2 tests sync using accounts + storage,
+// where one peer is consistently returning very small results.
+func TestSyncWithStorageAndOneCappedPeerV2(t *testing.T) {
+ t.Parallel()
+ testSyncWithStorageAndOneCappedPeerV2(t, rawdb.HashScheme)
+ testSyncWithStorageAndOneCappedPeerV2(t, rawdb.PathScheme)
+}
+
+func testSyncWithStorageAndOneCappedPeerV2(t *testing.T, scheme string) {
+ var (
+ once sync.Once
+ cancel = make(chan struct{})
+ term = func() { once.Do(func() { close(cancel) }) }
+ )
+ sourceAccountTrie, elems, storageTries, storageElems := makeAccountTrieWithStorage(scheme, 300, 1000, false, false, false)
+
+ mkSource := func(name string, slow bool) *testPeerV2 {
+ source := newTestPeerV2(name, t, term)
+ source.accountTrie = sourceAccountTrie.Copy()
+ source.accountValues = elems
+ source.setStorageTries(storageTries)
+ source.storageValues = storageElems
+ if slow {
+ source.storageRequestHandler = starvingStorageRequestHandlerV2
+ }
+ return source
+ }
+ syncer := setupSyncerV2(
+ scheme,
+ mkSource("nice-a", false),
+ mkSource("slow", true),
+ )
+ done := checkStall(t, term)
+ if err := syncer.Sync(sourceAccountTrie.Hash(), cancel); err != nil {
+ t.Fatalf("sync failed: %v", err)
+ }
+ close(done)
+ verifyFlatState(t, syncer.db, elems, storageElems)
+}
+
+// TestSyncWithStorageAndCorruptPeerV2 tests sync using accounts + storage,
+// where one peer is sometimes sending bad proofs.
+func TestSyncWithStorageAndCorruptPeerV2(t *testing.T) {
+ t.Parallel()
+ testSyncWithStorageAndCorruptPeerV2(t, rawdb.HashScheme)
+ testSyncWithStorageAndCorruptPeerV2(t, rawdb.PathScheme)
+}
+
+func testSyncWithStorageAndCorruptPeerV2(t *testing.T, scheme string) {
+ var (
+ once sync.Once
+ cancel = make(chan struct{})
+ term = func() { once.Do(func() { close(cancel) }) }
+ )
+ sourceAccountTrie, elems, storageTries, storageElems := makeAccountTrieWithStorage(scheme, 100, 3000, true, false, false)
+
+ mkSource := func(name string, handler storageHandlerFuncV2) *testPeerV2 {
+ source := newTestPeerV2(name, t, term)
+ source.accountTrie = sourceAccountTrie.Copy()
+ source.accountValues = elems
+ source.setStorageTries(storageTries)
+ source.storageValues = storageElems
+ source.storageRequestHandler = handler
+ return source
+ }
+ syncer := setupSyncerV2(
+ scheme,
+ mkSource("nice-a", defaultStorageRequestHandlerV2),
+ mkSource("nice-b", defaultStorageRequestHandlerV2),
+ mkSource("nice-c", defaultStorageRequestHandlerV2),
+ mkSource("corrupt", corruptStorageRequestHandlerV2),
+ )
+ done := checkStall(t, term)
+ if err := syncer.Sync(sourceAccountTrie.Hash(), cancel); err != nil {
+ t.Fatalf("sync failed: %v", err)
+ }
+ close(done)
+ verifyFlatState(t, syncer.db, elems, storageElems)
+}
+
+func TestSyncWithStorageAndNonProvingPeerV2(t *testing.T) {
+ t.Parallel()
+ testSyncWithStorageAndNonProvingPeerV2(t, rawdb.HashScheme)
+ testSyncWithStorageAndNonProvingPeerV2(t, rawdb.PathScheme)
+}
+
+func testSyncWithStorageAndNonProvingPeerV2(t *testing.T, scheme string) {
+ var (
+ once sync.Once
+ cancel = make(chan struct{})
+ term = func() { once.Do(func() { close(cancel) }) }
+ )
+ sourceAccountTrie, elems, storageTries, storageElems := makeAccountTrieWithStorage(scheme, 100, 3000, true, false, false)
+
+ mkSource := func(name string, handler storageHandlerFuncV2) *testPeerV2 {
+ source := newTestPeerV2(name, t, term)
+ source.accountTrie = sourceAccountTrie.Copy()
+ source.accountValues = elems
+ source.setStorageTries(storageTries)
+ source.storageValues = storageElems
+ source.storageRequestHandler = handler
+ return source
+ }
+ syncer := setupSyncerV2(
+ scheme,
+ mkSource("nice-a", defaultStorageRequestHandlerV2),
+ mkSource("nice-b", defaultStorageRequestHandlerV2),
+ mkSource("nice-c", defaultStorageRequestHandlerV2),
+ mkSource("corrupt", noProofStorageRequestHandlerV2),
+ )
+ done := checkStall(t, term)
+ if err := syncer.Sync(sourceAccountTrie.Hash(), cancel); err != nil {
+ t.Fatalf("sync failed: %v", err)
+ }
+ close(done)
+ verifyFlatState(t, syncer.db, elems, storageElems)
+}
+
+// TestSyncWithStorageMisbehavingProveV2 tests basic sync using accounts +
+// storage + code against a peer that insists on delivering full storage sets
+// _and_ proofs.
+func TestSyncWithStorageMisbehavingProveV2(t *testing.T) {
+ t.Parallel()
+ testSyncWithStorageMisbehavingProveV2(t, rawdb.HashScheme)
+ testSyncWithStorageMisbehavingProveV2(t, rawdb.PathScheme)
+}
+
+func testSyncWithStorageMisbehavingProveV2(t *testing.T, scheme string) {
+ var (
+ once sync.Once
+ cancel = make(chan struct{})
+ term = func() { once.Do(func() { close(cancel) }) }
+ )
+ nodeScheme, sourceAccountTrie, elems, storageTries, storageElems := makeAccountTrieWithStorageWithUniqueStorage(scheme, 10, 30, false)
+
+ mkSource := func(name string) *testPeerV2 {
+ source := newTestPeerV2(name, t, term)
+ source.accountTrie = sourceAccountTrie.Copy()
+ source.accountValues = elems
+ source.setStorageTries(storageTries)
+ source.storageValues = storageElems
+ source.storageRequestHandler = proofHappyStorageRequestHandlerV2
+ return source
+ }
+ syncer := setupSyncerV2(nodeScheme, mkSource("sourceA"))
+ if err := syncer.Sync(sourceAccountTrie.Hash(), cancel); err != nil {
+ t.Fatalf("sync failed: %v", err)
+ }
+ verifyFlatState(t, syncer.db, elems, storageElems)
+}
+
+// TestSyncWithUnevenStorageV2 tests sync where the storage trie is not even
+// and with a few empty ranges.
+func TestSyncWithUnevenStorageV2(t *testing.T) {
+ t.Parallel()
+ testSyncWithUnevenStorageV2(t, rawdb.HashScheme)
+ testSyncWithUnevenStorageV2(t, rawdb.PathScheme)
+}
+
+func testSyncWithUnevenStorageV2(t *testing.T, scheme string) {
+ var (
+ once sync.Once
+ cancel = make(chan struct{})
+ term = func() { once.Do(func() { close(cancel) }) }
+ )
+ accountTrie, accounts, storageTries, storageElems := makeAccountTrieWithStorage(scheme, 3, 256, false, false, true)
+
+ mkSource := func(name string) *testPeerV2 {
+ source := newTestPeerV2(name, t, term)
+ source.accountTrie = accountTrie.Copy()
+ source.accountValues = accounts
+ source.setStorageTries(storageTries)
+ source.storageValues = storageElems
+ source.storageRequestHandler = func(t *testPeerV2, reqId uint64, root common.Hash, accounts []common.Hash, origin, limit []byte, max int) error {
+ return defaultStorageRequestHandlerV2(t, reqId, root, accounts, origin, limit, 128)
+ }
+ return source
+ }
+ syncer := setupSyncerV2(scheme, mkSource("source"))
+ if err := syncer.Sync(accountTrie.Hash(), cancel); err != nil {
+ t.Fatalf("sync failed: %v", err)
+ }
+ verifyFlatState(t, syncer.db, accounts, storageElems)
+}
+
+// TestSyncBloatedProofV2 tests a scenario where the peer ships only one value
+// but inflates the proof with the entire trie. If the attack is successful the
+// remote side does not do any follow-up requests.
+func TestSyncBloatedProofV2(t *testing.T) {
+ t.Parallel()
+ testSyncBloatedProofV2(t, rawdb.HashScheme)
+ testSyncBloatedProofV2(t, rawdb.PathScheme)
+}
+
+func testSyncBloatedProofV2(t *testing.T, scheme string) {
+ var (
+ once sync.Once
+ cancel = make(chan struct{})
+ term = func() { once.Do(func() { close(cancel) }) }
+ )
+ nodeScheme, sourceAccountTrie, elems := makeAccountTrieNoStorage(100, scheme)
+ source := newTestPeerV2("source", t, term)
+ source.accountTrie = sourceAccountTrie.Copy()
+ source.accountValues = elems
+
+ source.accountRequestHandler = func(t *testPeerV2, requestId uint64, root common.Hash, origin common.Hash, limit common.Hash, cap int) error {
+ var (
+ keys []common.Hash
+ vals [][]byte
+ )
+ for _, entry := range t.accountValues {
+ if bytes.Compare(entry.k, origin[:]) < 0 {
+ continue
+ }
+ if bytes.Compare(entry.k, limit[:]) > 0 {
+ continue
+ }
+ keys = append(keys, common.BytesToHash(entry.k))
+ vals = append(vals, entry.v)
+ }
+ proof := trienode.NewProofSet()
+ if err := t.accountTrie.Prove(origin[:], proof); err != nil {
+ t.logger.Error("Could not prove origin", "origin", origin, "error", err)
+ }
+ for _, entry := range t.accountValues {
+ if err := t.accountTrie.Prove(entry.k, proof); err != nil {
+ t.logger.Error("Could not prove item", "error", err)
+ }
+ }
+ if len(keys) > 2 {
+ keys = append(keys[:1], keys[2:]...)
+ vals = append(vals[:1], vals[2:]...)
+ }
+ if err := t.remote.OnAccounts(t, requestId, keys, vals, proof.List()); err != nil {
+ t.logger.Info("remote error on delivery (as expected)", "error", err)
+ t.term()
+ }
+ return nil
+ }
+ syncer := setupSyncerV2(nodeScheme, source)
+ if err := syncer.Sync(sourceAccountTrie.Hash(), cancel); err == nil {
+ t.Fatal("No error returned from incomplete/cancelled sync")
+ }
+}