eth/protocols/snap: route remaining v1 dispatch sites through registerV1 hooks

This commit is contained in:
jonny rhea 2026-05-13 17:55:14 -05:00
parent b4484ede85
commit 4e06edf989
2 changed files with 359 additions and 340 deletions

View file

@ -339,7 +339,7 @@ type Syncer struct {
root common.Hash // Current state trie root being synced
tasks []*accountTask // Current account task set being synced
snapped bool // Flag to signal that snap phase is done
snapped bool // True once account-range download is complete.
healer *healTask // Current state healing task being executed
update chan struct{} // Notification channel for possible sync progression
@ -402,9 +402,12 @@ type Syncer struct {
// Version-specific hooks installed by registerV1/V2. Each one exists
// because shared code in sync.go needs to dispatch into version-specific
// code without knowing which version is running.
syncFn func(root common.Hash, cancel chan struct{}) error
revertVersionRequests func(peer string)
onBytecodesAfterSync func(peer SyncPeer, id uint64, bytecodes [][]byte) error
syncFn func(root common.Hash, cancel chan struct{}) error
revertVersionRequests func(peer string)
onBytecodesAfterSync func(peer SyncPeer, id uint64, bytecodes [][]byte) error
forwardAccountTask func(task *accountTask)
registerVersionIdler func(id string)
unregisterVersionIdler func(id string)
pend sync.WaitGroup // Tracks network request goroutines for graceful shutdown
lock sync.RWMutex // Protects fields that can change outside of sync (peers, reqs, root)
@ -456,8 +459,7 @@ func (s *Syncer) Register(peer SyncPeer) error {
s.accountIdlers[id] = struct{}{}
s.storageIdlers[id] = struct{}{}
s.bytecodeIdlers[id] = struct{}{}
s.trienodeHealIdlers[id] = struct{}{}
s.bytecodeHealIdlers[id] = struct{}{}
s.registerVersionIdler(id)
s.lock.Unlock()
// Notify any active syncs that a new peer can be assigned data
@ -484,8 +486,7 @@ func (s *Syncer) Unregister(id string) error {
delete(s.accountIdlers, id)
delete(s.storageIdlers, id)
delete(s.bytecodeIdlers, id)
delete(s.trienodeHealIdlers, id)
delete(s.bytecodeHealIdlers, id)
s.unregisterVersionIdler(id)
s.lock.Unlock()
// Notify any active syncs that pending requests need to be reverted
@ -1276,337 +1277,6 @@ func (s *Syncer) processBytecodeResponse(res *bytecodeResponse) {
// task assigners to pick up and fill.
}
// processStorageResponse integrates an already validated storage response
// into the account tasks.
func (s *Syncer) processStorageResponse(res *storageResponse) {
// Switch the subtask from pending to idle
if res.subTask != nil {
res.subTask.req = nil
}
batch := ethdb.HookedBatch{
Batch: s.db.NewBatch(),
OnPut: func(key []byte, value []byte) {
s.storageBytes += common.StorageSize(len(key) + len(value))
},
}
var (
slots int
oldStorageBytes = s.storageBytes
)
// Iterate over all the accounts and reconstruct their storage tries from the
// delivered slots
for i, account := range res.accounts {
// If the account was not delivered, reschedule it
if i >= len(res.hashes) {
res.mainTask.stateTasks[account] = res.roots[i]
continue
}
// State was delivered, if complete mark as not needed any more, otherwise
// mark the account as needing healing
for j, hash := range res.mainTask.res.hashes {
if account != hash {
continue
}
acc := res.mainTask.res.accounts[j]
// If the packet contains multiple contract storage slots, all
// but the last are surely complete. The last contract may be
// chunked, so check it's continuation flag.
if res.subTask == nil && res.mainTask.needState[j] && (i < len(res.hashes)-1 || !res.cont) {
res.mainTask.needState[j] = false
res.mainTask.pend--
res.mainTask.stateCompleted[account] = struct{}{} // mark it as completed
smallStorageGauge.Inc(1)
}
// If the last contract was chunked, mark it as needing healing
// to avoid writing it out to disk prematurely.
if res.subTask == nil && !res.mainTask.needHeal[j] && i == len(res.hashes)-1 && res.cont {
res.mainTask.needHeal[j] = true
}
// If the last contract was chunked, we need to switch to large
// contract handling mode
if res.subTask == nil && i == len(res.hashes)-1 && res.cont {
// If we haven't yet started a large-contract retrieval, create
// the subtasks for it within the main account task
if tasks, ok := res.mainTask.SubTasks[account]; !ok {
var (
keys = res.hashes[i]
chunks = uint64(storageConcurrency)
lastKey common.Hash
)
if len(keys) > 0 {
lastKey = keys[len(keys)-1]
}
// If the number of slots remaining is low, decrease the
// number of chunks. Somewhere on the order of 10-15K slots
// fit into a packet of 500KB. A key/slot pair is maximum 64
// bytes, so pessimistically maxRequestSize/64 = 8K.
//
// Chunk so that at least 2 packets are needed to fill a task.
if estimate, err := estimateRemainingSlots(len(keys), lastKey); err == nil {
if n := estimate / (2 * (maxRequestSize / 64)); n+1 < chunks {
chunks = n + 1
}
log.Debug("Chunked large contract", "initiators", len(keys), "tail", lastKey, "remaining", estimate, "chunks", chunks)
} else {
log.Debug("Chunked large contract", "initiators", len(keys), "tail", lastKey, "chunks", chunks)
}
r := newHashRange(lastKey, chunks)
if chunks == 1 {
smallStorageGauge.Inc(1)
} else {
largeStorageGauge.Inc(1)
}
// Our first task is the one that was just filled by this response.
batch := ethdb.HookedBatch{
Batch: s.db.NewBatch(),
OnPut: func(key []byte, value []byte) {
s.storageBytes += common.StorageSize(len(key) + len(value))
},
}
var tr genTrie
if s.scheme == rawdb.HashScheme {
tr = newHashTrie(batch)
}
if s.scheme == rawdb.PathScheme {
// Keep the left boundary as it's the first range.
tr = newPathTrie(account, false, s.db, batch)
}
tasks = append(tasks, &storageTask{
Next: common.Hash{},
Last: r.End(),
root: acc.Root,
genBatch: batch,
genTrie: tr,
})
for r.Next() {
batch := ethdb.HookedBatch{
Batch: s.db.NewBatch(),
OnPut: func(key []byte, value []byte) {
s.storageBytes += common.StorageSize(len(key) + len(value))
},
}
var tr genTrie
if s.scheme == rawdb.HashScheme {
tr = newHashTrie(batch)
}
if s.scheme == rawdb.PathScheme {
tr = newPathTrie(account, true, s.db, batch)
}
tasks = append(tasks, &storageTask{
Next: r.Start(),
Last: r.End(),
root: acc.Root,
genBatch: batch,
genTrie: tr,
})
}
for _, task := range tasks {
log.Debug("Created storage sync task", "account", account, "root", acc.Root, "from", task.Next, "last", task.Last)
}
res.mainTask.SubTasks[account] = tasks
// Since we've just created the sub-tasks, this response
// is surely for the first one (zero origin)
res.subTask = tasks[0]
}
}
// If we're in large contract delivery mode, forward the subtask
if res.subTask != nil {
// Ensure the response doesn't overflow into the subsequent task
last := res.subTask.Last.Big()
// Find the first overflowing key. While at it, mark res as complete
// if we find the range to include or pass the 'last'
index := sort.Search(len(res.hashes[i]), func(k int) bool {
cmp := res.hashes[i][k].Big().Cmp(last)
if cmp >= 0 {
res.cont = false
}
return cmp > 0
})
if index >= 0 {
// cut off excess
res.hashes[i] = res.hashes[i][:index]
res.slots[i] = res.slots[i][:index]
}
// Forward the relevant storage chunk (even if created just now)
if res.cont {
res.subTask.Next = incHash(res.hashes[i][len(res.hashes[i])-1])
} else {
res.subTask.done = true
}
}
}
// Iterate over all the complete contracts, reconstruct the trie nodes and
// push them to disk. If the contract is chunked, the trie nodes will be
// reconstructed later.
slots += len(res.hashes[i])
if i < len(res.hashes)-1 || res.subTask == nil {
// no need to make local reassignment of account: this closure does not outlive the loop
var tr genTrie
if s.scheme == rawdb.HashScheme {
tr = newHashTrie(batch)
}
if s.scheme == rawdb.PathScheme {
// Keep the left boundary as it's complete
tr = newPathTrie(account, false, s.db, batch)
}
for j := 0; j < len(res.hashes[i]); j++ {
tr.update(res.hashes[i][j][:], res.slots[i][j])
}
tr.commit(true)
}
// Persist the received storage segments. These flat state maybe
// outdated during the sync, but it can be fixed later during the
// snapshot generation.
for j := 0; j < len(res.hashes[i]); j++ {
rawdb.WriteStorageSnapshot(batch, account, res.hashes[i][j], res.slots[i][j])
// If we're storing large contracts, generate the trie nodes
// on the fly to not trash the gluing points
if i == len(res.hashes)-1 && res.subTask != nil {
res.subTask.genTrie.update(res.hashes[i][j][:], res.slots[i][j])
}
}
}
// Large contracts could have generated new trie nodes, flush them to disk
if res.subTask != nil {
if res.subTask.done {
root := res.subTask.genTrie.commit(res.subTask.Last == common.MaxHash)
if err := res.subTask.genBatch.Write(); err != nil {
log.Error("Failed to persist stack slots", "err", err)
}
res.subTask.genBatch.Reset()
// If the chunk's root is an overflown but full delivery,
// clear the heal request.
accountHash := res.accounts[len(res.accounts)-1]
if root == res.subTask.root && rawdb.HasTrieNode(s.db, accountHash, nil, root, s.scheme) {
for i, account := range res.mainTask.res.hashes {
if account == accountHash {
res.mainTask.needHeal[i] = false
skipStorageHealingGauge.Inc(1)
}
}
}
} else if res.subTask.genBatch.ValueSize() > batchSizeThreshold {
res.subTask.genTrie.commit(false)
if err := res.subTask.genBatch.Write(); err != nil {
log.Error("Failed to persist stack slots", "err", err)
}
res.subTask.genBatch.Reset()
}
}
// Flush anything written just now and update the stats
if err := batch.Write(); err != nil {
log.Crit("Failed to persist storage slots", "err", err)
}
s.storageSynced += uint64(slots)
log.Debug("Persisted set of storage slots", "accounts", len(res.hashes), "slots", slots, "bytes", s.storageBytes-oldStorageBytes)
// If this delivery completed the last pending task, forward the account task
// to the next chunk
if res.mainTask.pend == 0 {
s.forwardAccountTask(res.mainTask)
return
}
// Some accounts are still incomplete, leave as is for the storage and contract
// task assigners to pick up and fill.
}
// forwardAccountTask takes a filled account task and persists anything available
// into the database, after which it forwards the next account marker so that the
// task's next chunk may be filled.
func (s *Syncer) forwardAccountTask(task *accountTask) {
// Remove any pending delivery
res := task.res
if res == nil {
return // nothing to forward
}
task.res = nil
// Persist the received account segments. These flat state maybe
// outdated during the sync, but it can be fixed later during the
// snapshot generation.
oldAccountBytes := s.accountBytes
batch := ethdb.HookedBatch{
Batch: s.db.NewBatch(),
OnPut: func(key []byte, value []byte) {
s.accountBytes += common.StorageSize(len(key) + len(value))
},
}
for i, hash := range res.hashes {
if task.needCode[i] || task.needState[i] {
break
}
slim := types.SlimAccountRLP(*res.accounts[i])
rawdb.WriteAccountSnapshot(batch, hash, slim)
if !task.needHeal[i] {
// If the storage task is complete, drop it into the stack trie
// to generate account trie nodes for it
full, err := types.FullAccountRLP(slim) // TODO(karalabe): Slim parsing can be omitted
if err != nil {
panic(err) // Really shouldn't ever happen
}
task.genTrie.update(hash[:], full)
} else {
// If the storage task is incomplete, explicitly delete the corresponding
// account item from the account trie to ensure that all nodes along the
// path to the incomplete storage trie are cleaned up.
if err := task.genTrie.delete(hash[:]); err != nil {
panic(err) // Really shouldn't ever happen
}
}
}
// Flush anything written just now and update the stats
if err := batch.Write(); err != nil {
log.Crit("Failed to persist accounts", "err", err)
}
s.accountSynced += uint64(len(res.accounts))
// Task filling persisted, push it the chunk marker forward to the first
// account still missing data.
for i, hash := range res.hashes {
if task.needCode[i] || task.needState[i] {
return
}
task.Next = incHash(hash)
// Remove the completion flag once the account range is pushed
// forward. The leftover accounts will be skipped in the next
// cycle.
delete(task.stateCompleted, hash)
}
// All accounts marked as complete, track if the entire task is done
task.done = !res.cont
// Error out if there is any leftover completion flag.
if task.done && len(task.stateCompleted) != 0 {
panic(fmt.Errorf("storage completion flags should be emptied, %d left", len(task.stateCompleted)))
}
// Stack trie could have generated trie nodes, push them to disk (we need to
// flush after finalizing task.done. It's fine even if we crash and lose this
// write as it will only cause more data to be downloaded during heal.
if task.done {
task.genTrie.commit(task.Last == common.MaxHash)
if err := task.genBatch.Write(); err != nil {
log.Error("Failed to persist stack account", "err", err)
}
task.genBatch.Reset()
} else if task.genBatch.ValueSize() > batchSizeThreshold {
task.genTrie.commit(false)
if err := task.genBatch.Write(); err != nil {
log.Error("Failed to persist stack account", "err", err)
}
task.genBatch.Reset()
}
log.Debug("Persisted range of accounts", "accounts", len(res.accounts), "bytes", s.accountBytes-oldAccountBytes)
}
// OnAccounts is a callback method to invoke when a range of accounts are
// received from a remote peer.
func (s *Syncer) OnAccounts(peer SyncPeer, id uint64, hashes []common.Hash, accounts [][]byte, proof [][]byte) error {

View file

@ -70,6 +70,9 @@ func (s *Syncer) registerV1() {
s.syncFn = s.syncV1
s.revertVersionRequests = s.revertHealRequests
s.onBytecodesAfterSync = s.onHealByteCodes
s.forwardAccountTask = s.forwardAccountTaskV1
s.registerVersionIdler = s.registerV1Idler
s.unregisterVersionIdler = s.unregisterV1Idler
// V1 specific state.
s.trienodeHealIdlers = make(map[string]struct{})
@ -80,6 +83,20 @@ func (s *Syncer) registerV1() {
s.stateWriter = s.db.NewBatch()
}
// registerV1Idler is the registerVersionIdler hook implementation for snap/1.
// It marks the peer as idle for v1-specific heal idler buckets.
func (s *Syncer) registerV1Idler(id string) {
s.trienodeHealIdlers[id] = struct{}{}
s.bytecodeHealIdlers[id] = struct{}{}
}
// unregisterV1Idler is the unregisterVersionIdler hook implementation for snap/1.
// It removes the peer from v1-specific heal idler buckets.
func (s *Syncer) unregisterV1Idler(id string) {
delete(s.trienodeHealIdlers, id)
delete(s.bytecodeHealIdlers, id)
}
// trienodeHealRequest tracks a pending state trie request to ensure responses
// are to actual requests and to validate any security constraints.
//
@ -942,6 +959,338 @@ func (s *Syncer) onHealState(paths [][]byte, value []byte) error {
return nil
}
// processStorageResponseV1 integrates an already validated storage response
// into the account tasks. Called only from syncV1's select loop.
func (s *Syncer) processStorageResponseV1(res *storageResponse) {
// Switch the subtask from pending to idle
if res.subTask != nil {
res.subTask.req = nil
}
batch := ethdb.HookedBatch{
Batch: s.db.NewBatch(),
OnPut: func(key []byte, value []byte) {
s.storageBytes += common.StorageSize(len(key) + len(value))
},
}
var (
slots int
oldStorageBytes = s.storageBytes
)
// Iterate over all the accounts and reconstruct their storage tries from the
// delivered slots
for i, account := range res.accounts {
// If the account was not delivered, reschedule it
if i >= len(res.hashes) {
res.mainTask.stateTasks[account] = res.roots[i]
continue
}
// State was delivered, if complete mark as not needed any more, otherwise
// mark the account as needing healing
for j, hash := range res.mainTask.res.hashes {
if account != hash {
continue
}
acc := res.mainTask.res.accounts[j]
// If the packet contains multiple contract storage slots, all
// but the last are surely complete. The last contract may be
// chunked, so check it's continuation flag.
if res.subTask == nil && res.mainTask.needState[j] && (i < len(res.hashes)-1 || !res.cont) {
res.mainTask.needState[j] = false
res.mainTask.pend--
res.mainTask.stateCompleted[account] = struct{}{} // mark it as completed
smallStorageGauge.Inc(1)
}
// If the last contract was chunked, mark it as needing healing
// to avoid writing it out to disk prematurely.
if res.subTask == nil && !res.mainTask.needHeal[j] && i == len(res.hashes)-1 && res.cont {
res.mainTask.needHeal[j] = true
}
// If the last contract was chunked, we need to switch to large
// contract handling mode
if res.subTask == nil && i == len(res.hashes)-1 && res.cont {
// If we haven't yet started a large-contract retrieval, create
// the subtasks for it within the main account task
if tasks, ok := res.mainTask.SubTasks[account]; !ok {
var (
keys = res.hashes[i]
chunks = uint64(storageConcurrency)
lastKey common.Hash
)
if len(keys) > 0 {
lastKey = keys[len(keys)-1]
}
// If the number of slots remaining is low, decrease the
// number of chunks. Somewhere on the order of 10-15K slots
// fit into a packet of 500KB. A key/slot pair is maximum 64
// bytes, so pessimistically maxRequestSize/64 = 8K.
//
// Chunk so that at least 2 packets are needed to fill a task.
if estimate, err := estimateRemainingSlots(len(keys), lastKey); err == nil {
if n := estimate / (2 * (maxRequestSize / 64)); n+1 < chunks {
chunks = n + 1
}
log.Debug("Chunked large contract", "initiators", len(keys), "tail", lastKey, "remaining", estimate, "chunks", chunks)
} else {
log.Debug("Chunked large contract", "initiators", len(keys), "tail", lastKey, "chunks", chunks)
}
r := newHashRange(lastKey, chunks)
if chunks == 1 {
smallStorageGauge.Inc(1)
} else {
largeStorageGauge.Inc(1)
}
// Our first task is the one that was just filled by this response.
batch := ethdb.HookedBatch{
Batch: s.db.NewBatch(),
OnPut: func(key []byte, value []byte) {
s.storageBytes += common.StorageSize(len(key) + len(value))
},
}
var tr genTrie
if s.scheme == rawdb.HashScheme {
tr = newHashTrie(batch)
}
if s.scheme == rawdb.PathScheme {
// Keep the left boundary as it's the first range.
tr = newPathTrie(account, false, s.db, batch)
}
tasks = append(tasks, &storageTask{
Next: common.Hash{},
Last: r.End(),
root: acc.Root,
genBatch: batch,
genTrie: tr,
})
for r.Next() {
batch := ethdb.HookedBatch{
Batch: s.db.NewBatch(),
OnPut: func(key []byte, value []byte) {
s.storageBytes += common.StorageSize(len(key) + len(value))
},
}
var tr genTrie
if s.scheme == rawdb.HashScheme {
tr = newHashTrie(batch)
}
if s.scheme == rawdb.PathScheme {
tr = newPathTrie(account, true, s.db, batch)
}
tasks = append(tasks, &storageTask{
Next: r.Start(),
Last: r.End(),
root: acc.Root,
genBatch: batch,
genTrie: tr,
})
}
for _, task := range tasks {
log.Debug("Created storage sync task", "account", account, "root", acc.Root, "from", task.Next, "last", task.Last)
}
res.mainTask.SubTasks[account] = tasks
// Since we've just created the sub-tasks, this response
// is surely for the first one (zero origin)
res.subTask = tasks[0]
}
}
// If we're in large contract delivery mode, forward the subtask
if res.subTask != nil {
// Ensure the response doesn't overflow into the subsequent task
last := res.subTask.Last.Big()
// Find the first overflowing key. While at it, mark res as complete
// if we find the range to include or pass the 'last'
index := sort.Search(len(res.hashes[i]), func(k int) bool {
cmp := res.hashes[i][k].Big().Cmp(last)
if cmp >= 0 {
res.cont = false
}
return cmp > 0
})
if index >= 0 {
// cut off excess
res.hashes[i] = res.hashes[i][:index]
res.slots[i] = res.slots[i][:index]
}
// Forward the relevant storage chunk (even if created just now)
if res.cont {
res.subTask.Next = incHash(res.hashes[i][len(res.hashes[i])-1])
} else {
res.subTask.done = true
}
}
}
// Iterate over all the complete contracts, reconstruct the trie nodes and
// push them to disk. If the contract is chunked, the trie nodes will be
// reconstructed later.
slots += len(res.hashes[i])
if i < len(res.hashes)-1 || res.subTask == nil {
// no need to make local reassignment of account: this closure does not outlive the loop
var tr genTrie
if s.scheme == rawdb.HashScheme {
tr = newHashTrie(batch)
}
if s.scheme == rawdb.PathScheme {
// Keep the left boundary as it's complete
tr = newPathTrie(account, false, s.db, batch)
}
for j := 0; j < len(res.hashes[i]); j++ {
tr.update(res.hashes[i][j][:], res.slots[i][j])
}
tr.commit(true)
}
// Persist the received storage segments. These flat state maybe
// outdated during the sync, but it can be fixed later during the
// snapshot generation.
for j := 0; j < len(res.hashes[i]); j++ {
rawdb.WriteStorageSnapshot(batch, account, res.hashes[i][j], res.slots[i][j])
// If we're storing large contracts, generate the trie nodes
// on the fly to not trash the gluing points
if i == len(res.hashes)-1 && res.subTask != nil {
res.subTask.genTrie.update(res.hashes[i][j][:], res.slots[i][j])
}
}
}
// Large contracts could have generated new trie nodes, flush them to disk
if res.subTask != nil {
if res.subTask.done {
root := res.subTask.genTrie.commit(res.subTask.Last == common.MaxHash)
if err := res.subTask.genBatch.Write(); err != nil {
log.Error("Failed to persist stack slots", "err", err)
}
res.subTask.genBatch.Reset()
// If the chunk's root is an overflown but full delivery,
// clear the heal request.
accountHash := res.accounts[len(res.accounts)-1]
if root == res.subTask.root && rawdb.HasTrieNode(s.db, accountHash, nil, root, s.scheme) {
for i, account := range res.mainTask.res.hashes {
if account == accountHash {
res.mainTask.needHeal[i] = false
skipStorageHealingGauge.Inc(1)
}
}
}
} else if res.subTask.genBatch.ValueSize() > batchSizeThreshold {
res.subTask.genTrie.commit(false)
if err := res.subTask.genBatch.Write(); err != nil {
log.Error("Failed to persist stack slots", "err", err)
}
res.subTask.genBatch.Reset()
}
}
// Flush anything written just now and update the stats
if err := batch.Write(); err != nil {
log.Crit("Failed to persist storage slots", "err", err)
}
s.storageSynced += uint64(slots)
log.Debug("Persisted set of storage slots", "accounts", len(res.hashes), "slots", slots, "bytes", s.storageBytes-oldStorageBytes)
// If this delivery completed the last pending task, forward the account task
// to the next chunk
if res.mainTask.pend == 0 {
s.forwardAccountTask(res.mainTask)
return
}
// Some accounts are still incomplete, leave as is for the storage and contract
// task assigners to pick up and fill.
}
// forwardAccountTaskV1 takes a filled account task and persists anything available
// into the database, after which it forwards the next account marker so that the
// task's next chunk may be filled. Installed as Syncer.forwardAccountTask by
// registerV1.
func (s *Syncer) forwardAccountTaskV1(task *accountTask) {
// Remove any pending delivery
res := task.res
if res == nil {
return // nothing to forward
}
task.res = nil
// Persist the received account segments. These flat state maybe
// outdated during the sync, but it can be fixed later during the
// snapshot generation.
oldAccountBytes := s.accountBytes
batch := ethdb.HookedBatch{
Batch: s.db.NewBatch(),
OnPut: func(key []byte, value []byte) {
s.accountBytes += common.StorageSize(len(key) + len(value))
},
}
for i, hash := range res.hashes {
if task.needCode[i] || task.needState[i] {
break
}
slim := types.SlimAccountRLP(*res.accounts[i])
rawdb.WriteAccountSnapshot(batch, hash, slim)
if !task.needHeal[i] {
// If the storage task is complete, drop it into the stack trie
// to generate account trie nodes for it
full, err := types.FullAccountRLP(slim) // TODO(karalabe): Slim parsing can be omitted
if err != nil {
panic(err) // Really shouldn't ever happen
}
task.genTrie.update(hash[:], full)
} else {
// If the storage task is incomplete, explicitly delete the corresponding
// account item from the account trie to ensure that all nodes along the
// path to the incomplete storage trie are cleaned up.
if err := task.genTrie.delete(hash[:]); err != nil {
panic(err) // Really shouldn't ever happen
}
}
}
// Flush anything written just now and update the stats
if err := batch.Write(); err != nil {
log.Crit("Failed to persist accounts", "err", err)
}
s.accountSynced += uint64(len(res.accounts))
// Task filling persisted, push it the chunk marker forward to the first
// account still missing data.
for i, hash := range res.hashes {
if task.needCode[i] || task.needState[i] {
return
}
task.Next = incHash(hash)
// Remove the completion flag once the account range is pushed
// forward. The leftover accounts will be skipped in the next
// cycle.
delete(task.stateCompleted, hash)
}
// All accounts marked as complete, track if the entire task is done
task.done = !res.cont
// Error out if there is any leftover completion flag.
if task.done && len(task.stateCompleted) != 0 {
panic(fmt.Errorf("storage completion flags should be emptied, %d left", len(task.stateCompleted)))
}
// Stack trie could have generated trie nodes, push them to disk (we need to
// flush after finalizing task.done. It's fine even if we crash and lose this
// write as it will only cause more data to be downloaded during heal.
if task.done {
task.genTrie.commit(task.Last == common.MaxHash)
if err := task.genBatch.Write(); err != nil {
log.Error("Failed to persist stack account", "err", err)
}
task.genBatch.Reset()
} else if task.genBatch.ValueSize() > batchSizeThreshold {
task.genTrie.commit(false)
if err := task.genBatch.Write(); err != nil {
log.Error("Failed to persist stack account", "err", err)
}
task.genBatch.Reset()
}
log.Debug("Persisted range of accounts", "accounts", len(res.accounts), "bytes", s.accountBytes-oldAccountBytes)
}
// reportV1 routes between the shared sync-phase logger and the v1 heal-phase
// logger based on whether account tasks are still pending. Called from syncV1's
// main loop.
@ -1302,7 +1651,7 @@ func (s *Syncer) syncV1(root common.Hash, cancel chan struct{}) error {
case res := <-bytecodeResps:
s.processBytecodeResponse(res)
case res := <-storageResps:
s.processStorageResponse(res)
s.processStorageResponseV1(res)
case res := <-trienodeHealResps:
s.processTrienodeHealResponse(res)
case res := <-bytecodeHealResps: