eth, cmd: integrate the sync monitor into syncer service

This commit is contained in:
jsvisa 2025-09-17 23:27:12 +08:00 committed by Felix Lange
parent 4b219b2c88
commit 8b640a3b68
4 changed files with 78 additions and 75 deletions

View file

@ -38,6 +38,7 @@ import (
"github.com/ethereum/go-ethereum/crypto" "github.com/ethereum/go-ethereum/crypto"
"github.com/ethereum/go-ethereum/eth/catalyst" "github.com/ethereum/go-ethereum/eth/catalyst"
"github.com/ethereum/go-ethereum/eth/ethconfig" "github.com/ethereum/go-ethereum/eth/ethconfig"
"github.com/ethereum/go-ethereum/eth/syncer"
"github.com/ethereum/go-ethereum/internal/flags" "github.com/ethereum/go-ethereum/internal/flags"
"github.com/ethereum/go-ethereum/internal/telemetry/tracesetup" "github.com/ethereum/go-ethereum/internal/telemetry/tracesetup"
"github.com/ethereum/go-ethereum/internal/version" "github.com/ethereum/go-ethereum/internal/version"
@ -276,16 +277,19 @@ func makeFullNode(ctx *cli.Context) *node.Node {
if cfg.Ethstats.URL != "" { if cfg.Ethstats.URL != "" {
utils.RegisterEthStatsService(stack, backend, cfg.Ethstats.URL) utils.RegisterEthStatsService(stack, backend, cfg.Ethstats.URL)
} }
// Configure synchronization override service // Configure synchronization override service
var synctarget common.Hash syncConfig := syncer.Config{
ExitWhenSynced: ctx.Bool(utils.ExitWhenSyncedFlag.Name),
}
if ctx.IsSet(utils.SyncTargetFlag.Name) { if ctx.IsSet(utils.SyncTargetFlag.Name) {
target := ctx.String(utils.SyncTargetFlag.Name) target := ctx.String(utils.SyncTargetFlag.Name)
if !common.IsHexHash(target) { if !common.IsHexHash(target) {
utils.Fatalf("sync target hash is not a valid hex hash: %s", target) utils.Fatalf("sync target hash is not a valid hex hash: %s", target)
} }
synctarget = common.HexToHash(target) syncConfig.TargetBlock = common.HexToHash(target)
} }
utils.RegisterSyncOverrideService(stack, eth, synctarget, ctx.Bool(utils.ExitWhenSyncedFlag.Name)) utils.RegisterSyncOverrideService(stack, eth, syncConfig)
if ctx.IsSet(utils.DeveloperFlag.Name) { if ctx.IsSet(utils.DeveloperFlag.Name) {
// Start dev mode. // Start dev mode.

View file

@ -22,13 +22,10 @@ import (
"os" "os"
"slices" "slices"
"sort" "sort"
"time"
"github.com/ethereum/go-ethereum/accounts" "github.com/ethereum/go-ethereum/accounts"
"github.com/ethereum/go-ethereum/cmd/utils" "github.com/ethereum/go-ethereum/cmd/utils"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/console/prompt" "github.com/ethereum/go-ethereum/console/prompt"
"github.com/ethereum/go-ethereum/eth/downloader"
"github.com/ethereum/go-ethereum/ethclient" "github.com/ethereum/go-ethereum/ethclient"
"github.com/ethereum/go-ethereum/internal/debug" "github.com/ethereum/go-ethereum/internal/debug"
"github.com/ethereum/go-ethereum/internal/flags" "github.com/ethereum/go-ethereum/internal/flags"
@ -386,28 +383,4 @@ func startNode(ctx *cli.Context, stack *node.Node, isConsole bool) {
} }
} }
}() }()
// Spawn a standalone goroutine for status synchronization monitoring,
// close the node when synchronization is complete if user required.
if ctx.Bool(utils.ExitWhenSyncedFlag.Name) {
go func() {
sub := stack.EventMux().Subscribe(downloader.DoneEvent{})
defer sub.Unsubscribe()
for {
event := <-sub.Chan()
if event == nil {
continue
}
done, ok := event.Data.(downloader.DoneEvent)
if !ok {
continue
}
if timestamp := time.Unix(int64(done.Latest.Time), 0); time.Since(timestamp) < 10*time.Minute {
log.Info("Synchronisation completed", "latestnum", done.Latest.Number, "latesthash", done.Latest.Hash(),
"age", common.PrettyAge(timestamp))
stack.Close()
}
}
}()
}
} }

