From 038c1859d859afe8416c47a25c850c0a1b849d10 Mon Sep 17 00:00:00 2001
From: jonny rhea <5555162+jrhea@users.noreply.github.com>
Date: Wed, 13 May 2026 14:35:13 -0500
Subject: [PATCH 1/8] eth/protocols/snap: add sync_v1.go and move heal-related
types to sync_v1.go
---
eth/protocols/snap/sync.go | 148 -----------------------------
eth/protocols/snap/sync_v1.go | 173 ++++++++++++++++++++++++++++++++++
2 files changed, 173 insertions(+), 148 deletions(-)
create mode 100644 eth/protocols/snap/sync_v1.go
diff --git a/eth/protocols/snap/sync.go b/eth/protocols/snap/sync.go
index 841bfb446e..eb6d7339dd 100644
--- a/eth/protocols/snap/sync.go
+++ b/eth/protocols/snap/sync.go
@@ -228,73 +228,6 @@ type storageResponse struct {
cont bool // Whether the last storage range has a continuation
}
-// trienodeHealRequest tracks a pending state trie request to ensure responses
-// are to actual requests and to validate any security constraints.
-//
-// Concurrency note: trie node requests and responses are handled concurrently from
-// the main runloop to allow Keccak256 hash verifications on the peer's thread and
-// to drop on invalid response. The request struct must contain all the data to
-// construct the response without accessing runloop internals (i.e. task). That
-// is only included to allow the runloop to match a response to the task being
-// synced without having yet another set of maps.
-type trienodeHealRequest struct {
- peer string // Peer to which this request is assigned
- id uint64 // Request ID of this request
- time time.Time // Timestamp when the request was sent
-
- deliver chan *trienodeHealResponse // Channel to deliver successful response on
- revert chan *trienodeHealRequest // Channel to deliver request failure on
- cancel chan struct{} // Channel to track sync cancellation
- timeout *time.Timer // Timer to track delivery timeout
- stale chan struct{} // Channel to signal the request was dropped
-
- paths []string // Trie node paths for identifying trie node
- hashes []common.Hash // Trie node hashes to validate responses
-
- task *healTask // Task which this request is filling (only access fields through the runloop!!)
-}
-
-// trienodeHealResponse is an already verified remote response to a trie node request.
-type trienodeHealResponse struct {
- task *healTask // Task which this request is filling
-
- paths []string // Paths of the trie nodes
- hashes []common.Hash // Hashes of the trie nodes to avoid double hashing
- nodes [][]byte // Actual trie nodes to store into the database (nil = missing)
-}
-
-// bytecodeHealRequest tracks a pending bytecode request to ensure responses are to
-// actual requests and to validate any security constraints.
-//
-// Concurrency note: bytecode requests and responses are handled concurrently from
-// the main runloop to allow Keccak256 hash verifications on the peer's thread and
-// to drop on invalid response. The request struct must contain all the data to
-// construct the response without accessing runloop internals (i.e. task). That
-// is only included to allow the runloop to match a response to the task being
-// synced without having yet another set of maps.
-type bytecodeHealRequest struct {
- peer string // Peer to which this request is assigned
- id uint64 // Request ID of this request
- time time.Time // Timestamp when the request was sent
-
- deliver chan *bytecodeHealResponse // Channel to deliver successful response on
- revert chan *bytecodeHealRequest // Channel to deliver request failure on
- cancel chan struct{} // Channel to track sync cancellation
- timeout *time.Timer // Timer to track delivery timeout
- stale chan struct{} // Channel to signal the request was dropped
-
- hashes []common.Hash // Bytecode hashes to validate responses
- task *healTask // Task which this request is filling (only access fields through the runloop!!)
-}
-
-// bytecodeHealResponse is an already verified remote response to a bytecode request.
-type bytecodeHealResponse struct {
- task *healTask // Task which this request is filling
-
- hashes []common.Hash // Hashes of the bytecode to avoid double hashing
- codes [][]byte // Actual bytecodes to store into the database (nil = missing)
-}
-
// accountTask represents the sync task for a chunk of the account snapshot.
type accountTask struct {
// These fields get serialized to key-value store on shutdown
@@ -367,14 +300,6 @@ type storageTask struct {
done bool // Flag whether the task can be removed
}
-// healTask represents the sync task for healing the snap-synced chunk boundaries.
-type healTask struct {
- scheduler *trie.Sync // State trie sync scheduler defining the tasks
-
- trieTasks map[string]common.Hash // Set of trie node tasks currently queued for retrieval, indexed by node path
- codeTasks map[common.Hash]struct{} // Set of byte code tasks currently queued for retrieval, indexed by code hash
-}
-
// SyncProgress is a database entry to allow suspending and resuming a snapshot state
// sync. Opposed to full and fast sync, there is no way to restart a suspended
// snap sync without prior knowledge of the suspension point.
@@ -396,13 +321,6 @@ type SyncProgress struct {
BytecodeHealBytes common.StorageSize // Number of bytecodes persisted to disk
}
-// SyncPending is analogous to SyncProgress, but it's used to report on pending
-// ephemeral sync progress that doesn't get persisted into the database.
-type SyncPending struct {
- TrienodeHeal uint64 // Number of state trie nodes pending
- BytecodeHeal uint64 // Number of bytecodes pending
-}
-
// SyncPeer abstracts out the methods required for a peer to be synced against
// with the goal of allowing the construction of mock peers without the full
// blown networking.
@@ -3225,72 +3143,6 @@ func (s *capacitySort) Swap(i, j int) {
s.caps[i], s.caps[j] = s.caps[j], s.caps[i]
}
-// healRequestSort implements the Sort interface, allowing sorting trienode
-// heal requests, which is a prerequisite for merging storage-requests.
-type healRequestSort struct {
- paths []string
- hashes []common.Hash
- syncPaths []trie.SyncPath
-}
-
-func (t *healRequestSort) Len() int {
- return len(t.hashes)
-}
-
-func (t *healRequestSort) Less(i, j int) bool {
- a := t.syncPaths[i]
- b := t.syncPaths[j]
- switch bytes.Compare(a[0], b[0]) {
- case -1:
- return true
- case 1:
- return false
- }
- // identical first part
- if len(a) < len(b) {
- return true
- }
- if len(b) < len(a) {
- return false
- }
- if len(a) == 2 {
- return bytes.Compare(a[1], b[1]) < 0
- }
- return false
-}
-
-func (t *healRequestSort) Swap(i, j int) {
- t.paths[i], t.paths[j] = t.paths[j], t.paths[i]
- t.hashes[i], t.hashes[j] = t.hashes[j], t.hashes[i]
- t.syncPaths[i], t.syncPaths[j] = t.syncPaths[j], t.syncPaths[i]
-}
-
-// Merge merges the pathsets, so that several storage requests concerning the
-// same account are merged into one, to reduce bandwidth.
-// OBS: This operation is moot if t has not first been sorted.
-func (t *healRequestSort) Merge() []TrieNodePathSet {
- var result []TrieNodePathSet
- for _, path := range t.syncPaths {
- pathset := TrieNodePathSet(path)
- if len(path) == 1 {
- // It's an account reference.
- result = append(result, pathset)
- } else {
- // It's a storage reference.
- end := len(result) - 1
- if len(result) == 0 || !bytes.Equal(pathset[0], result[end][0]) {
- // The account doesn't match last, create a new entry.
- result = append(result, pathset)
- } else {
- // It's the same account as the previous one, add to the storage
- // paths of that request.
- result[end] = append(result[end], pathset[1])
- }
- }
- }
- return result
-}
-
// sortByAccountPath takes hashes and paths, and sorts them. After that, it generates
// the TrieNodePaths and merges paths which belongs to the same account path.
func sortByAccountPath(paths []string, hashes []common.Hash) ([]string, []common.Hash, []trie.SyncPath, []TrieNodePathSet) {
diff --git a/eth/protocols/snap/sync_v1.go b/eth/protocols/snap/sync_v1.go
new file mode 100644
index 0000000000..ae0982601a
--- /dev/null
+++ b/eth/protocols/snap/sync_v1.go
@@ -0,0 +1,173 @@
+// Copyright 2026 The go-ethereum Authors
+// This file is part of the go-ethereum library.
+//
+// The go-ethereum library is free software: you can redistribute it and/or modify
+// it under the terms of the GNU Lesser General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+//
+// The go-ethereum library is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU Lesser General Public License for more details.
+//
+// You should have received a copy of the GNU Lesser General Public License
+// along with the go-ethereum library. If not, see .
+
+package snap
+
+import (
+ "bytes"
+ "time"
+
+ "github.com/ethereum/go-ethereum/common"
+ "github.com/ethereum/go-ethereum/trie"
+)
+
+// trienodeHealRequest tracks a pending state trie request to ensure responses
+// are to actual requests and to validate any security constraints.
+//
+// Concurrency note: trie node requests and responses are handled concurrently from
+// the main runloop to allow Keccak256 hash verifications on the peer's thread and
+// to drop on invalid response. The request struct must contain all the data to
+// construct the response without accessing runloop internals (i.e. task). That
+// is only included to allow the runloop to match a response to the task being
+// synced without having yet another set of maps.
+type trienodeHealRequest struct {
+ peer string // Peer to which this request is assigned
+ id uint64 // Request ID of this request
+ time time.Time // Timestamp when the request was sent
+
+ deliver chan *trienodeHealResponse // Channel to deliver successful response on
+ revert chan *trienodeHealRequest // Channel to deliver request failure on
+ cancel chan struct{} // Channel to track sync cancellation
+ timeout *time.Timer // Timer to track delivery timeout
+ stale chan struct{} // Channel to signal the request was dropped
+
+ paths []string // Trie node paths for identifying trie node
+ hashes []common.Hash // Trie node hashes to validate responses
+
+ task *healTask // Task which this request is filling (only access fields through the runloop!!)
+}
+
+// trienodeHealResponse is an already verified remote response to a trie node request.
+type trienodeHealResponse struct {
+ task *healTask // Task which this request is filling
+
+ paths []string // Paths of the trie nodes
+ hashes []common.Hash // Hashes of the trie nodes to avoid double hashing
+ nodes [][]byte // Actual trie nodes to store into the database (nil = missing)
+}
+
+// bytecodeHealRequest tracks a pending bytecode request to ensure responses are to
+// actual requests and to validate any security constraints.
+//
+// Concurrency note: bytecode requests and responses are handled concurrently from
+// the main runloop to allow Keccak256 hash verifications on the peer's thread and
+// to drop on invalid response. The request struct must contain all the data to
+// construct the response without accessing runloop internals (i.e. task). That
+// is only included to allow the runloop to match a response to the task being
+// synced without having yet another set of maps.
+type bytecodeHealRequest struct {
+ peer string // Peer to which this request is assigned
+ id uint64 // Request ID of this request
+ time time.Time // Timestamp when the request was sent
+
+ deliver chan *bytecodeHealResponse // Channel to deliver successful response on
+ revert chan *bytecodeHealRequest // Channel to deliver request failure on
+ cancel chan struct{} // Channel to track sync cancellation
+ timeout *time.Timer // Timer to track delivery timeout
+ stale chan struct{} // Channel to signal the request was dropped
+
+ hashes []common.Hash // Bytecode hashes to validate responses
+ task *healTask // Task which this request is filling (only access fields through the runloop!!)
+}
+
+// bytecodeHealResponse is an already verified remote response to a bytecode request.
+type bytecodeHealResponse struct {
+ task *healTask // Task which this request is filling
+
+ hashes []common.Hash // Hashes of the bytecode to avoid double hashing
+ codes [][]byte // Actual bytecodes to store into the database (nil = missing)
+}
+
+// healTask represents the sync task for healing the snap-synced chunk boundaries.
+type healTask struct {
+ scheduler *trie.Sync // State trie sync scheduler defining the tasks
+
+ trieTasks map[string]common.Hash // Set of trie node tasks currently queued for retrieval, indexed by node path
+ codeTasks map[common.Hash]struct{} // Set of byte code tasks currently queued for retrieval, indexed by code hash
+}
+
+// SyncPending is analogous to SyncProgress, but it's used to report on pending
+// ephemeral sync progress that doesn't get persisted into the database.
+type SyncPending struct {
+ TrienodeHeal uint64 // Number of state trie nodes pending
+ BytecodeHeal uint64 // Number of bytecodes pending
+}
+
+// healRequestSort implements the Sort interface, allowing sorting trienode
+// heal requests, which is a prerequisite for merging storage-requests.
+type healRequestSort struct {
+ paths []string
+ hashes []common.Hash
+ syncPaths []trie.SyncPath
+}
+
+func (t *healRequestSort) Len() int {
+ return len(t.hashes)
+}
+
+func (t *healRequestSort) Less(i, j int) bool {
+ a := t.syncPaths[i]
+ b := t.syncPaths[j]
+ switch bytes.Compare(a[0], b[0]) {
+ case -1:
+ return true
+ case 1:
+ return false
+ }
+ // identical first part
+ if len(a) < len(b) {
+ return true
+ }
+ if len(b) < len(a) {
+ return false
+ }
+ if len(a) == 2 {
+ return bytes.Compare(a[1], b[1]) < 0
+ }
+ return false
+}
+
+func (t *healRequestSort) Swap(i, j int) {
+ t.paths[i], t.paths[j] = t.paths[j], t.paths[i]
+ t.hashes[i], t.hashes[j] = t.hashes[j], t.hashes[i]
+ t.syncPaths[i], t.syncPaths[j] = t.syncPaths[j], t.syncPaths[i]
+}
+
+// Merge merges the pathsets, so that several storage requests concerning the
+// same account are merged into one, to reduce bandwidth.
+// OBS: This operation is moot if t has not first been sorted.
+func (t *healRequestSort) Merge() []TrieNodePathSet {
+ var result []TrieNodePathSet
+ for _, path := range t.syncPaths {
+ pathset := TrieNodePathSet(path)
+ if len(path) == 1 {
+ // It's an account reference.
+ result = append(result, pathset)
+ } else {
+ // It's a storage reference.
+ end := len(result) - 1
+ if len(result) == 0 || !bytes.Equal(pathset[0], result[end][0]) {
+ // The account doesn't match last, create a new entry.
+ result = append(result, pathset)
+ } else {
+ // It's the same account as the previous one, add to the storage
+ // paths of that request.
+ result[end] = append(result[end], pathset[1])
+ }
+ }
+ }
+ return result
+}
From 3618763da58b8c6850efe7cc0bdbc10d7013b6eb Mon Sep 17 00:00:00 2001
From: jonny rhea <5555162+jrhea@users.noreply.github.com>
Date: Wed, 13 May 2026 14:45:26 -0500
Subject: [PATCH 2/8] move heal task assignment, resp processing and revert
helpers to sync_v1.go
---
eth/protocols/snap/sync.go | 708 ---------------------------------
eth/protocols/snap/sync_v1.go | 718 ++++++++++++++++++++++++++++++++++
2 files changed, 718 insertions(+), 708 deletions(-)
diff --git a/eth/protocols/snap/sync.go b/eth/protocols/snap/sync.go
index eb6d7339dd..ff52661f13 100644
--- a/eth/protocols/snap/sync.go
+++ b/eth/protocols/snap/sync.go
@@ -21,7 +21,6 @@ import (
"encoding/json"
"errors"
"fmt"
- gomath "math"
"math/big"
"math/rand"
"sort"
@@ -1290,250 +1289,6 @@ func (s *Syncer) assignStorageTasks(success chan *storageResponse, fail chan *st
}
}
-// assignTrienodeHealTasks attempts to match idle peers to trie node requests to
-// heal any trie errors caused by the snap sync's chunked retrieval model.
-func (s *Syncer) assignTrienodeHealTasks(success chan *trienodeHealResponse, fail chan *trienodeHealRequest, cancel chan struct{}) {
- s.lock.Lock()
- defer s.lock.Unlock()
-
- // Sort the peers by download capacity to use faster ones if many available
- idlers := &capacitySort{
- ids: make([]string, 0, len(s.trienodeHealIdlers)),
- caps: make([]int, 0, len(s.trienodeHealIdlers)),
- }
- targetTTL := s.rates.TargetTimeout()
- for id := range s.trienodeHealIdlers {
- if _, ok := s.statelessPeers[id]; ok {
- continue
- }
- idlers.ids = append(idlers.ids, id)
- idlers.caps = append(idlers.caps, s.rates.Capacity(id, TrieNodesMsg, targetTTL))
- }
- if len(idlers.ids) == 0 {
- return
- }
- sort.Sort(sort.Reverse(idlers))
-
- // Iterate over pending tasks and try to find a peer to retrieve with
- for len(s.healer.trieTasks) > 0 || s.healer.scheduler.Pending() > 0 {
- // If there are not enough trie tasks queued to fully assign, fill the
- // queue from the state sync scheduler. The trie synced schedules these
- // together with bytecodes, so we need to queue them combined.
- var (
- have = len(s.healer.trieTasks) + len(s.healer.codeTasks)
- want = maxTrieRequestCount + maxCodeRequestCount
- )
- if have < want {
- paths, hashes, codes := s.healer.scheduler.Missing(want - have)
- for i, path := range paths {
- s.healer.trieTasks[path] = hashes[i]
- }
- for _, hash := range codes {
- s.healer.codeTasks[hash] = struct{}{}
- }
- }
- // If all the heal tasks are bytecodes or already downloading, bail
- if len(s.healer.trieTasks) == 0 {
- return
- }
- // Task pending retrieval, try to find an idle peer. If no such peer
- // exists, we probably assigned tasks for all (or they are stateless).
- // Abort the entire assignment mechanism.
- if len(idlers.ids) == 0 {
- return
- }
- var (
- idle = idlers.ids[0]
- peer = s.peers[idle]
- cap = idlers.caps[0]
- )
- idlers.ids, idlers.caps = idlers.ids[1:], idlers.caps[1:]
-
- // Matched a pending task to an idle peer, allocate a unique request id
- var reqid uint64
- for {
- reqid = uint64(rand.Int63())
- if reqid == 0 {
- continue
- }
- if _, ok := s.trienodeHealReqs[reqid]; ok {
- continue
- }
- break
- }
- // Generate the network query and send it to the peer
- if cap > maxTrieRequestCount {
- cap = maxTrieRequestCount
- }
- cap = int(float64(cap) / s.trienodeHealThrottle)
- if cap <= 0 {
- cap = 1
- }
- var (
- hashes = make([]common.Hash, 0, cap)
- paths = make([]string, 0, cap)
- pathsets = make([]TrieNodePathSet, 0, cap)
- )
- for path, hash := range s.healer.trieTasks {
- delete(s.healer.trieTasks, path)
-
- paths = append(paths, path)
- hashes = append(hashes, hash)
- if len(paths) >= cap {
- break
- }
- }
- // Group requests by account hash
- paths, hashes, _, pathsets = sortByAccountPath(paths, hashes)
- req := &trienodeHealRequest{
- peer: idle,
- id: reqid,
- time: time.Now(),
- deliver: success,
- revert: fail,
- cancel: cancel,
- stale: make(chan struct{}),
- paths: paths,
- hashes: hashes,
- task: s.healer,
- }
- req.timeout = time.AfterFunc(s.rates.TargetTimeout(), func() {
- peer.Log().Debug("Trienode heal request timed out", "reqid", reqid)
- s.rates.Update(idle, TrieNodesMsg, 0, 0)
- s.scheduleRevertTrienodeHealRequest(req)
- })
- s.trienodeHealReqs[reqid] = req
- delete(s.trienodeHealIdlers, idle)
-
- s.pend.Add(1)
- go func(root common.Hash) {
- defer s.pend.Done()
-
- // Attempt to send the remote request and revert if it fails
- if err := peer.RequestTrieNodes(reqid, root, len(paths), pathsets, maxRequestSize); err != nil {
- log.Debug("Failed to request trienode healers", "err", err)
- s.scheduleRevertTrienodeHealRequest(req)
- }
- }(s.root)
- }
-}
-
-// assignBytecodeHealTasks attempts to match idle peers to bytecode requests to
-// heal any trie errors caused by the snap sync's chunked retrieval model.
-func (s *Syncer) assignBytecodeHealTasks(success chan *bytecodeHealResponse, fail chan *bytecodeHealRequest, cancel chan struct{}) {
- s.lock.Lock()
- defer s.lock.Unlock()
-
- // Sort the peers by download capacity to use faster ones if many available
- idlers := &capacitySort{
- ids: make([]string, 0, len(s.bytecodeHealIdlers)),
- caps: make([]int, 0, len(s.bytecodeHealIdlers)),
- }
- targetTTL := s.rates.TargetTimeout()
- for id := range s.bytecodeHealIdlers {
- if _, ok := s.statelessPeers[id]; ok {
- continue
- }
- idlers.ids = append(idlers.ids, id)
- idlers.caps = append(idlers.caps, s.rates.Capacity(id, ByteCodesMsg, targetTTL))
- }
- if len(idlers.ids) == 0 {
- return
- }
- sort.Sort(sort.Reverse(idlers))
-
- // Iterate over pending tasks and try to find a peer to retrieve with
- for len(s.healer.codeTasks) > 0 || s.healer.scheduler.Pending() > 0 {
- // If there are not enough trie tasks queued to fully assign, fill the
- // queue from the state sync scheduler. The trie synced schedules these
- // together with trie nodes, so we need to queue them combined.
- var (
- have = len(s.healer.trieTasks) + len(s.healer.codeTasks)
- want = maxTrieRequestCount + maxCodeRequestCount
- )
- if have < want {
- paths, hashes, codes := s.healer.scheduler.Missing(want - have)
- for i, path := range paths {
- s.healer.trieTasks[path] = hashes[i]
- }
- for _, hash := range codes {
- s.healer.codeTasks[hash] = struct{}{}
- }
- }
- // If all the heal tasks are trienodes or already downloading, bail
- if len(s.healer.codeTasks) == 0 {
- return
- }
- // Task pending retrieval, try to find an idle peer. If no such peer
- // exists, we probably assigned tasks for all (or they are stateless).
- // Abort the entire assignment mechanism.
- if len(idlers.ids) == 0 {
- return
- }
- var (
- idle = idlers.ids[0]
- peer = s.peers[idle]
- cap = idlers.caps[0]
- )
- idlers.ids, idlers.caps = idlers.ids[1:], idlers.caps[1:]
-
- // Matched a pending task to an idle peer, allocate a unique request id
- var reqid uint64
- for {
- reqid = uint64(rand.Int63())
- if reqid == 0 {
- continue
- }
- if _, ok := s.bytecodeHealReqs[reqid]; ok {
- continue
- }
- break
- }
- // Generate the network query and send it to the peer
- if cap > maxCodeRequestCount {
- cap = maxCodeRequestCount
- }
- hashes := make([]common.Hash, 0, cap)
- for hash := range s.healer.codeTasks {
- delete(s.healer.codeTasks, hash)
-
- hashes = append(hashes, hash)
- if len(hashes) >= cap {
- break
- }
- }
- req := &bytecodeHealRequest{
- peer: idle,
- id: reqid,
- time: time.Now(),
- deliver: success,
- revert: fail,
- cancel: cancel,
- stale: make(chan struct{}),
- hashes: hashes,
- task: s.healer,
- }
- req.timeout = time.AfterFunc(s.rates.TargetTimeout(), func() {
- peer.Log().Debug("Bytecode heal request timed out", "reqid", reqid)
- s.rates.Update(idle, ByteCodesMsg, 0, 0)
- s.scheduleRevertBytecodeHealRequest(req)
- })
- s.bytecodeHealReqs[reqid] = req
- delete(s.bytecodeHealIdlers, idle)
-
- s.pend.Add(1)
- go func() {
- defer s.pend.Done()
-
- // Attempt to send the remote request and revert if it fails
- if err := peer.RequestByteCodes(reqid, hashes, maxRequestSize); err != nil {
- log.Debug("Failed to request bytecode healers", "err", err)
- s.scheduleRevertBytecodeHealRequest(req)
- }
- }()
- }
-}
-
// revertRequests locates all the currently pending requests from a particular
// peer and reverts them, rescheduling for others to fulfill.
func (s *Syncer) revertRequests(peer string) {
@@ -1728,96 +1483,6 @@ func (s *Syncer) revertStorageRequest(req *storageRequest) {
}
}
-// scheduleRevertTrienodeHealRequest asks the event loop to clean up a trienode heal
-// request and return all failed retrieval tasks to the scheduler for reassignment.
-func (s *Syncer) scheduleRevertTrienodeHealRequest(req *trienodeHealRequest) {
- select {
- case req.revert <- req:
- // Sync event loop notified
- case <-req.cancel:
- // Sync cycle got cancelled
- case <-req.stale:
- // Request already reverted
- }
-}
-
-// revertTrienodeHealRequest cleans up a trienode heal request and returns all
-// failed retrieval tasks to the scheduler for reassignment.
-//
-// Note, this needs to run on the event runloop thread to reschedule to idle peers.
-// On peer threads, use scheduleRevertTrienodeHealRequest.
-func (s *Syncer) revertTrienodeHealRequest(req *trienodeHealRequest) {
- log.Debug("Reverting trienode heal request", "peer", req.peer)
- select {
- case <-req.stale:
- log.Trace("Trienode heal request already reverted", "peer", req.peer, "reqid", req.id)
- return
- default:
- }
- close(req.stale)
-
- // Remove the request from the tracked set and restore the peer to the
- // idle pool so it can be reassigned work (skip if peer already left).
- s.lock.Lock()
- delete(s.trienodeHealReqs, req.id)
- if _, ok := s.peers[req.peer]; ok {
- s.trienodeHealIdlers[req.peer] = struct{}{}
- }
- s.lock.Unlock()
-
- // If there's a timeout timer still running, abort it and mark the trie node
- // retrievals as not-pending, ready for rescheduling
- req.timeout.Stop()
- for i, path := range req.paths {
- req.task.trieTasks[path] = req.hashes[i]
- }
-}
-
-// scheduleRevertBytecodeHealRequest asks the event loop to clean up a bytecode heal
-// request and return all failed retrieval tasks to the scheduler for reassignment.
-func (s *Syncer) scheduleRevertBytecodeHealRequest(req *bytecodeHealRequest) {
- select {
- case req.revert <- req:
- // Sync event loop notified
- case <-req.cancel:
- // Sync cycle got cancelled
- case <-req.stale:
- // Request already reverted
- }
-}
-
-// revertBytecodeHealRequest cleans up a bytecode heal request and returns all
-// failed retrieval tasks to the scheduler for reassignment.
-//
-// Note, this needs to run on the event runloop thread to reschedule to idle peers.
-// On peer threads, use scheduleRevertBytecodeHealRequest.
-func (s *Syncer) revertBytecodeHealRequest(req *bytecodeHealRequest) {
- log.Debug("Reverting bytecode heal request", "peer", req.peer)
- select {
- case <-req.stale:
- log.Trace("Bytecode heal request already reverted", "peer", req.peer, "reqid", req.id)
- return
- default:
- }
- close(req.stale)
-
- // Remove the request from the tracked set and restore the peer to the
- // idle pool so it can be reassigned work (skip if peer already left).
- s.lock.Lock()
- delete(s.bytecodeHealReqs, req.id)
- if _, ok := s.peers[req.peer]; ok {
- s.bytecodeHealIdlers[req.peer] = struct{}{}
- }
- s.lock.Unlock()
-
- // If there's a timeout timer still running, abort it and mark the code
- // retrievals as not-pending, ready for rescheduling
- req.timeout.Stop()
- for _, hash := range req.hashes {
- req.task.codeTasks[hash] = struct{}{}
- }
-}
-
// processAccountResponse integrates an already validated account range response
// into the account tasks.
func (s *Syncer) processAccountResponse(res *accountResponse) {
@@ -2224,128 +1889,6 @@ func (s *Syncer) processStorageResponse(res *storageResponse) {
// task assigners to pick up and fill.
}
-// processTrienodeHealResponse integrates an already validated trienode response
-// into the healer tasks.
-func (s *Syncer) processTrienodeHealResponse(res *trienodeHealResponse) {
- var (
- start = time.Now()
- fills int
- )
- for i, hash := range res.hashes {
- node := res.nodes[i]
-
- // If the trie node was not delivered, reschedule it
- if node == nil {
- res.task.trieTasks[res.paths[i]] = res.hashes[i]
- continue
- }
- fills++
-
- // Push the trie node into the state syncer
- s.trienodeHealSynced++
- s.trienodeHealBytes += common.StorageSize(len(node))
-
- err := s.healer.scheduler.ProcessNode(trie.NodeSyncResult{Path: res.paths[i], Data: node})
- switch err {
- case nil:
- case trie.ErrAlreadyProcessed:
- s.trienodeHealDups++
- case trie.ErrNotRequested:
- s.trienodeHealNops++
- default:
- log.Error("Invalid trienode processed", "hash", hash, "err", err)
- }
- }
- s.commitHealer(false)
-
- // Calculate the processing rate of one filled trie node
- rate := float64(fills) / (float64(time.Since(start)) / float64(time.Second))
-
- // Update the currently measured trienode queueing and processing throughput.
- //
- // The processing rate needs to be updated uniformly independent if we've
- // processed 1x100 trie nodes or 100x1 to keep the rate consistent even in
- // the face of varying network packets. As such, we cannot just measure the
- // time it took to process N trie nodes and update once, we need one update
- // per trie node.
- //
- // Naively, that would be:
- //
- // for i:=0; i time.Second {
- // Periodically adjust the trie node throttler
- if float64(pending) > 2*s.trienodeHealRate {
- s.trienodeHealThrottle *= trienodeHealThrottleIncrease
- } else {
- s.trienodeHealThrottle /= trienodeHealThrottleDecrease
- }
- if s.trienodeHealThrottle > maxTrienodeHealThrottle {
- s.trienodeHealThrottle = maxTrienodeHealThrottle
- } else if s.trienodeHealThrottle < minTrienodeHealThrottle {
- s.trienodeHealThrottle = minTrienodeHealThrottle
- }
- s.trienodeHealThrottled = time.Now()
-
- log.Debug("Updated trie node heal throttler", "rate", s.trienodeHealRate, "pending", pending, "throttle", s.trienodeHealThrottle)
- }
-}
-
-func (s *Syncer) commitHealer(force bool) {
- if !force && s.healer.scheduler.MemSize() < ethdb.IdealBatchSize {
- return
- }
- batch := s.db.NewBatch()
- if err := s.healer.scheduler.Commit(batch); err != nil {
- log.Crit("Failed to commit healing data", "err", err)
- }
- if err := batch.Write(); err != nil {
- log.Crit("Failed to persist healing data", "err", err)
- }
- log.Debug("Persisted set of healing data", "type", "trienodes", "bytes", common.StorageSize(batch.ValueSize()))
-}
-
-// processBytecodeHealResponse integrates an already validated bytecode response
-// into the healer tasks.
-func (s *Syncer) processBytecodeHealResponse(res *bytecodeHealResponse) {
- for i, hash := range res.hashes {
- node := res.codes[i]
-
- // If the trie node was not delivered, reschedule it
- if node == nil {
- res.task.codeTasks[hash] = struct{}{}
- continue
- }
- // Push the trie node into the state syncer
- s.bytecodeHealSynced++
- s.bytecodeHealBytes += common.StorageSize(len(node))
-
- err := s.healer.scheduler.ProcessCode(trie.CodeSyncResult{Hash: hash, Data: node})
- switch err {
- case nil:
- case trie.ErrAlreadyProcessed:
- s.bytecodeHealDups++
- case trie.ErrNotRequested:
- s.bytecodeHealNops++
- default:
- log.Error("Invalid bytecode processed", "hash", hash, "err", err)
- }
- }
- s.commitHealer(false)
-}
-
// forwardAccountTask takes a filled account task and persists anything available
// into the database, after which it forwards the next account marker so that the
// task's next chunk may be filled.
@@ -2797,238 +2340,6 @@ func (s *Syncer) OnStorage(peer SyncPeer, id uint64, hashes [][]common.Hash, slo
return nil
}
-// OnTrieNodes is a callback method to invoke when a batch of trie nodes
-// are received from a remote peer.
-func (s *Syncer) OnTrieNodes(peer SyncPeer, id uint64, trienodes [][]byte) error {
- var size common.StorageSize
- for _, node := range trienodes {
- size += common.StorageSize(len(node))
- }
- logger := peer.Log().New("reqid", id)
- logger.Trace("Delivering set of healing trienodes", "trienodes", len(trienodes), "bytes", size)
-
- // Whether or not the response is valid, we can mark the peer as idle and
- // notify the scheduler to assign a new task. If the response is invalid,
- // we'll drop the peer in a bit.
- defer func() {
- s.lock.Lock()
- defer s.lock.Unlock()
- if _, ok := s.peers[peer.ID()]; ok {
- s.trienodeHealIdlers[peer.ID()] = struct{}{}
- }
- select {
- case s.update <- struct{}{}:
- default:
- }
- }()
- s.lock.Lock()
- // Ensure the response is for a valid request
- req, ok := s.trienodeHealReqs[id]
- if !ok {
- // Request stale, perhaps the peer timed out but came through in the end
- logger.Warn("Unexpected trienode heal packet")
- s.lock.Unlock()
- return nil
- }
- delete(s.trienodeHealReqs, id)
- s.rates.Update(peer.ID(), TrieNodesMsg, time.Since(req.time), len(trienodes))
-
- // Clean up the request timeout timer, we'll see how to proceed further based
- // on the actual delivered content
- if !req.timeout.Stop() {
- // The timeout is already triggered, and this request will be reverted+rescheduled
- s.lock.Unlock()
- return nil
- }
-
- // Response is valid, but check if peer is signalling that it does not have
- // the requested data. For bytecode range queries that means the peer is not
- // yet synced.
- if len(trienodes) == 0 {
- logger.Debug("Peer rejected trienode heal request")
- s.statelessPeers[peer.ID()] = struct{}{}
- s.lock.Unlock()
-
- // Signal this request as failed, and ready for rescheduling
- s.scheduleRevertTrienodeHealRequest(req)
- return nil
- }
- s.lock.Unlock()
-
- // Cross reference the requested trienodes with the response to find gaps
- // that the serving node is missing
- var (
- hasher = crypto.NewKeccakState()
- hash = make([]byte, 32)
- nodes = make([][]byte, len(req.hashes))
- fills uint64
- )
- for i, j := 0, 0; i < len(trienodes); i++ {
- // Find the next hash that we've been served, leaving misses with nils
- hasher.Reset()
- hasher.Write(trienodes[i])
- hasher.Read(hash)
-
- for j < len(req.hashes) && !bytes.Equal(hash, req.hashes[j][:]) {
- j++
- }
- if j < len(req.hashes) {
- nodes[j] = trienodes[i]
- fills++
- j++
- continue
- }
- // We've either ran out of hashes, or got unrequested data
- logger.Warn("Unexpected healing trienodes", "count", len(trienodes)-i)
-
- // Signal this request as failed, and ready for rescheduling
- s.scheduleRevertTrienodeHealRequest(req)
- return errors.New("unexpected healing trienode")
- }
- // Response validated, send it to the scheduler for filling
- s.trienodeHealPend.Add(fills)
- defer func() {
- s.trienodeHealPend.Add(^(fills - 1))
- }()
- response := &trienodeHealResponse{
- paths: req.paths,
- task: req.task,
- hashes: req.hashes,
- nodes: nodes,
- }
- select {
- case req.deliver <- response:
- case <-req.cancel:
- case <-req.stale:
- }
- return nil
-}
-
-// onHealByteCodes is a callback method to invoke when a batch of contract
-// bytes codes are received from a remote peer in the healing phase.
-func (s *Syncer) onHealByteCodes(peer SyncPeer, id uint64, bytecodes [][]byte) error {
- var size common.StorageSize
- for _, code := range bytecodes {
- size += common.StorageSize(len(code))
- }
- logger := peer.Log().New("reqid", id)
- logger.Trace("Delivering set of healing bytecodes", "bytecodes", len(bytecodes), "bytes", size)
-
- // Whether or not the response is valid, we can mark the peer as idle and
- // notify the scheduler to assign a new task. If the response is invalid,
- // we'll drop the peer in a bit.
- defer func() {
- s.lock.Lock()
- defer s.lock.Unlock()
- if _, ok := s.peers[peer.ID()]; ok {
- s.bytecodeHealIdlers[peer.ID()] = struct{}{}
- }
- select {
- case s.update <- struct{}{}:
- default:
- }
- }()
- s.lock.Lock()
- // Ensure the response is for a valid request
- req, ok := s.bytecodeHealReqs[id]
- if !ok {
- // Request stale, perhaps the peer timed out but came through in the end
- logger.Warn("Unexpected bytecode heal packet")
- s.lock.Unlock()
- return nil
- }
- delete(s.bytecodeHealReqs, id)
- s.rates.Update(peer.ID(), ByteCodesMsg, time.Since(req.time), len(bytecodes))
-
- // Clean up the request timeout timer, we'll see how to proceed further based
- // on the actual delivered content
- if !req.timeout.Stop() {
- // The timeout is already triggered, and this request will be reverted+rescheduled
- s.lock.Unlock()
- return nil
- }
-
- // Response is valid, but check if peer is signalling that it does not have
- // the requested data. For bytecode range queries that means the peer is not
- // yet synced.
- if len(bytecodes) == 0 {
- logger.Debug("Peer rejected bytecode heal request")
- s.statelessPeers[peer.ID()] = struct{}{}
- s.lock.Unlock()
-
- // Signal this request as failed, and ready for rescheduling
- s.scheduleRevertBytecodeHealRequest(req)
- return nil
- }
- s.lock.Unlock()
-
- // Cross reference the requested bytecodes with the response to find gaps
- // that the serving node is missing
- hasher := crypto.NewKeccakState()
- hash := make([]byte, 32)
-
- codes := make([][]byte, len(req.hashes))
- for i, j := 0, 0; i < len(bytecodes); i++ {
- // Find the next hash that we've been served, leaving misses with nils
- hasher.Reset()
- hasher.Write(bytecodes[i])
- hasher.Read(hash)
-
- for j < len(req.hashes) && !bytes.Equal(hash, req.hashes[j][:]) {
- j++
- }
- if j < len(req.hashes) {
- codes[j] = bytecodes[i]
- j++
- continue
- }
- // We've either ran out of hashes, or got unrequested data
- logger.Warn("Unexpected healing bytecodes", "count", len(bytecodes)-i)
- // Signal this request as failed, and ready for rescheduling
- s.scheduleRevertBytecodeHealRequest(req)
- return errors.New("unexpected healing bytecode")
- }
- // Response validated, send it to the scheduler for filling
- response := &bytecodeHealResponse{
- task: req.task,
- hashes: req.hashes,
- codes: codes,
- }
- select {
- case req.deliver <- response:
- case <-req.cancel:
- case <-req.stale:
- }
- return nil
-}
-
-// onHealState is a callback method to invoke when a flat state(account
-// or storage slot) is downloaded during the healing stage. The flat states
-// can be persisted blindly and can be fixed later in the generation stage.
-// Note it's not concurrent safe, please handle the concurrent issue outside.
-func (s *Syncer) onHealState(paths [][]byte, value []byte) error {
- if len(paths) == 1 {
- var account types.StateAccount
- if err := rlp.DecodeBytes(value, &account); err != nil {
- return nil // Returning the error here would drop the remote peer
- }
- blob := types.SlimAccountRLP(account)
- rawdb.WriteAccountSnapshot(s.stateWriter, common.BytesToHash(paths[0]), blob)
- s.accountHealed += 1
- s.accountHealedBytes += common.StorageSize(1 + common.HashLength + len(blob))
- }
- if len(paths) == 2 {
- rawdb.WriteStorageSnapshot(s.stateWriter, common.BytesToHash(paths[0]), common.BytesToHash(paths[1]), value)
- s.storageHealed += 1
- s.storageHealedBytes += common.StorageSize(1 + 2*common.HashLength + len(value))
- }
- if s.stateWriter.ValueSize() > ethdb.IdealBatchSize {
- s.stateWriter.Write() // It's fine to ignore the error here
- s.stateWriter.Reset()
- }
- return nil
-}
-
// hashSpace is the total size of the 256 bit hash space for accounts.
var hashSpace = new(big.Int).Exp(common.Big2, common.Big256, nil)
@@ -3087,25 +2398,6 @@ func (s *Syncer) reportSyncProgress(force bool) {
"accounts", accounts, "slots", storage, "codes", bytecode, "eta", common.PrettyDuration(estTime-elapsed))
}
-// reportHealProgress calculates various status reports and provides it to the user.
-func (s *Syncer) reportHealProgress(force bool) {
- // Don't report all the events, just occasionally
- if !force && time.Since(s.logTime) < 8*time.Second {
- return
- }
- s.logTime = time.Now()
-
- // Create a mega progress report
- var (
- trienode = fmt.Sprintf("%v@%v", log.FormatLogfmtUint64(s.trienodeHealSynced), s.trienodeHealBytes.TerminalString())
- bytecode = fmt.Sprintf("%v@%v", log.FormatLogfmtUint64(s.bytecodeHealSynced), s.bytecodeHealBytes.TerminalString())
- accounts = fmt.Sprintf("%v@%v", log.FormatLogfmtUint64(s.accountHealed), s.accountHealedBytes.TerminalString())
- storage = fmt.Sprintf("%v@%v", log.FormatLogfmtUint64(s.storageHealed), s.storageHealedBytes.TerminalString())
- )
- log.Info("Syncing: state healing in progress", "accounts", accounts, "slots", storage,
- "codes", bytecode, "nodes", trienode, "pending", s.healer.scheduler.Pending())
-}
-
// estimateRemainingSlots tries to determine roughly how many slots are left in
// a contract storage, based on the number of keys and the last hash. This method
// assumes that the hashes are lexicographically ordered and evenly distributed.
diff --git a/eth/protocols/snap/sync_v1.go b/eth/protocols/snap/sync_v1.go
index ae0982601a..d6cd5ab5ca 100644
--- a/eth/protocols/snap/sync_v1.go
+++ b/eth/protocols/snap/sync_v1.go
@@ -18,9 +18,20 @@ package snap
import (
"bytes"
+ "errors"
+ "fmt"
+ gomath "math"
+ "math/rand"
+ "sort"
"time"
"github.com/ethereum/go-ethereum/common"
+ "github.com/ethereum/go-ethereum/core/rawdb"
+ "github.com/ethereum/go-ethereum/core/types"
+ "github.com/ethereum/go-ethereum/crypto"
+ "github.com/ethereum/go-ethereum/ethdb"
+ "github.com/ethereum/go-ethereum/log"
+ "github.com/ethereum/go-ethereum/rlp"
"github.com/ethereum/go-ethereum/trie"
)
@@ -171,3 +182,710 @@ func (t *healRequestSort) Merge() []TrieNodePathSet {
}
return result
}
+
+// assignTrienodeHealTasks attempts to match idle peers to trie node requests to
+// heal any trie errors caused by the snap sync's chunked retrieval model.
+func (s *Syncer) assignTrienodeHealTasks(success chan *trienodeHealResponse, fail chan *trienodeHealRequest, cancel chan struct{}) {
+ s.lock.Lock()
+ defer s.lock.Unlock()
+
+ // Sort the peers by download capacity to use faster ones if many available
+ idlers := &capacitySort{
+ ids: make([]string, 0, len(s.trienodeHealIdlers)),
+ caps: make([]int, 0, len(s.trienodeHealIdlers)),
+ }
+ targetTTL := s.rates.TargetTimeout()
+ for id := range s.trienodeHealIdlers {
+ if _, ok := s.statelessPeers[id]; ok {
+ continue
+ }
+ idlers.ids = append(idlers.ids, id)
+ idlers.caps = append(idlers.caps, s.rates.Capacity(id, TrieNodesMsg, targetTTL))
+ }
+ if len(idlers.ids) == 0 {
+ return
+ }
+ sort.Sort(sort.Reverse(idlers))
+
+ // Iterate over pending tasks and try to find a peer to retrieve with
+ for len(s.healer.trieTasks) > 0 || s.healer.scheduler.Pending() > 0 {
+ // If there are not enough trie tasks queued to fully assign, fill the
+ // queue from the state sync scheduler. The trie synced schedules these
+ // together with bytecodes, so we need to queue them combined.
+ var (
+ have = len(s.healer.trieTasks) + len(s.healer.codeTasks)
+ want = maxTrieRequestCount + maxCodeRequestCount
+ )
+ if have < want {
+ paths, hashes, codes := s.healer.scheduler.Missing(want - have)
+ for i, path := range paths {
+ s.healer.trieTasks[path] = hashes[i]
+ }
+ for _, hash := range codes {
+ s.healer.codeTasks[hash] = struct{}{}
+ }
+ }
+ // If all the heal tasks are bytecodes or already downloading, bail
+ if len(s.healer.trieTasks) == 0 {
+ return
+ }
+ // Task pending retrieval, try to find an idle peer. If no such peer
+ // exists, we probably assigned tasks for all (or they are stateless).
+ // Abort the entire assignment mechanism.
+ if len(idlers.ids) == 0 {
+ return
+ }
+ var (
+ idle = idlers.ids[0]
+ peer = s.peers[idle]
+ cap = idlers.caps[0]
+ )
+ idlers.ids, idlers.caps = idlers.ids[1:], idlers.caps[1:]
+
+ // Matched a pending task to an idle peer, allocate a unique request id
+ var reqid uint64
+ for {
+ reqid = uint64(rand.Int63())
+ if reqid == 0 {
+ continue
+ }
+ if _, ok := s.trienodeHealReqs[reqid]; ok {
+ continue
+ }
+ break
+ }
+ // Generate the network query and send it to the peer
+ if cap > maxTrieRequestCount {
+ cap = maxTrieRequestCount
+ }
+ cap = int(float64(cap) / s.trienodeHealThrottle)
+ if cap <= 0 {
+ cap = 1
+ }
+ var (
+ hashes = make([]common.Hash, 0, cap)
+ paths = make([]string, 0, cap)
+ pathsets = make([]TrieNodePathSet, 0, cap)
+ )
+ for path, hash := range s.healer.trieTasks {
+ delete(s.healer.trieTasks, path)
+
+ paths = append(paths, path)
+ hashes = append(hashes, hash)
+ if len(paths) >= cap {
+ break
+ }
+ }
+ // Group requests by account hash
+ paths, hashes, _, pathsets = sortByAccountPath(paths, hashes)
+ req := &trienodeHealRequest{
+ peer: idle,
+ id: reqid,
+ time: time.Now(),
+ deliver: success,
+ revert: fail,
+ cancel: cancel,
+ stale: make(chan struct{}),
+ paths: paths,
+ hashes: hashes,
+ task: s.healer,
+ }
+ req.timeout = time.AfterFunc(s.rates.TargetTimeout(), func() {
+ peer.Log().Debug("Trienode heal request timed out", "reqid", reqid)
+ s.rates.Update(idle, TrieNodesMsg, 0, 0)
+ s.scheduleRevertTrienodeHealRequest(req)
+ })
+ s.trienodeHealReqs[reqid] = req
+ delete(s.trienodeHealIdlers, idle)
+
+ s.pend.Add(1)
+ go func(root common.Hash) {
+ defer s.pend.Done()
+
+ // Attempt to send the remote request and revert if it fails
+ if err := peer.RequestTrieNodes(reqid, root, len(paths), pathsets, maxRequestSize); err != nil {
+ log.Debug("Failed to request trienode healers", "err", err)
+ s.scheduleRevertTrienodeHealRequest(req)
+ }
+ }(s.root)
+ }
+}
+
+// assignBytecodeHealTasks attempts to match idle peers to bytecode requests to
+// heal any trie errors caused by the snap sync's chunked retrieval model.
+func (s *Syncer) assignBytecodeHealTasks(success chan *bytecodeHealResponse, fail chan *bytecodeHealRequest, cancel chan struct{}) {
+ s.lock.Lock()
+ defer s.lock.Unlock()
+
+ // Sort the peers by download capacity to use faster ones if many available
+ idlers := &capacitySort{
+ ids: make([]string, 0, len(s.bytecodeHealIdlers)),
+ caps: make([]int, 0, len(s.bytecodeHealIdlers)),
+ }
+ targetTTL := s.rates.TargetTimeout()
+ for id := range s.bytecodeHealIdlers {
+ if _, ok := s.statelessPeers[id]; ok {
+ continue
+ }
+ idlers.ids = append(idlers.ids, id)
+ idlers.caps = append(idlers.caps, s.rates.Capacity(id, ByteCodesMsg, targetTTL))
+ }
+ if len(idlers.ids) == 0 {
+ return
+ }
+ sort.Sort(sort.Reverse(idlers))
+
+ // Iterate over pending tasks and try to find a peer to retrieve with
+ for len(s.healer.codeTasks) > 0 || s.healer.scheduler.Pending() > 0 {
+ // If there are not enough trie tasks queued to fully assign, fill the
+ // queue from the state sync scheduler. The trie synced schedules these
+ // together with trie nodes, so we need to queue them combined.
+ var (
+ have = len(s.healer.trieTasks) + len(s.healer.codeTasks)
+ want = maxTrieRequestCount + maxCodeRequestCount
+ )
+ if have < want {
+ paths, hashes, codes := s.healer.scheduler.Missing(want - have)
+ for i, path := range paths {
+ s.healer.trieTasks[path] = hashes[i]
+ }
+ for _, hash := range codes {
+ s.healer.codeTasks[hash] = struct{}{}
+ }
+ }
+ // If all the heal tasks are trienodes or already downloading, bail
+ if len(s.healer.codeTasks) == 0 {
+ return
+ }
+ // Task pending retrieval, try to find an idle peer. If no such peer
+ // exists, we probably assigned tasks for all (or they are stateless).
+ // Abort the entire assignment mechanism.
+ if len(idlers.ids) == 0 {
+ return
+ }
+ var (
+ idle = idlers.ids[0]
+ peer = s.peers[idle]
+ cap = idlers.caps[0]
+ )
+ idlers.ids, idlers.caps = idlers.ids[1:], idlers.caps[1:]
+
+ // Matched a pending task to an idle peer, allocate a unique request id
+ var reqid uint64
+ for {
+ reqid = uint64(rand.Int63())
+ if reqid == 0 {
+ continue
+ }
+ if _, ok := s.bytecodeHealReqs[reqid]; ok {
+ continue
+ }
+ break
+ }
+ // Generate the network query and send it to the peer
+ if cap > maxCodeRequestCount {
+ cap = maxCodeRequestCount
+ }
+ hashes := make([]common.Hash, 0, cap)
+ for hash := range s.healer.codeTasks {
+ delete(s.healer.codeTasks, hash)
+
+ hashes = append(hashes, hash)
+ if len(hashes) >= cap {
+ break
+ }
+ }
+ req := &bytecodeHealRequest{
+ peer: idle,
+ id: reqid,
+ time: time.Now(),
+ deliver: success,
+ revert: fail,
+ cancel: cancel,
+ stale: make(chan struct{}),
+ hashes: hashes,
+ task: s.healer,
+ }
+ req.timeout = time.AfterFunc(s.rates.TargetTimeout(), func() {
+ peer.Log().Debug("Bytecode heal request timed out", "reqid", reqid)
+ s.rates.Update(idle, ByteCodesMsg, 0, 0)
+ s.scheduleRevertBytecodeHealRequest(req)
+ })
+ s.bytecodeHealReqs[reqid] = req
+ delete(s.bytecodeHealIdlers, idle)
+
+ s.pend.Add(1)
+ go func() {
+ defer s.pend.Done()
+
+ // Attempt to send the remote request and revert if it fails
+ if err := peer.RequestByteCodes(reqid, hashes, maxRequestSize); err != nil {
+ log.Debug("Failed to request bytecode healers", "err", err)
+ s.scheduleRevertBytecodeHealRequest(req)
+ }
+ }()
+ }
+}
+
+// scheduleRevertTrienodeHealRequest asks the event loop to clean up a trienode heal
+// request and return all failed retrieval tasks to the scheduler for reassignment.
+func (s *Syncer) scheduleRevertTrienodeHealRequest(req *trienodeHealRequest) {
+ select {
+ case req.revert <- req:
+ // Sync event loop notified
+ case <-req.cancel:
+ // Sync cycle got cancelled
+ case <-req.stale:
+ // Request already reverted
+ }
+}
+
+// revertTrienodeHealRequest cleans up a trienode heal request and returns all
+// failed retrieval tasks to the scheduler for reassignment.
+//
+// Note, this needs to run on the event runloop thread to reschedule to idle peers.
+// On peer threads, use scheduleRevertTrienodeHealRequest.
+func (s *Syncer) revertTrienodeHealRequest(req *trienodeHealRequest) {
+ log.Debug("Reverting trienode heal request", "peer", req.peer)
+ select {
+ case <-req.stale:
+ log.Trace("Trienode heal request already reverted", "peer", req.peer, "reqid", req.id)
+ return
+ default:
+ }
+ close(req.stale)
+
+ // Remove the request from the tracked set and restore the peer to the
+ // idle pool so it can be reassigned work (skip if peer already left).
+ s.lock.Lock()
+ delete(s.trienodeHealReqs, req.id)
+ if _, ok := s.peers[req.peer]; ok {
+ s.trienodeHealIdlers[req.peer] = struct{}{}
+ }
+ s.lock.Unlock()
+
+ // If there's a timeout timer still running, abort it and mark the trie node
+ // retrievals as not-pending, ready for rescheduling
+ req.timeout.Stop()
+ for i, path := range req.paths {
+ req.task.trieTasks[path] = req.hashes[i]
+ }
+}
+
+// scheduleRevertBytecodeHealRequest asks the event loop to clean up a bytecode heal
+// request and return all failed retrieval tasks to the scheduler for reassignment.
+func (s *Syncer) scheduleRevertBytecodeHealRequest(req *bytecodeHealRequest) {
+ select {
+ case req.revert <- req:
+ // Sync event loop notified
+ case <-req.cancel:
+ // Sync cycle got cancelled
+ case <-req.stale:
+ // Request already reverted
+ }
+}
+
+// revertBytecodeHealRequest cleans up a bytecode heal request and returns all
+// failed retrieval tasks to the scheduler for reassignment.
+//
+// Note, this needs to run on the event runloop thread to reschedule to idle peers.
+// On peer threads, use scheduleRevertBytecodeHealRequest.
+func (s *Syncer) revertBytecodeHealRequest(req *bytecodeHealRequest) {
+ log.Debug("Reverting bytecode heal request", "peer", req.peer)
+ select {
+ case <-req.stale:
+ log.Trace("Bytecode heal request already reverted", "peer", req.peer, "reqid", req.id)
+ return
+ default:
+ }
+ close(req.stale)
+
+ // Remove the request from the tracked set and restore the peer to the
+ // idle pool so it can be reassigned work (skip if peer already left).
+ s.lock.Lock()
+ delete(s.bytecodeHealReqs, req.id)
+ if _, ok := s.peers[req.peer]; ok {
+ s.bytecodeHealIdlers[req.peer] = struct{}{}
+ }
+ s.lock.Unlock()
+
+ // If there's a timeout timer still running, abort it and mark the code
+ // retrievals as not-pending, ready for rescheduling
+ req.timeout.Stop()
+ for _, hash := range req.hashes {
+ req.task.codeTasks[hash] = struct{}{}
+ }
+}
+
+// processTrienodeHealResponse integrates an already validated trienode response
+// into the healer tasks.
+func (s *Syncer) processTrienodeHealResponse(res *trienodeHealResponse) {
+ var (
+ start = time.Now()
+ fills int
+ )
+ for i, hash := range res.hashes {
+ node := res.nodes[i]
+
+ // If the trie node was not delivered, reschedule it
+ if node == nil {
+ res.task.trieTasks[res.paths[i]] = res.hashes[i]
+ continue
+ }
+ fills++
+
+ // Push the trie node into the state syncer
+ s.trienodeHealSynced++
+ s.trienodeHealBytes += common.StorageSize(len(node))
+
+ err := s.healer.scheduler.ProcessNode(trie.NodeSyncResult{Path: res.paths[i], Data: node})
+ switch err {
+ case nil:
+ case trie.ErrAlreadyProcessed:
+ s.trienodeHealDups++
+ case trie.ErrNotRequested:
+ s.trienodeHealNops++
+ default:
+ log.Error("Invalid trienode processed", "hash", hash, "err", err)
+ }
+ }
+ s.commitHealer(false)
+
+ // Calculate the processing rate of one filled trie node
+ rate := float64(fills) / (float64(time.Since(start)) / float64(time.Second))
+
+ // Update the currently measured trienode queueing and processing throughput.
+ //
+ // The processing rate needs to be updated uniformly independent if we've
+ // processed 1x100 trie nodes or 100x1 to keep the rate consistent even in
+ // the face of varying network packets. As such, we cannot just measure the
+ // time it took to process N trie nodes and update once, we need one update
+ // per trie node.
+ //
+ // Naively, that would be:
+ //
+ // for i:=0; i time.Second {
+ // Periodically adjust the trie node throttler
+ if float64(pending) > 2*s.trienodeHealRate {
+ s.trienodeHealThrottle *= trienodeHealThrottleIncrease
+ } else {
+ s.trienodeHealThrottle /= trienodeHealThrottleDecrease
+ }
+ if s.trienodeHealThrottle > maxTrienodeHealThrottle {
+ s.trienodeHealThrottle = maxTrienodeHealThrottle
+ } else if s.trienodeHealThrottle < minTrienodeHealThrottle {
+ s.trienodeHealThrottle = minTrienodeHealThrottle
+ }
+ s.trienodeHealThrottled = time.Now()
+
+ log.Debug("Updated trie node heal throttler", "rate", s.trienodeHealRate, "pending", pending, "throttle", s.trienodeHealThrottle)
+ }
+}
+
+func (s *Syncer) commitHealer(force bool) {
+ if !force && s.healer.scheduler.MemSize() < ethdb.IdealBatchSize {
+ return
+ }
+ batch := s.db.NewBatch()
+ if err := s.healer.scheduler.Commit(batch); err != nil {
+ log.Crit("Failed to commit healing data", "err", err)
+ }
+ if err := batch.Write(); err != nil {
+ log.Crit("Failed to persist healing data", "err", err)
+ }
+ log.Debug("Persisted set of healing data", "type", "trienodes", "bytes", common.StorageSize(batch.ValueSize()))
+}
+
+// processBytecodeHealResponse integrates an already validated bytecode response
+// into the healer tasks.
+func (s *Syncer) processBytecodeHealResponse(res *bytecodeHealResponse) {
+ for i, hash := range res.hashes {
+ node := res.codes[i]
+
+ // If the trie node was not delivered, reschedule it
+ if node == nil {
+ res.task.codeTasks[hash] = struct{}{}
+ continue
+ }
+ // Push the trie node into the state syncer
+ s.bytecodeHealSynced++
+ s.bytecodeHealBytes += common.StorageSize(len(node))
+
+ err := s.healer.scheduler.ProcessCode(trie.CodeSyncResult{Hash: hash, Data: node})
+ switch err {
+ case nil:
+ case trie.ErrAlreadyProcessed:
+ s.bytecodeHealDups++
+ case trie.ErrNotRequested:
+ s.bytecodeHealNops++
+ default:
+ log.Error("Invalid bytecode processed", "hash", hash, "err", err)
+ }
+ }
+ s.commitHealer(false)
+}
+
+// OnTrieNodes is a callback method to invoke when a batch of trie nodes
+// are received from a remote peer.
+func (s *Syncer) OnTrieNodes(peer SyncPeer, id uint64, trienodes [][]byte) error {
+ var size common.StorageSize
+ for _, node := range trienodes {
+ size += common.StorageSize(len(node))
+ }
+ logger := peer.Log().New("reqid", id)
+ logger.Trace("Delivering set of healing trienodes", "trienodes", len(trienodes), "bytes", size)
+
+ // Whether or not the response is valid, we can mark the peer as idle and
+ // notify the scheduler to assign a new task. If the response is invalid,
+ // we'll drop the peer in a bit.
+ defer func() {
+ s.lock.Lock()
+ defer s.lock.Unlock()
+ if _, ok := s.peers[peer.ID()]; ok {
+ s.trienodeHealIdlers[peer.ID()] = struct{}{}
+ }
+ select {
+ case s.update <- struct{}{}:
+ default:
+ }
+ }()
+ s.lock.Lock()
+ // Ensure the response is for a valid request
+ req, ok := s.trienodeHealReqs[id]
+ if !ok {
+ // Request stale, perhaps the peer timed out but came through in the end
+ logger.Warn("Unexpected trienode heal packet")
+ s.lock.Unlock()
+ return nil
+ }
+ delete(s.trienodeHealReqs, id)
+ s.rates.Update(peer.ID(), TrieNodesMsg, time.Since(req.time), len(trienodes))
+
+ // Clean up the request timeout timer, we'll see how to proceed further based
+ // on the actual delivered content
+ if !req.timeout.Stop() {
+ // The timeout is already triggered, and this request will be reverted+rescheduled
+ s.lock.Unlock()
+ return nil
+ }
+
+ // Response is valid, but check if peer is signalling that it does not have
+ // the requested data. For bytecode range queries that means the peer is not
+ // yet synced.
+ if len(trienodes) == 0 {
+ logger.Debug("Peer rejected trienode heal request")
+ s.statelessPeers[peer.ID()] = struct{}{}
+ s.lock.Unlock()
+
+ // Signal this request as failed, and ready for rescheduling
+ s.scheduleRevertTrienodeHealRequest(req)
+ return nil
+ }
+ s.lock.Unlock()
+
+ // Cross reference the requested trienodes with the response to find gaps
+ // that the serving node is missing
+ var (
+ hasher = crypto.NewKeccakState()
+ hash = make([]byte, 32)
+ nodes = make([][]byte, len(req.hashes))
+ fills uint64
+ )
+ for i, j := 0, 0; i < len(trienodes); i++ {
+ // Find the next hash that we've been served, leaving misses with nils
+ hasher.Reset()
+ hasher.Write(trienodes[i])
+ hasher.Read(hash)
+
+ for j < len(req.hashes) && !bytes.Equal(hash, req.hashes[j][:]) {
+ j++
+ }
+ if j < len(req.hashes) {
+ nodes[j] = trienodes[i]
+ fills++
+ j++
+ continue
+ }
+ // We've either ran out of hashes, or got unrequested data
+ logger.Warn("Unexpected healing trienodes", "count", len(trienodes)-i)
+
+ // Signal this request as failed, and ready for rescheduling
+ s.scheduleRevertTrienodeHealRequest(req)
+ return errors.New("unexpected healing trienode")
+ }
+ // Response validated, send it to the scheduler for filling
+ s.trienodeHealPend.Add(fills)
+ defer func() {
+ s.trienodeHealPend.Add(^(fills - 1))
+ }()
+ response := &trienodeHealResponse{
+ paths: req.paths,
+ task: req.task,
+ hashes: req.hashes,
+ nodes: nodes,
+ }
+ select {
+ case req.deliver <- response:
+ case <-req.cancel:
+ case <-req.stale:
+ }
+ return nil
+}
+
+// onHealByteCodes is a callback method to invoke when a batch of contract
+// bytes codes are received from a remote peer in the healing phase.
+func (s *Syncer) onHealByteCodes(peer SyncPeer, id uint64, bytecodes [][]byte) error {
+ var size common.StorageSize
+ for _, code := range bytecodes {
+ size += common.StorageSize(len(code))
+ }
+ logger := peer.Log().New("reqid", id)
+ logger.Trace("Delivering set of healing bytecodes", "bytecodes", len(bytecodes), "bytes", size)
+
+ // Whether or not the response is valid, we can mark the peer as idle and
+ // notify the scheduler to assign a new task. If the response is invalid,
+ // we'll drop the peer in a bit.
+ defer func() {
+ s.lock.Lock()
+ defer s.lock.Unlock()
+ if _, ok := s.peers[peer.ID()]; ok {
+ s.bytecodeHealIdlers[peer.ID()] = struct{}{}
+ }
+ select {
+ case s.update <- struct{}{}:
+ default:
+ }
+ }()
+ s.lock.Lock()
+ // Ensure the response is for a valid request
+ req, ok := s.bytecodeHealReqs[id]
+ if !ok {
+ // Request stale, perhaps the peer timed out but came through in the end
+ logger.Warn("Unexpected bytecode heal packet")
+ s.lock.Unlock()
+ return nil
+ }
+ delete(s.bytecodeHealReqs, id)
+ s.rates.Update(peer.ID(), ByteCodesMsg, time.Since(req.time), len(bytecodes))
+
+ // Clean up the request timeout timer, we'll see how to proceed further based
+ // on the actual delivered content
+ if !req.timeout.Stop() {
+ // The timeout is already triggered, and this request will be reverted+rescheduled
+ s.lock.Unlock()
+ return nil
+ }
+
+ // Response is valid, but check if peer is signalling that it does not have
+ // the requested data. For bytecode range queries that means the peer is not
+ // yet synced.
+ if len(bytecodes) == 0 {
+ logger.Debug("Peer rejected bytecode heal request")
+ s.statelessPeers[peer.ID()] = struct{}{}
+ s.lock.Unlock()
+
+ // Signal this request as failed, and ready for rescheduling
+ s.scheduleRevertBytecodeHealRequest(req)
+ return nil
+ }
+ s.lock.Unlock()
+
+ // Cross reference the requested bytecodes with the response to find gaps
+ // that the serving node is missing
+ hasher := crypto.NewKeccakState()
+ hash := make([]byte, 32)
+
+ codes := make([][]byte, len(req.hashes))
+ for i, j := 0, 0; i < len(bytecodes); i++ {
+ // Find the next hash that we've been served, leaving misses with nils
+ hasher.Reset()
+ hasher.Write(bytecodes[i])
+ hasher.Read(hash)
+
+ for j < len(req.hashes) && !bytes.Equal(hash, req.hashes[j][:]) {
+ j++
+ }
+ if j < len(req.hashes) {
+ codes[j] = bytecodes[i]
+ j++
+ continue
+ }
+ // We've either ran out of hashes, or got unrequested data
+ logger.Warn("Unexpected healing bytecodes", "count", len(bytecodes)-i)
+ // Signal this request as failed, and ready for rescheduling
+ s.scheduleRevertBytecodeHealRequest(req)
+ return errors.New("unexpected healing bytecode")
+ }
+ // Response validated, send it to the scheduler for filling
+ response := &bytecodeHealResponse{
+ task: req.task,
+ hashes: req.hashes,
+ codes: codes,
+ }
+ select {
+ case req.deliver <- response:
+ case <-req.cancel:
+ case <-req.stale:
+ }
+ return nil
+}
+
+// onHealState is a callback method to invoke when a flat state(account
+// or storage slot) is downloaded during the healing stage. The flat states
+// can be persisted blindly and can be fixed later in the generation stage.
+// Note it's not concurrent safe, please handle the concurrent issue outside.
+func (s *Syncer) onHealState(paths [][]byte, value []byte) error {
+ if len(paths) == 1 {
+ var account types.StateAccount
+ if err := rlp.DecodeBytes(value, &account); err != nil {
+ return nil // Returning the error here would drop the remote peer
+ }
+ blob := types.SlimAccountRLP(account)
+ rawdb.WriteAccountSnapshot(s.stateWriter, common.BytesToHash(paths[0]), blob)
+ s.accountHealed += 1
+ s.accountHealedBytes += common.StorageSize(1 + common.HashLength + len(blob))
+ }
+ if len(paths) == 2 {
+ rawdb.WriteStorageSnapshot(s.stateWriter, common.BytesToHash(paths[0]), common.BytesToHash(paths[1]), value)
+ s.storageHealed += 1
+ s.storageHealedBytes += common.StorageSize(1 + 2*common.HashLength + len(value))
+ }
+ if s.stateWriter.ValueSize() > ethdb.IdealBatchSize {
+ s.stateWriter.Write() // It's fine to ignore the error here
+ s.stateWriter.Reset()
+ }
+ return nil
+}
+
+// reportHealProgress calculates various status reports and provides it to the user.
+func (s *Syncer) reportHealProgress(force bool) {
+ // Don't report all the events, just occasionally
+ if !force && time.Since(s.logTime) < 8*time.Second {
+ return
+ }
+ s.logTime = time.Now()
+
+ // Create a mega progress report
+ var (
+ trienode = fmt.Sprintf("%v@%v", log.FormatLogfmtUint64(s.trienodeHealSynced), s.trienodeHealBytes.TerminalString())
+ bytecode = fmt.Sprintf("%v@%v", log.FormatLogfmtUint64(s.bytecodeHealSynced), s.bytecodeHealBytes.TerminalString())
+ accounts = fmt.Sprintf("%v@%v", log.FormatLogfmtUint64(s.accountHealed), s.accountHealedBytes.TerminalString())
+ storage = fmt.Sprintf("%v@%v", log.FormatLogfmtUint64(s.storageHealed), s.storageHealedBytes.TerminalString())
+ )
+ log.Info("Syncing: state healing in progress", "accounts", accounts, "slots", storage,
+ "codes", bytecode, "nodes", trienode, "pending", s.healer.scheduler.Pending())
+}
From 0e9f9ff1c814f64d81ea81c65dadbcadedbf9ce7 Mon Sep 17 00:00:00 2001
From: jonny rhea <5555162+jrhea@users.noreply.github.com>
Date: Wed, 13 May 2026 15:00:28 -0500
Subject: [PATCH 3/8] eth/protocols/snap: rename loadSyncStatus/saveSyncStatus
to v1 variants and move to sync_v1.go
---
eth/protocols/snap/sync.go | 176 +---------------------------------
eth/protocols/snap/sync_v1.go | 173 +++++++++++++++++++++++++++++++++
2 files changed, 175 insertions(+), 174 deletions(-)
diff --git a/eth/protocols/snap/sync.go b/eth/protocols/snap/sync.go
index ff52661f13..8ab40ffc41 100644
--- a/eth/protocols/snap/sync.go
+++ b/eth/protocols/snap/sync.go
@@ -18,7 +18,6 @@ package snap
import (
"bytes"
- "encoding/json"
"errors"
"fmt"
"math/big"
@@ -538,7 +537,7 @@ func (s *Syncer) Sync(root common.Hash, cancel chan struct{}) error {
s.startTime = time.Now()
}
// Retrieve the previous sync status from LevelDB and abort if already synced
- s.loadSyncStatus()
+ s.loadSyncStatusV1()
if len(s.tasks) == 0 && s.healer.scheduler.Pending() == 0 {
log.Debug("Snapshot sync already completed")
return nil
@@ -548,7 +547,7 @@ func (s *Syncer) Sync(root common.Hash, cancel chan struct{}) error {
s.forwardAccountTask(task)
}
s.cleanAccountTasks()
- s.saveSyncStatus()
+ s.saveSyncStatusV1()
}()
log.Debug("Starting snapshot sync cycle", "root", root)
@@ -687,177 +686,6 @@ func (s *Syncer) Sync(root common.Hash, cancel chan struct{}) error {
}
}
-// loadSyncStatus retrieves a previously aborted sync status from the database,
-// or generates a fresh one if none is available.
-func (s *Syncer) loadSyncStatus() {
- var progress SyncProgress
-
- if status := rawdb.ReadSnapshotSyncStatus(s.db); status != nil {
- if err := json.Unmarshal(status, &progress); err != nil {
- log.Error("Failed to decode snap sync status", "err", err)
- } else {
- for _, task := range progress.Tasks {
- log.Debug("Scheduled account sync task", "from", task.Next, "last", task.Last)
- }
- s.tasks = progress.Tasks
- for _, task := range s.tasks {
- // Restore the completed storages
- task.stateCompleted = make(map[common.Hash]struct{})
- for _, hash := range task.StorageCompleted {
- task.stateCompleted[hash] = struct{}{}
- }
- task.StorageCompleted = nil
-
- // Allocate batch for account trie generation
- task.genBatch = ethdb.HookedBatch{
- Batch: s.db.NewBatch(),
- OnPut: func(key []byte, value []byte) {
- s.accountBytes += common.StorageSize(len(key) + len(value))
- },
- }
- if s.scheme == rawdb.HashScheme {
- task.genTrie = newHashTrie(task.genBatch)
- }
- if s.scheme == rawdb.PathScheme {
- task.genTrie = newPathTrie(common.Hash{}, task.Next != common.Hash{}, s.db, task.genBatch)
- }
- // Restore leftover storage tasks
- for accountHash, subtasks := range task.SubTasks {
- for _, subtask := range subtasks {
- subtask.genBatch = ethdb.HookedBatch{
- Batch: s.db.NewBatch(),
- OnPut: func(key []byte, value []byte) {
- s.storageBytes += common.StorageSize(len(key) + len(value))
- },
- }
- if s.scheme == rawdb.HashScheme {
- subtask.genTrie = newHashTrie(subtask.genBatch)
- }
- if s.scheme == rawdb.PathScheme {
- subtask.genTrie = newPathTrie(accountHash, subtask.Next != common.Hash{}, s.db, subtask.genBatch)
- }
- }
- }
- }
- s.lock.Lock()
- defer s.lock.Unlock()
-
- s.snapped = len(s.tasks) == 0
-
- s.accountSynced = progress.AccountSynced
- s.accountBytes = progress.AccountBytes
- s.bytecodeSynced = progress.BytecodeSynced
- s.bytecodeBytes = progress.BytecodeBytes
- s.storageSynced = progress.StorageSynced
- s.storageBytes = progress.StorageBytes
-
- s.trienodeHealSynced = progress.TrienodeHealSynced
- s.trienodeHealBytes = progress.TrienodeHealBytes
- s.bytecodeHealSynced = progress.BytecodeHealSynced
- s.bytecodeHealBytes = progress.BytecodeHealBytes
- return
- }
- }
- // Either we've failed to decode the previous state, or there was none.
- // Start a fresh sync by chunking up the account range and scheduling
- // them for retrieval.
- s.tasks = nil
- s.accountSynced, s.accountBytes = 0, 0
- s.bytecodeSynced, s.bytecodeBytes = 0, 0
- s.storageSynced, s.storageBytes = 0, 0
- s.trienodeHealSynced, s.trienodeHealBytes = 0, 0
- s.bytecodeHealSynced, s.bytecodeHealBytes = 0, 0
-
- var next common.Hash
- step := new(big.Int).Sub(
- new(big.Int).Div(
- new(big.Int).Exp(common.Big2, common.Big256, nil),
- big.NewInt(int64(accountConcurrency)),
- ), common.Big1,
- )
- for i := 0; i < accountConcurrency; i++ {
- last := common.BigToHash(new(big.Int).Add(next.Big(), step))
- if i == accountConcurrency-1 {
- // Make sure we don't overflow if the step is not a proper divisor
- last = common.MaxHash
- }
- batch := ethdb.HookedBatch{
- Batch: s.db.NewBatch(),
- OnPut: func(key []byte, value []byte) {
- s.accountBytes += common.StorageSize(len(key) + len(value))
- },
- }
- var tr genTrie
- if s.scheme == rawdb.HashScheme {
- tr = newHashTrie(batch)
- }
- if s.scheme == rawdb.PathScheme {
- tr = newPathTrie(common.Hash{}, next != common.Hash{}, s.db, batch)
- }
- s.tasks = append(s.tasks, &accountTask{
- Next: next,
- Last: last,
- SubTasks: make(map[common.Hash][]*storageTask),
- genBatch: batch,
- stateCompleted: make(map[common.Hash]struct{}),
- genTrie: tr,
- })
- log.Debug("Created account sync task", "from", next, "last", last)
- next = common.BigToHash(new(big.Int).Add(last.Big(), common.Big1))
- }
-}
-
-// saveSyncStatus marshals the remaining sync tasks into leveldb.
-func (s *Syncer) saveSyncStatus() {
- // Serialize any partial progress to disk before spinning down
- for _, task := range s.tasks {
- // Claim the right boundary as incomplete before flushing the
- // accumulated nodes in batch, the nodes on right boundary
- // will be discarded and cleaned up by this call.
- task.genTrie.commit(false)
- if err := task.genBatch.Write(); err != nil {
- log.Error("Failed to persist account slots", "err", err)
- }
- for _, subtasks := range task.SubTasks {
- for _, subtask := range subtasks {
- // Same for account trie, discard and cleanup the
- // incomplete right boundary.
- subtask.genTrie.commit(false)
- if err := subtask.genBatch.Write(); err != nil {
- log.Error("Failed to persist storage slots", "err", err)
- }
- }
- }
- // Save the account hashes of completed storage.
- task.StorageCompleted = make([]common.Hash, 0, len(task.stateCompleted))
- for hash := range task.stateCompleted {
- task.StorageCompleted = append(task.StorageCompleted, hash)
- }
- if len(task.StorageCompleted) > 0 {
- log.Debug("Leftover completed storages", "number", len(task.StorageCompleted), "next", task.Next, "last", task.Last)
- }
- }
- // Store the actual progress markers
- progress := &SyncProgress{
- Tasks: s.tasks,
- AccountSynced: s.accountSynced,
- AccountBytes: s.accountBytes,
- BytecodeSynced: s.bytecodeSynced,
- BytecodeBytes: s.bytecodeBytes,
- StorageSynced: s.storageSynced,
- StorageBytes: s.storageBytes,
- TrienodeHealSynced: s.trienodeHealSynced,
- TrienodeHealBytes: s.trienodeHealBytes,
- BytecodeHealSynced: s.bytecodeHealSynced,
- BytecodeHealBytes: s.bytecodeHealBytes,
- }
- status, err := json.Marshal(progress)
- if err != nil {
- panic(err) // This can only fail during implementation
- }
- rawdb.WriteSnapshotSyncStatus(s.db, status)
-}
-
// Progress returns the snap sync status statistics.
func (s *Syncer) Progress() (*SyncProgress, *SyncPending) {
s.lock.Lock()
diff --git a/eth/protocols/snap/sync_v1.go b/eth/protocols/snap/sync_v1.go
index d6cd5ab5ca..7f8fafb0d6 100644
--- a/eth/protocols/snap/sync_v1.go
+++ b/eth/protocols/snap/sync_v1.go
@@ -18,9 +18,11 @@ package snap
import (
"bytes"
+ "encoding/json"
"errors"
"fmt"
gomath "math"
+ "math/big"
"math/rand"
"sort"
"time"
@@ -889,3 +891,174 @@ func (s *Syncer) reportHealProgress(force bool) {
log.Info("Syncing: state healing in progress", "accounts", accounts, "slots", storage,
"codes", bytecode, "nodes", trienode, "pending", s.healer.scheduler.Pending())
}
+
+// loadSyncStatusV1 retrieves a previously aborted sync status from the database,
+// or generates a fresh one if none is available.
+func (s *Syncer) loadSyncStatusV1() {
+ var progress SyncProgress
+
+ if status := rawdb.ReadSnapshotSyncStatus(s.db); status != nil {
+ if err := json.Unmarshal(status, &progress); err != nil {
+ log.Error("Failed to decode snap sync status", "err", err)
+ } else {
+ for _, task := range progress.Tasks {
+ log.Debug("Scheduled account sync task", "from", task.Next, "last", task.Last)
+ }
+ s.tasks = progress.Tasks
+ for _, task := range s.tasks {
+ // Restore the completed storages
+ task.stateCompleted = make(map[common.Hash]struct{})
+ for _, hash := range task.StorageCompleted {
+ task.stateCompleted[hash] = struct{}{}
+ }
+ task.StorageCompleted = nil
+
+ // Allocate batch for account trie generation
+ task.genBatch = ethdb.HookedBatch{
+ Batch: s.db.NewBatch(),
+ OnPut: func(key []byte, value []byte) {
+ s.accountBytes += common.StorageSize(len(key) + len(value))
+ },
+ }
+ if s.scheme == rawdb.HashScheme {
+ task.genTrie = newHashTrie(task.genBatch)
+ }
+ if s.scheme == rawdb.PathScheme {
+ task.genTrie = newPathTrie(common.Hash{}, task.Next != common.Hash{}, s.db, task.genBatch)
+ }
+ // Restore leftover storage tasks
+ for accountHash, subtasks := range task.SubTasks {
+ for _, subtask := range subtasks {
+ subtask.genBatch = ethdb.HookedBatch{
+ Batch: s.db.NewBatch(),
+ OnPut: func(key []byte, value []byte) {
+ s.storageBytes += common.StorageSize(len(key) + len(value))
+ },
+ }
+ if s.scheme == rawdb.HashScheme {
+ subtask.genTrie = newHashTrie(subtask.genBatch)
+ }
+ if s.scheme == rawdb.PathScheme {
+ subtask.genTrie = newPathTrie(accountHash, subtask.Next != common.Hash{}, s.db, subtask.genBatch)
+ }
+ }
+ }
+ }
+ s.lock.Lock()
+ defer s.lock.Unlock()
+
+ s.snapped = len(s.tasks) == 0
+
+ s.accountSynced = progress.AccountSynced
+ s.accountBytes = progress.AccountBytes
+ s.bytecodeSynced = progress.BytecodeSynced
+ s.bytecodeBytes = progress.BytecodeBytes
+ s.storageSynced = progress.StorageSynced
+ s.storageBytes = progress.StorageBytes
+
+ s.trienodeHealSynced = progress.TrienodeHealSynced
+ s.trienodeHealBytes = progress.TrienodeHealBytes
+ s.bytecodeHealSynced = progress.BytecodeHealSynced
+ s.bytecodeHealBytes = progress.BytecodeHealBytes
+ return
+ }
+ }
+ // Either we've failed to decode the previous state, or there was none.
+ // Start a fresh sync by chunking up the account range and scheduling
+ // them for retrieval.
+ s.tasks = nil
+ s.accountSynced, s.accountBytes = 0, 0
+ s.bytecodeSynced, s.bytecodeBytes = 0, 0
+ s.storageSynced, s.storageBytes = 0, 0
+ s.trienodeHealSynced, s.trienodeHealBytes = 0, 0
+ s.bytecodeHealSynced, s.bytecodeHealBytes = 0, 0
+
+ var next common.Hash
+ step := new(big.Int).Sub(
+ new(big.Int).Div(
+ new(big.Int).Exp(common.Big2, common.Big256, nil),
+ big.NewInt(int64(accountConcurrency)),
+ ), common.Big1,
+ )
+ for i := 0; i < accountConcurrency; i++ {
+ last := common.BigToHash(new(big.Int).Add(next.Big(), step))
+ if i == accountConcurrency-1 {
+ // Make sure we don't overflow if the step is not a proper divisor
+ last = common.MaxHash
+ }
+ batch := ethdb.HookedBatch{
+ Batch: s.db.NewBatch(),
+ OnPut: func(key []byte, value []byte) {
+ s.accountBytes += common.StorageSize(len(key) + len(value))
+ },
+ }
+ var tr genTrie
+ if s.scheme == rawdb.HashScheme {
+ tr = newHashTrie(batch)
+ }
+ if s.scheme == rawdb.PathScheme {
+ tr = newPathTrie(common.Hash{}, next != common.Hash{}, s.db, batch)
+ }
+ s.tasks = append(s.tasks, &accountTask{
+ Next: next,
+ Last: last,
+ SubTasks: make(map[common.Hash][]*storageTask),
+ genBatch: batch,
+ stateCompleted: make(map[common.Hash]struct{}),
+ genTrie: tr,
+ })
+ log.Debug("Created account sync task", "from", next, "last", last)
+ next = common.BigToHash(new(big.Int).Add(last.Big(), common.Big1))
+ }
+}
+
+// saveSyncStatusV1 marshals the remaining sync tasks into leveldb.
+func (s *Syncer) saveSyncStatusV1() {
+ // Serialize any partial progress to disk before spinning down
+ for _, task := range s.tasks {
+ // Claim the right boundary as incomplete before flushing the
+ // accumulated nodes in batch, the nodes on right boundary
+ // will be discarded and cleaned up by this call.
+ task.genTrie.commit(false)
+ if err := task.genBatch.Write(); err != nil {
+ log.Error("Failed to persist account slots", "err", err)
+ }
+ for _, subtasks := range task.SubTasks {
+ for _, subtask := range subtasks {
+ // Same for account trie, discard and cleanup the
+ // incomplete right boundary.
+ subtask.genTrie.commit(false)
+ if err := subtask.genBatch.Write(); err != nil {
+ log.Error("Failed to persist storage slots", "err", err)
+ }
+ }
+ }
+ // Save the account hashes of completed storage.
+ task.StorageCompleted = make([]common.Hash, 0, len(task.stateCompleted))
+ for hash := range task.stateCompleted {
+ task.StorageCompleted = append(task.StorageCompleted, hash)
+ }
+ if len(task.StorageCompleted) > 0 {
+ log.Debug("Leftover completed storages", "number", len(task.StorageCompleted), "next", task.Next, "last", task.Last)
+ }
+ }
+ // Store the actual progress markers
+ progress := &SyncProgress{
+ Tasks: s.tasks,
+ AccountSynced: s.accountSynced,
+ AccountBytes: s.accountBytes,
+ BytecodeSynced: s.bytecodeSynced,
+ BytecodeBytes: s.bytecodeBytes,
+ StorageSynced: s.storageSynced,
+ StorageBytes: s.storageBytes,
+ TrienodeHealSynced: s.trienodeHealSynced,
+ TrienodeHealBytes: s.trienodeHealBytes,
+ BytecodeHealSynced: s.bytecodeHealSynced,
+ BytecodeHealBytes: s.bytecodeHealBytes,
+ }
+ status, err := json.Marshal(progress)
+ if err != nil {
+ panic(err) // This can only fail during implementation
+ }
+ rawdb.WriteSnapshotSyncStatus(s.db, status)
+}
From 2c19ece19a9b4bdf9235305c5dc9c9ad319d4df4 Mon Sep 17 00:00:00 2001
From: jonny rhea <5555162+jrhea@users.noreply.github.com>
Date: Wed, 13 May 2026 15:13:51 -0500
Subject: [PATCH 4/8] eth/protocols/snap: extract syncV1 main loop into
sync_v1.go
---
eth/protocols/snap/sync.go | 165 +-------------------------------
eth/protocols/snap/sync_v1.go | 171 ++++++++++++++++++++++++++++++++++
2 files changed, 172 insertions(+), 164 deletions(-)
diff --git a/eth/protocols/snap/sync.go b/eth/protocols/snap/sync.go
index 8ab40ffc41..16d0a7e2f4 100644
--- a/eth/protocols/snap/sync.go
+++ b/eth/protocols/snap/sync.go
@@ -30,7 +30,6 @@ import (
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/common/math"
"github.com/ethereum/go-ethereum/core/rawdb"
- "github.com/ethereum/go-ethereum/core/state"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/crypto"
"github.com/ethereum/go-ethereum/ethdb"
@@ -521,169 +520,7 @@ func (s *Syncer) Unregister(id string) error {
// Previously downloaded segments will not be redownloaded of fixed, rather any
// errors will be healed after the leaves are fully accumulated.
func (s *Syncer) Sync(root common.Hash, cancel chan struct{}) error {
- // Move the trie root from any previous value, revert stateless markers for
- // any peers and initialize the syncer if it was not yet run
- s.lock.Lock()
- s.root = root
- s.healer = &healTask{
- scheduler: state.NewStateSync(root, s.db, s.onHealState, s.scheme),
- trieTasks: make(map[string]common.Hash),
- codeTasks: make(map[common.Hash]struct{}),
- }
- s.statelessPeers = make(map[string]struct{})
- s.lock.Unlock()
-
- if s.startTime.IsZero() {
- s.startTime = time.Now()
- }
- // Retrieve the previous sync status from LevelDB and abort if already synced
- s.loadSyncStatusV1()
- if len(s.tasks) == 0 && s.healer.scheduler.Pending() == 0 {
- log.Debug("Snapshot sync already completed")
- return nil
- }
- defer func() { // Persist any progress, independent of failure
- for _, task := range s.tasks {
- s.forwardAccountTask(task)
- }
- s.cleanAccountTasks()
- s.saveSyncStatusV1()
- }()
-
- log.Debug("Starting snapshot sync cycle", "root", root)
-
- // Flush out the last committed raw states
- defer func() {
- if s.stateWriter.ValueSize() > 0 {
- s.stateWriter.Write()
- s.stateWriter.Reset()
- }
- }()
- defer s.report(true)
- // commit any trie- and bytecode-healing data.
- defer s.commitHealer(true)
-
- // Whether sync completed or not, disregard any future packets
- defer func() {
- log.Debug("Terminating snapshot sync cycle", "root", root)
- s.lock.Lock()
- s.accountReqs = make(map[uint64]*accountRequest)
- s.storageReqs = make(map[uint64]*storageRequest)
- s.bytecodeReqs = make(map[uint64]*bytecodeRequest)
- s.trienodeHealReqs = make(map[uint64]*trienodeHealRequest)
- s.bytecodeHealReqs = make(map[uint64]*bytecodeHealRequest)
- s.lock.Unlock()
- }()
- // Keep scheduling sync tasks
- peerJoin := make(chan string, 16)
- peerJoinSub := s.peerJoin.Subscribe(peerJoin)
- defer peerJoinSub.Unsubscribe()
-
- peerDrop := make(chan string, 16)
- peerDropSub := s.peerDrop.Subscribe(peerDrop)
- defer peerDropSub.Unsubscribe()
-
- // Create a set of unique channels for this sync cycle. We need these to be
- // ephemeral so a data race doesn't accidentally deliver something stale on
- // a persistent channel across syncs (yup, this happened)
- var (
- accountReqFails = make(chan *accountRequest)
- storageReqFails = make(chan *storageRequest)
- bytecodeReqFails = make(chan *bytecodeRequest)
- accountResps = make(chan *accountResponse)
- storageResps = make(chan *storageResponse)
- bytecodeResps = make(chan *bytecodeResponse)
- trienodeHealReqFails = make(chan *trienodeHealRequest)
- bytecodeHealReqFails = make(chan *bytecodeHealRequest)
- trienodeHealResps = make(chan *trienodeHealResponse)
- bytecodeHealResps = make(chan *bytecodeHealResponse)
- )
- for {
- // Remove all completed tasks and terminate sync if everything's done
- s.cleanStorageTasks()
- s.cleanAccountTasks()
- if len(s.tasks) == 0 && s.healer.scheduler.Pending() == 0 {
- // State healing phase completed, record the elapsed time in metrics.
- // Note: healing may be rerun in subsequent cycles to fill gaps between
- // pivot states (e.g., if chain sync takes longer).
- if !s.healStartTime.IsZero() {
- stateHealTimeGauge.Inc(int64(time.Since(s.healStartTime)))
- log.Info("State healing phase is completed", "elapsed", common.PrettyDuration(time.Since(s.healStartTime)))
- s.healStartTime = time.Time{}
- }
- return nil
- }
- // Assign all the data retrieval tasks to any free peers
- s.assignAccountTasks(accountResps, accountReqFails, cancel)
- s.assignBytecodeTasks(bytecodeResps, bytecodeReqFails, cancel)
- s.assignStorageTasks(storageResps, storageReqFails, cancel)
-
- if len(s.tasks) == 0 {
- // State sync phase completed, record the elapsed time in metrics.
- // Note: the initial state sync runs only once, regardless of whether
- // a new cycle is started later. Any state differences in subsequent
- // cycles will be handled by the state healer.
- s.syncTimeOnce.Do(func() {
- stateSyncTimeGauge.Update(int64(time.Since(s.startTime)))
- log.Info("State sync phase is completed", "elapsed", common.PrettyDuration(time.Since(s.startTime)))
- })
- if s.healStartTime.IsZero() {
- s.healStartTime = time.Now()
- }
- s.assignTrienodeHealTasks(trienodeHealResps, trienodeHealReqFails, cancel)
- s.assignBytecodeHealTasks(bytecodeHealResps, bytecodeHealReqFails, cancel)
- }
- // Update sync progress
- s.lock.Lock()
- s.extProgress = &SyncProgress{
- AccountSynced: s.accountSynced,
- AccountBytes: s.accountBytes,
- BytecodeSynced: s.bytecodeSynced,
- BytecodeBytes: s.bytecodeBytes,
- StorageSynced: s.storageSynced,
- StorageBytes: s.storageBytes,
- TrienodeHealSynced: s.trienodeHealSynced,
- TrienodeHealBytes: s.trienodeHealBytes,
- BytecodeHealSynced: s.bytecodeHealSynced,
- BytecodeHealBytes: s.bytecodeHealBytes,
- }
- s.lock.Unlock()
- // Wait for something to happen
- select {
- case <-s.update:
- // Something happened (new peer, delivery, timeout), recheck tasks
- case <-peerJoin:
- // A new peer joined, try to schedule it new tasks
- case id := <-peerDrop:
- s.revertRequests(id)
- case <-cancel:
- return ErrCancelled
-
- case req := <-accountReqFails:
- s.revertAccountRequest(req)
- case req := <-bytecodeReqFails:
- s.revertBytecodeRequest(req)
- case req := <-storageReqFails:
- s.revertStorageRequest(req)
- case req := <-trienodeHealReqFails:
- s.revertTrienodeHealRequest(req)
- case req := <-bytecodeHealReqFails:
- s.revertBytecodeHealRequest(req)
-
- case res := <-accountResps:
- s.processAccountResponse(res)
- case res := <-bytecodeResps:
- s.processBytecodeResponse(res)
- case res := <-storageResps:
- s.processStorageResponse(res)
- case res := <-trienodeHealResps:
- s.processTrienodeHealResponse(res)
- case res := <-bytecodeHealResps:
- s.processBytecodeHealResponse(res)
- }
- // Report stats if something meaningful happened
- s.report(false)
- }
+ return s.syncV1(root, cancel)
}
// Progress returns the snap sync status statistics.
diff --git a/eth/protocols/snap/sync_v1.go b/eth/protocols/snap/sync_v1.go
index 7f8fafb0d6..1754782651 100644
--- a/eth/protocols/snap/sync_v1.go
+++ b/eth/protocols/snap/sync_v1.go
@@ -29,6 +29,7 @@ import (
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/rawdb"
+ "github.com/ethereum/go-ethereum/core/state"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/crypto"
"github.com/ethereum/go-ethereum/ethdb"
@@ -1062,3 +1063,173 @@ func (s *Syncer) saveSyncStatusV1() {
}
rawdb.WriteSnapshotSyncStatus(s.db, status)
}
+
+// syncV1 runs the snap/1 download-and-heal loop. State sync proceeds by
+// fetching account ranges, storage slots, bytecodes; once all account tasks
+// are complete, healing requests trie nodes and bytecodes to fix gaps left
+// by the incremental trie generation.
+func (s *Syncer) syncV1(root common.Hash, cancel chan struct{}) error {
+ // Move the trie root from any previous value, revert stateless markers for
+ // any peers and initialize the syncer if it was not yet run
+ s.lock.Lock()
+ s.root = root
+ s.healer = &healTask{
+ scheduler: state.NewStateSync(root, s.db, s.onHealState, s.scheme),
+ trieTasks: make(map[string]common.Hash),
+ codeTasks: make(map[common.Hash]struct{}),
+ }
+ s.statelessPeers = make(map[string]struct{})
+ s.lock.Unlock()
+
+ if s.startTime.IsZero() {
+ s.startTime = time.Now()
+ }
+ // Retrieve the previous sync status from LevelDB and abort if already synced
+ s.loadSyncStatusV1()
+ if len(s.tasks) == 0 && s.healer.scheduler.Pending() == 0 {
+ log.Debug("Snapshot sync already completed")
+ return nil
+ }
+ defer func() { // Persist any progress, independent of failure
+ for _, task := range s.tasks {
+ s.forwardAccountTask(task)
+ }
+ s.cleanAccountTasks()
+ s.saveSyncStatusV1()
+ }()
+
+ log.Debug("Starting snapshot sync cycle", "root", root)
+
+ // Flush out the last committed raw states
+ defer func() {
+ if s.stateWriter.ValueSize() > 0 {
+ s.stateWriter.Write()
+ s.stateWriter.Reset()
+ }
+ }()
+ defer s.report(true)
+ // commit any trie- and bytecode-healing data.
+ defer s.commitHealer(true)
+
+ // Whether sync completed or not, disregard any future packets
+ defer func() {
+ log.Debug("Terminating snapshot sync cycle", "root", root)
+ s.lock.Lock()
+ s.accountReqs = make(map[uint64]*accountRequest)
+ s.storageReqs = make(map[uint64]*storageRequest)
+ s.bytecodeReqs = make(map[uint64]*bytecodeRequest)
+ s.trienodeHealReqs = make(map[uint64]*trienodeHealRequest)
+ s.bytecodeHealReqs = make(map[uint64]*bytecodeHealRequest)
+ s.lock.Unlock()
+ }()
+ // Keep scheduling sync tasks
+ peerJoin := make(chan string, 16)
+ peerJoinSub := s.peerJoin.Subscribe(peerJoin)
+ defer peerJoinSub.Unsubscribe()
+
+ peerDrop := make(chan string, 16)
+ peerDropSub := s.peerDrop.Subscribe(peerDrop)
+ defer peerDropSub.Unsubscribe()
+
+ // Create a set of unique channels for this sync cycle. We need these to be
+ // ephemeral so a data race doesn't accidentally deliver something stale on
+ // a persistent channel across syncs (yup, this happened)
+ var (
+ accountReqFails = make(chan *accountRequest)
+ storageReqFails = make(chan *storageRequest)
+ bytecodeReqFails = make(chan *bytecodeRequest)
+ accountResps = make(chan *accountResponse)
+ storageResps = make(chan *storageResponse)
+ bytecodeResps = make(chan *bytecodeResponse)
+ trienodeHealReqFails = make(chan *trienodeHealRequest)
+ bytecodeHealReqFails = make(chan *bytecodeHealRequest)
+ trienodeHealResps = make(chan *trienodeHealResponse)
+ bytecodeHealResps = make(chan *bytecodeHealResponse)
+ )
+ for {
+ // Remove all completed tasks and terminate sync if everything's done
+ s.cleanStorageTasks()
+ s.cleanAccountTasks()
+ if len(s.tasks) == 0 && s.healer.scheduler.Pending() == 0 {
+ // State healing phase completed, record the elapsed time in metrics.
+ // Note: healing may be rerun in subsequent cycles to fill gaps between
+ // pivot states (e.g., if chain sync takes longer).
+ if !s.healStartTime.IsZero() {
+ stateHealTimeGauge.Inc(int64(time.Since(s.healStartTime)))
+ log.Info("State healing phase is completed", "elapsed", common.PrettyDuration(time.Since(s.healStartTime)))
+ s.healStartTime = time.Time{}
+ }
+ return nil
+ }
+ // Assign all the data retrieval tasks to any free peers
+ s.assignAccountTasks(accountResps, accountReqFails, cancel)
+ s.assignBytecodeTasks(bytecodeResps, bytecodeReqFails, cancel)
+ s.assignStorageTasks(storageResps, storageReqFails, cancel)
+
+ if len(s.tasks) == 0 {
+ // State sync phase completed, record the elapsed time in metrics.
+ // Note: the initial state sync runs only once, regardless of whether
+ // a new cycle is started later. Any state differences in subsequent
+ // cycles will be handled by the state healer.
+ s.syncTimeOnce.Do(func() {
+ stateSyncTimeGauge.Update(int64(time.Since(s.startTime)))
+ log.Info("State sync phase is completed", "elapsed", common.PrettyDuration(time.Since(s.startTime)))
+ })
+ if s.healStartTime.IsZero() {
+ s.healStartTime = time.Now()
+ }
+ s.assignTrienodeHealTasks(trienodeHealResps, trienodeHealReqFails, cancel)
+ s.assignBytecodeHealTasks(bytecodeHealResps, bytecodeHealReqFails, cancel)
+ }
+ // Update sync progress
+ s.lock.Lock()
+ s.extProgress = &SyncProgress{
+ AccountSynced: s.accountSynced,
+ AccountBytes: s.accountBytes,
+ BytecodeSynced: s.bytecodeSynced,
+ BytecodeBytes: s.bytecodeBytes,
+ StorageSynced: s.storageSynced,
+ StorageBytes: s.storageBytes,
+ TrienodeHealSynced: s.trienodeHealSynced,
+ TrienodeHealBytes: s.trienodeHealBytes,
+ BytecodeHealSynced: s.bytecodeHealSynced,
+ BytecodeHealBytes: s.bytecodeHealBytes,
+ }
+ s.lock.Unlock()
+ // Wait for something to happen
+ select {
+ case <-s.update:
+ // Something happened (new peer, delivery, timeout), recheck tasks
+ case <-peerJoin:
+ // A new peer joined, try to schedule it new tasks
+ case id := <-peerDrop:
+ s.revertRequests(id)
+ case <-cancel:
+ return ErrCancelled
+
+ case req := <-accountReqFails:
+ s.revertAccountRequest(req)
+ case req := <-bytecodeReqFails:
+ s.revertBytecodeRequest(req)
+ case req := <-storageReqFails:
+ s.revertStorageRequest(req)
+ case req := <-trienodeHealReqFails:
+ s.revertTrienodeHealRequest(req)
+ case req := <-bytecodeHealReqFails:
+ s.revertBytecodeHealRequest(req)
+
+ case res := <-accountResps:
+ s.processAccountResponse(res)
+ case res := <-bytecodeResps:
+ s.processBytecodeResponse(res)
+ case res := <-storageResps:
+ s.processStorageResponse(res)
+ case res := <-trienodeHealResps:
+ s.processTrienodeHealResponse(res)
+ case res := <-bytecodeHealResps:
+ s.processBytecodeHealResponse(res)
+ }
+ // Report stats if something meaningful happened
+ s.report(false)
+ }
+}
From b4484ede857d4c0e6cd52f0c58b778e52f9a897d Mon Sep 17 00:00:00 2001
From: jonny rhea <5555162+jrhea@users.noreply.github.com>
Date: Wed, 13 May 2026 17:03:04 -0500
Subject: [PATCH 5/8] eth/protocols/snap: install v1 dispatch hooks via
registerV1
---
eth/protocols/snap/sync.go | 79 +++++++--------------------------
eth/protocols/snap/sync_v1.go | 83 ++++++++++++++++++++++++++++++++++-
2 files changed, 97 insertions(+), 65 deletions(-)
diff --git a/eth/protocols/snap/sync.go b/eth/protocols/snap/sync.go
index 16d0a7e2f4..1a95134658 100644
--- a/eth/protocols/snap/sync.go
+++ b/eth/protocols/snap/sync.go
@@ -68,29 +68,6 @@ const (
// waste bandwidth.
maxTrieRequestCount = maxRequestSize / 512
- // trienodeHealRateMeasurementImpact is the impact a single measurement has on
- // the local node's trienode processing capacity. A value closer to 0 reacts
- // slower to sudden changes, but it is also more stable against temporary hiccups.
- trienodeHealRateMeasurementImpact = 0.005
-
- // minTrienodeHealThrottle is the minimum divisor for throttling trie node
- // heal requests to avoid overloading the local node and excessively expanding
- // the state trie breadth wise.
- minTrienodeHealThrottle = 1
-
- // maxTrienodeHealThrottle is the maximum divisor for throttling trie node
- // heal requests to avoid overloading the local node and exessively expanding
- // the state trie bedth wise.
- maxTrienodeHealThrottle = maxTrieRequestCount
-
- // trienodeHealThrottleIncrease is the multiplier for the throttle when the
- // rate of arriving data is higher than the rate of processing it.
- trienodeHealThrottleIncrease = 1.33
-
- // trienodeHealThrottleDecrease is the divisor for the throttle when the
- // rate of arriving data is lower than the rate of processing it.
- trienodeHealThrottleDecrease = 1.25
-
// batchSizeThreshold is the maximum size allowed for gentrie batch.
batchSizeThreshold = 8 * 1024 * 1024
)
@@ -422,6 +399,13 @@ type Syncer struct {
syncTimeOnce sync.Once // Ensure that the state sync time is uploaded only once
logTime time.Time // Time instance when status was last reported
+ // Version-specific hooks installed by registerV1/V2. Each one exists
+ // because shared code in sync.go needs to dispatch into version-specific
+ // code without knowing which version is running.
+ syncFn func(root common.Hash, cancel chan struct{}) error
+ revertVersionRequests func(peer string)
+ onBytecodesAfterSync func(peer SyncPeer, id uint64, bytecodes [][]byte) error
+
pend sync.WaitGroup // Tracks network request goroutines for graceful shutdown
lock sync.RWMutex // Protects fields that can change outside of sync (peers, reqs, root)
}
@@ -429,7 +413,7 @@ type Syncer struct {
// NewSyncer creates a new snapshot syncer to download the Ethereum state over the
// snap protocol.
func NewSyncer(db ethdb.KeyValueStore, scheme string) *Syncer {
- return &Syncer{
+ s := &Syncer{
db: db,
scheme: scheme,
@@ -447,16 +431,10 @@ func NewSyncer(db ethdb.KeyValueStore, scheme string) *Syncer {
storageReqs: make(map[uint64]*storageRequest),
bytecodeReqs: make(map[uint64]*bytecodeRequest),
- trienodeHealIdlers: make(map[string]struct{}),
- bytecodeHealIdlers: make(map[string]struct{}),
-
- trienodeHealReqs: make(map[uint64]*trienodeHealRequest),
- bytecodeHealReqs: make(map[uint64]*bytecodeHealRequest),
- trienodeHealThrottle: maxTrienodeHealThrottle, // Tune downward instead of insta-filling with junk
- stateWriter: db.NewBatch(),
-
extProgress: new(SyncProgress),
}
+ s.registerV1()
+ return s
}
// Register injects a new data source into the syncer's peerset.
@@ -520,7 +498,7 @@ func (s *Syncer) Unregister(id string) error {
// Previously downloaded segments will not be redownloaded of fixed, rather any
// errors will be healed after the leaves are fully accumulated.
func (s *Syncer) Sync(root common.Hash, cancel chan struct{}) error {
- return s.syncV1(root, cancel)
+ return s.syncFn(root, cancel)
}
// Progress returns the snap sync status statistics.
@@ -957,7 +935,7 @@ func (s *Syncer) assignStorageTasks(success chan *storageResponse, fail chan *st
// revertRequests locates all the currently pending requests from a particular
// peer and reverts them, rescheduling for others to fulfill.
func (s *Syncer) revertRequests(peer string) {
- // Gather the requests first, revertals need the lock too
+ // Gather the shared requests first, revertals need the lock too
s.lock.Lock()
var accountReqs []*accountRequest
for _, req := range s.accountReqs {
@@ -977,21 +955,9 @@ func (s *Syncer) revertRequests(peer string) {
storageReqs = append(storageReqs, req)
}
}
- var trienodeHealReqs []*trienodeHealRequest
- for _, req := range s.trienodeHealReqs {
- if req.peer == peer {
- trienodeHealReqs = append(trienodeHealReqs, req)
- }
- }
- var bytecodeHealReqs []*bytecodeHealRequest
- for _, req := range s.bytecodeHealReqs {
- if req.peer == peer {
- bytecodeHealReqs = append(bytecodeHealReqs, req)
- }
- }
s.lock.Unlock()
- // Revert all the requests matching the peer
+ // Revert all the shared requests matching the peer
for _, req := range accountReqs {
s.revertAccountRequest(req)
}
@@ -1001,12 +967,8 @@ func (s *Syncer) revertRequests(peer string) {
for _, req := range storageReqs {
s.revertStorageRequest(req)
}
- for _, req := range trienodeHealReqs {
- s.revertTrienodeHealRequest(req)
- }
- for _, req := range bytecodeHealReqs {
- s.revertBytecodeHealRequest(req)
- }
+ // Version-specific request maps (heal for v1, access lists for v2).
+ s.revertVersionRequests(peer)
}
// scheduleRevertAccountRequest asks the event loop to clean up an account range
@@ -1755,7 +1717,7 @@ func (s *Syncer) OnByteCodes(peer SyncPeer, id uint64, bytecodes [][]byte) error
if syncing {
return s.onByteCodes(peer, id, bytecodes)
}
- return s.onHealByteCodes(peer, id, bytecodes)
+ return s.onBytecodesAfterSync(peer, id, bytecodes)
}
// onByteCodes is a callback method to invoke when a batch of contract
@@ -2008,15 +1970,6 @@ func (s *Syncer) OnStorage(peer SyncPeer, id uint64, hashes [][]common.Hash, slo
// hashSpace is the total size of the 256 bit hash space for accounts.
var hashSpace = new(big.Int).Exp(common.Big2, common.Big256, nil)
-// report calculates various status reports and provides it to the user.
-func (s *Syncer) report(force bool) {
- if len(s.tasks) > 0 {
- s.reportSyncProgress(force)
- return
- }
- s.reportHealProgress(force)
-}
-
// reportSyncProgress calculates various status reports and provides it to the user.
func (s *Syncer) reportSyncProgress(force bool) {
// Don't report all the events, just occasionally
diff --git a/eth/protocols/snap/sync_v1.go b/eth/protocols/snap/sync_v1.go
index 1754782651..88bae04218 100644
--- a/eth/protocols/snap/sync_v1.go
+++ b/eth/protocols/snap/sync_v1.go
@@ -38,6 +38,48 @@ import (
"github.com/ethereum/go-ethereum/trie"
)
+const (
+ // trienodeHealRateMeasurementImpact is the impact a single measurement has on
+ // the local node's trienode processing capacity. A value closer to 0 reacts
+ // slower to sudden changes, but it is also more stable against temporary hiccups.
+ trienodeHealRateMeasurementImpact = 0.005
+
+ // minTrienodeHealThrottle is the minimum divisor for throttling trie node
+ // heal requests to avoid overloading the local node and excessively expanding
+ // the state trie breadth wise.
+ minTrienodeHealThrottle = 1
+
+ // maxTrienodeHealThrottle is the maximum divisor for throttling trie node
+ // heal requests to avoid overloading the local node and exessively expanding
+ // the state trie bedth wise.
+ maxTrienodeHealThrottle = maxTrieRequestCount
+
+ // trienodeHealThrottleIncrease is the multiplier for the throttle when the
+ // rate of arriving data is higher than the rate of processing it.
+ trienodeHealThrottleIncrease = 1.33
+
+ // trienodeHealThrottleDecrease is the divisor for the throttle when the
+ // rate of arriving data is lower than the rate of processing it.
+ trienodeHealThrottleDecrease = 1.25
+)
+
+// registerV1 wires the Syncer's version-specific hooks to the snap/1
+// implementation and allocates snap/1 specific state on the Syncer.
+func (s *Syncer) registerV1() {
+ // Dispatcher hooks used from shared code in sync.go.
+ s.syncFn = s.syncV1
+ s.revertVersionRequests = s.revertHealRequests
+ s.onBytecodesAfterSync = s.onHealByteCodes
+
+ // V1 specific state.
+ s.trienodeHealIdlers = make(map[string]struct{})
+ s.bytecodeHealIdlers = make(map[string]struct{})
+ s.trienodeHealReqs = make(map[uint64]*trienodeHealRequest)
+ s.bytecodeHealReqs = make(map[uint64]*bytecodeHealRequest)
+ s.trienodeHealThrottle = maxTrienodeHealThrottle
+ s.stateWriter = s.db.NewBatch()
+}
+
// trienodeHealRequest tracks a pending state trie request to ensure responses
// are to actual requests and to validate any security constraints.
//
@@ -520,6 +562,32 @@ func (s *Syncer) revertBytecodeHealRequest(req *bytecodeHealRequest) {
}
}
+// revertHealRequests reverts in-flight trie-node and bytecode heal requests
+// from the given peer. Installed as Syncer.revertVersionRequests by registerV1.
+func (s *Syncer) revertHealRequests(peer string) {
+ s.lock.Lock()
+ var trienodeHealReqs []*trienodeHealRequest
+ for _, req := range s.trienodeHealReqs {
+ if req.peer == peer {
+ trienodeHealReqs = append(trienodeHealReqs, req)
+ }
+ }
+ var bytecodeHealReqs []*bytecodeHealRequest
+ for _, req := range s.bytecodeHealReqs {
+ if req.peer == peer {
+ bytecodeHealReqs = append(bytecodeHealReqs, req)
+ }
+ }
+ s.lock.Unlock()
+
+ for _, req := range trienodeHealReqs {
+ s.revertTrienodeHealRequest(req)
+ }
+ for _, req := range bytecodeHealReqs {
+ s.revertBytecodeHealRequest(req)
+ }
+}
+
// processTrienodeHealResponse integrates an already validated trienode response
// into the healer tasks.
func (s *Syncer) processTrienodeHealResponse(res *trienodeHealResponse) {
@@ -874,6 +942,17 @@ func (s *Syncer) onHealState(paths [][]byte, value []byte) error {
return nil
}
+// reportV1 routes between the shared sync-phase logger and the v1 heal-phase
+// logger based on whether account tasks are still pending. Called from syncV1's
+// main loop.
+func (s *Syncer) reportV1(force bool) {
+ if len(s.tasks) > 0 {
+ s.reportSyncProgress(force)
+ return
+ }
+ s.reportHealProgress(force)
+}
+
// reportHealProgress calculates various status reports and provides it to the user.
func (s *Syncer) reportHealProgress(force bool) {
// Don't report all the events, just occasionally
@@ -1107,7 +1186,7 @@ func (s *Syncer) syncV1(root common.Hash, cancel chan struct{}) error {
s.stateWriter.Reset()
}
}()
- defer s.report(true)
+ defer s.reportV1(true)
// commit any trie- and bytecode-healing data.
defer s.commitHealer(true)
@@ -1230,6 +1309,6 @@ func (s *Syncer) syncV1(root common.Hash, cancel chan struct{}) error {
s.processBytecodeHealResponse(res)
}
// Report stats if something meaningful happened
- s.report(false)
+ s.reportV1(false)
}
}
From 4e06edf98966d8a2b570a7f6991625fe75fb47c4 Mon Sep 17 00:00:00 2001
From: jonny rhea <5555162+jrhea@users.noreply.github.com>
Date: Wed, 13 May 2026 17:55:14 -0500
Subject: [PATCH 6/8] eth/protocols/snap: route remaining v1 dispatch sites
through registerV1 hooks
---
eth/protocols/snap/sync.go | 348 +--------------------------------
eth/protocols/snap/sync_v1.go | 351 +++++++++++++++++++++++++++++++++-
2 files changed, 359 insertions(+), 340 deletions(-)
diff --git a/eth/protocols/snap/sync.go b/eth/protocols/snap/sync.go
index 1a95134658..4a83e2d127 100644
--- a/eth/protocols/snap/sync.go
+++ b/eth/protocols/snap/sync.go
@@ -339,7 +339,7 @@ type Syncer struct {
root common.Hash // Current state trie root being synced
tasks []*accountTask // Current account task set being synced
- snapped bool // Flag to signal that snap phase is done
+ snapped bool // True once account-range download is complete.
healer *healTask // Current state healing task being executed
update chan struct{} // Notification channel for possible sync progression
@@ -402,9 +402,12 @@ type Syncer struct {
// Version-specific hooks installed by registerV1/V2. Each one exists
// because shared code in sync.go needs to dispatch into version-specific
// code without knowing which version is running.
- syncFn func(root common.Hash, cancel chan struct{}) error
- revertVersionRequests func(peer string)
- onBytecodesAfterSync func(peer SyncPeer, id uint64, bytecodes [][]byte) error
+ syncFn func(root common.Hash, cancel chan struct{}) error
+ revertVersionRequests func(peer string)
+ onBytecodesAfterSync func(peer SyncPeer, id uint64, bytecodes [][]byte) error
+ forwardAccountTask func(task *accountTask)
+ registerVersionIdler func(id string)
+ unregisterVersionIdler func(id string)
pend sync.WaitGroup // Tracks network request goroutines for graceful shutdown
lock sync.RWMutex // Protects fields that can change outside of sync (peers, reqs, root)
@@ -456,8 +459,7 @@ func (s *Syncer) Register(peer SyncPeer) error {
s.accountIdlers[id] = struct{}{}
s.storageIdlers[id] = struct{}{}
s.bytecodeIdlers[id] = struct{}{}
- s.trienodeHealIdlers[id] = struct{}{}
- s.bytecodeHealIdlers[id] = struct{}{}
+ s.registerVersionIdler(id)
s.lock.Unlock()
// Notify any active syncs that a new peer can be assigned data
@@ -484,8 +486,7 @@ func (s *Syncer) Unregister(id string) error {
delete(s.accountIdlers, id)
delete(s.storageIdlers, id)
delete(s.bytecodeIdlers, id)
- delete(s.trienodeHealIdlers, id)
- delete(s.bytecodeHealIdlers, id)
+ s.unregisterVersionIdler(id)
s.lock.Unlock()
// Notify any active syncs that pending requests need to be reverted
@@ -1276,337 +1277,6 @@ func (s *Syncer) processBytecodeResponse(res *bytecodeResponse) {
// task assigners to pick up and fill.
}
-// processStorageResponse integrates an already validated storage response
-// into the account tasks.
-func (s *Syncer) processStorageResponse(res *storageResponse) {
- // Switch the subtask from pending to idle
- if res.subTask != nil {
- res.subTask.req = nil
- }
- batch := ethdb.HookedBatch{
- Batch: s.db.NewBatch(),
- OnPut: func(key []byte, value []byte) {
- s.storageBytes += common.StorageSize(len(key) + len(value))
- },
- }
- var (
- slots int
- oldStorageBytes = s.storageBytes
- )
- // Iterate over all the accounts and reconstruct their storage tries from the
- // delivered slots
- for i, account := range res.accounts {
- // If the account was not delivered, reschedule it
- if i >= len(res.hashes) {
- res.mainTask.stateTasks[account] = res.roots[i]
- continue
- }
- // State was delivered, if complete mark as not needed any more, otherwise
- // mark the account as needing healing
- for j, hash := range res.mainTask.res.hashes {
- if account != hash {
- continue
- }
- acc := res.mainTask.res.accounts[j]
-
- // If the packet contains multiple contract storage slots, all
- // but the last are surely complete. The last contract may be
- // chunked, so check it's continuation flag.
- if res.subTask == nil && res.mainTask.needState[j] && (i < len(res.hashes)-1 || !res.cont) {
- res.mainTask.needState[j] = false
- res.mainTask.pend--
- res.mainTask.stateCompleted[account] = struct{}{} // mark it as completed
- smallStorageGauge.Inc(1)
- }
- // If the last contract was chunked, mark it as needing healing
- // to avoid writing it out to disk prematurely.
- if res.subTask == nil && !res.mainTask.needHeal[j] && i == len(res.hashes)-1 && res.cont {
- res.mainTask.needHeal[j] = true
- }
- // If the last contract was chunked, we need to switch to large
- // contract handling mode
- if res.subTask == nil && i == len(res.hashes)-1 && res.cont {
- // If we haven't yet started a large-contract retrieval, create
- // the subtasks for it within the main account task
- if tasks, ok := res.mainTask.SubTasks[account]; !ok {
- var (
- keys = res.hashes[i]
- chunks = uint64(storageConcurrency)
- lastKey common.Hash
- )
- if len(keys) > 0 {
- lastKey = keys[len(keys)-1]
- }
- // If the number of slots remaining is low, decrease the
- // number of chunks. Somewhere on the order of 10-15K slots
- // fit into a packet of 500KB. A key/slot pair is maximum 64
- // bytes, so pessimistically maxRequestSize/64 = 8K.
- //
- // Chunk so that at least 2 packets are needed to fill a task.
- if estimate, err := estimateRemainingSlots(len(keys), lastKey); err == nil {
- if n := estimate / (2 * (maxRequestSize / 64)); n+1 < chunks {
- chunks = n + 1
- }
- log.Debug("Chunked large contract", "initiators", len(keys), "tail", lastKey, "remaining", estimate, "chunks", chunks)
- } else {
- log.Debug("Chunked large contract", "initiators", len(keys), "tail", lastKey, "chunks", chunks)
- }
- r := newHashRange(lastKey, chunks)
- if chunks == 1 {
- smallStorageGauge.Inc(1)
- } else {
- largeStorageGauge.Inc(1)
- }
- // Our first task is the one that was just filled by this response.
- batch := ethdb.HookedBatch{
- Batch: s.db.NewBatch(),
- OnPut: func(key []byte, value []byte) {
- s.storageBytes += common.StorageSize(len(key) + len(value))
- },
- }
- var tr genTrie
- if s.scheme == rawdb.HashScheme {
- tr = newHashTrie(batch)
- }
- if s.scheme == rawdb.PathScheme {
- // Keep the left boundary as it's the first range.
- tr = newPathTrie(account, false, s.db, batch)
- }
- tasks = append(tasks, &storageTask{
- Next: common.Hash{},
- Last: r.End(),
- root: acc.Root,
- genBatch: batch,
- genTrie: tr,
- })
- for r.Next() {
- batch := ethdb.HookedBatch{
- Batch: s.db.NewBatch(),
- OnPut: func(key []byte, value []byte) {
- s.storageBytes += common.StorageSize(len(key) + len(value))
- },
- }
- var tr genTrie
- if s.scheme == rawdb.HashScheme {
- tr = newHashTrie(batch)
- }
- if s.scheme == rawdb.PathScheme {
- tr = newPathTrie(account, true, s.db, batch)
- }
- tasks = append(tasks, &storageTask{
- Next: r.Start(),
- Last: r.End(),
- root: acc.Root,
- genBatch: batch,
- genTrie: tr,
- })
- }
- for _, task := range tasks {
- log.Debug("Created storage sync task", "account", account, "root", acc.Root, "from", task.Next, "last", task.Last)
- }
- res.mainTask.SubTasks[account] = tasks
-
- // Since we've just created the sub-tasks, this response
- // is surely for the first one (zero origin)
- res.subTask = tasks[0]
- }
- }
- // If we're in large contract delivery mode, forward the subtask
- if res.subTask != nil {
- // Ensure the response doesn't overflow into the subsequent task
- last := res.subTask.Last.Big()
- // Find the first overflowing key. While at it, mark res as complete
- // if we find the range to include or pass the 'last'
- index := sort.Search(len(res.hashes[i]), func(k int) bool {
- cmp := res.hashes[i][k].Big().Cmp(last)
- if cmp >= 0 {
- res.cont = false
- }
- return cmp > 0
- })
- if index >= 0 {
- // cut off excess
- res.hashes[i] = res.hashes[i][:index]
- res.slots[i] = res.slots[i][:index]
- }
- // Forward the relevant storage chunk (even if created just now)
- if res.cont {
- res.subTask.Next = incHash(res.hashes[i][len(res.hashes[i])-1])
- } else {
- res.subTask.done = true
- }
- }
- }
- // Iterate over all the complete contracts, reconstruct the trie nodes and
- // push them to disk. If the contract is chunked, the trie nodes will be
- // reconstructed later.
- slots += len(res.hashes[i])
-
- if i < len(res.hashes)-1 || res.subTask == nil {
- // no need to make local reassignment of account: this closure does not outlive the loop
- var tr genTrie
- if s.scheme == rawdb.HashScheme {
- tr = newHashTrie(batch)
- }
- if s.scheme == rawdb.PathScheme {
- // Keep the left boundary as it's complete
- tr = newPathTrie(account, false, s.db, batch)
- }
- for j := 0; j < len(res.hashes[i]); j++ {
- tr.update(res.hashes[i][j][:], res.slots[i][j])
- }
- tr.commit(true)
- }
- // Persist the received storage segments. These flat state maybe
- // outdated during the sync, but it can be fixed later during the
- // snapshot generation.
- for j := 0; j < len(res.hashes[i]); j++ {
- rawdb.WriteStorageSnapshot(batch, account, res.hashes[i][j], res.slots[i][j])
-
- // If we're storing large contracts, generate the trie nodes
- // on the fly to not trash the gluing points
- if i == len(res.hashes)-1 && res.subTask != nil {
- res.subTask.genTrie.update(res.hashes[i][j][:], res.slots[i][j])
- }
- }
- }
- // Large contracts could have generated new trie nodes, flush them to disk
- if res.subTask != nil {
- if res.subTask.done {
- root := res.subTask.genTrie.commit(res.subTask.Last == common.MaxHash)
- if err := res.subTask.genBatch.Write(); err != nil {
- log.Error("Failed to persist stack slots", "err", err)
- }
- res.subTask.genBatch.Reset()
-
- // If the chunk's root is an overflown but full delivery,
- // clear the heal request.
- accountHash := res.accounts[len(res.accounts)-1]
- if root == res.subTask.root && rawdb.HasTrieNode(s.db, accountHash, nil, root, s.scheme) {
- for i, account := range res.mainTask.res.hashes {
- if account == accountHash {
- res.mainTask.needHeal[i] = false
- skipStorageHealingGauge.Inc(1)
- }
- }
- }
- } else if res.subTask.genBatch.ValueSize() > batchSizeThreshold {
- res.subTask.genTrie.commit(false)
- if err := res.subTask.genBatch.Write(); err != nil {
- log.Error("Failed to persist stack slots", "err", err)
- }
- res.subTask.genBatch.Reset()
- }
- }
- // Flush anything written just now and update the stats
- if err := batch.Write(); err != nil {
- log.Crit("Failed to persist storage slots", "err", err)
- }
- s.storageSynced += uint64(slots)
-
- log.Debug("Persisted set of storage slots", "accounts", len(res.hashes), "slots", slots, "bytes", s.storageBytes-oldStorageBytes)
-
- // If this delivery completed the last pending task, forward the account task
- // to the next chunk
- if res.mainTask.pend == 0 {
- s.forwardAccountTask(res.mainTask)
- return
- }
- // Some accounts are still incomplete, leave as is for the storage and contract
- // task assigners to pick up and fill.
-}
-
-// forwardAccountTask takes a filled account task and persists anything available
-// into the database, after which it forwards the next account marker so that the
-// task's next chunk may be filled.
-func (s *Syncer) forwardAccountTask(task *accountTask) {
- // Remove any pending delivery
- res := task.res
- if res == nil {
- return // nothing to forward
- }
- task.res = nil
-
- // Persist the received account segments. These flat state maybe
- // outdated during the sync, but it can be fixed later during the
- // snapshot generation.
- oldAccountBytes := s.accountBytes
-
- batch := ethdb.HookedBatch{
- Batch: s.db.NewBatch(),
- OnPut: func(key []byte, value []byte) {
- s.accountBytes += common.StorageSize(len(key) + len(value))
- },
- }
- for i, hash := range res.hashes {
- if task.needCode[i] || task.needState[i] {
- break
- }
- slim := types.SlimAccountRLP(*res.accounts[i])
- rawdb.WriteAccountSnapshot(batch, hash, slim)
-
- if !task.needHeal[i] {
- // If the storage task is complete, drop it into the stack trie
- // to generate account trie nodes for it
- full, err := types.FullAccountRLP(slim) // TODO(karalabe): Slim parsing can be omitted
- if err != nil {
- panic(err) // Really shouldn't ever happen
- }
- task.genTrie.update(hash[:], full)
- } else {
- // If the storage task is incomplete, explicitly delete the corresponding
- // account item from the account trie to ensure that all nodes along the
- // path to the incomplete storage trie are cleaned up.
- if err := task.genTrie.delete(hash[:]); err != nil {
- panic(err) // Really shouldn't ever happen
- }
- }
- }
- // Flush anything written just now and update the stats
- if err := batch.Write(); err != nil {
- log.Crit("Failed to persist accounts", "err", err)
- }
- s.accountSynced += uint64(len(res.accounts))
-
- // Task filling persisted, push it the chunk marker forward to the first
- // account still missing data.
- for i, hash := range res.hashes {
- if task.needCode[i] || task.needState[i] {
- return
- }
- task.Next = incHash(hash)
-
- // Remove the completion flag once the account range is pushed
- // forward. The leftover accounts will be skipped in the next
- // cycle.
- delete(task.stateCompleted, hash)
- }
- // All accounts marked as complete, track if the entire task is done
- task.done = !res.cont
-
- // Error out if there is any leftover completion flag.
- if task.done && len(task.stateCompleted) != 0 {
- panic(fmt.Errorf("storage completion flags should be emptied, %d left", len(task.stateCompleted)))
- }
- // Stack trie could have generated trie nodes, push them to disk (we need to
- // flush after finalizing task.done. It's fine even if we crash and lose this
- // write as it will only cause more data to be downloaded during heal.
- if task.done {
- task.genTrie.commit(task.Last == common.MaxHash)
- if err := task.genBatch.Write(); err != nil {
- log.Error("Failed to persist stack account", "err", err)
- }
- task.genBatch.Reset()
- } else if task.genBatch.ValueSize() > batchSizeThreshold {
- task.genTrie.commit(false)
- if err := task.genBatch.Write(); err != nil {
- log.Error("Failed to persist stack account", "err", err)
- }
- task.genBatch.Reset()
- }
- log.Debug("Persisted range of accounts", "accounts", len(res.accounts), "bytes", s.accountBytes-oldAccountBytes)
-}
-
// OnAccounts is a callback method to invoke when a range of accounts are
// received from a remote peer.
func (s *Syncer) OnAccounts(peer SyncPeer, id uint64, hashes []common.Hash, accounts [][]byte, proof [][]byte) error {
diff --git a/eth/protocols/snap/sync_v1.go b/eth/protocols/snap/sync_v1.go
index 88bae04218..cd88d4194d 100644
--- a/eth/protocols/snap/sync_v1.go
+++ b/eth/protocols/snap/sync_v1.go
@@ -70,6 +70,9 @@ func (s *Syncer) registerV1() {
s.syncFn = s.syncV1
s.revertVersionRequests = s.revertHealRequests
s.onBytecodesAfterSync = s.onHealByteCodes
+ s.forwardAccountTask = s.forwardAccountTaskV1
+ s.registerVersionIdler = s.registerV1Idler
+ s.unregisterVersionIdler = s.unregisterV1Idler
// V1 specific state.
s.trienodeHealIdlers = make(map[string]struct{})
@@ -80,6 +83,20 @@ func (s *Syncer) registerV1() {
s.stateWriter = s.db.NewBatch()
}
+// registerV1Idler is the registerVersionIdler hook implementation for snap/1.
+// It marks the peer as idle for v1-specific heal idler buckets.
+func (s *Syncer) registerV1Idler(id string) {
+ s.trienodeHealIdlers[id] = struct{}{}
+ s.bytecodeHealIdlers[id] = struct{}{}
+}
+
+// unregisterV1Idler is the unregisterVersionIdler hook implementation for snap/1.
+// It removes the peer from v1-specific heal idler buckets.
+func (s *Syncer) unregisterV1Idler(id string) {
+ delete(s.trienodeHealIdlers, id)
+ delete(s.bytecodeHealIdlers, id)
+}
+
// trienodeHealRequest tracks a pending state trie request to ensure responses
// are to actual requests and to validate any security constraints.
//
@@ -942,6 +959,338 @@ func (s *Syncer) onHealState(paths [][]byte, value []byte) error {
return nil
}
+// processStorageResponseV1 integrates an already validated storage response
+// into the account tasks. Called only from syncV1's select loop.
+func (s *Syncer) processStorageResponseV1(res *storageResponse) {
+ // Switch the subtask from pending to idle
+ if res.subTask != nil {
+ res.subTask.req = nil
+ }
+ batch := ethdb.HookedBatch{
+ Batch: s.db.NewBatch(),
+ OnPut: func(key []byte, value []byte) {
+ s.storageBytes += common.StorageSize(len(key) + len(value))
+ },
+ }
+ var (
+ slots int
+ oldStorageBytes = s.storageBytes
+ )
+ // Iterate over all the accounts and reconstruct their storage tries from the
+ // delivered slots
+ for i, account := range res.accounts {
+ // If the account was not delivered, reschedule it
+ if i >= len(res.hashes) {
+ res.mainTask.stateTasks[account] = res.roots[i]
+ continue
+ }
+ // State was delivered, if complete mark as not needed any more, otherwise
+ // mark the account as needing healing
+ for j, hash := range res.mainTask.res.hashes {
+ if account != hash {
+ continue
+ }
+ acc := res.mainTask.res.accounts[j]
+
+ // If the packet contains multiple contract storage slots, all
+ // but the last are surely complete. The last contract may be
+ // chunked, so check it's continuation flag.
+ if res.subTask == nil && res.mainTask.needState[j] && (i < len(res.hashes)-1 || !res.cont) {
+ res.mainTask.needState[j] = false
+ res.mainTask.pend--
+ res.mainTask.stateCompleted[account] = struct{}{} // mark it as completed
+ smallStorageGauge.Inc(1)
+ }
+ // If the last contract was chunked, mark it as needing healing
+ // to avoid writing it out to disk prematurely.
+ if res.subTask == nil && !res.mainTask.needHeal[j] && i == len(res.hashes)-1 && res.cont {
+ res.mainTask.needHeal[j] = true
+ }
+ // If the last contract was chunked, we need to switch to large
+ // contract handling mode
+ if res.subTask == nil && i == len(res.hashes)-1 && res.cont {
+ // If we haven't yet started a large-contract retrieval, create
+ // the subtasks for it within the main account task
+ if tasks, ok := res.mainTask.SubTasks[account]; !ok {
+ var (
+ keys = res.hashes[i]
+ chunks = uint64(storageConcurrency)
+ lastKey common.Hash
+ )
+ if len(keys) > 0 {
+ lastKey = keys[len(keys)-1]
+ }
+ // If the number of slots remaining is low, decrease the
+ // number of chunks. Somewhere on the order of 10-15K slots
+ // fit into a packet of 500KB. A key/slot pair is maximum 64
+ // bytes, so pessimistically maxRequestSize/64 = 8K.
+ //
+ // Chunk so that at least 2 packets are needed to fill a task.
+ if estimate, err := estimateRemainingSlots(len(keys), lastKey); err == nil {
+ if n := estimate / (2 * (maxRequestSize / 64)); n+1 < chunks {
+ chunks = n + 1
+ }
+ log.Debug("Chunked large contract", "initiators", len(keys), "tail", lastKey, "remaining", estimate, "chunks", chunks)
+ } else {
+ log.Debug("Chunked large contract", "initiators", len(keys), "tail", lastKey, "chunks", chunks)
+ }
+ r := newHashRange(lastKey, chunks)
+ if chunks == 1 {
+ smallStorageGauge.Inc(1)
+ } else {
+ largeStorageGauge.Inc(1)
+ }
+ // Our first task is the one that was just filled by this response.
+ batch := ethdb.HookedBatch{
+ Batch: s.db.NewBatch(),
+ OnPut: func(key []byte, value []byte) {
+ s.storageBytes += common.StorageSize(len(key) + len(value))
+ },
+ }
+ var tr genTrie
+ if s.scheme == rawdb.HashScheme {
+ tr = newHashTrie(batch)
+ }
+ if s.scheme == rawdb.PathScheme {
+ // Keep the left boundary as it's the first range.
+ tr = newPathTrie(account, false, s.db, batch)
+ }
+ tasks = append(tasks, &storageTask{
+ Next: common.Hash{},
+ Last: r.End(),
+ root: acc.Root,
+ genBatch: batch,
+ genTrie: tr,
+ })
+ for r.Next() {
+ batch := ethdb.HookedBatch{
+ Batch: s.db.NewBatch(),
+ OnPut: func(key []byte, value []byte) {
+ s.storageBytes += common.StorageSize(len(key) + len(value))
+ },
+ }
+ var tr genTrie
+ if s.scheme == rawdb.HashScheme {
+ tr = newHashTrie(batch)
+ }
+ if s.scheme == rawdb.PathScheme {
+ tr = newPathTrie(account, true, s.db, batch)
+ }
+ tasks = append(tasks, &storageTask{
+ Next: r.Start(),
+ Last: r.End(),
+ root: acc.Root,
+ genBatch: batch,
+ genTrie: tr,
+ })
+ }
+ for _, task := range tasks {
+ log.Debug("Created storage sync task", "account", account, "root", acc.Root, "from", task.Next, "last", task.Last)
+ }
+ res.mainTask.SubTasks[account] = tasks
+
+ // Since we've just created the sub-tasks, this response
+ // is surely for the first one (zero origin)
+ res.subTask = tasks[0]
+ }
+ }
+ // If we're in large contract delivery mode, forward the subtask
+ if res.subTask != nil {
+ // Ensure the response doesn't overflow into the subsequent task
+ last := res.subTask.Last.Big()
+ // Find the first overflowing key. While at it, mark res as complete
+ // if we find the range to include or pass the 'last'
+ index := sort.Search(len(res.hashes[i]), func(k int) bool {
+ cmp := res.hashes[i][k].Big().Cmp(last)
+ if cmp >= 0 {
+ res.cont = false
+ }
+ return cmp > 0
+ })
+ if index >= 0 {
+ // cut off excess
+ res.hashes[i] = res.hashes[i][:index]
+ res.slots[i] = res.slots[i][:index]
+ }
+ // Forward the relevant storage chunk (even if created just now)
+ if res.cont {
+ res.subTask.Next = incHash(res.hashes[i][len(res.hashes[i])-1])
+ } else {
+ res.subTask.done = true
+ }
+ }
+ }
+ // Iterate over all the complete contracts, reconstruct the trie nodes and
+ // push them to disk. If the contract is chunked, the trie nodes will be
+ // reconstructed later.
+ slots += len(res.hashes[i])
+
+ if i < len(res.hashes)-1 || res.subTask == nil {
+ // no need to make local reassignment of account: this closure does not outlive the loop
+ var tr genTrie
+ if s.scheme == rawdb.HashScheme {
+ tr = newHashTrie(batch)
+ }
+ if s.scheme == rawdb.PathScheme {
+ // Keep the left boundary as it's complete
+ tr = newPathTrie(account, false, s.db, batch)
+ }
+ for j := 0; j < len(res.hashes[i]); j++ {
+ tr.update(res.hashes[i][j][:], res.slots[i][j])
+ }
+ tr.commit(true)
+ }
+ // Persist the received storage segments. These flat state maybe
+ // outdated during the sync, but it can be fixed later during the
+ // snapshot generation.
+ for j := 0; j < len(res.hashes[i]); j++ {
+ rawdb.WriteStorageSnapshot(batch, account, res.hashes[i][j], res.slots[i][j])
+
+ // If we're storing large contracts, generate the trie nodes
+ // on the fly to not trash the gluing points
+ if i == len(res.hashes)-1 && res.subTask != nil {
+ res.subTask.genTrie.update(res.hashes[i][j][:], res.slots[i][j])
+ }
+ }
+ }
+ // Large contracts could have generated new trie nodes, flush them to disk
+ if res.subTask != nil {
+ if res.subTask.done {
+ root := res.subTask.genTrie.commit(res.subTask.Last == common.MaxHash)
+ if err := res.subTask.genBatch.Write(); err != nil {
+ log.Error("Failed to persist stack slots", "err", err)
+ }
+ res.subTask.genBatch.Reset()
+
+ // If the chunk's root is an overflown but full delivery,
+ // clear the heal request.
+ accountHash := res.accounts[len(res.accounts)-1]
+ if root == res.subTask.root && rawdb.HasTrieNode(s.db, accountHash, nil, root, s.scheme) {
+ for i, account := range res.mainTask.res.hashes {
+ if account == accountHash {
+ res.mainTask.needHeal[i] = false
+ skipStorageHealingGauge.Inc(1)
+ }
+ }
+ }
+ } else if res.subTask.genBatch.ValueSize() > batchSizeThreshold {
+ res.subTask.genTrie.commit(false)
+ if err := res.subTask.genBatch.Write(); err != nil {
+ log.Error("Failed to persist stack slots", "err", err)
+ }
+ res.subTask.genBatch.Reset()
+ }
+ }
+ // Flush anything written just now and update the stats
+ if err := batch.Write(); err != nil {
+ log.Crit("Failed to persist storage slots", "err", err)
+ }
+ s.storageSynced += uint64(slots)
+
+ log.Debug("Persisted set of storage slots", "accounts", len(res.hashes), "slots", slots, "bytes", s.storageBytes-oldStorageBytes)
+
+ // If this delivery completed the last pending task, forward the account task
+ // to the next chunk
+ if res.mainTask.pend == 0 {
+ s.forwardAccountTask(res.mainTask)
+ return
+ }
+ // Some accounts are still incomplete, leave as is for the storage and contract
+ // task assigners to pick up and fill.
+}
+
+// forwardAccountTaskV1 takes a filled account task and persists anything available
+// into the database, after which it forwards the next account marker so that the
+// task's next chunk may be filled. Installed as Syncer.forwardAccountTask by
+// registerV1.
+func (s *Syncer) forwardAccountTaskV1(task *accountTask) {
+ // Remove any pending delivery
+ res := task.res
+ if res == nil {
+ return // nothing to forward
+ }
+ task.res = nil
+
+ // Persist the received account segments. These flat state maybe
+ // outdated during the sync, but it can be fixed later during the
+ // snapshot generation.
+ oldAccountBytes := s.accountBytes
+
+ batch := ethdb.HookedBatch{
+ Batch: s.db.NewBatch(),
+ OnPut: func(key []byte, value []byte) {
+ s.accountBytes += common.StorageSize(len(key) + len(value))
+ },
+ }
+ for i, hash := range res.hashes {
+ if task.needCode[i] || task.needState[i] {
+ break
+ }
+ slim := types.SlimAccountRLP(*res.accounts[i])
+ rawdb.WriteAccountSnapshot(batch, hash, slim)
+
+ if !task.needHeal[i] {
+ // If the storage task is complete, drop it into the stack trie
+ // to generate account trie nodes for it
+ full, err := types.FullAccountRLP(slim) // TODO(karalabe): Slim parsing can be omitted
+ if err != nil {
+ panic(err) // Really shouldn't ever happen
+ }
+ task.genTrie.update(hash[:], full)
+ } else {
+ // If the storage task is incomplete, explicitly delete the corresponding
+ // account item from the account trie to ensure that all nodes along the
+ // path to the incomplete storage trie are cleaned up.
+ if err := task.genTrie.delete(hash[:]); err != nil {
+ panic(err) // Really shouldn't ever happen
+ }
+ }
+ }
+ // Flush anything written just now and update the stats
+ if err := batch.Write(); err != nil {
+ log.Crit("Failed to persist accounts", "err", err)
+ }
+ s.accountSynced += uint64(len(res.accounts))
+
+ // Task filling persisted, push it the chunk marker forward to the first
+ // account still missing data.
+ for i, hash := range res.hashes {
+ if task.needCode[i] || task.needState[i] {
+ return
+ }
+ task.Next = incHash(hash)
+
+ // Remove the completion flag once the account range is pushed
+ // forward. The leftover accounts will be skipped in the next
+ // cycle.
+ delete(task.stateCompleted, hash)
+ }
+ // All accounts marked as complete, track if the entire task is done
+ task.done = !res.cont
+
+ // Error out if there is any leftover completion flag.
+ if task.done && len(task.stateCompleted) != 0 {
+ panic(fmt.Errorf("storage completion flags should be emptied, %d left", len(task.stateCompleted)))
+ }
+ // Stack trie could have generated trie nodes, push them to disk (we need to
+ // flush after finalizing task.done. It's fine even if we crash and lose this
+ // write as it will only cause more data to be downloaded during heal.
+ if task.done {
+ task.genTrie.commit(task.Last == common.MaxHash)
+ if err := task.genBatch.Write(); err != nil {
+ log.Error("Failed to persist stack account", "err", err)
+ }
+ task.genBatch.Reset()
+ } else if task.genBatch.ValueSize() > batchSizeThreshold {
+ task.genTrie.commit(false)
+ if err := task.genBatch.Write(); err != nil {
+ log.Error("Failed to persist stack account", "err", err)
+ }
+ task.genBatch.Reset()
+ }
+ log.Debug("Persisted range of accounts", "accounts", len(res.accounts), "bytes", s.accountBytes-oldAccountBytes)
+}
+
// reportV1 routes between the shared sync-phase logger and the v1 heal-phase
// logger based on whether account tasks are still pending. Called from syncV1's
// main loop.
@@ -1302,7 +1651,7 @@ func (s *Syncer) syncV1(root common.Hash, cancel chan struct{}) error {
case res := <-bytecodeResps:
s.processBytecodeResponse(res)
case res := <-storageResps:
- s.processStorageResponse(res)
+ s.processStorageResponseV1(res)
case res := <-trienodeHealResps:
s.processTrienodeHealResponse(res)
case res := <-bytecodeHealResps:
From e1809a1678a1878ed9f47f9da4d41e2e1cffa96a Mon Sep 17 00:00:00 2001
From: jonny rhea <5555162+jrhea@users.noreply.github.com>
Date: Wed, 13 May 2026 18:01:16 -0500
Subject: [PATCH 7/8] eth/protocols/snap: rename sync_test.go to
sync_v1_test.go
---
eth/protocols/snap/{sync_test.go => sync_v1_test.go} | 0
1 file changed, 0 insertions(+), 0 deletions(-)
rename eth/protocols/snap/{sync_test.go => sync_v1_test.go} (100%)
diff --git a/eth/protocols/snap/sync_test.go b/eth/protocols/snap/sync_v1_test.go
similarity index 100%
rename from eth/protocols/snap/sync_test.go
rename to eth/protocols/snap/sync_v1_test.go
From 83f32fbaa43190c0d3bcba252b06f1053290cc0a Mon Sep 17 00:00:00 2001
From: jonny rhea <5555162+jrhea@users.noreply.github.com>
Date: Wed, 13 May 2026 18:10:22 -0500
Subject: [PATCH 8/8] eth/protocols/snap: move maxTrieRequestCount and
batchSizeThreshold to sync_v1.go
---
eth/protocols/snap/sync.go | 9 ---------
eth/protocols/snap/sync_v1.go | 9 +++++++++
2 files changed, 9 insertions(+), 9 deletions(-)
diff --git a/eth/protocols/snap/sync.go b/eth/protocols/snap/sync.go
index 4a83e2d127..d1d8c968c5 100644
--- a/eth/protocols/snap/sync.go
+++ b/eth/protocols/snap/sync.go
@@ -61,15 +61,6 @@ const (
// size should be maxRequestSize / 24K. Assuming that most contracts do not
// come close to that, requesting 4x should be a good approximation.
maxCodeRequestCount = maxRequestSize / (24 * 1024) * 4
-
- // maxTrieRequestCount is the maximum number of trie node blobs to request in
- // a single query. If this number is too low, we're not filling responses fully
- // and waste round trip times. If it's too high, we're capping responses and
- // waste bandwidth.
- maxTrieRequestCount = maxRequestSize / 512
-
- // batchSizeThreshold is the maximum size allowed for gentrie batch.
- batchSizeThreshold = 8 * 1024 * 1024
)
var (
diff --git a/eth/protocols/snap/sync_v1.go b/eth/protocols/snap/sync_v1.go
index cd88d4194d..083ced453b 100644
--- a/eth/protocols/snap/sync_v1.go
+++ b/eth/protocols/snap/sync_v1.go
@@ -39,6 +39,15 @@ import (
)
const (
+ // maxTrieRequestCount is the maximum number of trie node blobs to request in
+ // a single query. If this number is too low, we're not filling responses fully
+ // and waste round trip times. If it's too high, we're capping responses and
+ // waste bandwidth.
+ maxTrieRequestCount = maxRequestSize / 512
+
+ // batchSizeThreshold is the maximum size allowed for gentrie batch.
+ batchSizeThreshold = 8 * 1024 * 1024
+
// trienodeHealRateMeasurementImpact is the impact a single measurement has on
// the local node's trienode processing capacity. A value closer to 0 reacts
// slower to sudden changes, but it is also more stable against temporary hiccups.