go-ethereum/eth/syncer/syncer.go
Delweng 1abbae239d
Some checks are pending
/ Linux Build (push) Waiting to run
/ Linux Build (arm) (push) Waiting to run
/ Keeper Build (push) Waiting to run
/ Windows Build (push) Waiting to run
/ Docker Image (push) Waiting to run
eth,node: replace the deprecated TypeMux with Feed (#32585)
replace the not used event.Typemux to event.Feed

---------

Co-authored-by: Felix Lange <fjl@twurst.com>
2026-05-08 10:12:46 +08:00

251 lines
7.2 KiB
Go

// Copyright 2025 The go-ethereum Authors
// This file is part of the go-ethereum library.
//
// The go-ethereum library is free software: you can redistribute it and/or modify
// it under the terms of the GNU Lesser General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// The go-ethereum library is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Lesser General Public License for more details.
//
// You should have received a copy of the GNU Lesser General Public License
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
package syncer
import (
"errors"
"fmt"
"sync"
"time"
"github.com/ethereum/go-ethereum/beacon/params"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/eth"
"github.com/ethereum/go-ethereum/eth/downloader"
"github.com/ethereum/go-ethereum/eth/ethconfig"
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/node"
"github.com/ethereum/go-ethereum/rpc"
)
type syncReq struct {
hash common.Hash
errc chan error
}
type Config struct {
TargetBlock common.Hash // if set, sync is triggered at startup
ExitWhenSynced bool // if true, the node shuts down after sync has finished
}
// Syncer is an auxiliary service that allows Geth to perform full sync
// alone without consensus-layer attached. Users must specify a valid block hash
// as the sync target.
//
// Additionally, the syncer can be used to monitor state synchronization.
// It will exit once the specified target has been reached or when the
// most recent chain head is caught up.
//
// This tool can be applied to different networks, no matter it's pre-merge or
// post-merge, but only for full-sync.
type Syncer struct {
stack *node.Node
backend *eth.Ethereum
request chan *syncReq
closed chan struct{}
wg sync.WaitGroup
config Config
}
// Register registers the synchronization override service into the node
// stack for launching and stopping the service controlled by node.
func Register(stack *node.Node, backend *eth.Ethereum, cfg Config) (*Syncer, error) {
s := &Syncer{
stack: stack,
backend: backend,
request: make(chan *syncReq),
closed: make(chan struct{}),
config: cfg,
}
stack.RegisterAPIs(s.APIs())
stack.RegisterLifecycle(s)
return s, nil
}
// APIs return the collection of RPC services the ethereum package offers.
// NOTE, some of these services probably need to be moved to somewhere else.
func (s *Syncer) APIs() []rpc.API {
return []rpc.API{
{
Namespace: "debug",
Service: NewAPI(s),
},
}
}
// run is the main loop that monitors sync requests from users and initiates
// sync operations when necessary. It also checks whether the specified target
// has been reached and shuts down Geth if requested by the user.
func (s *Syncer) run() {
defer s.wg.Done()
var (
target *types.Header
syncCh = make(chan downloader.SyncEvent, 10)
)
sub := s.backend.Downloader().SubscribeSyncEvents(syncCh)
defer sub.Unsubscribe()
for {
select {
case req := <-s.request:
var (
resync bool
retries int
logged bool
)
for {
if retries >= 10 {
req.errc <- fmt.Errorf("sync target is not available, %x", req.hash)
break
}
select {
case <-s.closed:
req.errc <- errors.New("syncer closed")
return
default:
}
header, err := s.backend.Downloader().GetHeader(req.hash)
if err != nil {
if !logged {
logged = true
log.Info("Waiting for peers to retrieve sync target", "hash", req.hash)
}
time.Sleep(time.Second * time.Duration(retries+1))
retries++
continue
}
if target != nil && header.Number.Cmp(target.Number) <= 0 {
req.errc <- fmt.Errorf("stale sync target, current: %d, received: %d", target.Number, header.Number)
break
}
target = header
resync = true
break
}
if resync {
if mode := s.backend.Downloader().ConfigSyncMode(); mode != ethconfig.FullSync {
req.errc <- fmt.Errorf("unsupported syncmode %v, please relaunch geth with --syncmode full", mode)
} else {
req.errc <- s.backend.Downloader().BeaconDevSync(target)
}
}
case ev := <-syncCh:
if ev.Type == downloader.SyncStarted {
log.Debug("Synchronization started")
continue
}
if ev.Type == downloader.SyncFailed {
log.Debug("Synchronization failed", "err", ev.Err)
continue
}
head := s.backend.BlockChain().CurrentHeader()
if head != nil {
// Set the finalized and safe markers relative to the current head.
// The finalized marker is set two epochs behind the target,
// and the safe marker is set one epoch behind the target.
if header := s.backend.BlockChain().GetHeaderByNumber(head.Number.Uint64() - params.EpochLength*2); header != nil {
if final := s.backend.BlockChain().CurrentFinalBlock(); final == nil || final.Number.Cmp(header.Number) < 0 {
s.backend.BlockChain().SetFinalized(header)
}
}
if header := s.backend.BlockChain().GetHeaderByNumber(head.Number.Uint64() - params.EpochLength); header != nil {
if safe := s.backend.BlockChain().CurrentSafeBlock(); safe == nil || safe.Number.Cmp(header.Number) < 0 {
s.backend.BlockChain().SetSafe(header)
}
}
}
// Terminate the node if the target has been reached
if s.config.ExitWhenSynced {
var synced bool
var block *types.Header
if target != nil {
tb := s.backend.BlockChain().GetBlockByHash(target.Hash())
synced = tb != nil
block = tb.Header()
} else {
timestamp := time.Unix(int64(ev.Latest.Time), 0)
synced = time.Since(timestamp) < 10*time.Minute
block = ev.Latest
}
if synced {
log.Info("Sync target reached", "number", block.Number.Uint64(), "hash", block.Hash())
go s.stack.Close() // async since we need to close ourselves
}
}
case <-s.closed:
return
}
}
}
// Start launches the synchronization service.
func (s *Syncer) Start() error {
s.wg.Add(1)
go s.run()
if s.config.TargetBlock == (common.Hash{}) {
return nil
}
return s.Sync(s.config.TargetBlock)
}
// Stop terminates the synchronization service and stop all background activities.
// This function can only be called for one time.
func (s *Syncer) Stop() error {
close(s.closed)
s.wg.Wait()
return nil
}
// Sync sets the synchronization target. Notably, setting a target lower than the
// previous one is not allowed, as backward synchronization is not supported.
func (s *Syncer) Sync(hash common.Hash) error {
req := &syncReq{
hash: hash,
errc: make(chan error, 1),
}
select {
case s.request <- req:
return <-req.errc
case <-s.closed:
return errors.New("syncer is closed")
}
}
// API is the collection of synchronization service APIs for debugging the
// protocol.
type API struct {
s *Syncer
}
// NewAPI creates a new debug API instance.
func NewAPI(s *Syncer) *API {
return &API{s: s}
}
// Sync initiates a full sync to the target block hash.
func (api *API) Sync(target common.Hash) error {
return api.s.Sync(target)
}