diff --git a/core/txindexer.go b/core/txindexer.go index b2a94a6ead..ceff84d736 100644 --- a/core/txindexer.go +++ b/core/txindexer.go @@ -23,6 +23,7 @@ import ( "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/core/rawdb" "github.com/ethereum/go-ethereum/ethdb" + "github.com/ethereum/go-ethereum/event" "github.com/ethereum/go-ethereum/log" ) @@ -64,6 +65,9 @@ type txIndexer struct { db ethdb.Database term chan chan struct{} closed chan struct{} + + headCh chan ChainHeadEvent + headSub event.Subscription } // newTxIndexer initializes the transaction indexer. @@ -75,7 +79,9 @@ func newTxIndexer(limit uint64, chain *BlockChain) *txIndexer { db: chain.db, term: make(chan chan struct{}), closed: make(chan struct{}), + headCh: make(chan ChainHeadEvent), } + indexer.headSub = chain.SubscribeChainHeadEvent(indexer.headCh) indexer.head.Store(indexer.resolveHead()) indexer.tail.Store(rawdb.ReadTxIndexTail(chain.db)) @@ -228,15 +234,14 @@ func (indexer *txIndexer) resolveHead() uint64 { // on the received chain event. func (indexer *txIndexer) loop(chain *BlockChain) { defer close(indexer.closed) + defer indexer.headSub.Unsubscribe() // Listening to chain events and manipulate the transaction indexes. var ( stop chan struct{} // Non-nil if background routine is active done chan struct{} // Non-nil if background routine is active - headCh = make(chan ChainHeadEvent) - sub = chain.SubscribeChainHeadEvent(headCh) + headCh = indexer.headCh ) - defer sub.Unsubscribe() // Validate the transaction indexes and repair if necessary head := indexer.head.Load() diff --git a/core/txpool/txpool.go b/core/txpool/txpool.go index 9c78748422..cb425e5809 100644 --- a/core/txpool/txpool.go +++ b/core/txpool/txpool.go @@ -76,6 +76,9 @@ type TxPool struct { quit chan chan error // Quit channel to tear down the head updater term chan struct{} // Termination channel to detect a closed pool + newHeadCh chan core.ChainHeadEvent + newHeadSub event.Subscription + sync chan chan error // Testing / simulator channel to block until internal reset is done } @@ -98,13 +101,15 @@ func New(gasTip uint64, chain BlockChain, subpools []SubPool) (*TxPool, error) { return nil, err } pool := &TxPool{ - subpools: subpools, - chain: chain, - state: statedb, - quit: make(chan chan error), - term: make(chan struct{}), - sync: make(chan chan error), + subpools: subpools, + chain: chain, + state: statedb, + quit: make(chan chan error), + term: make(chan struct{}), + sync: make(chan chan error), + newHeadCh: make(chan core.ChainHeadEvent), } + pool.newHeadSub = chain.SubscribeChainHeadEvent(pool.newHeadCh) reserver := NewReservationTracker() for i, subpool := range subpools { if err := subpool.Init(gasTip, head, reserver.NewHandle(i)); err != nil { @@ -150,12 +155,8 @@ func (p *TxPool) loop(head *types.Header) { // Close the termination marker when the pool stops defer close(p.term) - // Subscribe to chain head events to trigger subpool resets - var ( - newHeadCh = make(chan core.ChainHeadEvent) - newHeadSub = p.chain.SubscribeChainHeadEvent(newHeadCh) - ) - defer newHeadSub.Unsubscribe() + newHeadCh := p.newHeadCh + defer p.newHeadSub.Unsubscribe() // Track the previous and current head to feed to an idle reset var ( diff --git a/eth/backend.go b/eth/backend.go index af8b04bda6..2f10351b9c 100644 --- a/eth/backend.go +++ b/eth/backend.go @@ -49,6 +49,7 @@ import ( "github.com/ethereum/go-ethereum/eth/protocols/snap" "github.com/ethereum/go-ethereum/eth/tracers" "github.com/ethereum/go-ethereum/ethdb" + "github.com/ethereum/go-ethereum/event" "github.com/ethereum/go-ethereum/internal/ethapi" "github.com/ethereum/go-ethereum/internal/shutdowncheck" "github.com/ethereum/go-ethereum/internal/version" @@ -110,6 +111,13 @@ type Ethereum struct { filterMaps *filtermaps.FilterMaps closeFilterMaps chan chan struct{} + // Chain event subscriptions driving updateFilterMapsHeads. The + // subscriptions are registered and consumed in Start. + fmHeadEventCh chan core.ChainEvent + fmHeadSub event.Subscription + fmBlockProcCh chan bool + fmBlockProcSub event.Subscription + APIBackend *EthAPIBackend miner *miner.Miner @@ -199,6 +207,8 @@ func New(stack *node.Node, config *ethconfig.Config) (*Ethereum, error) { p2pServer: stack.Server(), discmix: enode.NewFairMix(discmixTimeout), shutdownTracker: shutdowncheck.NewShutdownTracker(chainDb), + fmHeadEventCh: make(chan core.ChainEvent, 10), + fmBlockProcCh: make(chan bool, 10), } bcVersion := rawdb.ReadDatabaseVersion(chainDb) var dbVer = "" @@ -459,6 +469,10 @@ func (s *Ethereum) Start() error { // Start the connection manager s.dropper.Start(s.p2pServer, func() bool { return !s.Synced() }) + // Subscribe to chain events for the filterMaps head updater. + s.fmHeadSub = s.blockchain.SubscribeChainEvent(s.fmHeadEventCh) + s.fmBlockProcSub = s.blockchain.SubscribeBlockProcessingEvent(s.fmBlockProcCh) + // start log indexer s.filterMaps.Start() go s.updateFilterMapsHeads() @@ -473,13 +487,11 @@ func (s *Ethereum) newChainView(head *types.Header) *filtermaps.ChainView { } func (s *Ethereum) updateFilterMapsHeads() { - headEventCh := make(chan core.ChainEvent, 10) - blockProcCh := make(chan bool, 10) - sub := s.blockchain.SubscribeChainEvent(headEventCh) - sub2 := s.blockchain.SubscribeBlockProcessingEvent(blockProcCh) + headEventCh := s.fmHeadEventCh + blockProcCh := s.fmBlockProcCh defer func() { - sub.Unsubscribe() - sub2.Unsubscribe() + s.fmHeadSub.Unsubscribe() + s.fmBlockProcSub.Unsubscribe() for { select { case <-headEventCh: