From 1abbae239d9a9f5797a5967350023c0f6b6aabb9 Mon Sep 17 00:00:00 2001 From: Delweng Date: Fri, 8 May 2026 10:12:46 +0800 Subject: [PATCH] eth,node: replace the deprecated TypeMux with Feed (#32585) replace the not used event.Typemux to event.Feed --------- Co-authored-by: Felix Lange --- cmd/geth/config.go | 10 ++- cmd/geth/main.go | 27 -------- cmd/utils/flags.go | 8 +-- eth/backend.go | 7 +- eth/downloader/api.go | 22 +++--- eth/downloader/downloader.go | 28 +++++--- eth/downloader/downloader_test.go | 3 +- eth/downloader/events.go | 24 +++++-- eth/handler.go | 35 ++++------ eth/syncer/syncer.go | 108 ++++++++++++++++++------------ node/node.go | 9 --- 11 files changed, 140 insertions(+), 141 deletions(-) diff --git a/cmd/geth/config.go b/cmd/geth/config.go index 8e2db32d76..40458186f4 100644 --- a/cmd/geth/config.go +++ b/cmd/geth/config.go @@ -38,6 +38,7 @@ import ( "github.com/ethereum/go-ethereum/crypto" "github.com/ethereum/go-ethereum/eth/catalyst" "github.com/ethereum/go-ethereum/eth/ethconfig" + "github.com/ethereum/go-ethereum/eth/syncer" "github.com/ethereum/go-ethereum/internal/flags" "github.com/ethereum/go-ethereum/internal/telemetry/tracesetup" "github.com/ethereum/go-ethereum/internal/version" @@ -276,16 +277,19 @@ func makeFullNode(ctx *cli.Context) *node.Node { if cfg.Ethstats.URL != "" { utils.RegisterEthStatsService(stack, backend, cfg.Ethstats.URL) } + // Configure synchronization override service - var synctarget common.Hash + syncConfig := syncer.Config{ + ExitWhenSynced: ctx.Bool(utils.ExitWhenSyncedFlag.Name), + } if ctx.IsSet(utils.SyncTargetFlag.Name) { target := ctx.String(utils.SyncTargetFlag.Name) if !common.IsHexHash(target) { utils.Fatalf("sync target hash is not a valid hex hash: %s", target) } - synctarget = common.HexToHash(target) + syncConfig.TargetBlock = common.HexToHash(target) } - utils.RegisterSyncOverrideService(stack, eth, synctarget, ctx.Bool(utils.ExitWhenSyncedFlag.Name)) + utils.RegisterSyncOverrideService(stack, eth, syncConfig) if ctx.IsSet(utils.DeveloperFlag.Name) { // Start dev mode. diff --git a/cmd/geth/main.go b/cmd/geth/main.go index c8d7abc65b..850e26d161 100644 --- a/cmd/geth/main.go +++ b/cmd/geth/main.go @@ -22,13 +22,10 @@ import ( "os" "slices" "sort" - "time" "github.com/ethereum/go-ethereum/accounts" "github.com/ethereum/go-ethereum/cmd/utils" - "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/console/prompt" - "github.com/ethereum/go-ethereum/eth/downloader" "github.com/ethereum/go-ethereum/ethclient" "github.com/ethereum/go-ethereum/internal/debug" "github.com/ethereum/go-ethereum/internal/flags" @@ -387,28 +384,4 @@ func startNode(ctx *cli.Context, stack *node.Node, isConsole bool) { } } }() - - // Spawn a standalone goroutine for status synchronization monitoring, - // close the node when synchronization is complete if user required. - if ctx.Bool(utils.ExitWhenSyncedFlag.Name) { - go func() { - sub := stack.EventMux().Subscribe(downloader.DoneEvent{}) - defer sub.Unsubscribe() - for { - event := <-sub.Chan() - if event == nil { - continue - } - done, ok := event.Data.(downloader.DoneEvent) - if !ok { - continue - } - if timestamp := time.Unix(int64(done.Latest.Time), 0); time.Since(timestamp) < 10*time.Minute { - log.Info("Synchronisation completed", "latestnum", done.Latest.Number, "latesthash", done.Latest.Hash(), - "age", common.PrettyAge(timestamp)) - stack.Close() - } - } - }() - } } diff --git a/cmd/utils/flags.go b/cmd/utils/flags.go index cc4c3bff5c..ea0f6f5ee4 100644 --- a/cmd/utils/flags.go +++ b/cmd/utils/flags.go @@ -2237,13 +2237,13 @@ func RegisterFilterAPI(stack *node.Node, backend ethapi.Backend, ethcfg *ethconf } // RegisterSyncOverrideService adds the synchronization override service into node. -func RegisterSyncOverrideService(stack *node.Node, eth *eth.Ethereum, target common.Hash, exitWhenSynced bool) { - if target != (common.Hash{}) { - log.Info("Registered sync override service", "hash", target, "exitWhenSynced", exitWhenSynced) +func RegisterSyncOverrideService(stack *node.Node, eth *eth.Ethereum, config syncer.Config) { + if config.TargetBlock != (common.Hash{}) { + log.Info("Registered sync override service", "hash", config.TargetBlock, "exitWhenSynced", config.ExitWhenSynced) } else { log.Info("Registered sync override service") } - syncer.Register(stack, eth, target, exitWhenSynced) + syncer.Register(stack, eth, config) } // SetupMetrics configures the metrics system. diff --git a/eth/backend.go b/eth/backend.go index 6cfd1f6fa0..af8b04bda6 100644 --- a/eth/backend.go +++ b/eth/backend.go @@ -49,7 +49,6 @@ import ( "github.com/ethereum/go-ethereum/eth/protocols/snap" "github.com/ethereum/go-ethereum/eth/tracers" "github.com/ethereum/go-ethereum/ethdb" - "github.com/ethereum/go-ethereum/event" "github.com/ethereum/go-ethereum/internal/ethapi" "github.com/ethereum/go-ethereum/internal/shutdowncheck" "github.com/ethereum/go-ethereum/internal/version" @@ -105,7 +104,6 @@ type Ethereum struct { // DB interfaces chainDb ethdb.Database // Block chain database - eventMux *event.TypeMux engine consensus.Engine accountManager *accounts.Manager @@ -194,7 +192,6 @@ func New(stack *node.Node, config *ethconfig.Config) (*Ethereum, error) { eth := &Ethereum{ config: config, chainDb: chainDb, - eventMux: stack.EventMux(), accountManager: stack.AccountManager(), engine: engine, networkID: networkID, @@ -344,7 +341,6 @@ func New(stack *node.Node, config *ethconfig.Config) (*Ethereum, error) { Network: networkID, Sync: config.SyncMode, BloomCache: uint64(cacheLimit), - EventMux: eth.eventMux, RequiredBlocks: config.RequiredBlocks, }); err != nil { return nil, err @@ -405,7 +401,7 @@ func (s *Ethereum) APIs() []rpc.API { Service: NewMinerAPI(s), }, { Namespace: "eth", - Service: downloader.NewDownloaderAPI(s.handler.downloader, s.blockchain, s.eventMux), + Service: downloader.NewDownloaderAPI(s.handler.downloader, s.blockchain), }, { Namespace: "admin", Service: NewAdminAPI(s), @@ -600,7 +596,6 @@ func (s *Ethereum) Stop() error { s.shutdownTracker.Stop() s.chainDb.Close() - s.eventMux.Stop() return nil } diff --git a/eth/downloader/api.go b/eth/downloader/api.go index 1fea35775e..6033e44474 100644 --- a/eth/downloader/api.go +++ b/eth/downloader/api.go @@ -23,7 +23,6 @@ import ( "github.com/ethereum/go-ethereum" "github.com/ethereum/go-ethereum/core" - "github.com/ethereum/go-ethereum/event" "github.com/ethereum/go-ethereum/rpc" ) @@ -33,20 +32,18 @@ import ( type DownloaderAPI struct { d *Downloader chain *core.BlockChain - mux *event.TypeMux installSyncSubscription chan chan interface{} uninstallSyncSubscription chan *uninstallSyncSubscriptionRequest } // NewDownloaderAPI creates a new DownloaderAPI. The API has an internal event loop that -// listens for events from the downloader through the global event mux. In case it receives one of +// listens for events from the downloader through the event feed. In case it receives one of // these events it broadcasts it to all syncing subscriptions that are installed through the // installSyncSubscription channel. -func NewDownloaderAPI(d *Downloader, chain *core.BlockChain, m *event.TypeMux) *DownloaderAPI { +func NewDownloaderAPI(d *Downloader, chain *core.BlockChain) *DownloaderAPI { api := &DownloaderAPI{ d: d, chain: chain, - mux: m, installSyncSubscription: make(chan chan interface{}), uninstallSyncSubscription: make(chan *uninstallSyncSubscriptionRequest), } @@ -66,7 +63,8 @@ func NewDownloaderAPI(d *Downloader, chain *core.BlockChain, m *event.TypeMux) * // receive is {false}. func (api *DownloaderAPI) eventLoop() { var ( - sub = api.mux.Subscribe(StartEvent{}) + events = make(chan SyncEvent, 16) + sub = api.d.SubscribeSyncEvents(events) syncSubscriptions = make(map[chan interface{}]struct{}) checkInterval = time.Second * 60 checkTimer = time.NewTimer(checkInterval) @@ -90,6 +88,7 @@ func (api *DownloaderAPI) eventLoop() { } ) defer checkTimer.Stop() + defer sub.Unsubscribe() for { select { @@ -101,14 +100,13 @@ func (api *DownloaderAPI) eventLoop() { case u := <-api.uninstallSyncSubscription: delete(syncSubscriptions, u.c) close(u.uninstalled) - case event := <-sub.Chan(): - if event == nil { - return - } - switch event.Data.(type) { - case StartEvent: + case ev := <-events: + if ev.Type == SyncStarted { started = true } + case <-sub.Err(): + // The downloader is terminated or other internal error occurs + return case <-checkTimer.C: if !started { checkTimer.Reset(checkInterval) diff --git a/eth/downloader/downloader.go b/eth/downloader/downloader.go index 1de0933842..4a575d6856 100644 --- a/eth/downloader/downloader.go +++ b/eth/downloader/downloader.go @@ -97,9 +97,12 @@ type headerTask struct { } type Downloader struct { - mode atomic.Uint32 // Synchronisation mode defining the strategy used (per sync cycle), use d.getMode() to get the SyncMode - moder *syncModer // Sync mode management, deliver the appropriate sync mode choice for each cycle - mux *event.TypeMux // Event multiplexer to announce sync operation events + mode atomic.Uint32 // Synchronisation mode defining the strategy used (per sync cycle), use d.getMode() to get the SyncMode + moder *syncModer // Sync mode management, deliver the appropriate sync mode choice for each cycle + + // Event feed for downloader events + feed event.FeedOf[SyncEvent] + scope event.SubscriptionScope queue *queue // Scheduler for selecting the hashes to download peers *peerSet // Set of active peers from which download can proceed @@ -229,12 +232,11 @@ type BlockChain interface { } // New creates a new downloader to fetch hashes and blocks from remote peers. -func New(stateDb ethdb.Database, mode ethconfig.SyncMode, mux *event.TypeMux, chain BlockChain, dropPeer peerDropFn, success func()) *Downloader { +func New(stateDb ethdb.Database, mode ethconfig.SyncMode, chain BlockChain, dropPeer peerDropFn, success func()) *Downloader { cutoffNumber, cutoffHash := chain.HistoryPruningCutoff() dl := &Downloader{ stateDB: stateDb, moder: newSyncModer(mode, chain, stateDb), - mux: mux, queue: newQueue(blockCacheMaxItems, blockCacheInitialItems), peers: newPeerSet(), blockchain: chain, @@ -427,20 +429,25 @@ func (d *Downloader) ConfigSyncMode() SyncMode { return d.moder.get(false) } +// SubscribeSyncEvents creates a subscription for downloader sync events +func (d *Downloader) SubscribeSyncEvents(ch chan<- SyncEvent) event.Subscription { + return d.scope.Track(d.feed.Subscribe(ch)) +} + // syncToHead starts a block synchronization based on the hash chain from // the specified head hash. func (d *Downloader) syncToHead() (err error) { - d.mux.Post(StartEvent{}) + mode := d.getMode() + d.feed.Send(SyncEvent{Type: SyncStarted, Mode: mode}) defer func() { // reset on error if err != nil { - d.mux.Post(FailedEvent{err}) + d.feed.Send(SyncEvent{Type: SyncFailed, Mode: mode, Err: err}) } else { latest := d.blockchain.CurrentHeader() - d.mux.Post(DoneEvent{latest}) + d.feed.Send(SyncEvent{Type: SyncCompleted, Mode: mode, Latest: latest}) } }() - mode := d.getMode() log.Debug("Backfilling with the network", "mode", mode) defer func(start time.Time) { @@ -662,6 +669,9 @@ func (d *Downloader) Cancel() { // Terminate interrupts the downloader, canceling all pending operations. // The downloader cannot be reused after calling Terminate. func (d *Downloader) Terminate() { + // Unsubscribe all subscriptions registered from downloader + d.scope.Close() + // Close the termination channel (make sure double close is allowed) d.quitLock.Lock() select { diff --git a/eth/downloader/downloader_test.go b/eth/downloader/downloader_test.go index 6d5d159631..e6c477cd33 100644 --- a/eth/downloader/downloader_test.go +++ b/eth/downloader/downloader_test.go @@ -32,7 +32,6 @@ import ( "github.com/ethereum/go-ethereum/eth/ethconfig" "github.com/ethereum/go-ethereum/eth/protocols/eth" "github.com/ethereum/go-ethereum/eth/protocols/snap" - "github.com/ethereum/go-ethereum/event" "github.com/ethereum/go-ethereum/log" "github.com/ethereum/go-ethereum/params" "github.com/ethereum/go-ethereum/rlp" @@ -75,7 +74,7 @@ func newTesterWithNotification(t *testing.T, mode ethconfig.SyncMode, success fu chain: chain, peers: make(map[string]*downloadTesterPeer), } - tester.downloader = New(db, mode, new(event.TypeMux), tester.chain, tester.dropPeer, success) + tester.downloader = New(db, mode, tester.chain, tester.dropPeer, success) return tester } diff --git a/eth/downloader/events.go b/eth/downloader/events.go index 25255a3a72..0fb380a857 100644 --- a/eth/downloader/events.go +++ b/eth/downloader/events.go @@ -16,10 +16,24 @@ package downloader -import "github.com/ethereum/go-ethereum/core/types" +import ( + "github.com/ethereum/go-ethereum/core/types" + "github.com/ethereum/go-ethereum/eth/ethconfig" +) -type DoneEvent struct { - Latest *types.Header +// SyncEventType represents the type of sync event +type SyncEventType int + +const ( + SyncStarted SyncEventType = iota + SyncFailed + SyncCompleted +) + +// SyncEvent represents a downloader synchronization event +type SyncEvent struct { + Type SyncEventType + Mode ethconfig.SyncMode + Err error // Set when Type is SyncFailed + Latest *types.Header // Set when Type is SyncCompleted } -type StartEvent struct{} -type FailedEvent struct{ Err error } diff --git a/eth/handler.go b/eth/handler.go index 27b5e60697..76df635fb0 100644 --- a/eth/handler.go +++ b/eth/handler.go @@ -107,7 +107,6 @@ type handlerConfig struct { Network uint64 // Network identifier to advertise Sync ethconfig.SyncMode // Whether to snap or full sync BloomCache uint64 // Megabytes to alloc for snap sync bloom - EventMux *event.TypeMux // Legacy event mux, deprecate for `feed` RequiredBlocks map[uint64]common.Hash // Hard coded map of required block hashes for sync challenges } @@ -126,7 +125,6 @@ type handler struct { peers *peerSet txBroadcastKey [16]byte - eventMux *event.TypeMux txsCh chan core.NewTxsEvent txsSub event.Subscription blockRange *blockRangeState @@ -144,14 +142,9 @@ type handler struct { // newHandler returns a handler for all Ethereum chain management protocol. func newHandler(config *handlerConfig) (*handler, error) { - // Create the protocol manager with the base fields - if config.EventMux == nil { - config.EventMux = new(event.TypeMux) // Nicety initialization for tests - } h := &handler{ nodeID: config.NodeID, networkID: config.Network, - eventMux: config.EventMux, database: config.Database, txpool: config.TxPool, chain: config.Chain, @@ -163,7 +156,7 @@ func newHandler(config *handlerConfig) (*handler, error) { handlerStartCh: make(chan struct{}), } // Construct the downloader (long sync) - h.downloader = downloader.New(config.Database, config.Sync, h.eventMux, h.chain, h.removePeer, h.enableSyncedFeatures) + h.downloader = downloader.New(config.Database, config.Sync, h.chain, h.removePeer, h.enableSyncedFeatures) // If snap sync is requested but snapshots are disabled, fail loudly if h.downloader.ConfigSyncMode() == ethconfig.SnapSync && (config.Chain.Snapshots() == nil && config.Chain.TrieDB().Scheme() == rawdb.HashScheme) { @@ -420,7 +413,7 @@ func (h *handler) Start(maxPeers int) { // broadcast block range h.wg.Add(1) - h.blockRange = newBlockRangeState(h.chain, h.eventMux) + h.blockRange = newBlockRangeState(h.chain, h.downloader) go h.blockRangeLoop(h.blockRange) // start sync handlers @@ -536,16 +529,19 @@ type blockRangeState struct { next atomic.Pointer[eth.BlockRangeUpdatePacket] headCh chan core.ChainHeadEvent headSub event.Subscription - syncSub *event.TypeMuxSubscription + syncCh chan downloader.SyncEvent + syncSub event.Subscription } -func newBlockRangeState(chain *core.BlockChain, typeMux *event.TypeMux) *blockRangeState { +func newBlockRangeState(chain *core.BlockChain, dl *downloader.Downloader) *blockRangeState { headCh := make(chan core.ChainHeadEvent, chainHeadChanSize) headSub := chain.SubscribeChainHeadEvent(headCh) - syncSub := typeMux.Subscribe(downloader.StartEvent{}, downloader.DoneEvent{}, downloader.FailedEvent{}) + syncCh := make(chan downloader.SyncEvent, 16) + syncSub := dl.SubscribeSyncEvents(syncCh) st := &blockRangeState{ headCh: headCh, headSub: headSub, + syncCh: syncCh, syncSub: syncSub, } st.update(chain, chain.CurrentBlock()) @@ -561,11 +557,8 @@ func (h *handler) blockRangeLoop(st *blockRangeState) { for { select { - case ev := <-st.syncSub.Chan(): - if ev == nil { - continue - } - if _, ok := ev.Data.(downloader.StartEvent); ok && h.downloader.ConfigSyncMode() == ethconfig.SnapSync { + case ev := <-st.syncCh: + if ev.Type == downloader.SyncStarted && ev.Mode == ethconfig.SnapSync { h.blockRangeWhileSnapSyncing(st) } case <-st.headCh: @@ -593,12 +586,8 @@ func (h *handler) blockRangeWhileSnapSyncing(st *blockRangeState) { h.broadcastBlockRange(st) } // back to processing head block updates when sync is done - case ev := <-st.syncSub.Chan(): - if ev == nil { - continue - } - switch ev.Data.(type) { - case downloader.FailedEvent, downloader.DoneEvent: + case ev := <-st.syncCh: + if ev.Type == downloader.SyncFailed || ev.Type == downloader.SyncCompleted { return } // ignore head updates, but exit when the subscription ends diff --git a/eth/syncer/syncer.go b/eth/syncer/syncer.go index c0d54b953b..b04d8f22e8 100644 --- a/eth/syncer/syncer.go +++ b/eth/syncer/syncer.go @@ -26,6 +26,7 @@ import ( "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/eth" + "github.com/ethereum/go-ethereum/eth/downloader" "github.com/ethereum/go-ethereum/eth/ethconfig" "github.com/ethereum/go-ethereum/log" "github.com/ethereum/go-ethereum/node" @@ -37,32 +38,40 @@ type syncReq struct { errc chan error } +type Config struct { + TargetBlock common.Hash // if set, sync is triggered at startup + ExitWhenSynced bool // if true, the node shuts down after sync has finished +} + // Syncer is an auxiliary service that allows Geth to perform full sync // alone without consensus-layer attached. Users must specify a valid block hash // as the sync target. // +// Additionally, the syncer can be used to monitor state synchronization. +// It will exit once the specified target has been reached or when the +// most recent chain head is caught up. +// // This tool can be applied to different networks, no matter it's pre-merge or // post-merge, but only for full-sync. type Syncer struct { - stack *node.Node - backend *eth.Ethereum - target common.Hash - request chan *syncReq - closed chan struct{} - wg sync.WaitGroup - exitWhenSynced bool + stack *node.Node + backend *eth.Ethereum + request chan *syncReq + closed chan struct{} + wg sync.WaitGroup + + config Config } // Register registers the synchronization override service into the node // stack for launching and stopping the service controlled by node. -func Register(stack *node.Node, backend *eth.Ethereum, target common.Hash, exitWhenSynced bool) (*Syncer, error) { +func Register(stack *node.Node, backend *eth.Ethereum, cfg Config) (*Syncer, error) { s := &Syncer{ - stack: stack, - backend: backend, - target: target, - request: make(chan *syncReq), - closed: make(chan struct{}), - exitWhenSynced: exitWhenSynced, + stack: stack, + backend: backend, + request: make(chan *syncReq), + closed: make(chan struct{}), + config: cfg, } stack.RegisterAPIs(s.APIs()) stack.RegisterLifecycle(s) @@ -88,9 +97,11 @@ func (s *Syncer) run() { var ( target *types.Header - ticker = time.NewTicker(time.Second * 5) + syncCh = make(chan downloader.SyncEvent, 10) ) - defer ticker.Stop() + sub := s.backend.Downloader().SubscribeSyncEvents(syncCh) + defer sub.Unsubscribe() + for { select { case req := <-s.request: @@ -137,35 +148,50 @@ func (s *Syncer) run() { } } - case <-ticker.C: - if target == nil { + case ev := <-syncCh: + if ev.Type == downloader.SyncStarted { + log.Debug("Synchronization started") continue } + if ev.Type == downloader.SyncFailed { + log.Debug("Synchronization failed", "err", ev.Err) + continue + } + + head := s.backend.BlockChain().CurrentHeader() + if head != nil { + // Set the finalized and safe markers relative to the current head. + // The finalized marker is set two epochs behind the target, + // and the safe marker is set one epoch behind the target. + if header := s.backend.BlockChain().GetHeaderByNumber(head.Number.Uint64() - params.EpochLength*2); header != nil { + if final := s.backend.BlockChain().CurrentFinalBlock(); final == nil || final.Number.Cmp(header.Number) < 0 { + s.backend.BlockChain().SetFinalized(header) + } + } + if header := s.backend.BlockChain().GetHeaderByNumber(head.Number.Uint64() - params.EpochLength); header != nil { + if safe := s.backend.BlockChain().CurrentSafeBlock(); safe == nil || safe.Number.Cmp(header.Number) < 0 { + s.backend.BlockChain().SetSafe(header) + } + } + } // Terminate the node if the target has been reached - if s.exitWhenSynced { - if block := s.backend.BlockChain().GetBlockByHash(target.Hash()); block != nil { - log.Info("Sync target reached", "number", block.NumberU64(), "hash", block.Hash()) - go s.stack.Close() // async since we need to close ourselves - return + if s.config.ExitWhenSynced { + var synced bool + var block *types.Header + if target != nil { + tb := s.backend.BlockChain().GetBlockByHash(target.Hash()) + synced = tb != nil + block = tb.Header() + } else { + timestamp := time.Unix(int64(ev.Latest.Time), 0) + synced = time.Since(timestamp) < 10*time.Minute + block = ev.Latest } - } - // Set the finalized and safe markers relative to the current head. - // The finalized marker is set two epochs behind the target, - // and the safe marker is set one epoch behind the target. - head := s.backend.BlockChain().CurrentHeader() - if head == nil { - continue - } - if header := s.backend.BlockChain().GetHeaderByNumber(head.Number.Uint64() - params.EpochLength*2); header != nil { - if final := s.backend.BlockChain().CurrentFinalBlock(); final == nil || final.Number.Cmp(header.Number) < 0 { - s.backend.BlockChain().SetFinalized(header) - } - } - if header := s.backend.BlockChain().GetHeaderByNumber(head.Number.Uint64() - params.EpochLength); header != nil { - if safe := s.backend.BlockChain().CurrentSafeBlock(); safe == nil || safe.Number.Cmp(header.Number) < 0 { - s.backend.BlockChain().SetSafe(header) + if synced { + log.Info("Sync target reached", "number", block.Number.Uint64(), "hash", block.Hash()) + go s.stack.Close() // async since we need to close ourselves } } @@ -179,10 +205,10 @@ func (s *Syncer) run() { func (s *Syncer) Start() error { s.wg.Add(1) go s.run() - if s.target == (common.Hash{}) { + if s.config.TargetBlock == (common.Hash{}) { return nil } - return s.Sync(s.target) + return s.Sync(s.config.TargetBlock) } // Stop terminates the synchronization service and stop all background activities. diff --git a/node/node.go b/node/node.go index 01318881d4..56ecd7d522 100644 --- a/node/node.go +++ b/node/node.go @@ -35,7 +35,6 @@ import ( "github.com/ethereum/go-ethereum/core/rawdb" "github.com/ethereum/go-ethereum/ethdb" "github.com/ethereum/go-ethereum/ethdb/memorydb" - "github.com/ethereum/go-ethereum/event" "github.com/ethereum/go-ethereum/log" "github.com/ethereum/go-ethereum/p2p" "github.com/ethereum/go-ethereum/rpc" @@ -44,7 +43,6 @@ import ( // Node is a container on which services can be registered. type Node struct { - eventmux *event.TypeMux config *Config accman *accounts.Manager log log.Logger @@ -108,7 +106,6 @@ func New(conf *Config) (*Node, error) { node := &Node{ config: conf, inprocHandler: server, - eventmux: new(event.TypeMux), log: conf.Logger, stop: make(chan struct{}), server: &p2p.Server{Config: conf.P2P}, @@ -692,12 +689,6 @@ func (n *Node) WSAuthEndpoint() string { return "ws://" + n.wsAuth.listenAddr() + n.wsAuth.wsConfig.prefix } -// EventMux retrieves the event multiplexer used by all the network services in -// the current protocol stack. -func (n *Node) EventMux() *event.TypeMux { - return n.eventmux -} - // OpenDatabaseWithOptions opens an existing database with the given name (or creates one if no // previous can be found) from within the node's instance directory. If the node has no // data directory, an in-memory database is returned.