fix(core/txpool): coordinate reset lifecycle and shutdown signaling #28837 (#2132)

Improve txpool loop synchronization around background resets.

This change:
- adds an explicit termination channel to signal pool shutdown
- tracks forced-reset intent and a waiter channel inside the reset loop
- ensures reset waiters are notified on completion or on pool termination
- allows an explicit sync request path to trigger an additional reset round when needed

Scope is limited to internal txpool concurrency control in core/txpool/txpool.go, with no protocol or RPC behavior change.
This commit is contained in:
Daniel Liu 2026-03-17 13:43:47 +08:00 committed by GitHub
parent 2137fb3e8d
commit a9be217e6e
No known key found for this signature in database
GPG key ID: B5690EEEBB952194

View file

@ -17,6 +17,7 @@
package txpool
import (
"errors"
"fmt"
"maps"
"math/big"
@ -57,6 +58,9 @@ type TxPool struct {
subs event.SubscriptionScope // Subscription scope to unsubscribe all on shutdown
quit chan chan error // Quit channel to tear down the head updater
term chan struct{} // Termination channel to detect a closed pool
sync chan chan error // Testing / simulator channel to block until internal reset is done
}
// New creates a new transaction pool to gather, sort and filter inbound
@ -70,6 +74,8 @@ func New(gasTip uint64, chain BlockChain, subpools []SubPool) (*TxPool, error) {
pool := &TxPool{
subpools: subpools,
quit: make(chan chan error),
term: make(chan struct{}),
sync: make(chan chan error),
}
reserver := NewReservationTracker()
for i, subpool := range subpools {
@ -113,6 +119,9 @@ func (p *TxPool) Close() error {
// outside blockchain events as well as for various reporting and transaction
// eviction events.
func (p *TxPool) loop(head *types.Header, chain BlockChain) {
// Close the termination marker when the pool stops
defer close(p.term)
// Subscribe to chain head events to trigger subpool resets
var (
newHeadCh = make(chan core.ChainHeadEvent)
@ -129,13 +138,26 @@ func (p *TxPool) loop(head *types.Header, chain BlockChain) {
var (
resetBusy = make(chan struct{}, 1) // Allow 1 reset to run concurrently
resetDone = make(chan *types.Header)
resetForced bool // Whether a forced reset was requested, only used in simulator mode
resetWaiter chan error // Channel waiting on a forced reset, only used in simulator mode
)
// Notify the live reset waiter without blocking if the txpool is closed.
defer func() {
if resetWaiter != nil {
select {
case resetWaiter <- errors.New("pool already terminated"):
default:
}
resetWaiter = nil
}
}()
var errc chan error
for errc == nil {
// Something interesting might have happened, run a reset if there is
// one needed but none is running. The resetter will run on its own
// goroutine to allow chain head events to be consumed contiguously.
if newHead != oldHead {
if newHead != oldHead || resetForced {
// Try to inject a busy marker and start a reset if successful
select {
case resetBusy <- struct{}{}:
@ -147,8 +169,17 @@ func (p *TxPool) loop(head *types.Header, chain BlockChain) {
resetDone <- newHead
}(oldHead, newHead)
// If the reset operation was explicitly requested, consider it
// being fulfilled and drop the request marker. If it was not,
// this is a noop.
resetForced = false
default:
// Reset already running, wait until it finishes
// Reset already running, wait until it finishes.
//
// Note, this will not drop any forced reset request. If a forced
// reset was requested, but we were busy, then when the currently
// running reset finishes, a new one will be spun up.
}
}
// Wait for the next chain head event or a previous reset finish
@ -162,8 +193,37 @@ func (p *TxPool) loop(head *types.Header, chain BlockChain) {
oldHead = head
<-resetBusy
// If someone is waiting for a reset to finish, notify them, unless
// the forced op is still pending. In that case, wait another round
// of resets.
if resetWaiter != nil && !resetForced {
select {
case resetWaiter <- nil:
// notification delivered
default:
// no active listener; avoid blocking the event loop
}
resetWaiter = nil
}
case errc = <-p.quit:
// Termination requested, break out on the next loop round
case syncc := <-p.sync:
// Transaction pool is running inside a simulator, and we are about
// to create a new block. Request a forced sync operation to ensure
// that any running reset operation finishes to make block imports
// deterministic. On top of that, run a new reset operation to make
// transaction insertions deterministic instead of being stuck in a
// queue waiting for a reset.
if resetWaiter != nil {
// A previous sync waiter is still pending; notify it to avoid
// leaking a goroutine waiting on the old channel.
resetWaiter <- errors.New("sync request superseded by a new request")
resetWaiter = nil
}
resetForced = true
resetWaiter = syncc
}
}
// Notify the closer of termination (no error possible for now)
@ -375,3 +435,20 @@ func (pool *TxPool) IsSigner(addr common.Address) bool {
}
return false
}
// Sync is a helper method for unit tests or simulator runs where the chain events
// are arriving in quick succession, without any time in between them to run the
// internal background reset operations. This method will run an explicit reset
// operation to ensure the pool stabilises, thus avoiding flakey behavior.
//
// Note, do not use this in production / live code. In live code, the pool is
// meant to reset on a separate thread to avoid DoS vectors.
func (p *TxPool) Sync() error {
sync := make(chan error)
select {
case p.sync <- sync:
return <-sync
case <-p.term:
return errors.New("pool already terminated")
}
}