fix(core/txpool): allow tx and authority regardless of admission order #31373 (#2185)

This commit is contained in:
Daniel Liu 2026-03-18 14:05:00 +08:00 committed by GitHub
parent e647c71f5b
commit 508680055b
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
5 changed files with 245 additions and 89 deletions

View file

@ -251,7 +251,7 @@ type LegacyPool struct {
currentHead atomic.Pointer[types.Header] // Current head of the blockchain
currentState *state.StateDB // Current state in the blockchain head
pendingNonces *noncer // Pending state tracking virtual nonces
reserver *txpool.Reserver // Address reserver to ensure exclusivity across subpools
reserver txpool.Reserver // Address reserver to ensure exclusivity across subpools
pending map[common.Address]*list // All currently processable transactions
queue map[common.Address]*list // Queued but non-processable transactions
@ -320,7 +320,7 @@ func (pool *LegacyPool) Filter(tx *types.Transaction) bool {
// Init sets the gas price needed to keep a transaction in the pool and the chain
// head to allow balance / nonce checks. The internal
// goroutines will be spun up and the pool deemed operational afterwards.
func (pool *LegacyPool) Init(gasTip uint64, head *types.Header, reserver *txpool.Reserver) error {
func (pool *LegacyPool) Init(gasTip uint64, head *types.Header, reserver txpool.Reserver) error {
// Set the address reserver to request exclusive access to pooled accounts
pool.reserver = reserver
@ -694,11 +694,18 @@ func (pool *LegacyPool) validateAuth(tx *types.Transaction) error {
if err := pool.checkDelegationLimit(tx); err != nil {
return err
}
// Authorities must not conflict with any pending or queued transactions,
// nor with addresses that have already been reserved.
// For symmetry, allow at most one in-flight tx for any authority with a
// pending transaction.
if auths := tx.SetCodeAuthorities(); len(auths) > 0 {
for _, auth := range auths {
if pool.pending[auth] != nil || pool.queue[auth] != nil {
var count int
if pending := pool.pending[auth]; pending != nil {
count += pending.Len()
}
if queue := pool.queue[auth]; queue != nil {
count += queue.Len()
}
if count > 1 {
return ErrAuthorityReserved
}
// Because there is no exclusive lock held between different subpools
@ -2022,9 +2029,13 @@ func (pool *LegacyPool) Clear() {
// The transaction addition may attempt to reserve the sender addr which
// can't happen until Clear releases the reservation lock. Clear cannot
// acquire the subpool lock until the transaction addition is completed.
for _, tx := range pool.all.txs {
senderAddr, _ := types.Sender(pool.signer, tx)
pool.reserver.Release(senderAddr)
for addr := range pool.pending {
if _, ok := pool.queue[addr]; !ok {
pool.reserver.Release(addr)
}
}
for addr := range pool.queue {
pool.reserver.Release(addr)
}
pool.all = newLookup()
pool.priced = newPricedList(pool.all)

View file

@ -24,6 +24,7 @@ import (
"math"
"math/big"
"math/rand"
"sync"
"sync/atomic"
"testing"
"time"
@ -186,8 +187,39 @@ func setupPool() (*LegacyPool, *ecdsa.PrivateKey) {
return setupPoolWithConfig(params.TestChainConfig)
}
func newReserver() *txpool.Reserver {
return txpool.NewReservationTracker().NewHandle(42)
// reserver is a utility struct to sanity check that accounts are
// properly reserved by the blobpool (no duplicate reserves or unreserves).
type reserver struct {
accounts map[common.Address]struct{}
lock sync.RWMutex
}
func newReserver() txpool.Reserver {
return &reserver{accounts: make(map[common.Address]struct{})}
}
func (r *reserver) Hold(addr common.Address) error {
r.lock.Lock()
defer r.lock.Unlock()
if _, exists := r.accounts[addr]; exists {
panic("already reserved")
}
r.accounts[addr] = struct{}{}
return nil
}
func (r *reserver) Release(addr common.Address) error {
r.lock.Lock()
defer r.lock.Unlock()
if _, exists := r.accounts[addr]; !exists {
panic("not reserved")
}
delete(r.accounts, addr)
return nil
}
func (r *reserver) Has(address common.Address) bool {
return false // reserver only supports a single pool
}
func setupPoolWithConfig(config *params.ChainConfig) (*LegacyPool, *ecdsa.PrivateKey) {
@ -2497,7 +2529,6 @@ func TestSetCodeTransactions(t *testing.T) {
minGasPrice := new(big.Int).Set(common.MinGasPrice)
minGasFee := uint256.MustFromBig(minGasPrice)
doubleGasFee := new(uint256.Int).Lsh(new(uint256.Int).Set(minGasFee), 1)
tripleGasFee := new(uint256.Int).Mul(new(uint256.Int).Set(minGasFee), uint256.NewInt(3))
legacyReplacePrice := new(big.Int).Mul(minGasPrice, big.NewInt(10))
@ -2509,9 +2540,8 @@ func TestSetCodeTransactions(t *testing.T) {
}{
{
// Check that only one in-flight transaction is allowed for accounts
// with delegation set. Also verify the accepted transaction can be
// replaced by fee.
name: "only-one-in-flight",
// with delegation set.
name: "accept-one-inflight-tx-of-delegated-account",
pending: 1,
run: func(name string) {
aa := common.Address{0xaa, 0xaa}
@ -2526,6 +2556,7 @@ func TestSetCodeTransactions(t *testing.T) {
if err := pool.addRemoteSync(pricedTransaction(0, 100000, minGasPrice, keyA)); err != nil {
t.Fatalf("%s: failed to add remote transaction: %v", name, err)
}
// Second and further transactions shall be rejected
if err := pool.addRemoteSync(pricedTransaction(1, 100000, minGasPrice, keyA)); !errors.Is(err, txpool.ErrInflightTxLimitReached) {
t.Fatalf("%s: error mismatch: want %v, have %v", name, txpool.ErrInflightTxLimitReached, err)
}
@ -2537,6 +2568,70 @@ func TestSetCodeTransactions(t *testing.T) {
if err := pool.addRemoteSync(pricedTransaction(0, 100000, legacyReplacePrice, keyA)); err != nil {
t.Fatalf("%s: failed to replace with remote transaction: %v", name, err)
}
// Reset the delegation, avoid leaking state into the other tests
statedb.SetCode(addrA, nil)
},
},
{
// This test is analogous to the previous one, but the delegation is pending
// instead of set.
name: "allow-one-tx-from-pooled-delegation",
pending: 2,
run: func(name string) {
// Create a pending delegation request from B.
if err := pool.addRemoteSync(setCodeTx(0, keyA, []unsignedAuth{{0, keyB}})); err != nil {
t.Fatalf("%s: failed to add with remote setcode transaction: %v", name, err)
}
// First transaction from B is accepted.
if err := pool.addRemoteSync(pricedTransaction(0, 100000, minGasPrice, keyB)); err != nil {
t.Fatalf("%s: failed to add remote transaction: %v", name, err)
}
// Second transaction fails due to limit.
if err := pool.addRemoteSync(pricedTransaction(1, 100000, minGasPrice, keyB)); !errors.Is(err, txpool.ErrInflightTxLimitReached) {
t.Fatalf("%s: error mismatch: want %v, have %v", name, txpool.ErrInflightTxLimitReached, err)
}
// Replace by fee for first transaction from B works.
if err := pool.addRemoteSync(pricedTransaction(0, 100000, legacyReplacePrice, keyB)); err != nil {
t.Fatalf("%s: failed to add remote transaction: %v", name, err)
}
},
},
{
// This is the symmetric case of the previous one, where the delegation request
// is received after the transaction. The resulting state shall be the same.
name: "accept-authorization-from-sender-of-one-inflight-tx",
pending: 2,
run: func(name string) {
// The first in-flight transaction is accepted.
if err := pool.addRemoteSync(pricedTransaction(0, 100000, minGasPrice, keyB)); err != nil {
t.Fatalf("%s: failed to add with pending delegation: %v", name, err)
}
// Delegation is accepted.
if err := pool.addRemoteSync(setCodeTx(0, keyA, []unsignedAuth{{0, keyB}})); err != nil {
t.Fatalf("%s: failed to add remote transaction: %v", name, err)
}
// The second in-flight transaction is rejected.
if err := pool.addRemoteSync(pricedTransaction(1, 100000, minGasPrice, keyB)); !errors.Is(err, txpool.ErrInflightTxLimitReached) {
t.Fatalf("%s: error mismatch: want %v, have %v", name, txpool.ErrInflightTxLimitReached, err)
}
},
},
{
name: "reject-authorization-from-sender-with-more-than-one-inflight-tx",
pending: 2,
run: func(name string) {
// Submit two transactions.
if err := pool.addRemoteSync(pricedTransaction(0, 100000, minGasPrice, keyB)); err != nil {
t.Fatalf("%s: failed to add with pending delegation: %v", name, err)
}
if err := pool.addRemoteSync(pricedTransaction(1, 100000, minGasPrice, keyB)); err != nil {
t.Fatalf("%s: failed to add with pending delegation: %v", name, err)
}
// Delegation rejected since two txs are already in-flight.
if err := pool.addRemoteSync(setCodeTx(0, keyA, []unsignedAuth{{0, keyB}})); !errors.Is(err, ErrAuthorityReserved) {
t.Fatalf("%s: error mismatch: want %v, have %v", name, ErrAuthorityReserved, err)
}
},
},
{
@ -2544,7 +2639,7 @@ func TestSetCodeTransactions(t *testing.T) {
pending: 2,
run: func(name string) {
// Send two transactions where the first has no conflicting delegations and
// the second should be allowed despite conflicting with the authorities in 1).
// the second should be allowed despite conflicting with the authorities in the first.
if err := pool.addRemoteSync(setCodeTx(0, keyA, []unsignedAuth{{1, keyC}})); err != nil {
t.Fatalf("%s: failed to add with remote setcode transaction: %v", name, err)
}
@ -2554,82 +2649,105 @@ func TestSetCodeTransactions(t *testing.T) {
},
},
{
name: "allow-one-tx-from-pooled-delegation",
pending: 2,
name: "replace-by-fee-setcode-tx",
pending: 1,
run: func(name string) {
// Verify C cannot originate another transaction when it has a pooled delegation.
if err := pool.addRemoteSync(setCodeTx(0, keyA, []unsignedAuth{{0, keyC}})); err != nil {
if err := pool.addRemoteSync(setCodeTx(0, keyB, []unsignedAuth{{1, keyC}})); err != nil {
t.Fatalf("%s: failed to add with remote setcode transaction: %v", name, err)
}
if err := pool.addRemoteSync(pricedTransaction(0, 100000, minGasPrice, keyC)); err != nil {
t.Fatalf("%s: failed to add with pending delegation: %v", name, err)
if err := pool.addRemoteSync(pricedSetCodeTx(0, 250000, tripleGasFee, tripleGasFee, keyB, []unsignedAuth{{0, keyC}})); err != nil {
t.Fatalf("%s: failed to add with remote setcode transaction: %v", name, err)
}
// Also check gapped transaction is rejected.
if err := pool.addRemoteSync(pricedTransaction(1, 100000, minGasPrice, keyC)); !errors.Is(err, txpool.ErrInflightTxLimitReached) {
},
},
{
name: "allow-more-than-one-tx-from-replaced-authority",
pending: 3,
run: func(name string) {
// Send transaction from A with B as an authority.
if err := pool.addRemoteSync(pricedSetCodeTx(0, 250000, minGasFee, minGasFee, keyA, []unsignedAuth{{0, keyB}})); err != nil {
t.Fatalf("%s: failed to add with remote setcode transaction: %v", name, err)
}
// Replace transaction with another having C as an authority.
if err := pool.addRemoteSync(pricedSetCodeTx(0, 250000, tripleGasFee, tripleGasFee, keyA, []unsignedAuth{{0, keyC}})); err != nil {
t.Fatalf("%s: failed to add with remote setcode transaction: %v", name, err)
}
// B should not be considred as having an in-flight delegation, so
// should allow more than one pooled transaction.
if err := pool.addRemoteSync(pricedTransaction(0, 100000, minGasPrice, keyB)); err != nil {
t.Fatalf("%s: failed to replace with remote transaction: %v", name, err)
}
if err := pool.addRemoteSync(pricedTransaction(1, 100000, minGasPrice, keyB)); err != nil {
t.Fatalf("%s: failed to replace with remote transaction: %v", name, err)
}
},
},
{
// This test is analogous to the previous one, but the the replaced
// transaction is self-sponsored.
name: "allow-tx-from-replaced-self-sponsor-authority",
pending: 3,
run: func(name string) {
// Send transaction from A with A as an authority.
if err := pool.addRemoteSync(pricedSetCodeTx(0, 250000, minGasFee, minGasFee, keyA, []unsignedAuth{{0, keyA}})); err != nil {
t.Fatalf("%s: failed to add with remote setcode transaction: %v", name, err)
}
// Replace transaction with a transaction with B as an authority.
if err := pool.addRemoteSync(pricedSetCodeTx(0, 250000, tripleGasFee, tripleGasFee, keyA, []unsignedAuth{{0, keyB}})); err != nil {
t.Fatalf("%s: failed to add with remote setcode transaction: %v", name, err)
}
// The one in-flight transaction limit from A no longer applies, so we
// can stack a second transaction for the account.
if err := pool.addRemoteSync(pricedTransaction(1, 100000, legacyReplacePrice, keyA)); err != nil {
t.Fatalf("%s: failed to replace with remote transaction: %v", name, err)
}
// B should still be able to send transactions.
if err := pool.addRemoteSync(pricedTransaction(0, 100000, legacyReplacePrice, keyB)); err != nil {
t.Fatalf("%s: failed to replace with remote transaction: %v", name, err)
}
// However B still has the limitation to one in-flight transaction.
if err := pool.addRemoteSync(pricedTransaction(1, 100000, minGasPrice, keyB)); !errors.Is(err, txpool.ErrInflightTxLimitReached) {
t.Fatalf("%s: error mismatch: want %v, have %v", name, txpool.ErrInflightTxLimitReached, err)
}
},
},
{
name: "replace-by-fee-setcode-tx",
pending: 1,
run: func(name string) {
// 4. Fee bump the setcode tx send.
if err := pool.addRemoteSync(setCodeTx(0, keyB, []unsignedAuth{{1, keyC}})); err != nil {
t.Fatalf("%s: failed to add with remote setcode transaction: %v", name, err)
}
if err := pool.addRemoteSync(pricedSetCodeTx(0, 250000, doubleGasFee, uint256.NewInt(2), keyB, []unsignedAuth{{0, keyC}})); err != nil {
t.Fatalf("%s: failed to add with remote setcode transaction: %v", name, err)
}
},
},
{
name: "allow-tx-from-replaced-authority",
name: "replacements-respect-inflight-tx-count",
pending: 2,
run: func(name string) {
// Fee bump with a different auth list. Make sure that unlocks the authorities.
if err := pool.addRemoteSync(pricedSetCodeTx(0, 250000, minGasFee, uint256.NewInt(3), keyA, []unsignedAuth{{0, keyB}})); err != nil {
// Send transaction from A with B as an authority.
if err := pool.addRemoteSync(pricedSetCodeTx(0, 250000, minGasFee, minGasFee, keyA, []unsignedAuth{{0, keyB}})); err != nil {
t.Fatalf("%s: failed to add with remote setcode transaction: %v", name, err)
}
if err := pool.addRemoteSync(pricedSetCodeTx(0, 250000, tripleGasFee, uint256.NewInt(300), keyA, []unsignedAuth{{0, keyC}})); err != nil {
t.Fatalf("%s: failed to add with remote setcode transaction: %v", name, err)
}
// Now send a regular tx from B.
if err := pool.addRemoteSync(pricedTransaction(0, 100000, minGasPrice, keyB)); err != nil {
t.Fatalf("%s: failed to replace with remote transaction: %v", name, err)
}
},
},
{
name: "allow-tx-from-replaced-self-sponsor-authority",
pending: 2,
run: func(name string) {
//
if err := pool.addRemoteSync(pricedSetCodeTx(0, 250000, minGasFee, uint256.NewInt(3), keyA, []unsignedAuth{{0, keyA}})); err != nil {
t.Fatalf("%s: failed to add with remote setcode transaction: %v", name, err)
}
if err := pool.addRemoteSync(pricedSetCodeTx(0, 250000, tripleGasFee, uint256.NewInt(30), keyA, []unsignedAuth{{0, keyB}})); err != nil {
t.Fatalf("%s: failed to add with remote setcode transaction: %v", name, err)
}
// Now send a regular tx from keyA.
if err := pool.addRemoteSync(pricedTransaction(0, 100000, new(big.Int).Set(legacyReplacePrice), keyA)); err != nil {
t.Fatalf("%s: failed to replace with remote transaction: %v", name, err)
}
// Make sure we can still send from keyB.
// Send two transactions from B. Only the first should be accepted due
// to in-flight limit.
if err := pool.addRemoteSync(pricedTransaction(0, 100000, minGasPrice, keyB)); err != nil {
t.Fatalf("%s: failed to add remote transaction: %v", name, err)
}
if err := pool.addRemoteSync(pricedTransaction(1, 100000, minGasPrice, keyB)); !errors.Is(err, txpool.ErrInflightTxLimitReached) {
t.Fatalf("%s: error mismatch: want %v, have %v", name, txpool.ErrInflightTxLimitReached, err)
}
// Replace the in-flight transaction from B.
if err := pool.addRemoteSync(pricedTransaction(0, 100000, legacyReplacePrice, keyB)); err != nil {
t.Fatalf("%s: failed to replace with remote transaction: %v", name, err)
}
// Ensure the in-flight limit for B is still in place.
if err := pool.addRemoteSync(pricedTransaction(1, 100000, minGasPrice, keyB)); !errors.Is(err, txpool.ErrInflightTxLimitReached) {
t.Fatalf("%s: error mismatch: want %v, have %v", name, txpool.ErrInflightTxLimitReached, err)
}
},
},
{
// Since multiple authorizations can be pending simultaneously, replacing
// one of them should not break the one in-flight-transaction limit.
name: "track-multiple-conflicting-delegations",
pending: 3,
run: func(name string) {
// Send two setcode txs both with C as an authority.
if err := pool.addRemoteSync(pricedSetCodeTx(0, 250000, minGasFee, uint256.NewInt(3), keyA, []unsignedAuth{{0, keyC}})); err != nil {
if err := pool.addRemoteSync(pricedSetCodeTx(0, 250000, minGasFee, minGasFee, keyA, []unsignedAuth{{0, keyC}})); err != nil {
t.Fatalf("%s: failed to add with remote setcode transaction: %v", name, err)
}
if err := pool.addRemoteSync(pricedSetCodeTx(0, 250000, tripleGasFee, uint256.NewInt(30), keyB, []unsignedAuth{{0, keyC}})); err != nil {
if err := pool.addRemoteSync(pricedSetCodeTx(0, 250000, tripleGasFee, tripleGasFee, keyB, []unsignedAuth{{0, keyC}})); err != nil {
t.Fatalf("%s: failed to add with remote setcode transaction: %v", name, err)
}
// Replace the tx from A with a non-setcode tx.
@ -2647,15 +2765,28 @@ func TestSetCodeTransactions(t *testing.T) {
},
},
{
name: "reject-delegation-from-pending-account",
pending: 1,
name: "remove-hash-from-authority-tracker",
pending: 10,
run: func(name string) {
// Attempt to submit a delegation from an account with a pending tx.
if err := pool.addRemoteSync(pricedTransaction(0, 100000, minGasPrice, keyC)); err != nil {
t.Fatalf("%s: failed to add with remote setcode transaction: %v", name, err)
var keys []*ecdsa.PrivateKey
for i := 0; i < 30; i++ {
key, _ := crypto.GenerateKey()
keys = append(keys, key)
addr := crypto.PubkeyToAddress(key.PublicKey)
testAddBalance(pool, addr, big.NewInt(params.Ether))
}
if err, want := pool.addRemoteSync(setCodeTx(0, keyA, []unsignedAuth{{1, keyC}})), ErrAuthorityReserved; !errors.Is(err, want) {
t.Fatalf("%s: error mismatch: want %v, have %v", name, want, err)
// Create a transactions with 3 unique auths so the lookup's auth map is
// filled with addresses.
for i := 0; i < 30; i += 3 {
if err := pool.addRemoteSync(pricedSetCodeTx(0, 250000, minGasFee, minGasFee, keys[i], []unsignedAuth{{0, keys[i]}, {0, keys[i+1]}, {0, keys[i+2]}})); err != nil {
t.Fatalf("%s: failed to add with remote setcode transaction: %v", name, err)
}
}
// Replace one of the transactions with a normal transaction so that the
// original hash is removed from the tracker. The hash should be
// associated with 3 different authorities.
if err := pool.addRemoteSync(pricedTransaction(0, 100000, legacyReplacePrice, keys[0])); err != nil {
t.Fatalf("%s: failed to replace with remote transaction: %v", name, err)
}
},
},

View file

@ -52,22 +52,37 @@ func NewReservationTracker() *ReservationTracker {
// NewHandle creates a named handle on the ReservationTracker. The handle
// identifies the subpool so ownership of reservations can be determined.
func (r *ReservationTracker) NewHandle(id int) *Reserver {
return &Reserver{r, id}
func (r *ReservationTracker) NewHandle(id int) *ReservationHandle {
return &ReservationHandle{r, id}
}
// Reserver is a named handle on ReservationTracker. It is held by subpools to
// Reserver is an interface for creating and releasing owned reservations in the
// ReservationTracker struct, which is shared between subpools.
type Reserver interface {
// Hold attempts to reserve the specified account address for the given pool.
// Returns an error if the account is already reserved.
Hold(addr common.Address) error
// Release attempts to release the reservation for the specified account.
// Returns an error if the address is not reserved or is reserved by another pool.
Release(addr common.Address) error
// Has returns a flag indicating if the address has been reserved by a pool
// other than one with the current Reserver handle.
Has(address common.Address) bool
}
// ReservationHandle is a named handle on ReservationTracker. It is held by subpools to
// make reservations for accounts it is tracking. The id is used to determine
// which pool owns an address and disallows non-owners to hold or release
// addresses it doesn't own.
type Reserver struct {
type ReservationHandle struct {
tracker *ReservationTracker
id int
}
// Hold attempts to reserve the specified account address for the given pool.
// Returns an error if the account is already reserved.
func (h *Reserver) Hold(addr common.Address) error {
// Hold implements the Reserver interface.
func (h *ReservationHandle) Hold(addr common.Address) error {
h.tracker.lock.Lock()
defer h.tracker.lock.Unlock()
@ -89,9 +104,8 @@ func (h *Reserver) Hold(addr common.Address) error {
return nil
}
// Release attempts to release the reservation for the specified account.
// Returns an error if the address is not reserved or is reserved by another pool.
func (h *Reserver) Release(addr common.Address) error {
// Release implements the Reserver interface.
func (h *ReservationHandle) Release(addr common.Address) error {
h.tracker.lock.Lock()
defer h.tracker.lock.Unlock()
@ -114,11 +128,11 @@ func (h *Reserver) Release(addr common.Address) error {
return nil
}
// Has returns a flag indicating if the address has been reserved or not.
func (h *Reserver) Has(address common.Address) bool {
// Has implements the Reserver interface.
func (h *ReservationHandle) Has(address common.Address) bool {
h.tracker.lock.RLock()
defer h.tracker.lock.RUnlock()
_, exists := h.tracker.accounts[address]
return exists
id, exists := h.tracker.accounts[address]
return exists && id != h.id
}

View file

@ -93,7 +93,7 @@ type SubPool interface {
// These should not be passed as a constructor argument - nor should the pools
// start by themselves - in order to keep multiple subpools in lockstep with
// one another.
Init(gasTip uint64, head *types.Header, reserver *Reserver) error
Init(gasTip uint64, head *types.Header, reserver Reserver) error
// Close terminates any background processing threads and releases any held
// resources.

View file

@ -60,7 +60,7 @@ type testSubPool struct {
func (s *testSubPool) Filter(tx *types.Transaction) bool { return true }
func (s *testSubPool) Init(gasTip uint64, head *types.Header, reserver *Reserver) error { return nil }
func (s *testSubPool) Init(gasTip uint64, head *types.Header, reserver Reserver) error { return nil }
func (s *testSubPool) Close() error { return nil }