From 27245764f37860486c0eff8787d65b1a72592945 Mon Sep 17 00:00:00 2001 From: apetro2 Date: Sun, 22 Mar 2026 10:44:30 +0800 Subject: [PATCH] core/txpool: subscribe in constructor --- core/txpool/txpool.go | 16 ++++++---------- core/txpool/txpool_test.go | 20 ++++++++++++++++++++ 2 files changed, 26 insertions(+), 10 deletions(-) diff --git a/core/txpool/txpool.go b/core/txpool/txpool.go index 861ccceacb..0f66a5abef 100644 --- a/core/txpool/txpool.go +++ b/core/txpool/txpool.go @@ -68,6 +68,7 @@ type TxPool struct { stateLock sync.RWMutex // The lock for protecting state instance state *state.StateDB // Current state at the blockchain head + headCh chan core.ChainHeadEvent subs event.SubscriptionScope // Subscription scope to unsubscribe all on shutdown quit chan chan error // Quit channel to tear down the head updater @@ -98,10 +99,14 @@ func New(gasTip uint64, chain BlockChain, subpools []SubPool) (*TxPool, error) { subpools: subpools, chain: chain, state: statedb, + headCh: make(chan core.ChainHeadEvent), quit: make(chan chan error), term: make(chan struct{}), sync: make(chan chan error), } + if sub := chain.SubscribeChainHeadEvent(pool.headCh); sub != nil { + pool.subs.Track(sub) + } reserver := NewReservationTracker() for i, subpool := range subpools { if err := subpool.Init(gasTip, head, reserver.NewHandle(i)); err != nil { @@ -147,15 +152,6 @@ func (p *TxPool) loop(head *types.Header) { // Close the termination marker when the pool stops defer close(p.term) - // Subscribe to chain head events to trigger subpool resets - var ( - newHeadCh = make(chan core.ChainHeadEvent) - newHeadSub = p.chain.SubscribeChainHeadEvent(newHeadCh) - ) - if newHeadSub != nil { - defer newHeadSub.Unsubscribe() - } - // Track the previous and current head to feed to an idle reset var ( oldHead = head @@ -221,7 +217,7 @@ func (p *TxPool) loop(head *types.Header) { } // Wait for the next chain head event or a previous reset finish select { - case event := <-newHeadCh: + case event := <-p.headCh: // Chain moved forward, store the head for later consumption newHead = event.Header diff --git a/core/txpool/txpool_test.go b/core/txpool/txpool_test.go index 89055abcd4..f28b6b18ad 100644 --- a/core/txpool/txpool_test.go +++ b/core/txpool/txpool_test.go @@ -29,6 +29,7 @@ import ( ) type nilHeadSubChain struct{} +type trackedHeadSubChain struct{ nilHeadSubChain } func (nilHeadSubChain) Config() *params.ChainConfig { return params.TestChainConfig } @@ -42,6 +43,10 @@ func (nilHeadSubChain) StateAt(common.Hash) (*state.StateDB, error) { return state.New(types.EmptyRootHash, state.NewDatabaseForTesting()) } +func (trackedHeadSubChain) SubscribeChainHeadEvent(chan<- core.ChainHeadEvent) event.Subscription { + return event.NewSubscription(func(<-chan struct{}) error { return nil }) +} + func TestTxPoolCloseNilHeadSubscription(t *testing.T) { t.Parallel() @@ -62,3 +67,18 @@ func TestTxPoolCloseNilHeadSubscription(t *testing.T) { t.Fatal("timed out waiting for txpool loop termination") } } + +func TestTxPoolNewTracksHeadSubscription(t *testing.T) { + t.Parallel() + + pool, err := New(0, trackedHeadSubChain{}, nil) + if err != nil { + t.Fatalf("failed to create txpool: %v", err) + } + if count := pool.subs.Count(); count != 1 { + t.Fatalf("unexpected subscription count: have %d want %d", count, 1) + } + if err := pool.Close(); err != nil { + t.Fatalf("unexpected close error: %v", err) + } +}