core, core/txpool, eth: move subscriptions to constructor (#35048)

Closes https://github.com/ethereum/go-ethereum/issues/20554
It makes it easier to reason about the lifecycle.
This commit is contained in:
Marius van der Wijden 2026-06-01 02:13:59 +02:00 committed by GitHub
parent 046a10e8a7
commit b71f750916
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
3 changed files with 39 additions and 21 deletions

View file

@ -23,6 +23,7 @@ import (
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/rawdb"
"github.com/ethereum/go-ethereum/ethdb"
"github.com/ethereum/go-ethereum/event"
"github.com/ethereum/go-ethereum/log"
)
@ -64,6 +65,9 @@ type txIndexer struct {
db ethdb.Database
term chan chan struct{}
closed chan struct{}
headCh chan ChainHeadEvent
headSub event.Subscription
}
// newTxIndexer initializes the transaction indexer.
@ -75,7 +79,9 @@ func newTxIndexer(limit uint64, chain *BlockChain) *txIndexer {
db: chain.db,
term: make(chan chan struct{}),
closed: make(chan struct{}),
headCh: make(chan ChainHeadEvent),
}
indexer.headSub = chain.SubscribeChainHeadEvent(indexer.headCh)
indexer.head.Store(indexer.resolveHead())
indexer.tail.Store(rawdb.ReadTxIndexTail(chain.db))
@ -228,15 +234,14 @@ func (indexer *txIndexer) resolveHead() uint64 {
// on the received chain event.
func (indexer *txIndexer) loop(chain *BlockChain) {
defer close(indexer.closed)
defer indexer.headSub.Unsubscribe()
// Listening to chain events and manipulate the transaction indexes.
var (
stop chan struct{} // Non-nil if background routine is active
done chan struct{} // Non-nil if background routine is active
headCh = make(chan ChainHeadEvent)
sub = chain.SubscribeChainHeadEvent(headCh)
headCh = indexer.headCh
)
defer sub.Unsubscribe()
// Validate the transaction indexes and repair if necessary
head := indexer.head.Load()

View file

@ -76,6 +76,9 @@ type TxPool struct {
quit chan chan error // Quit channel to tear down the head updater
term chan struct{} // Termination channel to detect a closed pool
newHeadCh chan core.ChainHeadEvent
newHeadSub event.Subscription
sync chan chan error // Testing / simulator channel to block until internal reset is done
}
@ -98,13 +101,15 @@ func New(gasTip uint64, chain BlockChain, subpools []SubPool) (*TxPool, error) {
return nil, err
}
pool := &TxPool{
subpools: subpools,
chain: chain,
state: statedb,
quit: make(chan chan error),
term: make(chan struct{}),
sync: make(chan chan error),
subpools: subpools,
chain: chain,
state: statedb,
quit: make(chan chan error),
term: make(chan struct{}),
sync: make(chan chan error),
newHeadCh: make(chan core.ChainHeadEvent),
}
pool.newHeadSub = chain.SubscribeChainHeadEvent(pool.newHeadCh)
reserver := NewReservationTracker()
for i, subpool := range subpools {
if err := subpool.Init(gasTip, head, reserver.NewHandle(i)); err != nil {
@ -150,12 +155,8 @@ func (p *TxPool) loop(head *types.Header) {
// Close the termination marker when the pool stops
defer close(p.term)
// Subscribe to chain head events to trigger subpool resets
var (
newHeadCh = make(chan core.ChainHeadEvent)
newHeadSub = p.chain.SubscribeChainHeadEvent(newHeadCh)
)
defer newHeadSub.Unsubscribe()
newHeadCh := p.newHeadCh
defer p.newHeadSub.Unsubscribe()
// Track the previous and current head to feed to an idle reset
var (

View file

@ -49,6 +49,7 @@ import (
"github.com/ethereum/go-ethereum/eth/protocols/snap"
"github.com/ethereum/go-ethereum/eth/tracers"
"github.com/ethereum/go-ethereum/ethdb"
"github.com/ethereum/go-ethereum/event"
"github.com/ethereum/go-ethereum/internal/ethapi"
"github.com/ethereum/go-ethereum/internal/shutdowncheck"
"github.com/ethereum/go-ethereum/internal/version"
@ -110,6 +111,13 @@ type Ethereum struct {
filterMaps *filtermaps.FilterMaps
closeFilterMaps chan chan struct{}
// Chain event subscriptions driving updateFilterMapsHeads. The
// subscriptions are registered and consumed in Start.
fmHeadEventCh chan core.ChainEvent
fmHeadSub event.Subscription
fmBlockProcCh chan bool
fmBlockProcSub event.Subscription
APIBackend *EthAPIBackend
miner *miner.Miner
@ -199,6 +207,8 @@ func New(stack *node.Node, config *ethconfig.Config) (*Ethereum, error) {
p2pServer: stack.Server(),
discmix: enode.NewFairMix(discmixTimeout),
shutdownTracker: shutdowncheck.NewShutdownTracker(chainDb),
fmHeadEventCh: make(chan core.ChainEvent, 10),
fmBlockProcCh: make(chan bool, 10),
}
bcVersion := rawdb.ReadDatabaseVersion(chainDb)
var dbVer = "<nil>"
@ -459,6 +469,10 @@ func (s *Ethereum) Start() error {
// Start the connection manager
s.dropper.Start(s.p2pServer, func() bool { return !s.Synced() })
// Subscribe to chain events for the filterMaps head updater.
s.fmHeadSub = s.blockchain.SubscribeChainEvent(s.fmHeadEventCh)
s.fmBlockProcSub = s.blockchain.SubscribeBlockProcessingEvent(s.fmBlockProcCh)
// start log indexer
s.filterMaps.Start()
go s.updateFilterMapsHeads()
@ -473,13 +487,11 @@ func (s *Ethereum) newChainView(head *types.Header) *filtermaps.ChainView {
}
func (s *Ethereum) updateFilterMapsHeads() {
headEventCh := make(chan core.ChainEvent, 10)
blockProcCh := make(chan bool, 10)
sub := s.blockchain.SubscribeChainEvent(headEventCh)
sub2 := s.blockchain.SubscribeBlockProcessingEvent(blockProcCh)
headEventCh := s.fmHeadEventCh
blockProcCh := s.fmBlockProcCh
defer func() {
sub.Unsubscribe()
sub2.Unsubscribe()
s.fmHeadSub.Unsubscribe()
s.fmBlockProcSub.Unsubscribe()
for {
select {
case <-headEventCh: