From 469846166a628e31e1c0bedba7934dd3c3784421 Mon Sep 17 00:00:00 2001 From: apetro2 Date: Fri, 6 Mar 2026 22:39:02 +0800 Subject: [PATCH 1/5] core/txpool: guard nil head subscription --- core/txpool/txpool.go | 4 +- core/txpool/txpool_test.go | 76 ++++++++++++++++++++++++++++++++++++++ 2 files changed, 79 insertions(+), 1 deletion(-) create mode 100644 core/txpool/txpool_test.go diff --git a/core/txpool/txpool.go b/core/txpool/txpool.go index 25647e0cce..861ccceacb 100644 --- a/core/txpool/txpool.go +++ b/core/txpool/txpool.go @@ -152,7 +152,9 @@ func (p *TxPool) loop(head *types.Header) { newHeadCh = make(chan core.ChainHeadEvent) newHeadSub = p.chain.SubscribeChainHeadEvent(newHeadCh) ) - defer newHeadSub.Unsubscribe() + if newHeadSub != nil { + defer newHeadSub.Unsubscribe() + } // Track the previous and current head to feed to an idle reset var ( diff --git a/core/txpool/txpool_test.go b/core/txpool/txpool_test.go new file mode 100644 index 0000000000..8125d0714d --- /dev/null +++ b/core/txpool/txpool_test.go @@ -0,0 +1,76 @@ +// Copyright 2026 The go-ethereum Authors +// This file is part of the go-ethereum library. +// +// The go-ethereum library is free software: you can redistribute it and/or modify +// it under the terms of the GNU Lesser General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// The go-ethereum library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the go-ethereum library. If not, see . + +package txpool + +import ( + "errors" + "testing" + "time" + + "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/core" + "github.com/ethereum/go-ethereum/core/state" + "github.com/ethereum/go-ethereum/core/types" + "github.com/ethereum/go-ethereum/event" + "github.com/ethereum/go-ethereum/params" +) + +type nilHeadSubChain struct{} + +func (nilHeadSubChain) Config() *params.ChainConfig { return params.TestChainConfig } + +func (nilHeadSubChain) CurrentBlock() *types.Header { return &types.Header{} } + +func (nilHeadSubChain) SubscribeChainHeadEvent(chan<- core.ChainHeadEvent) event.Subscription { + return nil +} + +func (nilHeadSubChain) StateAt(common.Hash) (*state.StateDB, error) { + return nil, errors.New("not implemented") +} + +func TestTxPoolLoopNilHeadSubscription(t *testing.T) { + t.Parallel() + + pool := &TxPool{ + chain: nilHeadSubChain{}, + quit: make(chan chan error), + term: make(chan struct{}), + sync: make(chan chan error), + } + go pool.loop(nil) + + errc := make(chan error, 1) + select { + case pool.quit <- errc: + case <-time.After(time.Second): + t.Fatal("timed out waiting for txpool loop to accept quit signal") + } + select { + case err := <-errc: + if err != nil { + t.Fatalf("unexpected close error: %v", err) + } + case <-time.After(time.Second): + t.Fatal("timed out waiting for txpool loop to stop") + } + select { + case <-pool.term: + case <-time.After(time.Second): + t.Fatal("timed out waiting for txpool loop termination") + } +} From 7f64995ee33c65a2750b6b8c1986b141b7a52fda Mon Sep 17 00:00:00 2001 From: apetro2 Date: Thu, 19 Mar 2026 00:24:39 +0800 Subject: [PATCH 2/5] core/txpool: guard nil head subscription --- core/txpool/txpool_test.go | 34 +++++++++++----------------------- 1 file changed, 11 insertions(+), 23 deletions(-) diff --git a/core/txpool/txpool_test.go b/core/txpool/txpool_test.go index 8125d0714d..89055abcd4 100644 --- a/core/txpool/txpool_test.go +++ b/core/txpool/txpool_test.go @@ -17,7 +17,6 @@ package txpool import ( - "errors" "testing" "time" @@ -33,41 +32,30 @@ type nilHeadSubChain struct{} func (nilHeadSubChain) Config() *params.ChainConfig { return params.TestChainConfig } -func (nilHeadSubChain) CurrentBlock() *types.Header { return &types.Header{} } +func (nilHeadSubChain) CurrentBlock() *types.Header { return &types.Header{Root: types.EmptyRootHash} } func (nilHeadSubChain) SubscribeChainHeadEvent(chan<- core.ChainHeadEvent) event.Subscription { return nil } func (nilHeadSubChain) StateAt(common.Hash) (*state.StateDB, error) { - return nil, errors.New("not implemented") + return state.New(types.EmptyRootHash, state.NewDatabaseForTesting()) } -func TestTxPoolLoopNilHeadSubscription(t *testing.T) { +func TestTxPoolCloseNilHeadSubscription(t *testing.T) { t.Parallel() - pool := &TxPool{ - chain: nilHeadSubChain{}, - quit: make(chan chan error), - term: make(chan struct{}), - sync: make(chan chan error), + // TxPool.BlockChain exists to allow mocked chains in tests. A mock that + // opts out of head notifications may return a nil subscription. + pool, err := New(0, nilHeadSubChain{}, nil) + if err != nil { + t.Fatalf("failed to create txpool: %v", err) } - go pool.loop(nil) - errc := make(chan error, 1) - select { - case pool.quit <- errc: - case <-time.After(time.Second): - t.Fatal("timed out waiting for txpool loop to accept quit signal") - } - select { - case err := <-errc: - if err != nil { - t.Fatalf("unexpected close error: %v", err) - } - case <-time.After(time.Second): - t.Fatal("timed out waiting for txpool loop to stop") + if err := pool.Close(); err != nil { + t.Fatalf("unexpected close error: %v", err) } + select { case <-pool.term: case <-time.After(time.Second): From 27245764f37860486c0eff8787d65b1a72592945 Mon Sep 17 00:00:00 2001 From: apetro2 Date: Sun, 22 Mar 2026 10:44:30 +0800 Subject: [PATCH 3/5] core/txpool: subscribe in constructor --- core/txpool/txpool.go | 16 ++++++---------- core/txpool/txpool_test.go | 20 ++++++++++++++++++++ 2 files changed, 26 insertions(+), 10 deletions(-) diff --git a/core/txpool/txpool.go b/core/txpool/txpool.go index 861ccceacb..0f66a5abef 100644 --- a/core/txpool/txpool.go +++ b/core/txpool/txpool.go @@ -68,6 +68,7 @@ type TxPool struct { stateLock sync.RWMutex // The lock for protecting state instance state *state.StateDB // Current state at the blockchain head + headCh chan core.ChainHeadEvent subs event.SubscriptionScope // Subscription scope to unsubscribe all on shutdown quit chan chan error // Quit channel to tear down the head updater @@ -98,10 +99,14 @@ func New(gasTip uint64, chain BlockChain, subpools []SubPool) (*TxPool, error) { subpools: subpools, chain: chain, state: statedb, + headCh: make(chan core.ChainHeadEvent), quit: make(chan chan error), term: make(chan struct{}), sync: make(chan chan error), } + if sub := chain.SubscribeChainHeadEvent(pool.headCh); sub != nil { + pool.subs.Track(sub) + } reserver := NewReservationTracker() for i, subpool := range subpools { if err := subpool.Init(gasTip, head, reserver.NewHandle(i)); err != nil { @@ -147,15 +152,6 @@ func (p *TxPool) loop(head *types.Header) { // Close the termination marker when the pool stops defer close(p.term) - // Subscribe to chain head events to trigger subpool resets - var ( - newHeadCh = make(chan core.ChainHeadEvent) - newHeadSub = p.chain.SubscribeChainHeadEvent(newHeadCh) - ) - if newHeadSub != nil { - defer newHeadSub.Unsubscribe() - } - // Track the previous and current head to feed to an idle reset var ( oldHead = head @@ -221,7 +217,7 @@ func (p *TxPool) loop(head *types.Header) { } // Wait for the next chain head event or a previous reset finish select { - case event := <-newHeadCh: + case event := <-p.headCh: // Chain moved forward, store the head for later consumption newHead = event.Header diff --git a/core/txpool/txpool_test.go b/core/txpool/txpool_test.go index 89055abcd4..f28b6b18ad 100644 --- a/core/txpool/txpool_test.go +++ b/core/txpool/txpool_test.go @@ -29,6 +29,7 @@ import ( ) type nilHeadSubChain struct{} +type trackedHeadSubChain struct{ nilHeadSubChain } func (nilHeadSubChain) Config() *params.ChainConfig { return params.TestChainConfig } @@ -42,6 +43,10 @@ func (nilHeadSubChain) StateAt(common.Hash) (*state.StateDB, error) { return state.New(types.EmptyRootHash, state.NewDatabaseForTesting()) } +func (trackedHeadSubChain) SubscribeChainHeadEvent(chan<- core.ChainHeadEvent) event.Subscription { + return event.NewSubscription(func(<-chan struct{}) error { return nil }) +} + func TestTxPoolCloseNilHeadSubscription(t *testing.T) { t.Parallel() @@ -62,3 +67,18 @@ func TestTxPoolCloseNilHeadSubscription(t *testing.T) { t.Fatal("timed out waiting for txpool loop termination") } } + +func TestTxPoolNewTracksHeadSubscription(t *testing.T) { + t.Parallel() + + pool, err := New(0, trackedHeadSubChain{}, nil) + if err != nil { + t.Fatalf("failed to create txpool: %v", err) + } + if count := pool.subs.Count(); count != 1 { + t.Fatalf("unexpected subscription count: have %d want %d", count, 1) + } + if err := pool.Close(); err != nil { + t.Fatalf("unexpected close error: %v", err) + } +} From 55dae4fd09babea853c28fbf1896faaf74f28f9e Mon Sep 17 00:00:00 2001 From: apetro2 Date: Wed, 25 Mar 2026 21:24:50 +0800 Subject: [PATCH 4/5] core/txpool: guard nil head subscription --- core/txpool/txpool.go | 11 ++- core/txpool/txpool_test.go | 144 ++++++++++++++++++++++++++++++++----- 2 files changed, 134 insertions(+), 21 deletions(-) diff --git a/core/txpool/txpool.go b/core/txpool/txpool.go index 0f66a5abef..2ce5db326f 100644 --- a/core/txpool/txpool.go +++ b/core/txpool/txpool.go @@ -69,6 +69,7 @@ type TxPool struct { stateLock sync.RWMutex // The lock for protecting state instance state *state.StateDB // Current state at the blockchain head headCh chan core.ChainHeadEvent + headSub event.Subscription subs event.SubscriptionScope // Subscription scope to unsubscribe all on shutdown quit chan chan error // Quit channel to tear down the head updater @@ -104,15 +105,16 @@ func New(gasTip uint64, chain BlockChain, subpools []SubPool) (*TxPool, error) { term: make(chan struct{}), sync: make(chan chan error), } - if sub := chain.SubscribeChainHeadEvent(pool.headCh); sub != nil { - pool.subs.Track(sub) - } + pool.headSub = chain.SubscribeChainHeadEvent(pool.headCh) reserver := NewReservationTracker() for i, subpool := range subpools { if err := subpool.Init(gasTip, head, reserver.NewHandle(i)); err != nil { for j := i - 1; j >= 0; j-- { subpools[j].Close() } + if pool.headSub != nil { + pool.headSub.Unsubscribe() + } return nil, err } } @@ -124,6 +126,9 @@ func New(gasTip uint64, chain BlockChain, subpools []SubPool) (*TxPool, error) { func (p *TxPool) Close() error { var errs []error + if p.headSub != nil { + p.headSub.Unsubscribe() + } // Terminate the reset loop and wait for it to finish errc := make(chan error) p.quit <- errc diff --git a/core/txpool/txpool_test.go b/core/txpool/txpool_test.go index f28b6b18ad..f320c894c6 100644 --- a/core/txpool/txpool_test.go +++ b/core/txpool/txpool_test.go @@ -17,6 +17,9 @@ package txpool import ( + "errors" + "math/big" + "sync" "testing" "time" @@ -29,7 +32,6 @@ import ( ) type nilHeadSubChain struct{} -type trackedHeadSubChain struct{ nilHeadSubChain } func (nilHeadSubChain) Config() *params.ChainConfig { return params.TestChainConfig } @@ -43,8 +45,129 @@ func (nilHeadSubChain) StateAt(common.Hash) (*state.StateDB, error) { return state.New(types.EmptyRootHash, state.NewDatabaseForTesting()) } -func (trackedHeadSubChain) SubscribeChainHeadEvent(chan<- core.ChainHeadEvent) event.Subscription { - return event.NewSubscription(func(<-chan struct{}) error { return nil }) +type trackedHeadSubChain struct { + nilHeadSubChain + sub *subscriptionSpy +} + +func (c *trackedHeadSubChain) SubscribeChainHeadEvent(chan<- core.ChainHeadEvent) event.Subscription { + c.sub = newSubscriptionSpy() + return c.sub +} + +type subscriptionSpy struct { + err chan error + mu sync.Mutex + once sync.Once + closed bool +} + +func newSubscriptionSpy() *subscriptionSpy { + return &subscriptionSpy{err: make(chan error)} +} + +func (s *subscriptionSpy) Unsubscribe() { + s.once.Do(func() { + s.mu.Lock() + s.closed = true + s.mu.Unlock() + close(s.err) + }) +} + +func (s *subscriptionSpy) Err() <-chan error { + return s.err +} + +func (s *subscriptionSpy) isClosed() bool { + s.mu.Lock() + defer s.mu.Unlock() + + return s.closed +} + +type failingSubPool struct{} + +func (failingSubPool) Filter(*types.Transaction) bool { return false } + +func (failingSubPool) FilterType(byte) bool { return false } + +func (failingSubPool) Init(uint64, *types.Header, Reserver) error { + return errors.New("boom") +} + +func (failingSubPool) Close() error { return nil } + +func (failingSubPool) Reset(*types.Header, *types.Header) {} + +func (failingSubPool) SetGasTip(*big.Int) {} + +func (failingSubPool) Has(common.Hash) bool { return false } + +func (failingSubPool) Get(common.Hash) *types.Transaction { return nil } + +func (failingSubPool) GetRLP(common.Hash) []byte { return nil } + +func (failingSubPool) GetMetadata(common.Hash) *TxMetadata { return nil } + +func (failingSubPool) ValidateTxBasics(*types.Transaction) error { return nil } + +func (failingSubPool) Add([]*types.Transaction, bool) []error { return nil } + +func (failingSubPool) Pending(PendingFilter) map[common.Address][]*LazyTransaction { return nil } + +func (failingSubPool) SubscribeTransactions(chan<- core.NewTxsEvent, bool) event.Subscription { + return nil +} + +func (failingSubPool) Nonce(common.Address) uint64 { return 0 } + +func (failingSubPool) Stats() (int, int) { return 0, 0 } + +func (failingSubPool) Content() (map[common.Address][]*types.Transaction, map[common.Address][]*types.Transaction) { + return nil, nil +} + +func (failingSubPool) ContentFrom(common.Address) ([]*types.Transaction, []*types.Transaction) { + return nil, nil +} + +func (failingSubPool) Status(common.Hash) TxStatus { return TxStatusUnknown } + +func (failingSubPool) Clear() {} + +func TestTxPoolCloseUnsubscribesHeadSubscription(t *testing.T) { + t.Parallel() + + chain := &trackedHeadSubChain{} + pool, err := New(0, chain, nil) + if err != nil { + t.Fatalf("failed to create txpool: %v", err) + } + if chain.sub == nil { + t.Fatal("expected head subscription") + } + if err := pool.Close(); err != nil { + t.Fatalf("unexpected close error: %v", err) + } + if !chain.sub.isClosed() { + t.Fatal("expected head subscription to be unsubscribed on close") + } +} + +func TestTxPoolNewUnsubscribesHeadSubscriptionOnInitFailure(t *testing.T) { + t.Parallel() + + chain := &trackedHeadSubChain{} + if _, err := New(0, chain, []SubPool{failingSubPool{}}); err == nil { + t.Fatal("expected init failure") + } + if chain.sub == nil { + t.Fatal("expected head subscription") + } + if !chain.sub.isClosed() { + t.Fatal("expected head subscription to be unsubscribed on init failure") + } } func TestTxPoolCloseNilHeadSubscription(t *testing.T) { @@ -67,18 +190,3 @@ func TestTxPoolCloseNilHeadSubscription(t *testing.T) { t.Fatal("timed out waiting for txpool loop termination") } } - -func TestTxPoolNewTracksHeadSubscription(t *testing.T) { - t.Parallel() - - pool, err := New(0, trackedHeadSubChain{}, nil) - if err != nil { - t.Fatalf("failed to create txpool: %v", err) - } - if count := pool.subs.Count(); count != 1 { - t.Fatalf("unexpected subscription count: have %d want %d", count, 1) - } - if err := pool.Close(); err != nil { - t.Fatalf("unexpected close error: %v", err) - } -} From 6d81fe2dcce891bfb7a8d2abe05172962a436b22 Mon Sep 17 00:00:00 2001 From: apetro2 Date: Thu, 26 Mar 2026 22:05:05 +0800 Subject: [PATCH 5/5] core/txpool: fix test stub after rebase --- core/txpool/txpool_test.go | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/core/txpool/txpool_test.go b/core/txpool/txpool_test.go index f320c894c6..1ed67caff3 100644 --- a/core/txpool/txpool_test.go +++ b/core/txpool/txpool_test.go @@ -114,7 +114,9 @@ func (failingSubPool) ValidateTxBasics(*types.Transaction) error { return nil } func (failingSubPool) Add([]*types.Transaction, bool) []error { return nil } -func (failingSubPool) Pending(PendingFilter) map[common.Address][]*LazyTransaction { return nil } +func (failingSubPool) Pending(PendingFilter) (map[common.Address][]*LazyTransaction, int) { + return nil, 0 +} func (failingSubPool) SubscribeTransactions(chan<- core.NewTxsEvent, bool) event.Subscription { return nil