From 8b640a3b68a3c6d827edc260460f357f9bd3d08e Mon Sep 17 00:00:00 2001 From: jsvisa Date: Wed, 17 Sep 2025 23:27:12 +0800 Subject: [PATCH] eth, cmd: integrate the sync monitor into syncer service --- cmd/geth/config.go | 10 ++-- cmd/geth/main.go | 27 ----------- cmd/utils/flags.go | 8 ++-- eth/syncer/syncer.go | 108 +++++++++++++++++++++++++++---------------- 4 files changed, 78 insertions(+), 75 deletions(-) diff --git a/cmd/geth/config.go b/cmd/geth/config.go index 8e2db32d76..40458186f4 100644 --- a/cmd/geth/config.go +++ b/cmd/geth/config.go @@ -38,6 +38,7 @@ import ( "github.com/ethereum/go-ethereum/crypto" "github.com/ethereum/go-ethereum/eth/catalyst" "github.com/ethereum/go-ethereum/eth/ethconfig" + "github.com/ethereum/go-ethereum/eth/syncer" "github.com/ethereum/go-ethereum/internal/flags" "github.com/ethereum/go-ethereum/internal/telemetry/tracesetup" "github.com/ethereum/go-ethereum/internal/version" @@ -276,16 +277,19 @@ func makeFullNode(ctx *cli.Context) *node.Node { if cfg.Ethstats.URL != "" { utils.RegisterEthStatsService(stack, backend, cfg.Ethstats.URL) } + // Configure synchronization override service - var synctarget common.Hash + syncConfig := syncer.Config{ + ExitWhenSynced: ctx.Bool(utils.ExitWhenSyncedFlag.Name), + } if ctx.IsSet(utils.SyncTargetFlag.Name) { target := ctx.String(utils.SyncTargetFlag.Name) if !common.IsHexHash(target) { utils.Fatalf("sync target hash is not a valid hex hash: %s", target) } - synctarget = common.HexToHash(target) + syncConfig.TargetBlock = common.HexToHash(target) } - utils.RegisterSyncOverrideService(stack, eth, synctarget, ctx.Bool(utils.ExitWhenSyncedFlag.Name)) + utils.RegisterSyncOverrideService(stack, eth, syncConfig) if ctx.IsSet(utils.DeveloperFlag.Name) { // Start dev mode. diff --git a/cmd/geth/main.go b/cmd/geth/main.go index ae869ec970..2a5b6b627c 100644 --- a/cmd/geth/main.go +++ b/cmd/geth/main.go @@ -22,13 +22,10 @@ import ( "os" "slices" "sort" - "time" "github.com/ethereum/go-ethereum/accounts" "github.com/ethereum/go-ethereum/cmd/utils" - "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/console/prompt" - "github.com/ethereum/go-ethereum/eth/downloader" "github.com/ethereum/go-ethereum/ethclient" "github.com/ethereum/go-ethereum/internal/debug" "github.com/ethereum/go-ethereum/internal/flags" @@ -386,28 +383,4 @@ func startNode(ctx *cli.Context, stack *node.Node, isConsole bool) { } } }() - - // Spawn a standalone goroutine for status synchronization monitoring, - // close the node when synchronization is complete if user required. - if ctx.Bool(utils.ExitWhenSyncedFlag.Name) { - go func() { - sub := stack.EventMux().Subscribe(downloader.DoneEvent{}) - defer sub.Unsubscribe() - for { - event := <-sub.Chan() - if event == nil { - continue - } - done, ok := event.Data.(downloader.DoneEvent) - if !ok { - continue - } - if timestamp := time.Unix(int64(done.Latest.Time), 0); time.Since(timestamp) < 10*time.Minute { - log.Info("Synchronisation completed", "latestnum", done.Latest.Number, "latesthash", done.Latest.Hash(), - "age", common.PrettyAge(timestamp)) - stack.Close() - } - } - }() - } } diff --git a/cmd/utils/flags.go b/cmd/utils/flags.go index 9d996f15cb..c1d5fc076c 100644 --- a/cmd/utils/flags.go +++ b/cmd/utils/flags.go @@ -2228,13 +2228,13 @@ func RegisterFilterAPI(stack *node.Node, backend ethapi.Backend, ethcfg *ethconf } // RegisterSyncOverrideService adds the synchronization override service into node. -func RegisterSyncOverrideService(stack *node.Node, eth *eth.Ethereum, target common.Hash, exitWhenSynced bool) { - if target != (common.Hash{}) { - log.Info("Registered sync override service", "hash", target, "exitWhenSynced", exitWhenSynced) +func RegisterSyncOverrideService(stack *node.Node, eth *eth.Ethereum, config syncer.Config) { + if config.TargetBlock != (common.Hash{}) { + log.Info("Registered sync override service", "hash", config.TargetBlock, "exitWhenSynced", config.ExitWhenSynced) } else { log.Info("Registered sync override service") } - syncer.Register(stack, eth, target, exitWhenSynced) + syncer.Register(stack, eth, config) } // SetupMetrics configures the metrics system. diff --git a/eth/syncer/syncer.go b/eth/syncer/syncer.go index c0d54b953b..b04d8f22e8 100644 --- a/eth/syncer/syncer.go +++ b/eth/syncer/syncer.go @@ -26,6 +26,7 @@ import ( "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/eth" + "github.com/ethereum/go-ethereum/eth/downloader" "github.com/ethereum/go-ethereum/eth/ethconfig" "github.com/ethereum/go-ethereum/log" "github.com/ethereum/go-ethereum/node" @@ -37,32 +38,40 @@ type syncReq struct { errc chan error } +type Config struct { + TargetBlock common.Hash // if set, sync is triggered at startup + ExitWhenSynced bool // if true, the node shuts down after sync has finished +} + // Syncer is an auxiliary service that allows Geth to perform full sync // alone without consensus-layer attached. Users must specify a valid block hash // as the sync target. // +// Additionally, the syncer can be used to monitor state synchronization. +// It will exit once the specified target has been reached or when the +// most recent chain head is caught up. +// // This tool can be applied to different networks, no matter it's pre-merge or // post-merge, but only for full-sync. type Syncer struct { - stack *node.Node - backend *eth.Ethereum - target common.Hash - request chan *syncReq - closed chan struct{} - wg sync.WaitGroup - exitWhenSynced bool + stack *node.Node + backend *eth.Ethereum + request chan *syncReq + closed chan struct{} + wg sync.WaitGroup + + config Config } // Register registers the synchronization override service into the node // stack for launching and stopping the service controlled by node. -func Register(stack *node.Node, backend *eth.Ethereum, target common.Hash, exitWhenSynced bool) (*Syncer, error) { +func Register(stack *node.Node, backend *eth.Ethereum, cfg Config) (*Syncer, error) { s := &Syncer{ - stack: stack, - backend: backend, - target: target, - request: make(chan *syncReq), - closed: make(chan struct{}), - exitWhenSynced: exitWhenSynced, + stack: stack, + backend: backend, + request: make(chan *syncReq), + closed: make(chan struct{}), + config: cfg, } stack.RegisterAPIs(s.APIs()) stack.RegisterLifecycle(s) @@ -88,9 +97,11 @@ func (s *Syncer) run() { var ( target *types.Header - ticker = time.NewTicker(time.Second * 5) + syncCh = make(chan downloader.SyncEvent, 10) ) - defer ticker.Stop() + sub := s.backend.Downloader().SubscribeSyncEvents(syncCh) + defer sub.Unsubscribe() + for { select { case req := <-s.request: @@ -137,35 +148,50 @@ func (s *Syncer) run() { } } - case <-ticker.C: - if target == nil { + case ev := <-syncCh: + if ev.Type == downloader.SyncStarted { + log.Debug("Synchronization started") continue } + if ev.Type == downloader.SyncFailed { + log.Debug("Synchronization failed", "err", ev.Err) + continue + } + + head := s.backend.BlockChain().CurrentHeader() + if head != nil { + // Set the finalized and safe markers relative to the current head. + // The finalized marker is set two epochs behind the target, + // and the safe marker is set one epoch behind the target. + if header := s.backend.BlockChain().GetHeaderByNumber(head.Number.Uint64() - params.EpochLength*2); header != nil { + if final := s.backend.BlockChain().CurrentFinalBlock(); final == nil || final.Number.Cmp(header.Number) < 0 { + s.backend.BlockChain().SetFinalized(header) + } + } + if header := s.backend.BlockChain().GetHeaderByNumber(head.Number.Uint64() - params.EpochLength); header != nil { + if safe := s.backend.BlockChain().CurrentSafeBlock(); safe == nil || safe.Number.Cmp(header.Number) < 0 { + s.backend.BlockChain().SetSafe(header) + } + } + } // Terminate the node if the target has been reached - if s.exitWhenSynced { - if block := s.backend.BlockChain().GetBlockByHash(target.Hash()); block != nil { - log.Info("Sync target reached", "number", block.NumberU64(), "hash", block.Hash()) - go s.stack.Close() // async since we need to close ourselves - return + if s.config.ExitWhenSynced { + var synced bool + var block *types.Header + if target != nil { + tb := s.backend.BlockChain().GetBlockByHash(target.Hash()) + synced = tb != nil + block = tb.Header() + } else { + timestamp := time.Unix(int64(ev.Latest.Time), 0) + synced = time.Since(timestamp) < 10*time.Minute + block = ev.Latest } - } - // Set the finalized and safe markers relative to the current head. - // The finalized marker is set two epochs behind the target, - // and the safe marker is set one epoch behind the target. - head := s.backend.BlockChain().CurrentHeader() - if head == nil { - continue - } - if header := s.backend.BlockChain().GetHeaderByNumber(head.Number.Uint64() - params.EpochLength*2); header != nil { - if final := s.backend.BlockChain().CurrentFinalBlock(); final == nil || final.Number.Cmp(header.Number) < 0 { - s.backend.BlockChain().SetFinalized(header) - } - } - if header := s.backend.BlockChain().GetHeaderByNumber(head.Number.Uint64() - params.EpochLength); header != nil { - if safe := s.backend.BlockChain().CurrentSafeBlock(); safe == nil || safe.Number.Cmp(header.Number) < 0 { - s.backend.BlockChain().SetSafe(header) + if synced { + log.Info("Sync target reached", "number", block.Number.Uint64(), "hash", block.Hash()) + go s.stack.Close() // async since we need to close ourselves } } @@ -179,10 +205,10 @@ func (s *Syncer) run() { func (s *Syncer) Start() error { s.wg.Add(1) go s.run() - if s.target == (common.Hash{}) { + if s.config.TargetBlock == (common.Hash{}) { return nil } - return s.Sync(s.target) + return s.Sync(s.config.TargetBlock) } // Stop terminates the synchronization service and stop all background activities.