diff --git a/core/txpool/txpool.go b/core/txpool/txpool.go index 9c78748422..e3763d7b39 100644 --- a/core/txpool/txpool.go +++ b/core/txpool/txpool.go @@ -71,6 +71,8 @@ type TxPool struct { stateLock sync.RWMutex // The lock for protecting state instance state *state.StateDB // Current state at the blockchain head + headCh chan core.ChainHeadEvent + headSub event.Subscription subs event.SubscriptionScope // Subscription scope to unsubscribe all on shutdown quit chan chan error // Quit channel to tear down the head updater @@ -101,16 +103,21 @@ func New(gasTip uint64, chain BlockChain, subpools []SubPool) (*TxPool, error) { subpools: subpools, chain: chain, state: statedb, + headCh: make(chan core.ChainHeadEvent), quit: make(chan chan error), term: make(chan struct{}), sync: make(chan chan error), } + pool.headSub = chain.SubscribeChainHeadEvent(pool.headCh) reserver := NewReservationTracker() for i, subpool := range subpools { if err := subpool.Init(gasTip, head, reserver.NewHandle(i)); err != nil { for j := i - 1; j >= 0; j-- { subpools[j].Close() } + if pool.headSub != nil { + pool.headSub.Unsubscribe() + } return nil, err } } @@ -122,6 +129,9 @@ func New(gasTip uint64, chain BlockChain, subpools []SubPool) (*TxPool, error) { func (p *TxPool) Close() error { var errs []error + if p.headSub != nil { + p.headSub.Unsubscribe() + } // Terminate the reset loop and wait for it to finish errc := make(chan error) p.quit <- errc @@ -150,13 +160,6 @@ func (p *TxPool) loop(head *types.Header) { // Close the termination marker when the pool stops defer close(p.term) - // Subscribe to chain head events to trigger subpool resets - var ( - newHeadCh = make(chan core.ChainHeadEvent) - newHeadSub = p.chain.SubscribeChainHeadEvent(newHeadCh) - ) - defer newHeadSub.Unsubscribe() - // Track the previous and current head to feed to an idle reset var ( oldHead = head @@ -222,7 +225,7 @@ func (p *TxPool) loop(head *types.Header) { } // Wait for the next chain head event or a previous reset finish select { - case event := <-newHeadCh: + case event := <-p.headCh: // Chain moved forward, store the head for later consumption newHead = event.Header diff --git a/core/txpool/txpool_test.go b/core/txpool/txpool_test.go new file mode 100644 index 0000000000..1ed67caff3 --- /dev/null +++ b/core/txpool/txpool_test.go @@ -0,0 +1,194 @@ +// Copyright 2026 The go-ethereum Authors +// This file is part of the go-ethereum library. +// +// The go-ethereum library is free software: you can redistribute it and/or modify +// it under the terms of the GNU Lesser General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// The go-ethereum library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the go-ethereum library. If not, see . + +package txpool + +import ( + "errors" + "math/big" + "sync" + "testing" + "time" + + "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/core" + "github.com/ethereum/go-ethereum/core/state" + "github.com/ethereum/go-ethereum/core/types" + "github.com/ethereum/go-ethereum/event" + "github.com/ethereum/go-ethereum/params" +) + +type nilHeadSubChain struct{} + +func (nilHeadSubChain) Config() *params.ChainConfig { return params.TestChainConfig } + +func (nilHeadSubChain) CurrentBlock() *types.Header { return &types.Header{Root: types.EmptyRootHash} } + +func (nilHeadSubChain) SubscribeChainHeadEvent(chan<- core.ChainHeadEvent) event.Subscription { + return nil +} + +func (nilHeadSubChain) StateAt(common.Hash) (*state.StateDB, error) { + return state.New(types.EmptyRootHash, state.NewDatabaseForTesting()) +} + +type trackedHeadSubChain struct { + nilHeadSubChain + sub *subscriptionSpy +} + +func (c *trackedHeadSubChain) SubscribeChainHeadEvent(chan<- core.ChainHeadEvent) event.Subscription { + c.sub = newSubscriptionSpy() + return c.sub +} + +type subscriptionSpy struct { + err chan error + mu sync.Mutex + once sync.Once + closed bool +} + +func newSubscriptionSpy() *subscriptionSpy { + return &subscriptionSpy{err: make(chan error)} +} + +func (s *subscriptionSpy) Unsubscribe() { + s.once.Do(func() { + s.mu.Lock() + s.closed = true + s.mu.Unlock() + close(s.err) + }) +} + +func (s *subscriptionSpy) Err() <-chan error { + return s.err +} + +func (s *subscriptionSpy) isClosed() bool { + s.mu.Lock() + defer s.mu.Unlock() + + return s.closed +} + +type failingSubPool struct{} + +func (failingSubPool) Filter(*types.Transaction) bool { return false } + +func (failingSubPool) FilterType(byte) bool { return false } + +func (failingSubPool) Init(uint64, *types.Header, Reserver) error { + return errors.New("boom") +} + +func (failingSubPool) Close() error { return nil } + +func (failingSubPool) Reset(*types.Header, *types.Header) {} + +func (failingSubPool) SetGasTip(*big.Int) {} + +func (failingSubPool) Has(common.Hash) bool { return false } + +func (failingSubPool) Get(common.Hash) *types.Transaction { return nil } + +func (failingSubPool) GetRLP(common.Hash) []byte { return nil } + +func (failingSubPool) GetMetadata(common.Hash) *TxMetadata { return nil } + +func (failingSubPool) ValidateTxBasics(*types.Transaction) error { return nil } + +func (failingSubPool) Add([]*types.Transaction, bool) []error { return nil } + +func (failingSubPool) Pending(PendingFilter) (map[common.Address][]*LazyTransaction, int) { + return nil, 0 +} + +func (failingSubPool) SubscribeTransactions(chan<- core.NewTxsEvent, bool) event.Subscription { + return nil +} + +func (failingSubPool) Nonce(common.Address) uint64 { return 0 } + +func (failingSubPool) Stats() (int, int) { return 0, 0 } + +func (failingSubPool) Content() (map[common.Address][]*types.Transaction, map[common.Address][]*types.Transaction) { + return nil, nil +} + +func (failingSubPool) ContentFrom(common.Address) ([]*types.Transaction, []*types.Transaction) { + return nil, nil +} + +func (failingSubPool) Status(common.Hash) TxStatus { return TxStatusUnknown } + +func (failingSubPool) Clear() {} + +func TestTxPoolCloseUnsubscribesHeadSubscription(t *testing.T) { + t.Parallel() + + chain := &trackedHeadSubChain{} + pool, err := New(0, chain, nil) + if err != nil { + t.Fatalf("failed to create txpool: %v", err) + } + if chain.sub == nil { + t.Fatal("expected head subscription") + } + if err := pool.Close(); err != nil { + t.Fatalf("unexpected close error: %v", err) + } + if !chain.sub.isClosed() { + t.Fatal("expected head subscription to be unsubscribed on close") + } +} + +func TestTxPoolNewUnsubscribesHeadSubscriptionOnInitFailure(t *testing.T) { + t.Parallel() + + chain := &trackedHeadSubChain{} + if _, err := New(0, chain, []SubPool{failingSubPool{}}); err == nil { + t.Fatal("expected init failure") + } + if chain.sub == nil { + t.Fatal("expected head subscription") + } + if !chain.sub.isClosed() { + t.Fatal("expected head subscription to be unsubscribed on init failure") + } +} + +func TestTxPoolCloseNilHeadSubscription(t *testing.T) { + t.Parallel() + + // TxPool.BlockChain exists to allow mocked chains in tests. A mock that + // opts out of head notifications may return a nil subscription. + pool, err := New(0, nilHeadSubChain{}, nil) + if err != nil { + t.Fatalf("failed to create txpool: %v", err) + } + + if err := pool.Close(); err != nil { + t.Fatalf("unexpected close error: %v", err) + } + + select { + case <-pool.term: + case <-time.After(time.Second): + t.Fatal("timed out waiting for txpool loop termination") + } +}