diff --git a/eth/protocols/snap/sync.go b/eth/protocols/snap/sync.go index 8ab40ffc41..16d0a7e2f4 100644 --- a/eth/protocols/snap/sync.go +++ b/eth/protocols/snap/sync.go @@ -30,7 +30,6 @@ import ( "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/common/math" "github.com/ethereum/go-ethereum/core/rawdb" - "github.com/ethereum/go-ethereum/core/state" "github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/crypto" "github.com/ethereum/go-ethereum/ethdb" @@ -521,169 +520,7 @@ func (s *Syncer) Unregister(id string) error { // Previously downloaded segments will not be redownloaded of fixed, rather any // errors will be healed after the leaves are fully accumulated. func (s *Syncer) Sync(root common.Hash, cancel chan struct{}) error { - // Move the trie root from any previous value, revert stateless markers for - // any peers and initialize the syncer if it was not yet run - s.lock.Lock() - s.root = root - s.healer = &healTask{ - scheduler: state.NewStateSync(root, s.db, s.onHealState, s.scheme), - trieTasks: make(map[string]common.Hash), - codeTasks: make(map[common.Hash]struct{}), - } - s.statelessPeers = make(map[string]struct{}) - s.lock.Unlock() - - if s.startTime.IsZero() { - s.startTime = time.Now() - } - // Retrieve the previous sync status from LevelDB and abort if already synced - s.loadSyncStatusV1() - if len(s.tasks) == 0 && s.healer.scheduler.Pending() == 0 { - log.Debug("Snapshot sync already completed") - return nil - } - defer func() { // Persist any progress, independent of failure - for _, task := range s.tasks { - s.forwardAccountTask(task) - } - s.cleanAccountTasks() - s.saveSyncStatusV1() - }() - - log.Debug("Starting snapshot sync cycle", "root", root) - - // Flush out the last committed raw states - defer func() { - if s.stateWriter.ValueSize() > 0 { - s.stateWriter.Write() - s.stateWriter.Reset() - } - }() - defer s.report(true) - // commit any trie- and bytecode-healing data. - defer s.commitHealer(true) - - // Whether sync completed or not, disregard any future packets - defer func() { - log.Debug("Terminating snapshot sync cycle", "root", root) - s.lock.Lock() - s.accountReqs = make(map[uint64]*accountRequest) - s.storageReqs = make(map[uint64]*storageRequest) - s.bytecodeReqs = make(map[uint64]*bytecodeRequest) - s.trienodeHealReqs = make(map[uint64]*trienodeHealRequest) - s.bytecodeHealReqs = make(map[uint64]*bytecodeHealRequest) - s.lock.Unlock() - }() - // Keep scheduling sync tasks - peerJoin := make(chan string, 16) - peerJoinSub := s.peerJoin.Subscribe(peerJoin) - defer peerJoinSub.Unsubscribe() - - peerDrop := make(chan string, 16) - peerDropSub := s.peerDrop.Subscribe(peerDrop) - defer peerDropSub.Unsubscribe() - - // Create a set of unique channels for this sync cycle. We need these to be - // ephemeral so a data race doesn't accidentally deliver something stale on - // a persistent channel across syncs (yup, this happened) - var ( - accountReqFails = make(chan *accountRequest) - storageReqFails = make(chan *storageRequest) - bytecodeReqFails = make(chan *bytecodeRequest) - accountResps = make(chan *accountResponse) - storageResps = make(chan *storageResponse) - bytecodeResps = make(chan *bytecodeResponse) - trienodeHealReqFails = make(chan *trienodeHealRequest) - bytecodeHealReqFails = make(chan *bytecodeHealRequest) - trienodeHealResps = make(chan *trienodeHealResponse) - bytecodeHealResps = make(chan *bytecodeHealResponse) - ) - for { - // Remove all completed tasks and terminate sync if everything's done - s.cleanStorageTasks() - s.cleanAccountTasks() - if len(s.tasks) == 0 && s.healer.scheduler.Pending() == 0 { - // State healing phase completed, record the elapsed time in metrics. - // Note: healing may be rerun in subsequent cycles to fill gaps between - // pivot states (e.g., if chain sync takes longer). - if !s.healStartTime.IsZero() { - stateHealTimeGauge.Inc(int64(time.Since(s.healStartTime))) - log.Info("State healing phase is completed", "elapsed", common.PrettyDuration(time.Since(s.healStartTime))) - s.healStartTime = time.Time{} - } - return nil - } - // Assign all the data retrieval tasks to any free peers - s.assignAccountTasks(accountResps, accountReqFails, cancel) - s.assignBytecodeTasks(bytecodeResps, bytecodeReqFails, cancel) - s.assignStorageTasks(storageResps, storageReqFails, cancel) - - if len(s.tasks) == 0 { - // State sync phase completed, record the elapsed time in metrics. - // Note: the initial state sync runs only once, regardless of whether - // a new cycle is started later. Any state differences in subsequent - // cycles will be handled by the state healer. - s.syncTimeOnce.Do(func() { - stateSyncTimeGauge.Update(int64(time.Since(s.startTime))) - log.Info("State sync phase is completed", "elapsed", common.PrettyDuration(time.Since(s.startTime))) - }) - if s.healStartTime.IsZero() { - s.healStartTime = time.Now() - } - s.assignTrienodeHealTasks(trienodeHealResps, trienodeHealReqFails, cancel) - s.assignBytecodeHealTasks(bytecodeHealResps, bytecodeHealReqFails, cancel) - } - // Update sync progress - s.lock.Lock() - s.extProgress = &SyncProgress{ - AccountSynced: s.accountSynced, - AccountBytes: s.accountBytes, - BytecodeSynced: s.bytecodeSynced, - BytecodeBytes: s.bytecodeBytes, - StorageSynced: s.storageSynced, - StorageBytes: s.storageBytes, - TrienodeHealSynced: s.trienodeHealSynced, - TrienodeHealBytes: s.trienodeHealBytes, - BytecodeHealSynced: s.bytecodeHealSynced, - BytecodeHealBytes: s.bytecodeHealBytes, - } - s.lock.Unlock() - // Wait for something to happen - select { - case <-s.update: - // Something happened (new peer, delivery, timeout), recheck tasks - case <-peerJoin: - // A new peer joined, try to schedule it new tasks - case id := <-peerDrop: - s.revertRequests(id) - case <-cancel: - return ErrCancelled - - case req := <-accountReqFails: - s.revertAccountRequest(req) - case req := <-bytecodeReqFails: - s.revertBytecodeRequest(req) - case req := <-storageReqFails: - s.revertStorageRequest(req) - case req := <-trienodeHealReqFails: - s.revertTrienodeHealRequest(req) - case req := <-bytecodeHealReqFails: - s.revertBytecodeHealRequest(req) - - case res := <-accountResps: - s.processAccountResponse(res) - case res := <-bytecodeResps: - s.processBytecodeResponse(res) - case res := <-storageResps: - s.processStorageResponse(res) - case res := <-trienodeHealResps: - s.processTrienodeHealResponse(res) - case res := <-bytecodeHealResps: - s.processBytecodeHealResponse(res) - } - // Report stats if something meaningful happened - s.report(false) - } + return s.syncV1(root, cancel) } // Progress returns the snap sync status statistics. diff --git a/eth/protocols/snap/sync_v1.go b/eth/protocols/snap/sync_v1.go index 7f8fafb0d6..1754782651 100644 --- a/eth/protocols/snap/sync_v1.go +++ b/eth/protocols/snap/sync_v1.go @@ -29,6 +29,7 @@ import ( "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/core/rawdb" + "github.com/ethereum/go-ethereum/core/state" "github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/crypto" "github.com/ethereum/go-ethereum/ethdb" @@ -1062,3 +1063,173 @@ func (s *Syncer) saveSyncStatusV1() { } rawdb.WriteSnapshotSyncStatus(s.db, status) } + +// syncV1 runs the snap/1 download-and-heal loop. State sync proceeds by +// fetching account ranges, storage slots, bytecodes; once all account tasks +// are complete, healing requests trie nodes and bytecodes to fix gaps left +// by the incremental trie generation. +func (s *Syncer) syncV1(root common.Hash, cancel chan struct{}) error { + // Move the trie root from any previous value, revert stateless markers for + // any peers and initialize the syncer if it was not yet run + s.lock.Lock() + s.root = root + s.healer = &healTask{ + scheduler: state.NewStateSync(root, s.db, s.onHealState, s.scheme), + trieTasks: make(map[string]common.Hash), + codeTasks: make(map[common.Hash]struct{}), + } + s.statelessPeers = make(map[string]struct{}) + s.lock.Unlock() + + if s.startTime.IsZero() { + s.startTime = time.Now() + } + // Retrieve the previous sync status from LevelDB and abort if already synced + s.loadSyncStatusV1() + if len(s.tasks) == 0 && s.healer.scheduler.Pending() == 0 { + log.Debug("Snapshot sync already completed") + return nil + } + defer func() { // Persist any progress, independent of failure + for _, task := range s.tasks { + s.forwardAccountTask(task) + } + s.cleanAccountTasks() + s.saveSyncStatusV1() + }() + + log.Debug("Starting snapshot sync cycle", "root", root) + + // Flush out the last committed raw states + defer func() { + if s.stateWriter.ValueSize() > 0 { + s.stateWriter.Write() + s.stateWriter.Reset() + } + }() + defer s.report(true) + // commit any trie- and bytecode-healing data. + defer s.commitHealer(true) + + // Whether sync completed or not, disregard any future packets + defer func() { + log.Debug("Terminating snapshot sync cycle", "root", root) + s.lock.Lock() + s.accountReqs = make(map[uint64]*accountRequest) + s.storageReqs = make(map[uint64]*storageRequest) + s.bytecodeReqs = make(map[uint64]*bytecodeRequest) + s.trienodeHealReqs = make(map[uint64]*trienodeHealRequest) + s.bytecodeHealReqs = make(map[uint64]*bytecodeHealRequest) + s.lock.Unlock() + }() + // Keep scheduling sync tasks + peerJoin := make(chan string, 16) + peerJoinSub := s.peerJoin.Subscribe(peerJoin) + defer peerJoinSub.Unsubscribe() + + peerDrop := make(chan string, 16) + peerDropSub := s.peerDrop.Subscribe(peerDrop) + defer peerDropSub.Unsubscribe() + + // Create a set of unique channels for this sync cycle. We need these to be + // ephemeral so a data race doesn't accidentally deliver something stale on + // a persistent channel across syncs (yup, this happened) + var ( + accountReqFails = make(chan *accountRequest) + storageReqFails = make(chan *storageRequest) + bytecodeReqFails = make(chan *bytecodeRequest) + accountResps = make(chan *accountResponse) + storageResps = make(chan *storageResponse) + bytecodeResps = make(chan *bytecodeResponse) + trienodeHealReqFails = make(chan *trienodeHealRequest) + bytecodeHealReqFails = make(chan *bytecodeHealRequest) + trienodeHealResps = make(chan *trienodeHealResponse) + bytecodeHealResps = make(chan *bytecodeHealResponse) + ) + for { + // Remove all completed tasks and terminate sync if everything's done + s.cleanStorageTasks() + s.cleanAccountTasks() + if len(s.tasks) == 0 && s.healer.scheduler.Pending() == 0 { + // State healing phase completed, record the elapsed time in metrics. + // Note: healing may be rerun in subsequent cycles to fill gaps between + // pivot states (e.g., if chain sync takes longer). + if !s.healStartTime.IsZero() { + stateHealTimeGauge.Inc(int64(time.Since(s.healStartTime))) + log.Info("State healing phase is completed", "elapsed", common.PrettyDuration(time.Since(s.healStartTime))) + s.healStartTime = time.Time{} + } + return nil + } + // Assign all the data retrieval tasks to any free peers + s.assignAccountTasks(accountResps, accountReqFails, cancel) + s.assignBytecodeTasks(bytecodeResps, bytecodeReqFails, cancel) + s.assignStorageTasks(storageResps, storageReqFails, cancel) + + if len(s.tasks) == 0 { + // State sync phase completed, record the elapsed time in metrics. + // Note: the initial state sync runs only once, regardless of whether + // a new cycle is started later. Any state differences in subsequent + // cycles will be handled by the state healer. + s.syncTimeOnce.Do(func() { + stateSyncTimeGauge.Update(int64(time.Since(s.startTime))) + log.Info("State sync phase is completed", "elapsed", common.PrettyDuration(time.Since(s.startTime))) + }) + if s.healStartTime.IsZero() { + s.healStartTime = time.Now() + } + s.assignTrienodeHealTasks(trienodeHealResps, trienodeHealReqFails, cancel) + s.assignBytecodeHealTasks(bytecodeHealResps, bytecodeHealReqFails, cancel) + } + // Update sync progress + s.lock.Lock() + s.extProgress = &SyncProgress{ + AccountSynced: s.accountSynced, + AccountBytes: s.accountBytes, + BytecodeSynced: s.bytecodeSynced, + BytecodeBytes: s.bytecodeBytes, + StorageSynced: s.storageSynced, + StorageBytes: s.storageBytes, + TrienodeHealSynced: s.trienodeHealSynced, + TrienodeHealBytes: s.trienodeHealBytes, + BytecodeHealSynced: s.bytecodeHealSynced, + BytecodeHealBytes: s.bytecodeHealBytes, + } + s.lock.Unlock() + // Wait for something to happen + select { + case <-s.update: + // Something happened (new peer, delivery, timeout), recheck tasks + case <-peerJoin: + // A new peer joined, try to schedule it new tasks + case id := <-peerDrop: + s.revertRequests(id) + case <-cancel: + return ErrCancelled + + case req := <-accountReqFails: + s.revertAccountRequest(req) + case req := <-bytecodeReqFails: + s.revertBytecodeRequest(req) + case req := <-storageReqFails: + s.revertStorageRequest(req) + case req := <-trienodeHealReqFails: + s.revertTrienodeHealRequest(req) + case req := <-bytecodeHealReqFails: + s.revertBytecodeHealRequest(req) + + case res := <-accountResps: + s.processAccountResponse(res) + case res := <-bytecodeResps: + s.processBytecodeResponse(res) + case res := <-storageResps: + s.processStorageResponse(res) + case res := <-trienodeHealResps: + s.processTrienodeHealResponse(res) + case res := <-bytecodeHealResps: + s.processBytecodeHealResponse(res) + } + // Report stats if something meaningful happened + s.report(false) + } +}