View file

@ -2228,13 +2228,13 @@ func RegisterFilterAPI(stack *node.Node, backend ethapi.Backend, ethcfg *ethconf
} }
// RegisterSyncOverrideService adds the synchronization override service into node. // RegisterSyncOverrideService adds the synchronization override service into node.
func RegisterSyncOverrideService(stack *node.Node, eth *eth.Ethereum, target common.Hash, exitWhenSynced bool) { func RegisterSyncOverrideService(stack *node.Node, eth *eth.Ethereum, config syncer.Config) {
if target != (common.Hash{}) { if config.TargetBlock != (common.Hash{}) {
log.Info("Registered sync override service", "hash", target, "exitWhenSynced", exitWhenSynced) log.Info("Registered sync override service", "hash", config.TargetBlock, "exitWhenSynced", config.ExitWhenSynced)
} else { } else {
log.Info("Registered sync override service") log.Info("Registered sync override service")
} }
syncer.Register(stack, eth, target, exitWhenSynced) syncer.Register(stack, eth, config)
} }
// SetupMetrics configures the metrics system. // SetupMetrics configures the metrics system.

View file

@ -26,6 +26,7 @@ import (
"github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/eth" "github.com/ethereum/go-ethereum/eth"
"github.com/ethereum/go-ethereum/eth/downloader"
"github.com/ethereum/go-ethereum/eth/ethconfig" "github.com/ethereum/go-ethereum/eth/ethconfig"
"github.com/ethereum/go-ethereum/log" "github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/node" "github.com/ethereum/go-ethereum/node"
@ -37,32 +38,40 @@ type syncReq struct {
errc chan error errc chan error
} }
type Config struct {
TargetBlock common.Hash // if set, sync is triggered at startup
ExitWhenSynced bool // if true, the node shuts down after sync has finished
}
// Syncer is an auxiliary service that allows Geth to perform full sync // Syncer is an auxiliary service that allows Geth to perform full sync
// alone without consensus-layer attached. Users must specify a valid block hash // alone without consensus-layer attached. Users must specify a valid block hash
// as the sync target. // as the sync target.
// //
// Additionally, the syncer can be used to monitor state synchronization.
// It will exit once the specified target has been reached or when the
// most recent chain head is caught up.
//
// This tool can be applied to different networks, no matter it's pre-merge or // This tool can be applied to different networks, no matter it's pre-merge or
// post-merge, but only for full-sync. // post-merge, but only for full-sync.
type Syncer struct { type Syncer struct {
stack *node.Node stack *node.Node
backend *eth.Ethereum backend *eth.Ethereum
target common.Hash request chan *syncReq
request chan *syncReq closed chan struct{}
closed chan struct{} wg sync.WaitGroup
wg sync.WaitGroup
exitWhenSynced bool config Config
} }
// Register registers the synchronization override service into the node // Register registers the synchronization override service into the node
// stack for launching and stopping the service controlled by node. // stack for launching and stopping the service controlled by node.
func Register(stack *node.Node, backend *eth.Ethereum, target common.Hash, exitWhenSynced bool) (*Syncer, error) { func Register(stack *node.Node, backend *eth.Ethereum, cfg Config) (*Syncer, error) {
s := &Syncer{ s := &Syncer{
stack: stack, stack: stack,
backend: backend, backend: backend,
target: target, request: make(chan *syncReq),
request: make(chan *syncReq), closed: make(chan struct{}),
closed: make(chan struct{}), config: cfg,
exitWhenSynced: exitWhenSynced,
} }
stack.RegisterAPIs(s.APIs()) stack.RegisterAPIs(s.APIs())
stack.RegisterLifecycle(s) stack.RegisterLifecycle(s)
@ -88,9 +97,11 @@ func (s *Syncer) run() {
var ( var (
target *types.Header target *types.Header
ticker = time.NewTicker(time.Second * 5) syncCh = make(chan downloader.SyncEvent, 10)
) )
defer ticker.Stop() sub := s.backend.Downloader().SubscribeSyncEvents(syncCh)
defer sub.Unsubscribe()
for { for {
select { select {
case req := <-s.request: case req := <-s.request:
@ -137,35 +148,50 @@ func (s *Syncer) run() {
} }
} }
case <-ticker.C: case ev := <-syncCh:
if target == nil { if ev.Type == downloader.SyncStarted {
log.Debug("Synchronization started")
continue continue
} }
if ev.Type == downloader.SyncFailed {
log.Debug("Synchronization failed", "err", ev.Err)
continue
}
head := s.backend.BlockChain().CurrentHeader()
if head != nil {
// Set the finalized and safe markers relative to the current head.
// The finalized marker is set two epochs behind the target,
// and the safe marker is set one epoch behind the target.
if header := s.backend.BlockChain().GetHeaderByNumber(head.Number.Uint64() - params.EpochLength*2); header != nil {
if final := s.backend.BlockChain().CurrentFinalBlock(); final == nil || final.Number.Cmp(header.Number) < 0 {
s.backend.BlockChain().SetFinalized(header)
}
}
if header := s.backend.BlockChain().GetHeaderByNumber(head.Number.Uint64() - params.EpochLength); header != nil {
if safe := s.backend.BlockChain().CurrentSafeBlock(); safe == nil || safe.Number.Cmp(header.Number) < 0 {
s.backend.BlockChain().SetSafe(header)
}
}
}
// Terminate the node if the target has been reached // Terminate the node if the target has been reached
if s.exitWhenSynced { if s.config.ExitWhenSynced {
if block := s.backend.BlockChain().GetBlockByHash(target.Hash()); block != nil { var synced bool
log.Info("Sync target reached", "number", block.NumberU64(), "hash", block.Hash()) var block *types.Header
go s.stack.Close() // async since we need to close ourselves if target != nil {
return tb := s.backend.BlockChain().GetBlockByHash(target.Hash())
synced = tb != nil
block = tb.Header()
} else {
timestamp := time.Unix(int64(ev.Latest.Time), 0)
synced = time.Since(timestamp) < 10*time.Minute
block = ev.Latest
} }
}
// Set the finalized and safe markers relative to the current head. if synced {
// The finalized marker is set two epochs behind the target, log.Info("Sync target reached", "number", block.Number.Uint64(), "hash", block.Hash())
// and the safe marker is set one epoch behind the target. go s.stack.Close() // async since we need to close ourselves
head := s.backend.BlockChain().CurrentHeader()
if head == nil {
continue
}
if header := s.backend.BlockChain().GetHeaderByNumber(head.Number.Uint64() - params.EpochLength*2); header != nil {
if final := s.backend.BlockChain().CurrentFinalBlock(); final == nil || final.Number.Cmp(header.Number) < 0 {
s.backend.BlockChain().SetFinalized(header)
}
}
if header := s.backend.BlockChain().GetHeaderByNumber(head.Number.Uint64() - params.EpochLength); header != nil {
if safe := s.backend.BlockChain().CurrentSafeBlock(); safe == nil || safe.Number.Cmp(header.Number) < 0 {
s.backend.BlockChain().SetSafe(header)
} }
} }
@ -179,10 +205,10 @@ func (s *Syncer) run() {
func (s *Syncer) Start() error { func (s *Syncer) Start() error {
s.wg.Add(1) s.wg.Add(1)
go s.run() go s.run()
if s.target == (common.Hash{}) { if s.config.TargetBlock == (common.Hash{}) {
return nil return nil
} }
return s.Sync(s.target) return s.Sync(s.config.TargetBlock)
} }
// Stop terminates the synchronization service and stop all background activities. // Stop terminates the synchronization service and stop all background activities.