core/txpool: subscribe in constructor

This commit is contained in:
apetro2 2026-03-22 10:44:30 +08:00
parent 7f64995ee3
commit 27245764f3
2 changed files with 26 additions and 10 deletions

View file

@ -68,6 +68,7 @@ type TxPool struct {
stateLock sync.RWMutex // The lock for protecting state instance
state *state.StateDB // Current state at the blockchain head
headCh chan core.ChainHeadEvent
subs event.SubscriptionScope // Subscription scope to unsubscribe all on shutdown
quit chan chan error // Quit channel to tear down the head updater
@ -98,10 +99,14 @@ func New(gasTip uint64, chain BlockChain, subpools []SubPool) (*TxPool, error) {
subpools: subpools,
chain: chain,
state: statedb,
headCh: make(chan core.ChainHeadEvent),
quit: make(chan chan error),
term: make(chan struct{}),
sync: make(chan chan error),
}
if sub := chain.SubscribeChainHeadEvent(pool.headCh); sub != nil {
pool.subs.Track(sub)
}
reserver := NewReservationTracker()
for i, subpool := range subpools {
if err := subpool.Init(gasTip, head, reserver.NewHandle(i)); err != nil {
@ -147,15 +152,6 @@ func (p *TxPool) loop(head *types.Header) {
// Close the termination marker when the pool stops
defer close(p.term)
// Subscribe to chain head events to trigger subpool resets
var (
newHeadCh = make(chan core.ChainHeadEvent)
newHeadSub = p.chain.SubscribeChainHeadEvent(newHeadCh)
)
if newHeadSub != nil {
defer newHeadSub.Unsubscribe()
}
// Track the previous and current head to feed to an idle reset
var (
oldHead = head
@ -221,7 +217,7 @@ func (p *TxPool) loop(head *types.Header) {
}
// Wait for the next chain head event or a previous reset finish
select {
case event := <-newHeadCh:
case event := <-p.headCh:
// Chain moved forward, store the head for later consumption
newHead = event.Header

View file

@ -29,6 +29,7 @@ import (
)
type nilHeadSubChain struct{}
type trackedHeadSubChain struct{ nilHeadSubChain }
func (nilHeadSubChain) Config() *params.ChainConfig { return params.TestChainConfig }
@ -42,6 +43,10 @@ func (nilHeadSubChain) StateAt(common.Hash) (*state.StateDB, error) {
return state.New(types.EmptyRootHash, state.NewDatabaseForTesting())
}
func (trackedHeadSubChain) SubscribeChainHeadEvent(chan<- core.ChainHeadEvent) event.Subscription {
return event.NewSubscription(func(<-chan struct{}) error { return nil })
}
func TestTxPoolCloseNilHeadSubscription(t *testing.T) {
t.Parallel()
@ -62,3 +67,18 @@ func TestTxPoolCloseNilHeadSubscription(t *testing.T) {
t.Fatal("timed out waiting for txpool loop termination")
}
}
func TestTxPoolNewTracksHeadSubscription(t *testing.T) {
t.Parallel()
pool, err := New(0, trackedHeadSubChain{}, nil)
if err != nil {
t.Fatalf("failed to create txpool: %v", err)
}
if count := pool.subs.Count(); count != 1 {
t.Fatalf("unexpected subscription count: have %d want %d", count, 1)
}
if err := pool.Close(); err != nil {
t.Fatalf("unexpected close error: %v", err)
}
}