eth: replace event.TypeMux with event.FeedOf

This commit is contained in:
jsvisa 2025-09-17 23:14:02 +08:00 committed by Felix Lange
parent 4dc7d46155
commit 4b219b2c88
4 changed files with 39 additions and 13 deletions

View file

@ -404,7 +404,7 @@ func (s *Ethereum) APIs() []rpc.API {
Service: NewMinerAPI(s), Service: NewMinerAPI(s),
}, { }, {
Namespace: "eth", Namespace: "eth",
Service: downloader.NewDownloaderAPI(s.handler.downloader, s.blockchain, s.eventMux), Service: downloader.NewDownloaderAPI(s.handler.downloader, s.blockchain),
}, { }, {
Namespace: "admin", Namespace: "admin",
Service: NewAdminAPI(s), Service: NewAdminAPI(s),

View file

@ -23,7 +23,6 @@ import (
"github.com/ethereum/go-ethereum" "github.com/ethereum/go-ethereum"
"github.com/ethereum/go-ethereum/core" "github.com/ethereum/go-ethereum/core"
"github.com/ethereum/go-ethereum/event"
"github.com/ethereum/go-ethereum/rpc" "github.com/ethereum/go-ethereum/rpc"
) )
@ -33,20 +32,18 @@ import (
type DownloaderAPI struct { type DownloaderAPI struct {
d *Downloader d *Downloader
chain *core.BlockChain chain *core.BlockChain
mux *event.TypeMux
installSyncSubscription chan chan interface{} installSyncSubscription chan chan interface{}
uninstallSyncSubscription chan *uninstallSyncSubscriptionRequest uninstallSyncSubscription chan *uninstallSyncSubscriptionRequest
} }
// NewDownloaderAPI creates a new DownloaderAPI. The API has an internal event loop that // NewDownloaderAPI creates a new DownloaderAPI. The API has an internal event loop that
// listens for events from the downloader through the global event mux. In case it receives one of // listens for events from the downloader through the event feed. In case it receives one of
// these events it broadcasts it to all syncing subscriptions that are installed through the // these events it broadcasts it to all syncing subscriptions that are installed through the
// installSyncSubscription channel. // installSyncSubscription channel.
func NewDownloaderAPI(d *Downloader, chain *core.BlockChain, m *event.TypeMux) *DownloaderAPI { func NewDownloaderAPI(d *Downloader, chain *core.BlockChain) *DownloaderAPI {
api := &DownloaderAPI{ api := &DownloaderAPI{
d: d, d: d,
chain: chain, chain: chain,
mux: m,
installSyncSubscription: make(chan chan interface{}), installSyncSubscription: make(chan chan interface{}),
uninstallSyncSubscription: make(chan *uninstallSyncSubscriptionRequest), uninstallSyncSubscription: make(chan *uninstallSyncSubscriptionRequest),
} }
@ -66,7 +63,8 @@ func NewDownloaderAPI(d *Downloader, chain *core.BlockChain, m *event.TypeMux) *
// receive is {false}. // receive is {false}.
func (api *DownloaderAPI) eventLoop() { func (api *DownloaderAPI) eventLoop() {
var ( var (
sub = api.mux.Subscribe(StartEvent{}) events = make(chan SyncEvent, 16)
sub = api.d.SubscribeSyncEvents(events)
syncSubscriptions = make(map[chan interface{}]struct{}) syncSubscriptions = make(map[chan interface{}]struct{})
checkInterval = time.Second * 60 checkInterval = time.Second * 60
checkTimer = time.NewTimer(checkInterval) checkTimer = time.NewTimer(checkInterval)
@ -90,6 +88,7 @@ func (api *DownloaderAPI) eventLoop() {
} }
) )
defer checkTimer.Stop() defer checkTimer.Stop()
defer sub.Unsubscribe()
for { for {
select { select {
@ -101,12 +100,8 @@ func (api *DownloaderAPI) eventLoop() {
case u := <-api.uninstallSyncSubscription: case u := <-api.uninstallSyncSubscription:
delete(syncSubscriptions, u.c) delete(syncSubscriptions, u.c)
close(u.uninstalled) close(u.uninstalled)
case event := <-sub.Chan(): case ev := <-events:
if event == nil { if ev.Type == SyncStarted {
return
}
switch event.Data.(type) {
case StartEvent:
started = true started = true
} }
case <-checkTimer.C: case <-checkTimer.C:

View file

@ -101,6 +101,10 @@ type Downloader struct {
moder *syncModer // Sync mode management, deliver the appropriate sync mode choice for each cycle moder *syncModer // Sync mode management, deliver the appropriate sync mode choice for each cycle
mux *event.TypeMux // Event multiplexer to announce sync operation events mux *event.TypeMux // Event multiplexer to announce sync operation events
// New event feed for downloader events (alongside the existing TypeMux)
feed event.FeedOf[SyncEvent]
scope event.SubscriptionScope
queue *queue // Scheduler for selecting the hashes to download queue *queue // Scheduler for selecting the hashes to download
peers *peerSet // Set of active peers from which download can proceed peers *peerSet // Set of active peers from which download can proceed
@ -427,17 +431,25 @@ func (d *Downloader) ConfigSyncMode() SyncMode {
return d.moder.get(false) return d.moder.get(false)
} }
// SubscribeSyncEvents creates a subscription for downloader sync events
func (d *Downloader) SubscribeSyncEvents(ch chan<- SyncEvent) event.Subscription {
return d.scope.Track(d.feed.Subscribe(ch))
}
// syncToHead starts a block synchronization based on the hash chain from // syncToHead starts a block synchronization based on the hash chain from
// the specified head hash. // the specified head hash.
func (d *Downloader) syncToHead() (err error) { func (d *Downloader) syncToHead() (err error) {
d.mux.Post(StartEvent{}) d.mux.Post(StartEvent{})
d.feed.Send(SyncEvent{Type: SyncStarted})
defer func() { defer func() {
// reset on error // reset on error
if err != nil { if err != nil {
d.mux.Post(FailedEvent{err}) d.mux.Post(FailedEvent{err})
d.feed.Send(SyncEvent{Type: SyncFailed, Err: err})
} else { } else {
latest := d.blockchain.CurrentHeader() latest := d.blockchain.CurrentHeader()
d.mux.Post(DoneEvent{latest}) d.mux.Post(DoneEvent{latest})
d.feed.Send(SyncEvent{Type: SyncCompleted, Latest: latest})
} }
}() }()
mode := d.getMode() mode := d.getMode()
@ -662,6 +674,9 @@ func (d *Downloader) Cancel() {
// Terminate interrupts the downloader, canceling all pending operations. // Terminate interrupts the downloader, canceling all pending operations.
// The downloader cannot be reused after calling Terminate. // The downloader cannot be reused after calling Terminate.
func (d *Downloader) Terminate() { func (d *Downloader) Terminate() {
// Unsubscribe all subscriptions registered from downloader
d.scope.Close()
// Close the termination channel (make sure double close is allowed) // Close the termination channel (make sure double close is allowed)
d.quitLock.Lock() d.quitLock.Lock()
select { select {

View file

@ -23,3 +23,19 @@ type DoneEvent struct {
} }
type StartEvent struct{} type StartEvent struct{}
type FailedEvent struct{ Err error } type FailedEvent struct{ Err error }
// SyncEventType represents the type of sync event
type SyncEventType int
const (
SyncStarted SyncEventType = iota
SyncFailed
SyncCompleted
)
// SyncEvent represents a downloader synchronization event
type SyncEvent struct {
Type SyncEventType
Err error // Set when Type is SyncFailed
Latest *types.Header // Set when Type is SyncCompleted
}