eth,node: remove the remaining eventMux and event type

This commit is contained in:
Felix Lange 2026-04-29 15:40:38 +02:00
parent 8b640a3b68
commit 4ed7a8a4a8
6 changed files with 17 additions and 54 deletions

View file

@ -49,7 +49,6 @@ import (
"github.com/ethereum/go-ethereum/eth/protocols/snap" "github.com/ethereum/go-ethereum/eth/protocols/snap"
"github.com/ethereum/go-ethereum/eth/tracers" "github.com/ethereum/go-ethereum/eth/tracers"
"github.com/ethereum/go-ethereum/ethdb" "github.com/ethereum/go-ethereum/ethdb"
"github.com/ethereum/go-ethereum/event"
"github.com/ethereum/go-ethereum/internal/ethapi" "github.com/ethereum/go-ethereum/internal/ethapi"
"github.com/ethereum/go-ethereum/internal/shutdowncheck" "github.com/ethereum/go-ethereum/internal/shutdowncheck"
"github.com/ethereum/go-ethereum/internal/version" "github.com/ethereum/go-ethereum/internal/version"
@ -105,7 +104,6 @@ type Ethereum struct {
// DB interfaces // DB interfaces
chainDb ethdb.Database // Block chain database chainDb ethdb.Database // Block chain database
eventMux *event.TypeMux
engine consensus.Engine engine consensus.Engine
accountManager *accounts.Manager accountManager *accounts.Manager
@ -194,7 +192,6 @@ func New(stack *node.Node, config *ethconfig.Config) (*Ethereum, error) {
eth := &Ethereum{ eth := &Ethereum{
config: config, config: config,
chainDb: chainDb, chainDb: chainDb,
eventMux: stack.EventMux(),
accountManager: stack.AccountManager(), accountManager: stack.AccountManager(),
engine: engine, engine: engine,
networkID: networkID, networkID: networkID,
@ -343,7 +340,6 @@ func New(stack *node.Node, config *ethconfig.Config) (*Ethereum, error) {
Network: networkID, Network: networkID,
Sync: config.SyncMode, Sync: config.SyncMode,
BloomCache: uint64(cacheLimit), BloomCache: uint64(cacheLimit),
EventMux: eth.eventMux,
RequiredBlocks: config.RequiredBlocks, RequiredBlocks: config.RequiredBlocks,
}); err != nil { }); err != nil {
return nil, err return nil, err
@ -599,7 +595,6 @@ func (s *Ethereum) Stop() error {
s.shutdownTracker.Stop() s.shutdownTracker.Stop()
s.chainDb.Close() s.chainDb.Close()
s.eventMux.Stop()
return nil return nil
} }

View file

@ -99,9 +99,8 @@ type headerTask struct {
type Downloader struct { type Downloader struct {
mode atomic.Uint32 // Synchronisation mode defining the strategy used (per sync cycle), use d.getMode() to get the SyncMode mode atomic.Uint32 // Synchronisation mode defining the strategy used (per sync cycle), use d.getMode() to get the SyncMode
moder *syncModer // Sync mode management, deliver the appropriate sync mode choice for each cycle moder *syncModer // Sync mode management, deliver the appropriate sync mode choice for each cycle
mux *event.TypeMux // Event multiplexer to announce sync operation events
// New event feed for downloader events (alongside the existing TypeMux) // Event feed for downloader events
feed event.FeedOf[SyncEvent] feed event.FeedOf[SyncEvent]
scope event.SubscriptionScope scope event.SubscriptionScope
@ -233,12 +232,11 @@ type BlockChain interface {
} }
// New creates a new downloader to fetch hashes and blocks from remote peers. // New creates a new downloader to fetch hashes and blocks from remote peers.
func New(stateDb ethdb.Database, mode ethconfig.SyncMode, mux *event.TypeMux, chain BlockChain, dropPeer peerDropFn, success func()) *Downloader { func New(stateDb ethdb.Database, chain BlockChain, mode ethconfig.SyncMode, dropPeer peerDropFn, success func()) *Downloader {
cutoffNumber, cutoffHash := chain.HistoryPruningCutoff() cutoffNumber, cutoffHash := chain.HistoryPruningCutoff()
dl := &Downloader{ dl := &Downloader{
stateDB: stateDb, stateDB: stateDb,
moder: newSyncModer(mode, chain, stateDb), moder: newSyncModer(mode, chain, stateDb),
mux: mux,
queue: newQueue(blockCacheMaxItems, blockCacheInitialItems), queue: newQueue(blockCacheMaxItems, blockCacheInitialItems),
peers: newPeerSet(), peers: newPeerSet(),
blockchain: chain, blockchain: chain,
@ -439,16 +437,13 @@ func (d *Downloader) SubscribeSyncEvents(ch chan<- SyncEvent) event.Subscription
// syncToHead starts a block synchronization based on the hash chain from // syncToHead starts a block synchronization based on the hash chain from
// the specified head hash. // the specified head hash.
func (d *Downloader) syncToHead() (err error) { func (d *Downloader) syncToHead() (err error) {
d.mux.Post(StartEvent{})
d.feed.Send(SyncEvent{Type: SyncStarted}) d.feed.Send(SyncEvent{Type: SyncStarted})
defer func() { defer func() {
// reset on error // reset on error
if err != nil { if err != nil {
d.mux.Post(FailedEvent{err})
d.feed.Send(SyncEvent{Type: SyncFailed, Err: err}) d.feed.Send(SyncEvent{Type: SyncFailed, Err: err})
} else { } else {
latest := d.blockchain.CurrentHeader() latest := d.blockchain.CurrentHeader()
d.mux.Post(DoneEvent{latest})
d.feed.Send(SyncEvent{Type: SyncCompleted, Latest: latest}) d.feed.Send(SyncEvent{Type: SyncCompleted, Latest: latest})
} }
}() }()

View file

@ -32,7 +32,6 @@ import (
"github.com/ethereum/go-ethereum/eth/ethconfig" "github.com/ethereum/go-ethereum/eth/ethconfig"
"github.com/ethereum/go-ethereum/eth/protocols/eth" "github.com/ethereum/go-ethereum/eth/protocols/eth"
"github.com/ethereum/go-ethereum/eth/protocols/snap" "github.com/ethereum/go-ethereum/eth/protocols/snap"
"github.com/ethereum/go-ethereum/event"
"github.com/ethereum/go-ethereum/log" "github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/params" "github.com/ethereum/go-ethereum/params"
"github.com/ethereum/go-ethereum/rlp" "github.com/ethereum/go-ethereum/rlp"
@ -75,7 +74,7 @@ func newTesterWithNotification(t *testing.T, mode ethconfig.SyncMode, success fu
chain: chain, chain: chain,
peers: make(map[string]*downloadTesterPeer), peers: make(map[string]*downloadTesterPeer),
} }
tester.downloader = New(db, mode, new(event.TypeMux), tester.chain, tester.dropPeer, success) tester.downloader = New(db, tester.chain, mode, tester.dropPeer, success)
return tester return tester
} }

View file

@ -18,12 +18,6 @@ package downloader
import "github.com/ethereum/go-ethereum/core/types" import "github.com/ethereum/go-ethereum/core/types"
type DoneEvent struct {
Latest *types.Header
}
type StartEvent struct{}
type FailedEvent struct{ Err error }
// SyncEventType represents the type of sync event // SyncEventType represents the type of sync event
type SyncEventType int type SyncEventType int

View file

@ -107,7 +107,6 @@ type handlerConfig struct {
Network uint64 // Network identifier to advertise Network uint64 // Network identifier to advertise
Sync ethconfig.SyncMode // Whether to snap or full sync Sync ethconfig.SyncMode // Whether to snap or full sync
BloomCache uint64 // Megabytes to alloc for snap sync bloom BloomCache uint64 // Megabytes to alloc for snap sync bloom
EventMux *event.TypeMux // Legacy event mux, deprecate for `feed`
RequiredBlocks map[uint64]common.Hash // Hard coded map of required block hashes for sync challenges RequiredBlocks map[uint64]common.Hash // Hard coded map of required block hashes for sync challenges
} }
@ -126,7 +125,6 @@ type handler struct {
peers *peerSet peers *peerSet
txBroadcastKey [16]byte txBroadcastKey [16]byte
eventMux *event.TypeMux
txsCh chan core.NewTxsEvent txsCh chan core.NewTxsEvent
txsSub event.Subscription txsSub event.Subscription
blockRange *blockRangeState blockRange *blockRangeState
@ -144,14 +142,9 @@ type handler struct {
// newHandler returns a handler for all Ethereum chain management protocol. // newHandler returns a handler for all Ethereum chain management protocol.
func newHandler(config *handlerConfig) (*handler, error) { func newHandler(config *handlerConfig) (*handler, error) {
// Create the protocol manager with the base fields
if config.EventMux == nil {
config.EventMux = new(event.TypeMux) // Nicety initialization for tests
}
h := &handler{ h := &handler{
nodeID: config.NodeID, nodeID: config.NodeID,
networkID: config.Network, networkID: config.Network,
eventMux: config.EventMux,
database: config.Database, database: config.Database,
txpool: config.TxPool, txpool: config.TxPool,
chain: config.Chain, chain: config.Chain,
@ -163,7 +156,7 @@ func newHandler(config *handlerConfig) (*handler, error) {
handlerStartCh: make(chan struct{}), handlerStartCh: make(chan struct{}),
} }
// Construct the downloader (long sync) // Construct the downloader (long sync)
h.downloader = downloader.New(config.Database, config.Sync, h.eventMux, h.chain, h.removePeer, h.enableSyncedFeatures) h.downloader = downloader.New(config.Database, h.chain, config.Sync, h.removePeer, h.enableSyncedFeatures)
// If snap sync is requested but snapshots are disabled, fail loudly // If snap sync is requested but snapshots are disabled, fail loudly
if h.downloader.ConfigSyncMode() == ethconfig.SnapSync && (config.Chain.Snapshots() == nil && config.Chain.TrieDB().Scheme() == rawdb.HashScheme) { if h.downloader.ConfigSyncMode() == ethconfig.SnapSync && (config.Chain.Snapshots() == nil && config.Chain.TrieDB().Scheme() == rawdb.HashScheme) {
@ -420,7 +413,7 @@ func (h *handler) Start(maxPeers int) {
// broadcast block range // broadcast block range
h.wg.Add(1) h.wg.Add(1)
h.blockRange = newBlockRangeState(h.chain, h.eventMux) h.blockRange = newBlockRangeState(h.chain, h.downloader)
go h.blockRangeLoop(h.blockRange) go h.blockRangeLoop(h.blockRange)
// start sync handlers // start sync handlers
@ -536,16 +529,19 @@ type blockRangeState struct {
next atomic.Pointer[eth.BlockRangeUpdatePacket] next atomic.Pointer[eth.BlockRangeUpdatePacket]
headCh chan core.ChainHeadEvent headCh chan core.ChainHeadEvent
headSub event.Subscription headSub event.Subscription
syncSub *event.TypeMuxSubscription syncCh chan downloader.SyncEvent
syncSub event.Subscription
} }
func newBlockRangeState(chain *core.BlockChain, typeMux *event.TypeMux) *blockRangeState { func newBlockRangeState(chain *core.BlockChain, dl *downloader.Downloader) *blockRangeState {
headCh := make(chan core.ChainHeadEvent, chainHeadChanSize) headCh := make(chan core.ChainHeadEvent, chainHeadChanSize)
headSub := chain.SubscribeChainHeadEvent(headCh) headSub := chain.SubscribeChainHeadEvent(headCh)
syncSub := typeMux.Subscribe(downloader.StartEvent{}, downloader.DoneEvent{}, downloader.FailedEvent{}) syncCh := make(chan downloader.SyncEvent, 16)
syncSub := dl.SubscribeSyncEvents(syncCh)
st := &blockRangeState{ st := &blockRangeState{
headCh: headCh, headCh: headCh,
headSub: headSub, headSub: headSub,
syncCh: syncCh,
syncSub: syncSub, syncSub: syncSub,
} }
st.update(chain, chain.CurrentBlock()) st.update(chain, chain.CurrentBlock())
@ -561,11 +557,8 @@ func (h *handler) blockRangeLoop(st *blockRangeState) {
for { for {
select { select {
case ev := <-st.syncSub.Chan(): case ev := <-st.syncCh:
if ev == nil { if ev.Type == downloader.SyncStarted && h.downloader.ConfigSyncMode() == ethconfig.SnapSync {
continue
}
if _, ok := ev.Data.(downloader.StartEvent); ok && h.downloader.ConfigSyncMode() == ethconfig.SnapSync {
h.blockRangeWhileSnapSyncing(st) h.blockRangeWhileSnapSyncing(st)
} }
case <-st.headCh: case <-st.headCh:
@ -593,12 +586,8 @@ func (h *handler) blockRangeWhileSnapSyncing(st *blockRangeState) {
h.broadcastBlockRange(st) h.broadcastBlockRange(st)
} }
// back to processing head block updates when sync is done // back to processing head block updates when sync is done
case ev := <-st.syncSub.Chan(): case ev := <-st.syncCh:
if ev == nil { if ev.Type == downloader.SyncFailed || ev.Type == downloader.SyncCompleted {
continue
}
switch ev.Data.(type) {
case downloader.FailedEvent, downloader.DoneEvent:
return return
} }
// ignore head updates, but exit when the subscription ends // ignore head updates, but exit when the subscription ends

View file

@ -35,7 +35,6 @@ import (
"github.com/ethereum/go-ethereum/core/rawdb" "github.com/ethereum/go-ethereum/core/rawdb"
"github.com/ethereum/go-ethereum/ethdb" "github.com/ethereum/go-ethereum/ethdb"
"github.com/ethereum/go-ethereum/ethdb/memorydb" "github.com/ethereum/go-ethereum/ethdb/memorydb"
"github.com/ethereum/go-ethereum/event"
"github.com/ethereum/go-ethereum/log" "github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/p2p" "github.com/ethereum/go-ethereum/p2p"
"github.com/ethereum/go-ethereum/rpc" "github.com/ethereum/go-ethereum/rpc"
@ -44,7 +43,6 @@ import (
// Node is a container on which services can be registered. // Node is a container on which services can be registered.
type Node struct { type Node struct {
eventmux *event.TypeMux
config *Config config *Config
accman *accounts.Manager accman *accounts.Manager
log log.Logger log log.Logger
@ -108,7 +106,6 @@ func New(conf *Config) (*Node, error) {
node := &Node{ node := &Node{
config: conf, config: conf,
inprocHandler: server, inprocHandler: server,
eventmux: new(event.TypeMux),
log: conf.Logger, log: conf.Logger,
stop: make(chan struct{}), stop: make(chan struct{}),
server: &p2p.Server{Config: conf.P2P}, server: &p2p.Server{Config: conf.P2P},
@ -692,12 +689,6 @@ func (n *Node) WSAuthEndpoint() string {
return "ws://" + n.wsAuth.listenAddr() + n.wsAuth.wsConfig.prefix return "ws://" + n.wsAuth.listenAddr() + n.wsAuth.wsConfig.prefix
} }
// EventMux retrieves the event multiplexer used by all the network services in
// the current protocol stack.
func (n *Node) EventMux() *event.TypeMux {
return n.eventmux
}
// OpenDatabaseWithOptions opens an existing database with the given name (or creates one if no // OpenDatabaseWithOptions opens an existing database with the given name (or creates one if no
// previous can be found) from within the node's instance directory. If the node has no // previous can be found) from within the node's instance directory. If the node has no
// data directory, an in-memory database is returned. // data directory, an in-memory database is returned.