From 4b219b2c88f5b68f9916cd3249958801692bab04 Mon Sep 17 00:00:00 2001 From: jsvisa Date: Wed, 17 Sep 2025 23:14:02 +0800 Subject: [PATCH] eth: replace event.TypeMux with event.FeedOf --- eth/backend.go | 2 +- eth/downloader/api.go | 19 +++++++------------ eth/downloader/downloader.go | 15 +++++++++++++++ eth/downloader/events.go | 16 ++++++++++++++++ 4 files changed, 39 insertions(+), 13 deletions(-) diff --git a/eth/backend.go b/eth/backend.go index 08a3c70c9d..37fa1df239 100644 --- a/eth/backend.go +++ b/eth/backend.go @@ -404,7 +404,7 @@ func (s *Ethereum) APIs() []rpc.API { Service: NewMinerAPI(s), }, { Namespace: "eth", - Service: downloader.NewDownloaderAPI(s.handler.downloader, s.blockchain, s.eventMux), + Service: downloader.NewDownloaderAPI(s.handler.downloader, s.blockchain), }, { Namespace: "admin", Service: NewAdminAPI(s), diff --git a/eth/downloader/api.go b/eth/downloader/api.go index 1fea35775e..7a2c028753 100644 --- a/eth/downloader/api.go +++ b/eth/downloader/api.go @@ -23,7 +23,6 @@ import ( "github.com/ethereum/go-ethereum" "github.com/ethereum/go-ethereum/core" - "github.com/ethereum/go-ethereum/event" "github.com/ethereum/go-ethereum/rpc" ) @@ -33,20 +32,18 @@ import ( type DownloaderAPI struct { d *Downloader chain *core.BlockChain - mux *event.TypeMux installSyncSubscription chan chan interface{} uninstallSyncSubscription chan *uninstallSyncSubscriptionRequest } // NewDownloaderAPI creates a new DownloaderAPI. The API has an internal event loop that -// listens for events from the downloader through the global event mux. In case it receives one of +// listens for events from the downloader through the event feed. In case it receives one of // these events it broadcasts it to all syncing subscriptions that are installed through the // installSyncSubscription channel. -func NewDownloaderAPI(d *Downloader, chain *core.BlockChain, m *event.TypeMux) *DownloaderAPI { +func NewDownloaderAPI(d *Downloader, chain *core.BlockChain) *DownloaderAPI { api := &DownloaderAPI{ d: d, chain: chain, - mux: m, installSyncSubscription: make(chan chan interface{}), uninstallSyncSubscription: make(chan *uninstallSyncSubscriptionRequest), } @@ -66,7 +63,8 @@ func NewDownloaderAPI(d *Downloader, chain *core.BlockChain, m *event.TypeMux) * // receive is {false}. func (api *DownloaderAPI) eventLoop() { var ( - sub = api.mux.Subscribe(StartEvent{}) + events = make(chan SyncEvent, 16) + sub = api.d.SubscribeSyncEvents(events) syncSubscriptions = make(map[chan interface{}]struct{}) checkInterval = time.Second * 60 checkTimer = time.NewTimer(checkInterval) @@ -90,6 +88,7 @@ func (api *DownloaderAPI) eventLoop() { } ) defer checkTimer.Stop() + defer sub.Unsubscribe() for { select { @@ -101,12 +100,8 @@ func (api *DownloaderAPI) eventLoop() { case u := <-api.uninstallSyncSubscription: delete(syncSubscriptions, u.c) close(u.uninstalled) - case event := <-sub.Chan(): - if event == nil { - return - } - switch event.Data.(type) { - case StartEvent: + case ev := <-events: + if ev.Type == SyncStarted { started = true } case <-checkTimer.C: diff --git a/eth/downloader/downloader.go b/eth/downloader/downloader.go index 1de0933842..04941bbb6b 100644 --- a/eth/downloader/downloader.go +++ b/eth/downloader/downloader.go @@ -101,6 +101,10 @@ type Downloader struct { moder *syncModer // Sync mode management, deliver the appropriate sync mode choice for each cycle mux *event.TypeMux // Event multiplexer to announce sync operation events + // New event feed for downloader events (alongside the existing TypeMux) + feed event.FeedOf[SyncEvent] + scope event.SubscriptionScope + queue *queue // Scheduler for selecting the hashes to download peers *peerSet // Set of active peers from which download can proceed @@ -427,17 +431,25 @@ func (d *Downloader) ConfigSyncMode() SyncMode { return d.moder.get(false) } +// SubscribeSyncEvents creates a subscription for downloader sync events +func (d *Downloader) SubscribeSyncEvents(ch chan<- SyncEvent) event.Subscription { + return d.scope.Track(d.feed.Subscribe(ch)) +} + // syncToHead starts a block synchronization based on the hash chain from // the specified head hash. func (d *Downloader) syncToHead() (err error) { d.mux.Post(StartEvent{}) + d.feed.Send(SyncEvent{Type: SyncStarted}) defer func() { // reset on error if err != nil { d.mux.Post(FailedEvent{err}) + d.feed.Send(SyncEvent{Type: SyncFailed, Err: err}) } else { latest := d.blockchain.CurrentHeader() d.mux.Post(DoneEvent{latest}) + d.feed.Send(SyncEvent{Type: SyncCompleted, Latest: latest}) } }() mode := d.getMode() @@ -662,6 +674,9 @@ func (d *Downloader) Cancel() { // Terminate interrupts the downloader, canceling all pending operations. // The downloader cannot be reused after calling Terminate. func (d *Downloader) Terminate() { + // Unsubscribe all subscriptions registered from downloader + d.scope.Close() + // Close the termination channel (make sure double close is allowed) d.quitLock.Lock() select { diff --git a/eth/downloader/events.go b/eth/downloader/events.go index 25255a3a72..0281dfb24f 100644 --- a/eth/downloader/events.go +++ b/eth/downloader/events.go @@ -23,3 +23,19 @@ type DoneEvent struct { } type StartEvent struct{} type FailedEvent struct{ Err error } + +// SyncEventType represents the type of sync event +type SyncEventType int + +const ( + SyncStarted SyncEventType = iota + SyncFailed + SyncCompleted +) + +// SyncEvent represents a downloader synchronization event +type SyncEvent struct { + Type SyncEventType + Err error // Set when Type is SyncFailed + Latest *types.Header // Set when Type is SyncCompleted +}