core, eth: add lock protection in snap sync (#33428)

Fixes #33396, #33397, #33398
This commit is contained in:
rjl493456442 2025-12-19 16:36:48 +08:00 committed by GitHub
parent dd7daace9d
commit bf141fbfb1
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
8 changed files with 149 additions and 53 deletions

View file

@ -953,7 +953,8 @@ func (bc *BlockChain) rewindPathHead(head *types.Header, root common.Hash) (*typ
// Recover if the target state if it's not available yet.
if !bc.HasState(head.Root) {
if err := bc.triedb.Recover(head.Root); err != nil {
log.Crit("Failed to rollback state", "err", err)
log.Error("Failed to rollback state, resetting to genesis", "err", err)
return bc.genesisBlock.Header(), rootNumber
}
}
log.Info("Rewound to block with state", "number", head.Number, "hash", head.Hash())
@ -1115,14 +1116,48 @@ func (bc *BlockChain) setHeadBeyondRoot(head uint64, time uint64, root common.Ha
return rootNumber, bc.loadLastState()
}
// SnapSyncCommitHead sets the current head block to the one defined by the hash
// irrelevant what the chain contents were prior.
func (bc *BlockChain) SnapSyncCommitHead(hash common.Hash) error {
// SnapSyncStart disables the underlying databases (such as the trie DB and the
// optional state snapshot) to prevent potential concurrent mutations between
// snap sync and other chain operations.
func (bc *BlockChain) SnapSyncStart() error {
if !bc.chainmu.TryLock() {
return errChainStopped
}
defer bc.chainmu.Unlock()
// Snap sync will directly modify the persistent state, making the entire
// trie database unusable until the state is fully synced. To prevent any
// subsequent state reads, explicitly disable the trie database and state
// syncer is responsible to address and correct any state missing.
if bc.TrieDB().Scheme() == rawdb.PathScheme {
if err := bc.TrieDB().Disable(); err != nil {
return err
}
}
// Snap sync uses the snapshot namespace to store potentially flaky data until
// sync completely heals and finishes. Pause snapshot maintenance in the mean-
// time to prevent access.
if snapshots := bc.Snapshots(); snapshots != nil { // Only nil in tests
snapshots.Disable()
}
return nil
}
// SnapSyncComplete sets the current head block to the block identified by the
// given hash, regardless of the chain contents prior to snap sync. It is
// invoked once snap sync completes and assumes that SnapSyncStart was called
// previously.
func (bc *BlockChain) SnapSyncComplete(hash common.Hash) error {
// Make sure that both the block as well at its state trie exists
block := bc.GetBlockByHash(hash)
if block == nil {
return fmt.Errorf("non existent block [%x..]", hash[:4])
}
if !bc.chainmu.TryLock() {
return errChainStopped
}
defer bc.chainmu.Unlock()
// Reset the trie database with the fresh snap synced state.
root := block.Root()
if bc.triedb.Scheme() == rawdb.PathScheme {
@ -1133,19 +1168,16 @@ func (bc *BlockChain) SnapSyncCommitHead(hash common.Hash) error {
if !bc.HasState(root) {
return fmt.Errorf("non existent state [%x..]", root[:4])
}
// If all checks out, manually set the head block.
if !bc.chainmu.TryLock() {
return errChainStopped
}
bc.currentBlock.Store(block.Header())
headBlockGauge.Update(int64(block.NumberU64()))
bc.chainmu.Unlock()
// Destroy any existing state snapshot and regenerate it in the background,
// also resuming the normal maintenance of any previously paused snapshot.
if bc.snaps != nil {
bc.snaps.Rebuild(root)
}
// If all checks out, manually set the head block.
bc.currentBlock.Store(block.Header())
headBlockGauge.Update(int64(block.NumberU64()))
log.Info("Committed new head block", "number", block.Number(), "hash", hash)
return nil
}

View file

@ -36,7 +36,6 @@ import (
"github.com/ethereum/go-ethereum/eth/ethconfig"
"github.com/ethereum/go-ethereum/internal/version"
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/metrics"
"github.com/ethereum/go-ethereum/miner"
"github.com/ethereum/go-ethereum/node"
"github.com/ethereum/go-ethereum/params"
@ -81,20 +80,6 @@ const (
beaconUpdateWarnFrequency = 5 * time.Minute
)
var (
// Number of blobs requested via getBlobsV2
getBlobsRequestedCounter = metrics.NewRegisteredCounter("engine/getblobs/requested", nil)
// Number of blobs requested via getBlobsV2 that are present in the blobpool
getBlobsAvailableCounter = metrics.NewRegisteredCounter("engine/getblobs/available", nil)
// Number of times getBlobsV2 responded with “hit”
getBlobsV2RequestHit = metrics.NewRegisteredCounter("engine/getblobs/hit", nil)
// Number of times getBlobsV2 responded with “miss”
getBlobsV2RequestMiss = metrics.NewRegisteredCounter("engine/getblobs/miss", nil)
)
type ConsensusAPI struct {
eth *eth.Ethereum
@ -137,6 +122,9 @@ type ConsensusAPI struct {
// NewConsensusAPI creates a new consensus api for the given backend.
// The underlying blockchain needs to have a valid terminal total difficulty set.
//
// This function creates a long-lived object with an attached background thread.
// For testing or other short-term use cases, please use newConsensusAPIWithoutHeartbeat.
func NewConsensusAPI(eth *eth.Ethereum) *ConsensusAPI {
api := newConsensusAPIWithoutHeartbeat(eth)
go api.heartbeat()
@ -818,7 +806,7 @@ func (api *ConsensusAPI) delayPayloadImport(block *types.Block) engine.PayloadSt
return engine.PayloadStatusV1{Status: engine.SYNCING}
}
// Either no beacon sync was started yet, or it rejected the delivered
// payload as non-integratable on top of the existing sync. We'll just
// payload as non-integrate on top of the existing sync. We'll just
// have to rely on the beacon client to forcefully update the head with
// a forkchoice update request.
if api.eth.Downloader().ConfigSyncMode() == ethconfig.FullSync {
@ -916,8 +904,6 @@ func (api *ConsensusAPI) invalid(err error, latestValid *types.Header) engine.Pa
// heartbeat loops indefinitely, and checks if there have been beacon client updates
// received in the last while. If not - or if they but strange ones - it warns the
// user that something might be off with their consensus node.
//
// TODO(karalabe): Spin this goroutine down somehow
func (api *ConsensusAPI) heartbeat() {
// Sleep a bit on startup since there's obviously no beacon client yet
// attached, so no need to print scary warnings to the user.

33
eth/catalyst/metrics.go Normal file
View file

@ -0,0 +1,33 @@
// Copyright 2025 The go-ethereum Authors
// This file is part of the go-ethereum library.
//
// The go-ethereum library is free software: you can redistribute it and/or modify
// it under the terms of the GNU Lesser General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// The go-ethereum library is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Lesser General Public License for more details.
//
// You should have received a copy of the GNU Lesser General Public License
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
package catalyst
import "github.com/ethereum/go-ethereum/metrics"
var (
// Number of blobs requested via getBlobsV2
getBlobsRequestedCounter = metrics.NewRegisteredCounter("engine/getblobs/requested", nil)
// Number of blobs requested via getBlobsV2 that are present in the blobpool
getBlobsAvailableCounter = metrics.NewRegisteredCounter("engine/getblobs/available", nil)
// Number of times getBlobsV2 responded with “hit”
getBlobsV2RequestHit = metrics.NewRegisteredCounter("engine/getblobs/hit", nil)
// Number of times getBlobsV2 responded with “miss”
getBlobsV2RequestMiss = metrics.NewRegisteredCounter("engine/getblobs/miss", nil)
)

View file

@ -61,6 +61,7 @@ func (b *beaconBackfiller) suspend() *types.Header {
b.lock.Unlock()
if !filling {
log.Debug("Backfiller was inactive")
return filled // Return the filled header on the previous sync completion
}
// A previous filling should be running, though it may happen that it hasn't
@ -73,6 +74,7 @@ func (b *beaconBackfiller) suspend() *types.Header {
// Now that we're sure the downloader successfully started up, we can cancel
// it safely without running the risk of data races.
b.downloader.Cancel()
log.Debug("Backfiller has been suspended")
// Sync cycle was just terminated, retrieve and return the last filled header.
// Can't use `filled` as that contains a stale value from before cancellation.
@ -86,6 +88,7 @@ func (b *beaconBackfiller) resume() {
// If a previous filling cycle is still running, just ignore this start
// request. // TODO(karalabe): We should make this channel driven
b.lock.Unlock()
log.Debug("Backfiller is running")
return
}
b.filling = true
@ -114,7 +117,9 @@ func (b *beaconBackfiller) resume() {
if b.success != nil {
b.success()
}
log.Debug("Backfilling completed")
}()
log.Debug("Backfilling started")
}
// SetBadBlockCallback sets the callback to run when a bad block is hit by the

View file

@ -193,8 +193,12 @@ type BlockChain interface {
// CurrentSnapBlock retrieves the head snap block from the local chain.
CurrentSnapBlock() *types.Header
// SnapSyncCommitHead directly commits the head block to a certain entity.
SnapSyncCommitHead(common.Hash) error
// SnapSyncStart explicitly notifies the chain that snap sync is scheduled and
// marks chain mutations as disallowed.
SnapSyncStart() error
// SnapSyncComplete directly commits the head block to a certain entity.
SnapSyncComplete(common.Hash) error
// InsertHeadersBeforeCutoff inserts a batch of headers before the configured
// chain cutoff into the ancient store.
@ -361,28 +365,21 @@ func (d *Downloader) synchronise(beaconPing chan struct{}) (err error) {
if d.notified.CompareAndSwap(false, true) {
log.Info("Block synchronisation started")
}
mode := d.moder.get()
// Obtain the synchronized used in this cycle
mode := d.moder.get(true)
defer func() {
if err == nil && mode == ethconfig.SnapSync {
d.moder.disableSnap()
log.Info("Disabled snap-sync after the initial sync cycle")
}
}()
// Disable chain mutations when snap sync is selected, ensuring the
// downloader is the sole mutator.
if mode == ethconfig.SnapSync {
// Snap sync will directly modify the persistent state, making the entire
// trie database unusable until the state is fully synced. To prevent any
// subsequent state reads, explicitly disable the trie database and state
// syncer is responsible to address and correct any state missing.
if d.blockchain.TrieDB().Scheme() == rawdb.PathScheme {
if err := d.blockchain.TrieDB().Disable(); err != nil {
return err
}
}
// Snap sync uses the snapshot namespace to store potentially flaky data until
// sync completely heals and finishes. Pause snapshot maintenance in the mean-
// time to prevent access.
if snapshots := d.blockchain.Snapshots(); snapshots != nil { // Only nil in tests
snapshots.Disable()
if err := d.blockchain.SnapSyncStart(); err != nil {
return err
}
}
// Reset the queue, peer set and wake channels to clean any internal leftover state
@ -427,7 +424,7 @@ func (d *Downloader) getMode() SyncMode {
// ConfigSyncMode returns the sync mode configured for the node.
// The actual running sync mode can differ from this.
func (d *Downloader) ConfigSyncMode() SyncMode {
return d.moder.get()
return d.moder.get(false)
}
// syncToHead starts a block synchronization based on the hash chain from
@ -1086,7 +1083,7 @@ func (d *Downloader) commitPivotBlock(result *fetchResult) error {
if _, err := d.blockchain.InsertReceiptChain([]*types.Block{block}, []rlp.RawValue{result.Receipts}, d.ancientLimit); err != nil {
return err
}
if err := d.blockchain.SnapSyncCommitHead(block.Hash()); err != nil {
if err := d.blockchain.SnapSyncComplete(block.Hash()); err != nil {
return err
}
d.committed.Store(true)

View file

@ -64,6 +64,12 @@ var errSyncMerged = errors.New("sync merged")
// should abort and restart with the new state.
var errSyncReorged = errors.New("sync reorged")
// errSyncTrimmed is an internal helper error to signal that the local chain
// has been trimmed (e.g, via debug_setHead explicitly) and the skeleton chain
// is no longer linked with the local chain. In this case, the skeleton sync
// should be re-scheduled again.
var errSyncTrimmed = errors.New("sync trimmed")
// errTerminated is returned if the sync mechanism was terminated for this run of
// the process. This is usually the case when Geth is shutting down and some events
// might still be propagating.
@ -296,6 +302,11 @@ func (s *skeleton) startup() {
// head to force a cleanup.
head = newhead
case err == errSyncTrimmed:
// The skeleton chain is not linked with the local chain anymore,
// restart the sync.
head = nil
case err == errTerminated:
// Sync was requested to be terminated from within, stop and
// return (no need to pass a message, was already done internally)
@ -486,7 +497,22 @@ func (s *skeleton) sync(head *types.Header) (*types.Header, error) {
// is still running, it will pick it up. If it already terminated,
// a new cycle needs to be spun up.
if linked {
s.filler.resume()
linked = len(s.progress.Subchains) == 1 &&
rawdb.HasHeader(s.db, s.progress.Subchains[0].Next, s.scratchHead) &&
rawdb.HasBody(s.db, s.progress.Subchains[0].Next, s.scratchHead) &&
rawdb.HasReceipts(s.db, s.progress.Subchains[0].Next, s.scratchHead)
if linked {
// The skeleton chain has been extended and is still linked with the local
// chain, try to re-schedule the backfiller if it's already terminated.
s.filler.resume()
} else {
// The skeleton chain is no longer linked to the local chain for some reason
// (e.g. debug_setHead was used to trim the local chain). Re-schedule the
// skeleton sync to fill the chain gap.
log.Warn("Local chain has been trimmed", "tailnumber", s.scratchHead, "tailhash", s.progress.Subchains[0].Next)
return nil, errSyncTrimmed
}
}
case req := <-requestFails:
@ -649,9 +675,19 @@ func (s *skeleton) processNewHead(head *types.Header, final *types.Header) error
// Not a noop / double head announce, abort with a reorg
return fmt.Errorf("%w, tail: %d, head: %d, newHead: %d", errChainReorged, lastchain.Tail, lastchain.Head, number)
}
// Terminate the sync if the chain head is gapped
if lastchain.Head+1 < number {
return fmt.Errorf("%w, head: %d, newHead: %d", errChainGapped, lastchain.Head, number)
}
// Ignore the duplicated beacon header announcement
if lastchain.Head == number {
local := rawdb.ReadSkeletonHeader(s.db, number)
if local != nil && local.Hash() == head.Hash() {
log.Debug("Ignored the identical beacon header", "number", number, "hash", local.Hash())
return nil
}
}
// Terminate the sync if the chain head is forked
if parent := rawdb.ReadSkeletonHeader(s.db, number-1); parent.Hash() != head.ParentHash {
return fmt.Errorf("%w, ancestor: %d, hash: %s, want: %s", errChainForked, number-1, parent.Hash(), head.ParentHash)
}
@ -669,6 +705,7 @@ func (s *skeleton) processNewHead(head *types.Header, final *types.Header) error
if err := batch.Write(); err != nil {
log.Crit("Failed to write skeleton sync status", "err", err)
}
log.Debug("Extended beacon header chain", "number", head.Number, "hash", head.Hash())
return nil
}
@ -1206,6 +1243,7 @@ func (s *skeleton) cleanStales(filled *types.Header) error {
if err := batch.Write(); err != nil {
log.Crit("Failed to write beacon trim data", "err", err)
}
log.Debug("Cleaned stale beacon headers", "start", start, "end", end)
return nil
}

View file

@ -75,7 +75,7 @@ func newSyncModer(mode ethconfig.SyncMode, chain BlockChain, disk ethdb.KeyValue
// get retrieves the current sync mode, either explicitly set, or derived
// from the chain status.
func (m *syncModer) get() ethconfig.SyncMode {
func (m *syncModer) get(report bool) ethconfig.SyncMode {
m.lock.Lock()
defer m.lock.Unlock()
@ -83,12 +83,16 @@ func (m *syncModer) get() ethconfig.SyncMode {
if m.mode == ethconfig.SnapSync {
return ethconfig.SnapSync
}
logger := log.Debug
if report {
logger = log.Info
}
// We are probably in full sync, but we might have rewound to before the
// snap sync pivot, check if we should re-enable snap sync.
head := m.chain.CurrentBlock()
if pivot := rawdb.ReadLastPivotNumber(m.disk); pivot != nil {
if head.Number.Uint64() < *pivot {
log.Info("Reenabled snap-sync as chain is lagging behind the pivot", "head", head.Number, "pivot", pivot)
logger("Reenabled snap-sync as chain is lagging behind the pivot", "head", head.Number, "pivot", pivot)
return ethconfig.SnapSync
}
}
@ -96,7 +100,7 @@ func (m *syncModer) get() ethconfig.SyncMode {
// the head state, forcefully rerun the snap sync. Note it doesn't mean the
// persistent state is corrupted, just mismatch with the head block.
if !m.chain.HasState(head.Root) {
log.Info("Reenabled snap-sync as chain is stateless")
logger("Reenabled snap-sync as chain is stateless")
return ethconfig.SnapSync
}
// Nope, we're really full syncing

View file

@ -148,6 +148,7 @@ func (g *generator) stop() {
g.abort <- ch
<-ch
g.running = false
log.Debug("Snapshot generation has been terminated")
}
// completed returns the flag indicating if the whole generation is done.