diff --git a/eth/protocols/snap/sync.go b/eth/protocols/snap/sync.go index 16d0a7e2f4..1a95134658 100644 --- a/eth/protocols/snap/sync.go +++ b/eth/protocols/snap/sync.go @@ -68,29 +68,6 @@ const ( // waste bandwidth. maxTrieRequestCount = maxRequestSize / 512 - // trienodeHealRateMeasurementImpact is the impact a single measurement has on - // the local node's trienode processing capacity. A value closer to 0 reacts - // slower to sudden changes, but it is also more stable against temporary hiccups. - trienodeHealRateMeasurementImpact = 0.005 - - // minTrienodeHealThrottle is the minimum divisor for throttling trie node - // heal requests to avoid overloading the local node and excessively expanding - // the state trie breadth wise. - minTrienodeHealThrottle = 1 - - // maxTrienodeHealThrottle is the maximum divisor for throttling trie node - // heal requests to avoid overloading the local node and exessively expanding - // the state trie bedth wise. - maxTrienodeHealThrottle = maxTrieRequestCount - - // trienodeHealThrottleIncrease is the multiplier for the throttle when the - // rate of arriving data is higher than the rate of processing it. - trienodeHealThrottleIncrease = 1.33 - - // trienodeHealThrottleDecrease is the divisor for the throttle when the - // rate of arriving data is lower than the rate of processing it. - trienodeHealThrottleDecrease = 1.25 - // batchSizeThreshold is the maximum size allowed for gentrie batch. batchSizeThreshold = 8 * 1024 * 1024 ) @@ -422,6 +399,13 @@ type Syncer struct { syncTimeOnce sync.Once // Ensure that the state sync time is uploaded only once logTime time.Time // Time instance when status was last reported + // Version-specific hooks installed by registerV1/V2. Each one exists + // because shared code in sync.go needs to dispatch into version-specific + // code without knowing which version is running. + syncFn func(root common.Hash, cancel chan struct{}) error + revertVersionRequests func(peer string) + onBytecodesAfterSync func(peer SyncPeer, id uint64, bytecodes [][]byte) error + pend sync.WaitGroup // Tracks network request goroutines for graceful shutdown lock sync.RWMutex // Protects fields that can change outside of sync (peers, reqs, root) } @@ -429,7 +413,7 @@ type Syncer struct { // NewSyncer creates a new snapshot syncer to download the Ethereum state over the // snap protocol. func NewSyncer(db ethdb.KeyValueStore, scheme string) *Syncer { - return &Syncer{ + s := &Syncer{ db: db, scheme: scheme, @@ -447,16 +431,10 @@ func NewSyncer(db ethdb.KeyValueStore, scheme string) *Syncer { storageReqs: make(map[uint64]*storageRequest), bytecodeReqs: make(map[uint64]*bytecodeRequest), - trienodeHealIdlers: make(map[string]struct{}), - bytecodeHealIdlers: make(map[string]struct{}), - - trienodeHealReqs: make(map[uint64]*trienodeHealRequest), - bytecodeHealReqs: make(map[uint64]*bytecodeHealRequest), - trienodeHealThrottle: maxTrienodeHealThrottle, // Tune downward instead of insta-filling with junk - stateWriter: db.NewBatch(), - extProgress: new(SyncProgress), } + s.registerV1() + return s } // Register injects a new data source into the syncer's peerset. @@ -520,7 +498,7 @@ func (s *Syncer) Unregister(id string) error { // Previously downloaded segments will not be redownloaded of fixed, rather any // errors will be healed after the leaves are fully accumulated. func (s *Syncer) Sync(root common.Hash, cancel chan struct{}) error { - return s.syncV1(root, cancel) + return s.syncFn(root, cancel) } // Progress returns the snap sync status statistics. @@ -957,7 +935,7 @@ func (s *Syncer) assignStorageTasks(success chan *storageResponse, fail chan *st // revertRequests locates all the currently pending requests from a particular // peer and reverts them, rescheduling for others to fulfill. func (s *Syncer) revertRequests(peer string) { - // Gather the requests first, revertals need the lock too + // Gather the shared requests first, revertals need the lock too s.lock.Lock() var accountReqs []*accountRequest for _, req := range s.accountReqs { @@ -977,21 +955,9 @@ func (s *Syncer) revertRequests(peer string) { storageReqs = append(storageReqs, req) } } - var trienodeHealReqs []*trienodeHealRequest - for _, req := range s.trienodeHealReqs { - if req.peer == peer { - trienodeHealReqs = append(trienodeHealReqs, req) - } - } - var bytecodeHealReqs []*bytecodeHealRequest - for _, req := range s.bytecodeHealReqs { - if req.peer == peer { - bytecodeHealReqs = append(bytecodeHealReqs, req) - } - } s.lock.Unlock() - // Revert all the requests matching the peer + // Revert all the shared requests matching the peer for _, req := range accountReqs { s.revertAccountRequest(req) } @@ -1001,12 +967,8 @@ func (s *Syncer) revertRequests(peer string) { for _, req := range storageReqs { s.revertStorageRequest(req) } - for _, req := range trienodeHealReqs { - s.revertTrienodeHealRequest(req) - } - for _, req := range bytecodeHealReqs { - s.revertBytecodeHealRequest(req) - } + // Version-specific request maps (heal for v1, access lists for v2). + s.revertVersionRequests(peer) } // scheduleRevertAccountRequest asks the event loop to clean up an account range @@ -1755,7 +1717,7 @@ func (s *Syncer) OnByteCodes(peer SyncPeer, id uint64, bytecodes [][]byte) error if syncing { return s.onByteCodes(peer, id, bytecodes) } - return s.onHealByteCodes(peer, id, bytecodes) + return s.onBytecodesAfterSync(peer, id, bytecodes) } // onByteCodes is a callback method to invoke when a batch of contract @@ -2008,15 +1970,6 @@ func (s *Syncer) OnStorage(peer SyncPeer, id uint64, hashes [][]common.Hash, slo // hashSpace is the total size of the 256 bit hash space for accounts. var hashSpace = new(big.Int).Exp(common.Big2, common.Big256, nil) -// report calculates various status reports and provides it to the user. -func (s *Syncer) report(force bool) { - if len(s.tasks) > 0 { - s.reportSyncProgress(force) - return - } - s.reportHealProgress(force) -} - // reportSyncProgress calculates various status reports and provides it to the user. func (s *Syncer) reportSyncProgress(force bool) { // Don't report all the events, just occasionally diff --git a/eth/protocols/snap/sync_v1.go b/eth/protocols/snap/sync_v1.go index 1754782651..88bae04218 100644 --- a/eth/protocols/snap/sync_v1.go +++ b/eth/protocols/snap/sync_v1.go @@ -38,6 +38,48 @@ import ( "github.com/ethereum/go-ethereum/trie" ) +const ( + // trienodeHealRateMeasurementImpact is the impact a single measurement has on + // the local node's trienode processing capacity. A value closer to 0 reacts + // slower to sudden changes, but it is also more stable against temporary hiccups. + trienodeHealRateMeasurementImpact = 0.005 + + // minTrienodeHealThrottle is the minimum divisor for throttling trie node + // heal requests to avoid overloading the local node and excessively expanding + // the state trie breadth wise. + minTrienodeHealThrottle = 1 + + // maxTrienodeHealThrottle is the maximum divisor for throttling trie node + // heal requests to avoid overloading the local node and exessively expanding + // the state trie bedth wise. + maxTrienodeHealThrottle = maxTrieRequestCount + + // trienodeHealThrottleIncrease is the multiplier for the throttle when the + // rate of arriving data is higher than the rate of processing it. + trienodeHealThrottleIncrease = 1.33 + + // trienodeHealThrottleDecrease is the divisor for the throttle when the + // rate of arriving data is lower than the rate of processing it. + trienodeHealThrottleDecrease = 1.25 +) + +// registerV1 wires the Syncer's version-specific hooks to the snap/1 +// implementation and allocates snap/1 specific state on the Syncer. +func (s *Syncer) registerV1() { + // Dispatcher hooks used from shared code in sync.go. + s.syncFn = s.syncV1 + s.revertVersionRequests = s.revertHealRequests + s.onBytecodesAfterSync = s.onHealByteCodes + + // V1 specific state. + s.trienodeHealIdlers = make(map[string]struct{}) + s.bytecodeHealIdlers = make(map[string]struct{}) + s.trienodeHealReqs = make(map[uint64]*trienodeHealRequest) + s.bytecodeHealReqs = make(map[uint64]*bytecodeHealRequest) + s.trienodeHealThrottle = maxTrienodeHealThrottle + s.stateWriter = s.db.NewBatch() +} + // trienodeHealRequest tracks a pending state trie request to ensure responses // are to actual requests and to validate any security constraints. // @@ -520,6 +562,32 @@ func (s *Syncer) revertBytecodeHealRequest(req *bytecodeHealRequest) { } } +// revertHealRequests reverts in-flight trie-node and bytecode heal requests +// from the given peer. Installed as Syncer.revertVersionRequests by registerV1. +func (s *Syncer) revertHealRequests(peer string) { + s.lock.Lock() + var trienodeHealReqs []*trienodeHealRequest + for _, req := range s.trienodeHealReqs { + if req.peer == peer { + trienodeHealReqs = append(trienodeHealReqs, req) + } + } + var bytecodeHealReqs []*bytecodeHealRequest + for _, req := range s.bytecodeHealReqs { + if req.peer == peer { + bytecodeHealReqs = append(bytecodeHealReqs, req) + } + } + s.lock.Unlock() + + for _, req := range trienodeHealReqs { + s.revertTrienodeHealRequest(req) + } + for _, req := range bytecodeHealReqs { + s.revertBytecodeHealRequest(req) + } +} + // processTrienodeHealResponse integrates an already validated trienode response // into the healer tasks. func (s *Syncer) processTrienodeHealResponse(res *trienodeHealResponse) { @@ -874,6 +942,17 @@ func (s *Syncer) onHealState(paths [][]byte, value []byte) error { return nil } +// reportV1 routes between the shared sync-phase logger and the v1 heal-phase +// logger based on whether account tasks are still pending. Called from syncV1's +// main loop. +func (s *Syncer) reportV1(force bool) { + if len(s.tasks) > 0 { + s.reportSyncProgress(force) + return + } + s.reportHealProgress(force) +} + // reportHealProgress calculates various status reports and provides it to the user. func (s *Syncer) reportHealProgress(force bool) { // Don't report all the events, just occasionally @@ -1107,7 +1186,7 @@ func (s *Syncer) syncV1(root common.Hash, cancel chan struct{}) error { s.stateWriter.Reset() } }() - defer s.report(true) + defer s.reportV1(true) // commit any trie- and bytecode-healing data. defer s.commitHealer(true) @@ -1230,6 +1309,6 @@ func (s *Syncer) syncV1(root common.Hash, cancel chan struct{}) error { s.processBytecodeHealResponse(res) } // Report stats if something meaningful happened - s.report(false) + s.reportV1(false) } }