eth/protocols/snap: install v1 dispatch hooks via registerV1

This commit is contained in:
jonny rhea 2026-05-13 17:03:04 -05:00
parent 2c19ece19a
commit b4484ede85
2 changed files with 97 additions and 65 deletions

View file

@ -68,29 +68,6 @@ const (
// waste bandwidth. // waste bandwidth.
maxTrieRequestCount = maxRequestSize / 512 maxTrieRequestCount = maxRequestSize / 512
// trienodeHealRateMeasurementImpact is the impact a single measurement has on
// the local node's trienode processing capacity. A value closer to 0 reacts
// slower to sudden changes, but it is also more stable against temporary hiccups.
trienodeHealRateMeasurementImpact = 0.005
// minTrienodeHealThrottle is the minimum divisor for throttling trie node
// heal requests to avoid overloading the local node and excessively expanding
// the state trie breadth wise.
minTrienodeHealThrottle = 1
// maxTrienodeHealThrottle is the maximum divisor for throttling trie node
// heal requests to avoid overloading the local node and exessively expanding
// the state trie bedth wise.
maxTrienodeHealThrottle = maxTrieRequestCount
// trienodeHealThrottleIncrease is the multiplier for the throttle when the
// rate of arriving data is higher than the rate of processing it.
trienodeHealThrottleIncrease = 1.33
// trienodeHealThrottleDecrease is the divisor for the throttle when the
// rate of arriving data is lower than the rate of processing it.
trienodeHealThrottleDecrease = 1.25
// batchSizeThreshold is the maximum size allowed for gentrie batch. // batchSizeThreshold is the maximum size allowed for gentrie batch.
batchSizeThreshold = 8 * 1024 * 1024 batchSizeThreshold = 8 * 1024 * 1024
) )
@ -422,6 +399,13 @@ type Syncer struct {
syncTimeOnce sync.Once // Ensure that the state sync time is uploaded only once syncTimeOnce sync.Once // Ensure that the state sync time is uploaded only once
logTime time.Time // Time instance when status was last reported logTime time.Time // Time instance when status was last reported
// Version-specific hooks installed by registerV1/V2. Each one exists
// because shared code in sync.go needs to dispatch into version-specific
// code without knowing which version is running.
syncFn func(root common.Hash, cancel chan struct{}) error
revertVersionRequests func(peer string)
onBytecodesAfterSync func(peer SyncPeer, id uint64, bytecodes [][]byte) error
pend sync.WaitGroup // Tracks network request goroutines for graceful shutdown pend sync.WaitGroup // Tracks network request goroutines for graceful shutdown
lock sync.RWMutex // Protects fields that can change outside of sync (peers, reqs, root) lock sync.RWMutex // Protects fields that can change outside of sync (peers, reqs, root)
} }
@ -429,7 +413,7 @@ type Syncer struct {
// NewSyncer creates a new snapshot syncer to download the Ethereum state over the // NewSyncer creates a new snapshot syncer to download the Ethereum state over the
// snap protocol. // snap protocol.
func NewSyncer(db ethdb.KeyValueStore, scheme string) *Syncer { func NewSyncer(db ethdb.KeyValueStore, scheme string) *Syncer {
return &Syncer{ s := &Syncer{
db: db, db: db,
scheme: scheme, scheme: scheme,
@ -447,16 +431,10 @@ func NewSyncer(db ethdb.KeyValueStore, scheme string) *Syncer {
storageReqs: make(map[uint64]*storageRequest), storageReqs: make(map[uint64]*storageRequest),
bytecodeReqs: make(map[uint64]*bytecodeRequest), bytecodeReqs: make(map[uint64]*bytecodeRequest),
trienodeHealIdlers: make(map[string]struct{}),
bytecodeHealIdlers: make(map[string]struct{}),
trienodeHealReqs: make(map[uint64]*trienodeHealRequest),
bytecodeHealReqs: make(map[uint64]*bytecodeHealRequest),
trienodeHealThrottle: maxTrienodeHealThrottle, // Tune downward instead of insta-filling with junk
stateWriter: db.NewBatch(),
extProgress: new(SyncProgress), extProgress: new(SyncProgress),
} }
s.registerV1()
return s
} }
// Register injects a new data source into the syncer's peerset. // Register injects a new data source into the syncer's peerset.
@ -520,7 +498,7 @@ func (s *Syncer) Unregister(id string) error {
// Previously downloaded segments will not be redownloaded of fixed, rather any // Previously downloaded segments will not be redownloaded of fixed, rather any
// errors will be healed after the leaves are fully accumulated. // errors will be healed after the leaves are fully accumulated.
func (s *Syncer) Sync(root common.Hash, cancel chan struct{}) error { func (s *Syncer) Sync(root common.Hash, cancel chan struct{}) error {
return s.syncV1(root, cancel) return s.syncFn(root, cancel)
} }
// Progress returns the snap sync status statistics. // Progress returns the snap sync status statistics.
@ -957,7 +935,7 @@ func (s *Syncer) assignStorageTasks(success chan *storageResponse, fail chan *st
// revertRequests locates all the currently pending requests from a particular // revertRequests locates all the currently pending requests from a particular
// peer and reverts them, rescheduling for others to fulfill. // peer and reverts them, rescheduling for others to fulfill.
func (s *Syncer) revertRequests(peer string) { func (s *Syncer) revertRequests(peer string) {
// Gather the requests first, revertals need the lock too // Gather the shared requests first, revertals need the lock too
s.lock.Lock() s.lock.Lock()
var accountReqs []*accountRequest var accountReqs []*accountRequest
for _, req := range s.accountReqs { for _, req := range s.accountReqs {
@ -977,21 +955,9 @@ func (s *Syncer) revertRequests(peer string) {
storageReqs = append(storageReqs, req) storageReqs = append(storageReqs, req)
} }
} }
var trienodeHealReqs []*trienodeHealRequest
for _, req := range s.trienodeHealReqs {
if req.peer == peer {
trienodeHealReqs = append(trienodeHealReqs, req)
}
}
var bytecodeHealReqs []*bytecodeHealRequest
for _, req := range s.bytecodeHealReqs {
if req.peer == peer {
bytecodeHealReqs = append(bytecodeHealReqs, req)
}
}
s.lock.Unlock() s.lock.Unlock()
// Revert all the requests matching the peer // Revert all the shared requests matching the peer
for _, req := range accountReqs { for _, req := range accountReqs {
s.revertAccountRequest(req) s.revertAccountRequest(req)
} }
@ -1001,12 +967,8 @@ func (s *Syncer) revertRequests(peer string) {
for _, req := range storageReqs { for _, req := range storageReqs {
s.revertStorageRequest(req) s.revertStorageRequest(req)
} }
for _, req := range trienodeHealReqs { // Version-specific request maps (heal for v1, access lists for v2).
s.revertTrienodeHealRequest(req) s.revertVersionRequests(peer)
}
for _, req := range bytecodeHealReqs {
s.revertBytecodeHealRequest(req)
}
} }
// scheduleRevertAccountRequest asks the event loop to clean up an account range // scheduleRevertAccountRequest asks the event loop to clean up an account range
@ -1755,7 +1717,7 @@ func (s *Syncer) OnByteCodes(peer SyncPeer, id uint64, bytecodes [][]byte) error
if syncing { if syncing {
return s.onByteCodes(peer, id, bytecodes) return s.onByteCodes(peer, id, bytecodes)
} }
return s.onHealByteCodes(peer, id, bytecodes) return s.onBytecodesAfterSync(peer, id, bytecodes)
} }
// onByteCodes is a callback method to invoke when a batch of contract // onByteCodes is a callback method to invoke when a batch of contract
@ -2008,15 +1970,6 @@ func (s *Syncer) OnStorage(peer SyncPeer, id uint64, hashes [][]common.Hash, slo
// hashSpace is the total size of the 256 bit hash space for accounts. // hashSpace is the total size of the 256 bit hash space for accounts.
var hashSpace = new(big.Int).Exp(common.Big2, common.Big256, nil) var hashSpace = new(big.Int).Exp(common.Big2, common.Big256, nil)
// report calculates various status reports and provides it to the user.
func (s *Syncer) report(force bool) {
if len(s.tasks) > 0 {
s.reportSyncProgress(force)
return
}
s.reportHealProgress(force)
}
// reportSyncProgress calculates various status reports and provides it to the user. // reportSyncProgress calculates various status reports and provides it to the user.
func (s *Syncer) reportSyncProgress(force bool) { func (s *Syncer) reportSyncProgress(force bool) {
// Don't report all the events, just occasionally // Don't report all the events, just occasionally

View file

@ -38,6 +38,48 @@ import (
"github.com/ethereum/go-ethereum/trie" "github.com/ethereum/go-ethereum/trie"
) )
const (
// trienodeHealRateMeasurementImpact is the impact a single measurement has on
// the local node's trienode processing capacity. A value closer to 0 reacts
// slower to sudden changes, but it is also more stable against temporary hiccups.
trienodeHealRateMeasurementImpact = 0.005
// minTrienodeHealThrottle is the minimum divisor for throttling trie node
// heal requests to avoid overloading the local node and excessively expanding
// the state trie breadth wise.
minTrienodeHealThrottle = 1
// maxTrienodeHealThrottle is the maximum divisor for throttling trie node
// heal requests to avoid overloading the local node and exessively expanding
// the state trie bedth wise.
maxTrienodeHealThrottle = maxTrieRequestCount
// trienodeHealThrottleIncrease is the multiplier for the throttle when the
// rate of arriving data is higher than the rate of processing it.
trienodeHealThrottleIncrease = 1.33
// trienodeHealThrottleDecrease is the divisor for the throttle when the
// rate of arriving data is lower than the rate of processing it.
trienodeHealThrottleDecrease = 1.25
)
// registerV1 wires the Syncer's version-specific hooks to the snap/1
// implementation and allocates snap/1 specific state on the Syncer.
func (s *Syncer) registerV1() {
// Dispatcher hooks used from shared code in sync.go.
s.syncFn = s.syncV1
s.revertVersionRequests = s.revertHealRequests
s.onBytecodesAfterSync = s.onHealByteCodes
// V1 specific state.
s.trienodeHealIdlers = make(map[string]struct{})
s.bytecodeHealIdlers = make(map[string]struct{})
s.trienodeHealReqs = make(map[uint64]*trienodeHealRequest)
s.bytecodeHealReqs = make(map[uint64]*bytecodeHealRequest)
s.trienodeHealThrottle = maxTrienodeHealThrottle
s.stateWriter = s.db.NewBatch()
}
// trienodeHealRequest tracks a pending state trie request to ensure responses // trienodeHealRequest tracks a pending state trie request to ensure responses
// are to actual requests and to validate any security constraints. // are to actual requests and to validate any security constraints.
// //
@ -520,6 +562,32 @@ func (s *Syncer) revertBytecodeHealRequest(req *bytecodeHealRequest) {
} }
} }
// revertHealRequests reverts in-flight trie-node and bytecode heal requests
// from the given peer. Installed as Syncer.revertVersionRequests by registerV1.
func (s *Syncer) revertHealRequests(peer string) {
s.lock.Lock()
var trienodeHealReqs []*trienodeHealRequest
for _, req := range s.trienodeHealReqs {
if req.peer == peer {
trienodeHealReqs = append(trienodeHealReqs, req)
}
}
var bytecodeHealReqs []*bytecodeHealRequest
for _, req := range s.bytecodeHealReqs {
if req.peer == peer {
bytecodeHealReqs = append(bytecodeHealReqs, req)
}
}
s.lock.Unlock()
for _, req := range trienodeHealReqs {
s.revertTrienodeHealRequest(req)
}
for _, req := range bytecodeHealReqs {
s.revertBytecodeHealRequest(req)
}
}
// processTrienodeHealResponse integrates an already validated trienode response // processTrienodeHealResponse integrates an already validated trienode response
// into the healer tasks. // into the healer tasks.
func (s *Syncer) processTrienodeHealResponse(res *trienodeHealResponse) { func (s *Syncer) processTrienodeHealResponse(res *trienodeHealResponse) {
@ -874,6 +942,17 @@ func (s *Syncer) onHealState(paths [][]byte, value []byte) error {
return nil return nil
} }
// reportV1 routes between the shared sync-phase logger and the v1 heal-phase
// logger based on whether account tasks are still pending. Called from syncV1's
// main loop.
func (s *Syncer) reportV1(force bool) {
if len(s.tasks) > 0 {
s.reportSyncProgress(force)
return
}
s.reportHealProgress(force)
}
// reportHealProgress calculates various status reports and provides it to the user. // reportHealProgress calculates various status reports and provides it to the user.
func (s *Syncer) reportHealProgress(force bool) { func (s *Syncer) reportHealProgress(force bool) {
// Don't report all the events, just occasionally // Don't report all the events, just occasionally
@ -1107,7 +1186,7 @@ func (s *Syncer) syncV1(root common.Hash, cancel chan struct{}) error {
s.stateWriter.Reset() s.stateWriter.Reset()
} }
}() }()
defer s.report(true) defer s.reportV1(true)
// commit any trie- and bytecode-healing data. // commit any trie- and bytecode-healing data.
defer s.commitHealer(true) defer s.commitHealer(true)
@ -1230,6 +1309,6 @@ func (s *Syncer) syncV1(root common.Hash, cancel chan struct{}) error {
s.processBytecodeHealResponse(res) s.processBytecodeHealResponse(res)
} }
// Report stats if something meaningful happened // Report stats if something meaningful happened
s.report(false) s.reportV1(false)
} }
} }