go-ethereum/eth/downloader/beaconsync.go
CPerezz cdb4d77819
core, eth: fix end-to-end partial state sync pipeline
Fix several interacting issues that prevented partial state nodes from
syncing and following the chain on bal-devnet-2:

1. Stale pivot deadlock: Replace unconditional pivot suppression with
   rate-limited advances (2-minute cooldown). This prevents the restart
   loop bug while allowing recovery when the initial pivot is too stale
   for peers to serve.

2. Storage root resolution: Add snap-based resolver that queries peers
   for untracked contracts' storage roots during BAL processing. This
   lets the computed state root converge toward the header root.

3. SetCanonical for partial state: When the computed root differs from
   the header root (expected when untracked contracts have unresolved
   storage roots), check HasState(partialState.Root()) instead of only
   HasState(block.Root()). Guard against zero root during snap sync.

4. Canonical hash backfill: AdvancePartialHead now writes canonical
   hashes for all blocks between the pivot and snap head, fixing the
   "final block not in canonical chain" error caused by
   InsertReceiptChain skipping blocks whose bodies already exist.

5. Gap block processing: After snap sync completes, process accumulated
   blocks between the sync head and chain tip using their persisted BALs
   before entering steady-state chain following.

6. Computed root chaining: Use partialState.Root() (actual computed root)
   as parentRoot for subsequent blocks, not the header root. This ensures
   correct trie chaining when computed != header root.

Tested end-to-end on bal-devnet-2: snap sync completes, gap blocks
processed, canonical head advances at chain tip (~1 block/12s).

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-04-17 12:05:26 +02:00

448 lines
17 KiB
Go

