From 4e06edf98966d8a2b570a7f6991625fe75fb47c4 Mon Sep 17 00:00:00 2001 From: jonny rhea <5555162+jrhea@users.noreply.github.com> Date: Wed, 13 May 2026 17:55:14 -0500 Subject: [PATCH] eth/protocols/snap: route remaining v1 dispatch sites through registerV1 hooks --- eth/protocols/snap/sync.go | 348 +-------------------------------- eth/protocols/snap/sync_v1.go | 351 +++++++++++++++++++++++++++++++++- 2 files changed, 359 insertions(+), 340 deletions(-) diff --git a/eth/protocols/snap/sync.go b/eth/protocols/snap/sync.go index 1a95134658..4a83e2d127 100644 --- a/eth/protocols/snap/sync.go +++ b/eth/protocols/snap/sync.go @@ -339,7 +339,7 @@ type Syncer struct { root common.Hash // Current state trie root being synced tasks []*accountTask // Current account task set being synced - snapped bool // Flag to signal that snap phase is done + snapped bool // True once account-range download is complete. healer *healTask // Current state healing task being executed update chan struct{} // Notification channel for possible sync progression @@ -402,9 +402,12 @@ type Syncer struct { // Version-specific hooks installed by registerV1/V2. Each one exists // because shared code in sync.go needs to dispatch into version-specific // code without knowing which version is running. - syncFn func(root common.Hash, cancel chan struct{}) error - revertVersionRequests func(peer string) - onBytecodesAfterSync func(peer SyncPeer, id uint64, bytecodes [][]byte) error + syncFn func(root common.Hash, cancel chan struct{}) error + revertVersionRequests func(peer string) + onBytecodesAfterSync func(peer SyncPeer, id uint64, bytecodes [][]byte) error + forwardAccountTask func(task *accountTask) + registerVersionIdler func(id string) + unregisterVersionIdler func(id string) pend sync.WaitGroup // Tracks network request goroutines for graceful shutdown lock sync.RWMutex // Protects fields that can change outside of sync (peers, reqs, root) @@ -456,8 +459,7 @@ func (s *Syncer) Register(peer SyncPeer) error { s.accountIdlers[id] = struct{}{} s.storageIdlers[id] = struct{}{} s.bytecodeIdlers[id] = struct{}{} - s.trienodeHealIdlers[id] = struct{}{} - s.bytecodeHealIdlers[id] = struct{}{} + s.registerVersionIdler(id) s.lock.Unlock() // Notify any active syncs that a new peer can be assigned data @@ -484,8 +486,7 @@ func (s *Syncer) Unregister(id string) error { delete(s.accountIdlers, id) delete(s.storageIdlers, id) delete(s.bytecodeIdlers, id) - delete(s.trienodeHealIdlers, id) - delete(s.bytecodeHealIdlers, id) + s.unregisterVersionIdler(id) s.lock.Unlock() // Notify any active syncs that pending requests need to be reverted @@ -1276,337 +1277,6 @@ func (s *Syncer) processBytecodeResponse(res *bytecodeResponse) { // task assigners to pick up and fill. } -// processStorageResponse integrates an already validated storage response -// into the account tasks. -func (s *Syncer) processStorageResponse(res *storageResponse) { - // Switch the subtask from pending to idle - if res.subTask != nil { - res.subTask.req = nil - } - batch := ethdb.HookedBatch{ - Batch: s.db.NewBatch(), - OnPut: func(key []byte, value []byte) { - s.storageBytes += common.StorageSize(len(key) + len(value)) - }, - } - var ( - slots int - oldStorageBytes = s.storageBytes - ) - // Iterate over all the accounts and reconstruct their storage tries from the - // delivered slots - for i, account := range res.accounts { - // If the account was not delivered, reschedule it - if i >= len(res.hashes) { - res.mainTask.stateTasks[account] = res.roots[i] - continue - } - // State was delivered, if complete mark as not needed any more, otherwise - // mark the account as needing healing - for j, hash := range res.mainTask.res.hashes { - if account != hash { - continue - } - acc := res.mainTask.res.accounts[j] - - // If the packet contains multiple contract storage slots, all - // but the last are surely complete. The last contract may be - // chunked, so check it's continuation flag. - if res.subTask == nil && res.mainTask.needState[j] && (i < len(res.hashes)-1 || !res.cont) { - res.mainTask.needState[j] = false - res.mainTask.pend-- - res.mainTask.stateCompleted[account] = struct{}{} // mark it as completed - smallStorageGauge.Inc(1) - } - // If the last contract was chunked, mark it as needing healing - // to avoid writing it out to disk prematurely. - if res.subTask == nil && !res.mainTask.needHeal[j] && i == len(res.hashes)-1 && res.cont { - res.mainTask.needHeal[j] = true - } - // If the last contract was chunked, we need to switch to large - // contract handling mode - if res.subTask == nil && i == len(res.hashes)-1 && res.cont { - // If we haven't yet started a large-contract retrieval, create - // the subtasks for it within the main account task - if tasks, ok := res.mainTask.SubTasks[account]; !ok { - var ( - keys = res.hashes[i] - chunks = uint64(storageConcurrency) - lastKey common.Hash - ) - if len(keys) > 0 { - lastKey = keys[len(keys)-1] - } - // If the number of slots remaining is low, decrease the - // number of chunks. Somewhere on the order of 10-15K slots - // fit into a packet of 500KB. A key/slot pair is maximum 64 - // bytes, so pessimistically maxRequestSize/64 = 8K. - // - // Chunk so that at least 2 packets are needed to fill a task. - if estimate, err := estimateRemainingSlots(len(keys), lastKey); err == nil { - if n := estimate / (2 * (maxRequestSize / 64)); n+1 < chunks { - chunks = n + 1 - } - log.Debug("Chunked large contract", "initiators", len(keys), "tail", lastKey, "remaining", estimate, "chunks", chunks) - } else { - log.Debug("Chunked large contract", "initiators", len(keys), "tail", lastKey, "chunks", chunks) - } - r := newHashRange(lastKey, chunks) - if chunks == 1 { - smallStorageGauge.Inc(1) - } else { - largeStorageGauge.Inc(1) - } - // Our first task is the one that was just filled by this response. - batch := ethdb.HookedBatch{ - Batch: s.db.NewBatch(), - OnPut: func(key []byte, value []byte) { - s.storageBytes += common.StorageSize(len(key) + len(value)) - }, - } - var tr genTrie - if s.scheme == rawdb.HashScheme { - tr = newHashTrie(batch) - } - if s.scheme == rawdb.PathScheme { - // Keep the left boundary as it's the first range. - tr = newPathTrie(account, false, s.db, batch) - } - tasks = append(tasks, &storageTask{ - Next: common.Hash{}, - Last: r.End(), - root: acc.Root, - genBatch: batch, - genTrie: tr, - }) - for r.Next() { - batch := ethdb.HookedBatch{ - Batch: s.db.NewBatch(), - OnPut: func(key []byte, value []byte) { - s.storageBytes += common.StorageSize(len(key) + len(value)) - }, - } - var tr genTrie - if s.scheme == rawdb.HashScheme { - tr = newHashTrie(batch) - } - if s.scheme == rawdb.PathScheme { - tr = newPathTrie(account, true, s.db, batch) - } - tasks = append(tasks, &storageTask{ - Next: r.Start(), - Last: r.End(), - root: acc.Root, - genBatch: batch, - genTrie: tr, - }) - } - for _, task := range tasks { - log.Debug("Created storage sync task", "account", account, "root", acc.Root, "from", task.Next, "last", task.Last) - } - res.mainTask.SubTasks[account] = tasks - - // Since we've just created the sub-tasks, this response - // is surely for the first one (zero origin) - res.subTask = tasks[0] - } - } - // If we're in large contract delivery mode, forward the subtask - if res.subTask != nil { - // Ensure the response doesn't overflow into the subsequent task - last := res.subTask.Last.Big() - // Find the first overflowing key. While at it, mark res as complete - // if we find the range to include or pass the 'last' - index := sort.Search(len(res.hashes[i]), func(k int) bool { - cmp := res.hashes[i][k].Big().Cmp(last) - if cmp >= 0 { - res.cont = false - } - return cmp > 0 - }) - if index >= 0 { - // cut off excess - res.hashes[i] = res.hashes[i][:index] - res.slots[i] = res.slots[i][:index] - } - // Forward the relevant storage chunk (even if created just now) - if res.cont { - res.subTask.Next = incHash(res.hashes[i][len(res.hashes[i])-1]) - } else { - res.subTask.done = true - } - } - } - // Iterate over all the complete contracts, reconstruct the trie nodes and - // push them to disk. If the contract is chunked, the trie nodes will be - // reconstructed later. - slots += len(res.hashes[i]) - - if i < len(res.hashes)-1 || res.subTask == nil { - // no need to make local reassignment of account: this closure does not outlive the loop - var tr genTrie - if s.scheme == rawdb.HashScheme { - tr = newHashTrie(batch) - } - if s.scheme == rawdb.PathScheme { - // Keep the left boundary as it's complete - tr = newPathTrie(account, false, s.db, batch) - } - for j := 0; j < len(res.hashes[i]); j++ { - tr.update(res.hashes[i][j][:], res.slots[i][j]) - } - tr.commit(true) - } - // Persist the received storage segments. These flat state maybe - // outdated during the sync, but it can be fixed later during the - // snapshot generation. - for j := 0; j < len(res.hashes[i]); j++ { - rawdb.WriteStorageSnapshot(batch, account, res.hashes[i][j], res.slots[i][j]) - - // If we're storing large contracts, generate the trie nodes - // on the fly to not trash the gluing points - if i == len(res.hashes)-1 && res.subTask != nil { - res.subTask.genTrie.update(res.hashes[i][j][:], res.slots[i][j]) - } - } - } - // Large contracts could have generated new trie nodes, flush them to disk - if res.subTask != nil { - if res.subTask.done { - root := res.subTask.genTrie.commit(res.subTask.Last == common.MaxHash) - if err := res.subTask.genBatch.Write(); err != nil { - log.Error("Failed to persist stack slots", "err", err) - } - res.subTask.genBatch.Reset() - - // If the chunk's root is an overflown but full delivery, - // clear the heal request. - accountHash := res.accounts[len(res.accounts)-1] - if root == res.subTask.root && rawdb.HasTrieNode(s.db, accountHash, nil, root, s.scheme) { - for i, account := range res.mainTask.res.hashes { - if account == accountHash { - res.mainTask.needHeal[i] = false - skipStorageHealingGauge.Inc(1) - } - } - } - } else if res.subTask.genBatch.ValueSize() > batchSizeThreshold { - res.subTask.genTrie.commit(false) - if err := res.subTask.genBatch.Write(); err != nil { - log.Error("Failed to persist stack slots", "err", err) - } - res.subTask.genBatch.Reset() - } - } - // Flush anything written just now and update the stats - if err := batch.Write(); err != nil { - log.Crit("Failed to persist storage slots", "err", err) - } - s.storageSynced += uint64(slots) - - log.Debug("Persisted set of storage slots", "accounts", len(res.hashes), "slots", slots, "bytes", s.storageBytes-oldStorageBytes) - - // If this delivery completed the last pending task, forward the account task - // to the next chunk - if res.mainTask.pend == 0 { - s.forwardAccountTask(res.mainTask) - return - } - // Some accounts are still incomplete, leave as is for the storage and contract - // task assigners to pick up and fill. -} - -// forwardAccountTask takes a filled account task and persists anything available -// into the database, after which it forwards the next account marker so that the -// task's next chunk may be filled. -func (s *Syncer) forwardAccountTask(task *accountTask) { - // Remove any pending delivery - res := task.res - if res == nil { - return // nothing to forward - } - task.res = nil - - // Persist the received account segments. These flat state maybe - // outdated during the sync, but it can be fixed later during the - // snapshot generation. - oldAccountBytes := s.accountBytes - - batch := ethdb.HookedBatch{ - Batch: s.db.NewBatch(), - OnPut: func(key []byte, value []byte) { - s.accountBytes += common.StorageSize(len(key) + len(value)) - }, - } - for i, hash := range res.hashes { - if task.needCode[i] || task.needState[i] { - break - } - slim := types.SlimAccountRLP(*res.accounts[i]) - rawdb.WriteAccountSnapshot(batch, hash, slim) - - if !task.needHeal[i] { - // If the storage task is complete, drop it into the stack trie - // to generate account trie nodes for it - full, err := types.FullAccountRLP(slim) // TODO(karalabe): Slim parsing can be omitted - if err != nil { - panic(err) // Really shouldn't ever happen - } - task.genTrie.update(hash[:], full) - } else { - // If the storage task is incomplete, explicitly delete the corresponding - // account item from the account trie to ensure that all nodes along the - // path to the incomplete storage trie are cleaned up. - if err := task.genTrie.delete(hash[:]); err != nil { - panic(err) // Really shouldn't ever happen - } - } - } - // Flush anything written just now and update the stats - if err := batch.Write(); err != nil { - log.Crit("Failed to persist accounts", "err", err) - } - s.accountSynced += uint64(len(res.accounts)) - - // Task filling persisted, push it the chunk marker forward to the first - // account still missing data. - for i, hash := range res.hashes { - if task.needCode[i] || task.needState[i] { - return - } - task.Next = incHash(hash) - - // Remove the completion flag once the account range is pushed - // forward. The leftover accounts will be skipped in the next - // cycle. - delete(task.stateCompleted, hash) - } - // All accounts marked as complete, track if the entire task is done - task.done = !res.cont - - // Error out if there is any leftover completion flag. - if task.done && len(task.stateCompleted) != 0 { - panic(fmt.Errorf("storage completion flags should be emptied, %d left", len(task.stateCompleted))) - } - // Stack trie could have generated trie nodes, push them to disk (we need to - // flush after finalizing task.done. It's fine even if we crash and lose this - // write as it will only cause more data to be downloaded during heal. - if task.done { - task.genTrie.commit(task.Last == common.MaxHash) - if err := task.genBatch.Write(); err != nil { - log.Error("Failed to persist stack account", "err", err) - } - task.genBatch.Reset() - } else if task.genBatch.ValueSize() > batchSizeThreshold { - task.genTrie.commit(false) - if err := task.genBatch.Write(); err != nil { - log.Error("Failed to persist stack account", "err", err) - } - task.genBatch.Reset() - } - log.Debug("Persisted range of accounts", "accounts", len(res.accounts), "bytes", s.accountBytes-oldAccountBytes) -} - // OnAccounts is a callback method to invoke when a range of accounts are // received from a remote peer. func (s *Syncer) OnAccounts(peer SyncPeer, id uint64, hashes []common.Hash, accounts [][]byte, proof [][]byte) error { diff --git a/eth/protocols/snap/sync_v1.go b/eth/protocols/snap/sync_v1.go index 88bae04218..cd88d4194d 100644 --- a/eth/protocols/snap/sync_v1.go +++ b/eth/protocols/snap/sync_v1.go @@ -70,6 +70,9 @@ func (s *Syncer) registerV1() { s.syncFn = s.syncV1 s.revertVersionRequests = s.revertHealRequests s.onBytecodesAfterSync = s.onHealByteCodes + s.forwardAccountTask = s.forwardAccountTaskV1 + s.registerVersionIdler = s.registerV1Idler + s.unregisterVersionIdler = s.unregisterV1Idler // V1 specific state. s.trienodeHealIdlers = make(map[string]struct{}) @@ -80,6 +83,20 @@ func (s *Syncer) registerV1() { s.stateWriter = s.db.NewBatch() } +// registerV1Idler is the registerVersionIdler hook implementation for snap/1. +// It marks the peer as idle for v1-specific heal idler buckets. +func (s *Syncer) registerV1Idler(id string) { + s.trienodeHealIdlers[id] = struct{}{} + s.bytecodeHealIdlers[id] = struct{}{} +} + +// unregisterV1Idler is the unregisterVersionIdler hook implementation for snap/1. +// It removes the peer from v1-specific heal idler buckets. +func (s *Syncer) unregisterV1Idler(id string) { + delete(s.trienodeHealIdlers, id) + delete(s.bytecodeHealIdlers, id) +} + // trienodeHealRequest tracks a pending state trie request to ensure responses // are to actual requests and to validate any security constraints. // @@ -942,6 +959,338 @@ func (s *Syncer) onHealState(paths [][]byte, value []byte) error { return nil } +// processStorageResponseV1 integrates an already validated storage response +// into the account tasks. Called only from syncV1's select loop. +func (s *Syncer) processStorageResponseV1(res *storageResponse) { + // Switch the subtask from pending to idle + if res.subTask != nil { + res.subTask.req = nil + } + batch := ethdb.HookedBatch{ + Batch: s.db.NewBatch(), + OnPut: func(key []byte, value []byte) { + s.storageBytes += common.StorageSize(len(key) + len(value)) + }, + } + var ( + slots int + oldStorageBytes = s.storageBytes + ) + // Iterate over all the accounts and reconstruct their storage tries from the + // delivered slots + for i, account := range res.accounts { + // If the account was not delivered, reschedule it + if i >= len(res.hashes) { + res.mainTask.stateTasks[account] = res.roots[i] + continue + } + // State was delivered, if complete mark as not needed any more, otherwise + // mark the account as needing healing + for j, hash := range res.mainTask.res.hashes { + if account != hash { + continue + } + acc := res.mainTask.res.accounts[j] + + // If the packet contains multiple contract storage slots, all + // but the last are surely complete. The last contract may be + // chunked, so check it's continuation flag. + if res.subTask == nil && res.mainTask.needState[j] && (i < len(res.hashes)-1 || !res.cont) { + res.mainTask.needState[j] = false + res.mainTask.pend-- + res.mainTask.stateCompleted[account] = struct{}{} // mark it as completed + smallStorageGauge.Inc(1) + } + // If the last contract was chunked, mark it as needing healing + // to avoid writing it out to disk prematurely. + if res.subTask == nil && !res.mainTask.needHeal[j] && i == len(res.hashes)-1 && res.cont { + res.mainTask.needHeal[j] = true + } + // If the last contract was chunked, we need to switch to large + // contract handling mode + if res.subTask == nil && i == len(res.hashes)-1 && res.cont { + // If we haven't yet started a large-contract retrieval, create + // the subtasks for it within the main account task + if tasks, ok := res.mainTask.SubTasks[account]; !ok { + var ( + keys = res.hashes[i] + chunks = uint64(storageConcurrency) + lastKey common.Hash + ) + if len(keys) > 0 { + lastKey = keys[len(keys)-1] + } + // If the number of slots remaining is low, decrease the + // number of chunks. Somewhere on the order of 10-15K slots + // fit into a packet of 500KB. A key/slot pair is maximum 64 + // bytes, so pessimistically maxRequestSize/64 = 8K. + // + // Chunk so that at least 2 packets are needed to fill a task. + if estimate, err := estimateRemainingSlots(len(keys), lastKey); err == nil { + if n := estimate / (2 * (maxRequestSize / 64)); n+1 < chunks { + chunks = n + 1 + } + log.Debug("Chunked large contract", "initiators", len(keys), "tail", lastKey, "remaining", estimate, "chunks", chunks) + } else { + log.Debug("Chunked large contract", "initiators", len(keys), "tail", lastKey, "chunks", chunks) + } + r := newHashRange(lastKey, chunks) + if chunks == 1 { + smallStorageGauge.Inc(1) + } else { + largeStorageGauge.Inc(1) + } + // Our first task is the one that was just filled by this response. + batch := ethdb.HookedBatch{ + Batch: s.db.NewBatch(), + OnPut: func(key []byte, value []byte) { + s.storageBytes += common.StorageSize(len(key) + len(value)) + }, + } + var tr genTrie + if s.scheme == rawdb.HashScheme { + tr = newHashTrie(batch) + } + if s.scheme == rawdb.PathScheme { + // Keep the left boundary as it's the first range. + tr = newPathTrie(account, false, s.db, batch) + } + tasks = append(tasks, &storageTask{ + Next: common.Hash{}, + Last: r.End(), + root: acc.Root, + genBatch: batch, + genTrie: tr, + }) + for r.Next() { + batch := ethdb.HookedBatch{ + Batch: s.db.NewBatch(), + OnPut: func(key []byte, value []byte) { + s.storageBytes += common.StorageSize(len(key) + len(value)) + }, + } + var tr genTrie + if s.scheme == rawdb.HashScheme { + tr = newHashTrie(batch) + } + if s.scheme == rawdb.PathScheme { + tr = newPathTrie(account, true, s.db, batch) + } + tasks = append(tasks, &storageTask{ + Next: r.Start(), + Last: r.End(), + root: acc.Root, + genBatch: batch, + genTrie: tr, + }) + } + for _, task := range tasks { + log.Debug("Created storage sync task", "account", account, "root", acc.Root, "from", task.Next, "last", task.Last) + } + res.mainTask.SubTasks[account] = tasks + + // Since we've just created the sub-tasks, this response + // is surely for the first one (zero origin) + res.subTask = tasks[0] + } + } + // If we're in large contract delivery mode, forward the subtask + if res.subTask != nil { + // Ensure the response doesn't overflow into the subsequent task + last := res.subTask.Last.Big() + // Find the first overflowing key. While at it, mark res as complete + // if we find the range to include or pass the 'last' + index := sort.Search(len(res.hashes[i]), func(k int) bool { + cmp := res.hashes[i][k].Big().Cmp(last) + if cmp >= 0 { + res.cont = false + } + return cmp > 0 + }) + if index >= 0 { + // cut off excess + res.hashes[i] = res.hashes[i][:index] + res.slots[i] = res.slots[i][:index] + } + // Forward the relevant storage chunk (even if created just now) + if res.cont { + res.subTask.Next = incHash(res.hashes[i][len(res.hashes[i])-1]) + } else { + res.subTask.done = true + } + } + } + // Iterate over all the complete contracts, reconstruct the trie nodes and + // push them to disk. If the contract is chunked, the trie nodes will be + // reconstructed later. + slots += len(res.hashes[i]) + + if i < len(res.hashes)-1 || res.subTask == nil { + // no need to make local reassignment of account: this closure does not outlive the loop + var tr genTrie + if s.scheme == rawdb.HashScheme { + tr = newHashTrie(batch) + } + if s.scheme == rawdb.PathScheme { + // Keep the left boundary as it's complete + tr = newPathTrie(account, false, s.db, batch) + } + for j := 0; j < len(res.hashes[i]); j++ { + tr.update(res.hashes[i][j][:], res.slots[i][j]) + } + tr.commit(true) + } + // Persist the received storage segments. These flat state maybe + // outdated during the sync, but it can be fixed later during the + // snapshot generation. + for j := 0; j < len(res.hashes[i]); j++ { + rawdb.WriteStorageSnapshot(batch, account, res.hashes[i][j], res.slots[i][j]) + + // If we're storing large contracts, generate the trie nodes + // on the fly to not trash the gluing points + if i == len(res.hashes)-1 && res.subTask != nil { + res.subTask.genTrie.update(res.hashes[i][j][:], res.slots[i][j]) + } + } + } + // Large contracts could have generated new trie nodes, flush them to disk + if res.subTask != nil { + if res.subTask.done { + root := res.subTask.genTrie.commit(res.subTask.Last == common.MaxHash) + if err := res.subTask.genBatch.Write(); err != nil { + log.Error("Failed to persist stack slots", "err", err) + } + res.subTask.genBatch.Reset() + + // If the chunk's root is an overflown but full delivery, + // clear the heal request. + accountHash := res.accounts[len(res.accounts)-1] + if root == res.subTask.root && rawdb.HasTrieNode(s.db, accountHash, nil, root, s.scheme) { + for i, account := range res.mainTask.res.hashes { + if account == accountHash { + res.mainTask.needHeal[i] = false + skipStorageHealingGauge.Inc(1) + } + } + } + } else if res.subTask.genBatch.ValueSize() > batchSizeThreshold { + res.subTask.genTrie.commit(false) + if err := res.subTask.genBatch.Write(); err != nil { + log.Error("Failed to persist stack slots", "err", err) + } + res.subTask.genBatch.Reset() + } + } + // Flush anything written just now and update the stats + if err := batch.Write(); err != nil { + log.Crit("Failed to persist storage slots", "err", err) + } + s.storageSynced += uint64(slots) + + log.Debug("Persisted set of storage slots", "accounts", len(res.hashes), "slots", slots, "bytes", s.storageBytes-oldStorageBytes) + + // If this delivery completed the last pending task, forward the account task + // to the next chunk + if res.mainTask.pend == 0 { + s.forwardAccountTask(res.mainTask) + return + } + // Some accounts are still incomplete, leave as is for the storage and contract + // task assigners to pick up and fill. +} + +// forwardAccountTaskV1 takes a filled account task and persists anything available +// into the database, after which it forwards the next account marker so that the +// task's next chunk may be filled. Installed as Syncer.forwardAccountTask by +// registerV1. +func (s *Syncer) forwardAccountTaskV1(task *accountTask) { + // Remove any pending delivery + res := task.res + if res == nil { + return // nothing to forward + } + task.res = nil + + // Persist the received account segments. These flat state maybe + // outdated during the sync, but it can be fixed later during the + // snapshot generation. + oldAccountBytes := s.accountBytes + + batch := ethdb.HookedBatch{ + Batch: s.db.NewBatch(), + OnPut: func(key []byte, value []byte) { + s.accountBytes += common.StorageSize(len(key) + len(value)) + }, + } + for i, hash := range res.hashes { + if task.needCode[i] || task.needState[i] { + break + } + slim := types.SlimAccountRLP(*res.accounts[i]) + rawdb.WriteAccountSnapshot(batch, hash, slim) + + if !task.needHeal[i] { + // If the storage task is complete, drop it into the stack trie + // to generate account trie nodes for it + full, err := types.FullAccountRLP(slim) // TODO(karalabe): Slim parsing can be omitted + if err != nil { + panic(err) // Really shouldn't ever happen + } + task.genTrie.update(hash[:], full) + } else { + // If the storage task is incomplete, explicitly delete the corresponding + // account item from the account trie to ensure that all nodes along the + // path to the incomplete storage trie are cleaned up. + if err := task.genTrie.delete(hash[:]); err != nil { + panic(err) // Really shouldn't ever happen + } + } + } + // Flush anything written just now and update the stats + if err := batch.Write(); err != nil { + log.Crit("Failed to persist accounts", "err", err) + } + s.accountSynced += uint64(len(res.accounts)) + + // Task filling persisted, push it the chunk marker forward to the first + // account still missing data. + for i, hash := range res.hashes { + if task.needCode[i] || task.needState[i] { + return + } + task.Next = incHash(hash) + + // Remove the completion flag once the account range is pushed + // forward. The leftover accounts will be skipped in the next + // cycle. + delete(task.stateCompleted, hash) + } + // All accounts marked as complete, track if the entire task is done + task.done = !res.cont + + // Error out if there is any leftover completion flag. + if task.done && len(task.stateCompleted) != 0 { + panic(fmt.Errorf("storage completion flags should be emptied, %d left", len(task.stateCompleted))) + } + // Stack trie could have generated trie nodes, push them to disk (we need to + // flush after finalizing task.done. It's fine even if we crash and lose this + // write as it will only cause more data to be downloaded during heal. + if task.done { + task.genTrie.commit(task.Last == common.MaxHash) + if err := task.genBatch.Write(); err != nil { + log.Error("Failed to persist stack account", "err", err) + } + task.genBatch.Reset() + } else if task.genBatch.ValueSize() > batchSizeThreshold { + task.genTrie.commit(false) + if err := task.genBatch.Write(); err != nil { + log.Error("Failed to persist stack account", "err", err) + } + task.genBatch.Reset() + } + log.Debug("Persisted range of accounts", "accounts", len(res.accounts), "bytes", s.accountBytes-oldAccountBytes) +} + // reportV1 routes between the shared sync-phase logger and the v1 heal-phase // logger based on whether account tasks are still pending. Called from syncV1's // main loop. @@ -1302,7 +1651,7 @@ func (s *Syncer) syncV1(root common.Hash, cancel chan struct{}) error { case res := <-bytecodeResps: s.processBytecodeResponse(res) case res := <-storageResps: - s.processStorageResponse(res) + s.processStorageResponseV1(res) case res := <-trienodeHealResps: s.processTrienodeHealResponse(res) case res := <-bytecodeHealResps: