mirror of
https://github.com/ethereum/go-ethereum.git
synced 2026-05-24 16:59:26 +00:00
eth/protocols/snap: extract syncV1 main loop into sync_v1.go
This commit is contained in:
parent
0e9f9ff1c8
commit
2c19ece19a
2 changed files with 172 additions and 164 deletions
|
|
@ -30,7 +30,6 @@ import (
|
|||
"github.com/ethereum/go-ethereum/common"
|
||||
"github.com/ethereum/go-ethereum/common/math"
|
||||
"github.com/ethereum/go-ethereum/core/rawdb"
|
||||
"github.com/ethereum/go-ethereum/core/state"
|
||||
"github.com/ethereum/go-ethereum/core/types"
|
||||
"github.com/ethereum/go-ethereum/crypto"
|
||||
"github.com/ethereum/go-ethereum/ethdb"
|
||||
|
|
@ -521,169 +520,7 @@ func (s *Syncer) Unregister(id string) error {
|
|||
// Previously downloaded segments will not be redownloaded of fixed, rather any
|
||||
// errors will be healed after the leaves are fully accumulated.
|
||||
func (s *Syncer) Sync(root common.Hash, cancel chan struct{}) error {
|
||||
// Move the trie root from any previous value, revert stateless markers for
|
||||
// any peers and initialize the syncer if it was not yet run
|
||||
s.lock.Lock()
|
||||
s.root = root
|
||||
s.healer = &healTask{
|
||||
scheduler: state.NewStateSync(root, s.db, s.onHealState, s.scheme),
|
||||
trieTasks: make(map[string]common.Hash),
|
||||
codeTasks: make(map[common.Hash]struct{}),
|
||||
}
|
||||
s.statelessPeers = make(map[string]struct{})
|
||||
s.lock.Unlock()
|
||||
|
||||
if s.startTime.IsZero() {
|
||||
s.startTime = time.Now()
|
||||
}
|
||||
// Retrieve the previous sync status from LevelDB and abort if already synced
|
||||
s.loadSyncStatusV1()
|
||||
if len(s.tasks) == 0 && s.healer.scheduler.Pending() == 0 {
|
||||
log.Debug("Snapshot sync already completed")
|
||||
return nil
|
||||
}
|
||||
defer func() { // Persist any progress, independent of failure
|
||||
for _, task := range s.tasks {
|
||||
s.forwardAccountTask(task)
|
||||
}
|
||||
s.cleanAccountTasks()
|
||||
s.saveSyncStatusV1()
|
||||
}()
|
||||
|
||||
log.Debug("Starting snapshot sync cycle", "root", root)
|
||||
|
||||
// Flush out the last committed raw states
|
||||
defer func() {
|
||||
if s.stateWriter.ValueSize() > 0 {
|
||||
s.stateWriter.Write()
|
||||
s.stateWriter.Reset()
|
||||
}
|
||||
}()
|
||||
defer s.report(true)
|
||||
// commit any trie- and bytecode-healing data.
|
||||
defer s.commitHealer(true)
|
||||
|
||||
// Whether sync completed or not, disregard any future packets
|
||||
defer func() {
|
||||
log.Debug("Terminating snapshot sync cycle", "root", root)
|
||||
s.lock.Lock()
|
||||
s.accountReqs = make(map[uint64]*accountRequest)
|
||||
s.storageReqs = make(map[uint64]*storageRequest)
|
||||
s.bytecodeReqs = make(map[uint64]*bytecodeRequest)
|
||||
s.trienodeHealReqs = make(map[uint64]*trienodeHealRequest)
|
||||
s.bytecodeHealReqs = make(map[uint64]*bytecodeHealRequest)
|
||||
s.lock.Unlock()
|
||||
}()
|
||||
// Keep scheduling sync tasks
|
||||
peerJoin := make(chan string, 16)
|
||||
peerJoinSub := s.peerJoin.Subscribe(peerJoin)
|
||||
defer peerJoinSub.Unsubscribe()
|
||||
|
||||
peerDrop := make(chan string, 16)
|
||||
peerDropSub := s.peerDrop.Subscribe(peerDrop)
|
||||
defer peerDropSub.Unsubscribe()
|
||||
|
||||
// Create a set of unique channels for this sync cycle. We need these to be
|
||||
// ephemeral so a data race doesn't accidentally deliver something stale on
|
||||
// a persistent channel across syncs (yup, this happened)
|
||||
var (
|
||||
accountReqFails = make(chan *accountRequest)
|
||||
storageReqFails = make(chan *storageRequest)
|
||||
bytecodeReqFails = make(chan *bytecodeRequest)
|
||||
accountResps = make(chan *accountResponse)
|
||||
storageResps = make(chan *storageResponse)
|
||||
bytecodeResps = make(chan *bytecodeResponse)
|
||||
trienodeHealReqFails = make(chan *trienodeHealRequest)
|
||||
bytecodeHealReqFails = make(chan *bytecodeHealRequest)
|
||||
trienodeHealResps = make(chan *trienodeHealResponse)
|
||||
bytecodeHealResps = make(chan *bytecodeHealResponse)
|
||||
)
|
||||
for {
|
||||
// Remove all completed tasks and terminate sync if everything's done
|
||||
s.cleanStorageTasks()
|
||||
s.cleanAccountTasks()
|
||||
if len(s.tasks) == 0 && s.healer.scheduler.Pending() == 0 {
|
||||
// State healing phase completed, record the elapsed time in metrics.
|
||||
// Note: healing may be rerun in subsequent cycles to fill gaps between
|
||||
// pivot states (e.g., if chain sync takes longer).
|
||||
if !s.healStartTime.IsZero() {
|
||||
stateHealTimeGauge.Inc(int64(time.Since(s.healStartTime)))
|
||||
log.Info("State healing phase is completed", "elapsed", common.PrettyDuration(time.Since(s.healStartTime)))
|
||||
s.healStartTime = time.Time{}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
// Assign all the data retrieval tasks to any free peers
|
||||
s.assignAccountTasks(accountResps, accountReqFails, cancel)
|
||||
s.assignBytecodeTasks(bytecodeResps, bytecodeReqFails, cancel)
|
||||
s.assignStorageTasks(storageResps, storageReqFails, cancel)
|
||||
|
||||
if len(s.tasks) == 0 {
|
||||
// State sync phase completed, record the elapsed time in metrics.
|
||||
// Note: the initial state sync runs only once, regardless of whether
|
||||
// a new cycle is started later. Any state differences in subsequent
|
||||
// cycles will be handled by the state healer.
|
||||
s.syncTimeOnce.Do(func() {
|
||||
stateSyncTimeGauge.Update(int64(time.Since(s.startTime)))
|
||||
log.Info("State sync phase is completed", "elapsed", common.PrettyDuration(time.Since(s.startTime)))
|
||||
})
|
||||
if s.healStartTime.IsZero() {
|
||||
s.healStartTime = time.Now()
|
||||
}
|
||||
s.assignTrienodeHealTasks(trienodeHealResps, trienodeHealReqFails, cancel)
|
||||
s.assignBytecodeHealTasks(bytecodeHealResps, bytecodeHealReqFails, cancel)
|
||||
}
|
||||
// Update sync progress
|
||||
s.lock.Lock()
|
||||
s.extProgress = &SyncProgress{
|
||||
AccountSynced: s.accountSynced,
|
||||
AccountBytes: s.accountBytes,
|
||||
BytecodeSynced: s.bytecodeSynced,
|
||||
BytecodeBytes: s.bytecodeBytes,
|
||||
StorageSynced: s.storageSynced,
|
||||
StorageBytes: s.storageBytes,
|
||||
TrienodeHealSynced: s.trienodeHealSynced,
|
||||
TrienodeHealBytes: s.trienodeHealBytes,
|
||||
BytecodeHealSynced: s.bytecodeHealSynced,
|
||||
BytecodeHealBytes: s.bytecodeHealBytes,
|
||||
}
|
||||
s.lock.Unlock()
|
||||
// Wait for something to happen
|
||||
select {
|
||||
case <-s.update:
|
||||
// Something happened (new peer, delivery, timeout), recheck tasks
|
||||
case <-peerJoin:
|
||||
// A new peer joined, try to schedule it new tasks
|
||||
case id := <-peerDrop:
|
||||
s.revertRequests(id)
|
||||
case <-cancel:
|
||||
return ErrCancelled
|
||||
|
||||
case req := <-accountReqFails:
|
||||
s.revertAccountRequest(req)
|
||||
case req := <-bytecodeReqFails:
|
||||
s.revertBytecodeRequest(req)
|
||||
case req := <-storageReqFails:
|
||||
s.revertStorageRequest(req)
|
||||
case req := <-trienodeHealReqFails:
|
||||
s.revertTrienodeHealRequest(req)
|
||||
case req := <-bytecodeHealReqFails:
|
||||
s.revertBytecodeHealRequest(req)
|
||||
|
||||
case res := <-accountResps:
|
||||
s.processAccountResponse(res)
|
||||
case res := <-bytecodeResps:
|
||||
s.processBytecodeResponse(res)
|
||||
case res := <-storageResps:
|
||||
s.processStorageResponse(res)
|
||||
case res := <-trienodeHealResps:
|
||||
s.processTrienodeHealResponse(res)
|
||||
case res := <-bytecodeHealResps:
|
||||
s.processBytecodeHealResponse(res)
|
||||
}
|
||||
// Report stats if something meaningful happened
|
||||
s.report(false)
|
||||
}
|
||||
return s.syncV1(root, cancel)
|
||||
}
|
||||
|
||||
// Progress returns the snap sync status statistics.
|
||||
|
|
|
|||
|
|
@ -29,6 +29,7 @@ import (
|
|||
|
||||
"github.com/ethereum/go-ethereum/common"
|
||||
"github.com/ethereum/go-ethereum/core/rawdb"
|
||||
"github.com/ethereum/go-ethereum/core/state"
|
||||
"github.com/ethereum/go-ethereum/core/types"
|
||||
"github.com/ethereum/go-ethereum/crypto"
|
||||
"github.com/ethereum/go-ethereum/ethdb"
|
||||
|
|
@ -1062,3 +1063,173 @@ func (s *Syncer) saveSyncStatusV1() {
|
|||
}
|
||||
rawdb.WriteSnapshotSyncStatus(s.db, status)
|
||||
}
|
||||
|
||||
// syncV1 runs the snap/1 download-and-heal loop. State sync proceeds by
|
||||
// fetching account ranges, storage slots, bytecodes; once all account tasks
|
||||
// are complete, healing requests trie nodes and bytecodes to fix gaps left
|
||||
// by the incremental trie generation.
|
||||
func (s *Syncer) syncV1(root common.Hash, cancel chan struct{}) error {
|
||||
// Move the trie root from any previous value, revert stateless markers for
|
||||
// any peers and initialize the syncer if it was not yet run
|
||||
s.lock.Lock()
|
||||
s.root = root
|
||||
s.healer = &healTask{
|
||||
scheduler: state.NewStateSync(root, s.db, s.onHealState, s.scheme),
|
||||
trieTasks: make(map[string]common.Hash),
|
||||
codeTasks: make(map[common.Hash]struct{}),
|
||||
}
|
||||
s.statelessPeers = make(map[string]struct{})
|
||||
s.lock.Unlock()
|
||||
|
||||
if s.startTime.IsZero() {
|
||||
s.startTime = time.Now()
|
||||
}
|
||||
// Retrieve the previous sync status from LevelDB and abort if already synced
|
||||
s.loadSyncStatusV1()
|
||||
if len(s.tasks) == 0 && s.healer.scheduler.Pending() == 0 {
|
||||
log.Debug("Snapshot sync already completed")
|
||||
return nil
|
||||
}
|
||||
defer func() { // Persist any progress, independent of failure
|
||||
for _, task := range s.tasks {
|
||||
s.forwardAccountTask(task)
|
||||
}
|
||||
s.cleanAccountTasks()
|
||||
s.saveSyncStatusV1()
|
||||
}()
|
||||
|
||||
log.Debug("Starting snapshot sync cycle", "root", root)
|
||||
|
||||
// Flush out the last committed raw states
|
||||
defer func() {
|
||||
if s.stateWriter.ValueSize() > 0 {
|
||||
s.stateWriter.Write()
|
||||
s.stateWriter.Reset()
|
||||
}
|
||||
}()
|
||||
defer s.report(true)
|
||||
// commit any trie- and bytecode-healing data.
|
||||
defer s.commitHealer(true)
|
||||
|
||||
// Whether sync completed or not, disregard any future packets
|
||||
defer func() {
|
||||
log.Debug("Terminating snapshot sync cycle", "root", root)
|
||||
s.lock.Lock()
|
||||
s.accountReqs = make(map[uint64]*accountRequest)
|
||||
s.storageReqs = make(map[uint64]*storageRequest)
|
||||
s.bytecodeReqs = make(map[uint64]*bytecodeRequest)
|
||||
s.trienodeHealReqs = make(map[uint64]*trienodeHealRequest)
|
||||
s.bytecodeHealReqs = make(map[uint64]*bytecodeHealRequest)
|
||||
s.lock.Unlock()
|
||||
}()
|
||||
// Keep scheduling sync tasks
|
||||
peerJoin := make(chan string, 16)
|
||||
peerJoinSub := s.peerJoin.Subscribe(peerJoin)
|
||||
defer peerJoinSub.Unsubscribe()
|
||||
|
||||
peerDrop := make(chan string, 16)
|
||||
peerDropSub := s.peerDrop.Subscribe(peerDrop)
|
||||
defer peerDropSub.Unsubscribe()
|
||||
|
||||
// Create a set of unique channels for this sync cycle. We need these to be
|
||||
// ephemeral so a data race doesn't accidentally deliver something stale on
|
||||
// a persistent channel across syncs (yup, this happened)
|
||||
var (
|
||||
accountReqFails = make(chan *accountRequest)
|
||||
storageReqFails = make(chan *storageRequest)
|
||||
bytecodeReqFails = make(chan *bytecodeRequest)
|
||||
accountResps = make(chan *accountResponse)
|
||||
storageResps = make(chan *storageResponse)
|
||||
bytecodeResps = make(chan *bytecodeResponse)
|
||||
trienodeHealReqFails = make(chan *trienodeHealRequest)
|
||||
bytecodeHealReqFails = make(chan *bytecodeHealRequest)
|
||||
trienodeHealResps = make(chan *trienodeHealResponse)
|
||||
bytecodeHealResps = make(chan *bytecodeHealResponse)
|
||||
)
|
||||
for {
|
||||
// Remove all completed tasks and terminate sync if everything's done
|
||||
s.cleanStorageTasks()
|
||||
s.cleanAccountTasks()
|
||||
if len(s.tasks) == 0 && s.healer.scheduler.Pending() == 0 {
|
||||
// State healing phase completed, record the elapsed time in metrics.
|
||||
// Note: healing may be rerun in subsequent cycles to fill gaps between
|
||||
// pivot states (e.g., if chain sync takes longer).
|
||||
if !s.healStartTime.IsZero() {
|
||||
stateHealTimeGauge.Inc(int64(time.Since(s.healStartTime)))
|
||||
log.Info("State healing phase is completed", "elapsed", common.PrettyDuration(time.Since(s.healStartTime)))
|
||||
s.healStartTime = time.Time{}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
// Assign all the data retrieval tasks to any free peers
|
||||
s.assignAccountTasks(accountResps, accountReqFails, cancel)
|
||||
s.assignBytecodeTasks(bytecodeResps, bytecodeReqFails, cancel)
|
||||
s.assignStorageTasks(storageResps, storageReqFails, cancel)
|
||||
|
||||
if len(s.tasks) == 0 {
|
||||
// State sync phase completed, record the elapsed time in metrics.
|
||||
// Note: the initial state sync runs only once, regardless of whether
|
||||
// a new cycle is started later. Any state differences in subsequent
|
||||
// cycles will be handled by the state healer.
|
||||
s.syncTimeOnce.Do(func() {
|
||||
stateSyncTimeGauge.Update(int64(time.Since(s.startTime)))
|
||||
log.Info("State sync phase is completed", "elapsed", common.PrettyDuration(time.Since(s.startTime)))
|
||||
})
|
||||
if s.healStartTime.IsZero() {
|
||||
s.healStartTime = time.Now()
|
||||
}
|
||||
s.assignTrienodeHealTasks(trienodeHealResps, trienodeHealReqFails, cancel)
|
||||
s.assignBytecodeHealTasks(bytecodeHealResps, bytecodeHealReqFails, cancel)
|
||||
}
|
||||
// Update sync progress
|
||||
s.lock.Lock()
|
||||
s.extProgress = &SyncProgress{
|
||||
AccountSynced: s.accountSynced,
|
||||
AccountBytes: s.accountBytes,
|
||||
BytecodeSynced: s.bytecodeSynced,
|
||||
BytecodeBytes: s.bytecodeBytes,
|
||||
StorageSynced: s.storageSynced,
|
||||
StorageBytes: s.storageBytes,
|
||||
TrienodeHealSynced: s.trienodeHealSynced,
|
||||
TrienodeHealBytes: s.trienodeHealBytes,
|
||||
BytecodeHealSynced: s.bytecodeHealSynced,
|
||||
BytecodeHealBytes: s.bytecodeHealBytes,
|
||||
}
|
||||
s.lock.Unlock()
|
||||
// Wait for something to happen
|
||||
select {
|
||||
case <-s.update:
|
||||
// Something happened (new peer, delivery, timeout), recheck tasks
|
||||
case <-peerJoin:
|
||||
// A new peer joined, try to schedule it new tasks
|
||||
case id := <-peerDrop:
|
||||
s.revertRequests(id)
|
||||
case <-cancel:
|
||||
return ErrCancelled
|
||||
|
||||
case req := <-accountReqFails:
|
||||
s.revertAccountRequest(req)
|
||||
case req := <-bytecodeReqFails:
|
||||
s.revertBytecodeRequest(req)
|
||||
case req := <-storageReqFails:
|
||||
s.revertStorageRequest(req)
|
||||
case req := <-trienodeHealReqFails:
|
||||
s.revertTrienodeHealRequest(req)
|
||||
case req := <-bytecodeHealReqFails:
|
||||
s.revertBytecodeHealRequest(req)
|
||||
|
||||
case res := <-accountResps:
|
||||
s.processAccountResponse(res)
|
||||
case res := <-bytecodeResps:
|
||||
s.processBytecodeResponse(res)
|
||||
case res := <-storageResps:
|
||||
s.processStorageResponse(res)
|
||||
case res := <-trienodeHealResps:
|
||||
s.processTrienodeHealResponse(res)
|
||||
case res := <-bytecodeHealResps:
|
||||
s.processBytecodeHealResponse(res)
|
||||
}
|
||||
// Report stats if something meaningful happened
|
||||
s.report(false)
|
||||
}
|
||||
}
|
||||
|
|
|
|||
Loading…
Reference in a new issue