// Copyright 2022 The go-ethereum Authors
// This file is part of the go-ethereum library.
//
// The go-ethereum library is free software: you can redistribute it and/or modify
// it under the terms of the GNU Lesser General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// The go-ethereum library is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Lesser General Public License for more details.
//
// You should have received a copy of the GNU Lesser General Public License
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
package downloader
import (
"fmt"
"sync"
"time"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/rawdb"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/eth/ethconfig"
"github.com/ethereum/go-ethereum/log"
)
// beaconBackfiller is the chain and state backfilling that can be commenced once
// the skeleton syncer has successfully reverse downloaded all the headers up to
// the genesis block or an existing header in the database. Its operation is fully
// directed by the skeleton sync's head/tail events.
type beaconBackfiller struct {
downloader *Downloader // Downloader to direct via this callback implementation
success func() // Callback to run on successful sync cycle completion
filling bool // Flag whether the downloader is backfilling or not
started chan struct{} // Notification channel whether the downloader inited
lock sync.Mutex // Mutex protecting the sync lock
}
// newBeaconBackfiller is a helper method to create the backfiller.
func newBeaconBackfiller(dl *Downloader, success func()) backfiller {
return &beaconBackfiller{
downloader: dl,
success: success,
}
}
// suspend cancels any background downloader threads and returns the last header
// that has been successfully backfilled (potentially in a previous run), or the
// genesis.
func (b *beaconBackfiller) suspend() *types.Header {
// If no filling is running, don't waste cycles
b.lock.Lock()
filling := b.filling
started := b.started
b.lock.Unlock()
if !filling {
// Sync cycle was inactive, retrieve and return the latest snap block
// as the filled header.
log.Debug("Backfiller was inactive")
return b.downloader.blockchain.CurrentSnapBlock()
}
// A previous filling should be running, though it may happen that it hasn't
// yet started (being done on a new goroutine). Many concurrent beacon head
// announcements can lead to sync start/stop thrashing. In that case we need
// to wait for initialization before we can safely cancel it. It is safe to
// read this channel multiple times, it gets closed on startup.
<-started
// For partial state nodes during snap sync, don't cancel the sync on every
// beacon head update. The state sync needs uninterrupted time to complete,
// otherwise the constant cancel/restart cycle prevents progress.
// We skip cancellation when:
// 1. We're in partial state mode (partialFilter is set)
// 2. We're in snap sync mode OR the second state sync (pivot→HEAD) is running
// 3. State sync is actively running (synchronising is true)
if b.downloader.partialFilter != nil &&
(b.downloader.getMode() == ethconfig.SnapSync || b.downloader.partialHeadSyncing.Load()) &&
b.downloader.synchronising.Load() {
log.Debug("Backfiller suspend: partial state snap sync in progress, skipping cancel")
return b.downloader.blockchain.CurrentSnapBlock()
}
// Now that we're sure the downloader successfully started up, we can cancel
// it safely without running the risk of data races.
b.downloader.Cancel()
log.Debug("Backfiller has been suspended")
// Sync cycle was just terminated, retrieve and return the last filled header.
return b.downloader.blockchain.CurrentSnapBlock()
}
// resume starts the downloader threads for backfilling state and chain data.
func (b *beaconBackfiller) resume() {
// For partial state nodes, don't start new sync cycles after the initial
// snap sync completes. The partialSyncComplete flag is set after
// AdvancePartialHead succeeds, indicating new blocks should come via
// Engine API with BAL instead of sync.
if b.downloader.partialFilter != nil && b.downloader.partialSyncComplete.Load() {
log.Debug("Backfiller resume: partial state sync complete, skipping new cycle")
return
}
b.lock.Lock()
if b.filling {
// If a previous filling cycle is still running, just ignore this start
// request. // TODO(karalabe): We should make this channel driven
b.lock.Unlock()
log.Debug("Backfiller is running")
return
}
b.filling = true
b.started = make(chan struct{})
b.lock.Unlock()
// Start the backfilling on its own thread since the downloader does not have
// its own lifecycle runloop.
go func() {
// Set the backfiller to non-filling when download completes
defer func() {
b.lock.Lock()
b.filling = false
b.lock.Unlock()
}()
// If the downloader fails, report an error as in beacon chain mode there
// should be no errors as long as the chain we're syncing to is valid.
if err := b.downloader.synchronise(b.started); err != nil {
log.Error("Beacon backfilling failed", "err", err)
return
}
// Synchronization succeeded. Since this happens async, notify the outer
// context to enable transaction propagation.
if b.success != nil {
b.success()
}
log.Debug("Backfilling completed")
}()
log.Debug("Backfilling started")
}
// SetBadBlockCallback sets the callback to run when a bad block is hit by the
// block processor. This method is not thread safe and should be set only once
// on startup before system events are fired.
func (d *Downloader) SetBadBlockCallback(onBadBlock badBlockFn) {
d.badBlock = onBadBlock
}
// BeaconSync is the post-merge version of the chain synchronization, where the
// chain is not downloaded from genesis onward, rather from trusted head announces
// backwards.
//
// Internally backfilling and state sync is done the same way, but the header
// retrieval and scheduling is replaced.
func (d *Downloader) BeaconSync(head *types.Header, final *types.Header) error {
return d.beaconSync(head, final, true)
}
// BeaconExtend is an optimistic version of BeaconSync, where an attempt is made
// to extend the current beacon chain with a new header, but in case of a mismatch,
// the old sync will not be terminated and reorged, rather the new head is dropped.
//
// This is useful if a beacon client is feeding us large chunks of payloads to run,
// but is not setting the head after each.
func (d *Downloader) BeaconExtend(head *types.Header) error {
return d.beaconSync(head, nil, false)
}
// beaconSync is the post-merge version of the chain synchronization, where the
// chain is not downloaded from genesis onward, rather from trusted head announces
// backwards.
//
// Internally backfilling and state sync is done the same way, but the header
// retrieval and scheduling is replaced.
func (d *Downloader) beaconSync(head *types.Header, final *types.Header, force bool) error {
// Signal the skeleton sync to switch to a new head, however it wants
return d.skeleton.Sync(head, final, force)
}
// findBeaconAncestor tries to locate the common ancestor link of the local chain
// and the beacon chain just requested. In the general case when our node was in
// sync and on the correct chain, checking the top N links should already get us
// a match. In the rare scenario when we ended up on a long reorganisation (i.e.
// none of the head links match), we do a binary search to find the ancestor.
func (d *Downloader) findBeaconAncestor() (uint64, error) {
// Figure out the current local head position
var chainHead *types.Header
switch d.getMode() {
case ethconfig.FullSync:
chainHead = d.blockchain.CurrentBlock()
case ethconfig.SnapSync:
chainHead = d.blockchain.CurrentSnapBlock()
default:
panic("unknown sync mode")
}
number := chainHead.Number.Uint64()
// Retrieve the skeleton bounds and ensure they are linked to the local chain
beaconHead, beaconTail, _, err := d.skeleton.Bounds()
if err != nil {
// This is a programming error. The chain backfiller was called with an
// invalid beacon sync state. Ideally we would panic here, but erroring
// gives us at least a remote chance to recover. It's still a big fault!
log.Error("Failed to retrieve beacon bounds", "err", err)
return 0, err
}
log.Debug("Searching beacon ancestor", "local", number, "beaconhead", beaconHead.Number, "beacontail", beaconTail.Number)
var linked bool
switch d.getMode() {
case ethconfig.FullSync:
linked = d.blockchain.HasBlock(beaconTail.ParentHash, beaconTail.Number.Uint64()-1)
case ethconfig.SnapSync:
linked = d.blockchain.HasFastBlock(beaconTail.ParentHash, beaconTail.Number.Uint64()-1)
default:
panic("unknown sync mode")
}
if !linked {
// This is a programming error. The chain backfiller was called with a
// tail that's not linked to the local chain. Whilst this should never
// happen, there might be some weirdnesses if beacon sync backfilling
// races with the user (or beacon client) calling setHead. Whilst panic
// would be the ideal thing to do, it is safer long term to attempt a
// recovery and fix any noticed issue after the fact.
log.Error("Beacon sync linkup unavailable", "number", beaconTail.Number.Uint64()-1, "hash", beaconTail.ParentHash)
return 0, fmt.Errorf("beacon linkup unavailable locally: %d [%x]", beaconTail.Number.Uint64()-1, beaconTail.ParentHash)
}
// Binary search to find the ancestor
start, end := beaconTail.Number.Uint64()-1, number
if number := beaconHead.Number.Uint64(); end > number {
// This shouldn't really happen in a healthy network, but if the consensus
// clients feeds us a shorter chain as the canonical, we should not attempt
// to access non-existent skeleton items.
log.Warn("Beacon head lower than local chain", "beacon", number, "local", end)
end = number
}
for start+1 < end {
// Split our chain interval in two, and request the hash to cross check
check := (start + end) / 2
h := d.skeleton.Header(check)
if h == nil {
return 0, fmt.Errorf("filled skeleton header is missing: %d", check)
}
n := h.Number.Uint64()
var known bool
switch d.getMode() {
case ethconfig.FullSync:
known = d.blockchain.HasBlock(h.Hash(), n)
case ethconfig.SnapSync:
known = d.blockchain.HasFastBlock(h.Hash(), n)
default:
panic("unknown sync mode")
}
if !known {
end = check
continue
}
start = check
}
log.Debug("Found beacon ancestor", "number", start)
return start, nil
}
// fetchHeaders feeds skeleton headers to the downloader queue for scheduling
// until sync errors or is finished.
func (d *Downloader) fetchHeaders(from uint64) error {
head, tail, _, err := d.skeleton.Bounds()
if err != nil {
return err
}
// A part of headers are not in the skeleton space, try to resolve
// them from the local chain. Note the range should be very short
// and it should only happen when there are less than 64 post-merge
// blocks in the network.
var localHeaders []*types.Header
if from < tail.Number.Uint64() {
count := tail.Number.Uint64() - from
if count > uint64(fsMinFullBlocks) {
return fmt.Errorf("invalid origin (%d) of beacon sync (%d)", from, tail.Number)
}
localHeaders = d.readHeaderRange(tail, int(count))
log.Warn("Retrieved beacon headers from local", "from", from, "count", count)
}
fsHeaderContCheckTimer := time.NewTimer(fsHeaderContCheck)
defer fsHeaderContCheckTimer.Stop()
// Verify the header at configured chain cutoff, ensuring it's matched with
// the configured hash. Skip the check if the configured cutoff is even higher
// than the sync target, which is definitely not a common case.
//
// The hash validation is only performed when chainCutoffHash is non-zero.
// Static cutoffs (e.g. --history.chain postmerge) set a well-known hash;
// dynamic cutoffs (e.g. chain retention = HEAD-N) clear the hash to zero
// because the cutoff block changes every sync cycle and has no predetermined hash.
if d.chainCutoffNumber != 0 && d.chainCutoffNumber >= from && d.chainCutoffNumber <= head.Number.Uint64() {
h := d.skeleton.Header(d.chainCutoffNumber)
if h == nil {
if d.chainCutoffNumber < tail.Number.Uint64() {
dist := tail.Number.Uint64() - d.chainCutoffNumber
if len(localHeaders) >= int(dist) {
h = localHeaders[dist-1]
}
}
}
if h == nil {
return fmt.Errorf("header at chain cutoff is not available, cutoff: %d", d.chainCutoffNumber)
}
if d.chainCutoffHash != (common.Hash{}) && h.Hash() != d.chainCutoffHash {
return fmt.Errorf("header at chain cutoff mismatched, want: %v, got: %v", d.chainCutoffHash, h.Hash())
}
}
for {
// Some beacon headers might have appeared since the last cycle, make
// sure we're always syncing to all available ones
head, _, _, err = d.skeleton.Bounds()
if err != nil {
return err
}
// If the pivot became stale (older than 2*64-8 (bit of wiggle room)),
// move it ahead to HEAD-64
d.pivotLock.Lock()
if d.pivotHeader != nil {
if head.Number.Uint64() > d.pivotHeader.Number.Uint64()+2*uint64(fsMinFullBlocks)-8 {
// For partial state nodes, rate-limit pivot advances (max once per 2 min)
// to avoid the restart loop bug, while still recovering from stale pivots.
if d.partialFilter != nil {
if !d.lastPivotAdvance.IsZero() && time.Since(d.lastPivotAdvance) < 2*time.Minute {
log.Debug("Partial state: suppressing pivot move in fetchHeaders (cooldown active)",
"current", d.pivotHeader.Number, "head", head.Number,
"cooldownLeft", 2*time.Minute-time.Since(d.lastPivotAdvance))
} else {
number := head.Number.Uint64() - uint64(fsMinFullBlocks)
log.Info("Partial state: advancing stale pivot in fetchHeaders",
"old", d.pivotHeader.Number, "new", number)
if d.pivotHeader = d.skeleton.Header(number); d.pivotHeader == nil {
if number < tail.Number.Uint64() {
dist := tail.Number.Uint64() - number
if len(localHeaders) >= int(dist) {
d.pivotHeader = localHeaders[dist-1]
}
}
}
if d.pivotHeader == nil {
log.Error("Pivot header is not found", "number", number)
d.pivotLock.Unlock()
return errNoPivotHeader
}
rawdb.WriteLastPivotNumber(d.stateDB, d.pivotHeader.Number.Uint64())
d.lastPivotAdvance = time.Now()
}
} else {
// Retrieve the next pivot header, either from skeleton chain
// or the filled chain
number := head.Number.Uint64() - uint64(fsMinFullBlocks)
log.Warn("Pivot seemingly stale, moving", "old", d.pivotHeader.Number, "new", number)
if d.pivotHeader = d.skeleton.Header(number); d.pivotHeader == nil {
if number < tail.Number.Uint64() {
dist := tail.Number.Uint64() - number
if len(localHeaders) >= int(dist) {
d.pivotHeader = localHeaders[dist-1]
log.Warn("Retrieved pivot header from local", "number", d.pivotHeader.Number, "hash", d.pivotHeader.Hash(), "latest", head.Number, "oldest", tail.Number)
}
}
}
// Print an error log and return directly in case the pivot header
// is still not found. It means the skeleton chain is not linked
// correctly with local chain.
if d.pivotHeader == nil {
log.Error("Pivot header is not found", "number", number)
d.pivotLock.Unlock()
return errNoPivotHeader
}
// Write out the pivot into the database so a rollback beyond
// it will reenable snap sync and update the state root that
// the state syncer will be downloading
rawdb.WriteLastPivotNumber(d.stateDB, d.pivotHeader.Number.Uint64())
}
}
}
d.pivotLock.Unlock()
// Retrieve a batch of headers and feed it to the header processor
var (
headers = make([]*types.Header, 0, maxHeadersProcess)
hashes = make([]common.Hash, 0, maxHeadersProcess)
)
for i := 0; i < maxHeadersProcess && from <= head.Number.Uint64(); i++ {
header := d.skeleton.Header(from)
// The header is not found in skeleton space, try to find it in local chain.
if header == nil && from < tail.Number.Uint64() {
dist := tail.Number.Uint64() - from
if len(localHeaders) >= int(dist) {
header = localHeaders[dist-1]
}
}
// The header is still missing, the beacon sync is corrupted and bail out
// the error here.
if header == nil {
return fmt.Errorf("missing beacon header %d", from)
}
headers = append(headers, header)
hashes = append(hashes, headers[i].Hash())
from++
}
if len(headers) > 0 {
log.Trace("Scheduling new beacon headers", "count", len(headers), "from", from-uint64(len(headers)))
select {
case d.headerProcCh <- &headerTask{
headers: headers,
hashes: hashes,
}:
case <-d.cancelCh:
return errCanceled
}
}
// If we still have headers to import, loop and keep pushing them
if from <= head.Number.Uint64() {
continue
}
// If the pivot block is committed, signal header sync termination
if d.committed.Load() {
select {
case d.headerProcCh <- nil:
return nil
case <-d.cancelCh:
return errCanceled
}
}
// State sync still going, wait a bit for new headers and retry
log.Trace("Pivot not yet committed, waiting...")
fsHeaderContCheckTimer.Reset(fsHeaderContCheck)
select {
case <-fsHeaderContCheckTimer.C:
case <-d.cancelCh:
return errCanceled
}
}
}