From 2e739fce584a3cc8d84bf4c4604cd81c07294e2b Mon Sep 17 00:00:00 2001 From: rjl493456442 Date: Tue, 8 Apr 2025 21:46:27 +0800 Subject: [PATCH] core/txpool: add 7702 protection to blobpool (#31526) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit This pull request introduces two constraints in the blobPool: (a) If the sender has a pending authorization or delegation, only one in-flight executable transaction can be cached. (b) If the authority address in a SetCode transaction is already reserved by the blobPool, the transaction will be rejected. These constraints mitigate an attack where an attacker spams the pool with numerous blob transactions, evicts other transactions, and then cancels all pending blob transactions by draining the sender’s funds if they have a delegation. Note, because there is no exclusive lock held between different subpools when processing transactions, it's totally possible the SetCode transaction and blob transactions with conflict sender and authorities are accepted simultaneously. I think it's acceptable as it's very hard to be exploited. --------- Co-authored-by: lightclient --- core/txpool/blobpool/blobpool.go | 66 ++++++++--- core/txpool/blobpool/blobpool_test.go | 60 +++------- core/txpool/errors.go | 4 + core/txpool/legacypool/legacypool.go | 60 ++++++---- core/txpool/legacypool/legacypool2_test.go | 8 +- core/txpool/legacypool/legacypool_test.go | 82 +++++--------- core/txpool/reserver.go | 124 +++++++++++++++++++++ core/txpool/subpool.go | 6 +- core/txpool/txpool.go | 77 ++----------- eth/backend.go | 10 +- eth/protocols/eth/handler_test.go | 2 +- 11 files changed, 286 insertions(+), 213 deletions(-) create mode 100644 core/txpool/reserver.go diff --git a/core/txpool/blobpool/blobpool.go b/core/txpool/blobpool/blobpool.go index 5a20c3ce5a..b1966905a8 100644 --- a/core/txpool/blobpool/blobpool.go +++ b/core/txpool/blobpool/blobpool.go @@ -298,8 +298,9 @@ func newBlobTxMeta(id uint64, size uint64, storageSize uint32, tx *types.Transac // minimums will need to be done only starting at the swapped in/out nonce // and leading up to the first no-change. type BlobPool struct { - config Config // Pool configuration - reserve txpool.AddressReserver // Address reserver to ensure exclusivity across subpools + config Config // Pool configuration + reserver *txpool.Reserver // Address reserver to ensure exclusivity across subpools + hasPendingAuth func(common.Address) bool // Determine whether the specified address has a pending 7702-auth store billy.Database // Persistent data store for the tx metadata and blobs stored uint64 // Useful data size of all transactions on disk @@ -329,13 +330,14 @@ type BlobPool struct { // New creates a new blob transaction pool to gather, sort and filter inbound // blob transactions from the network. -func New(config Config, chain BlockChain) *BlobPool { +func New(config Config, chain BlockChain, hasPendingAuth func(common.Address) bool) *BlobPool { // Sanitize the input to ensure no vulnerable gas prices are set config = (&config).sanitize() // Create the transaction pool with its initial settings return &BlobPool{ config: config, + hasPendingAuth: hasPendingAuth, signer: types.LatestSigner(chain.Config()), chain: chain, lookup: newLookup(), @@ -353,8 +355,8 @@ func (p *BlobPool) Filter(tx *types.Transaction) bool { // Init sets the gas price needed to keep a transaction in the pool and the chain // head to allow balance / nonce checks. The transaction journal will be loaded // from disk and filtered based on the provided starting settings. -func (p *BlobPool) Init(gasTip uint64, head *types.Header, reserve txpool.AddressReserver) error { - p.reserve = reserve +func (p *BlobPool) Init(gasTip uint64, head *types.Header, reserver *txpool.Reserver) error { + p.reserver = reserver var ( queuedir string @@ -499,7 +501,7 @@ func (p *BlobPool) parseTransaction(id uint64, size uint32, blob []byte) error { return err } if _, ok := p.index[sender]; !ok { - if err := p.reserve(sender, true); err != nil { + if err := p.reserver.Hold(sender); err != nil { return err } p.index[sender] = []*blobTxMeta{} @@ -554,7 +556,7 @@ func (p *BlobPool) recheck(addr common.Address, inclusions map[common.Hash]uint6 if inclusions != nil { // only during reorgs will the heap be initialized heap.Remove(p.evict, p.evict.index[addr]) } - p.reserve(addr, false) + p.reserver.Release(addr) if gapped { log.Warn("Dropping dangling blob transactions", "from", addr, "missing", next, "drop", nonces, "ids", ids) @@ -707,7 +709,7 @@ func (p *BlobPool) recheck(addr common.Address, inclusions map[common.Hash]uint6 if inclusions != nil { // only during reorgs will the heap be initialized heap.Remove(p.evict, p.evict.index[addr]) } - p.reserve(addr, false) + p.reserver.Release(addr) } else { p.index[addr] = txs } @@ -1006,7 +1008,7 @@ func (p *BlobPool) reinject(addr common.Address, txhash common.Hash) error { // Update the indices and metrics meta := newBlobTxMeta(id, tx.Size(), p.store.Size(id), tx) if _, ok := p.index[addr]; !ok { - if err := p.reserve(addr, true); err != nil { + if err := p.reserver.Hold(addr); err != nil { log.Warn("Failed to reserve account for blob pool", "tx", tx.Hash(), "from", addr, "err", err) return err } @@ -1066,7 +1068,7 @@ func (p *BlobPool) SetGasTip(tip *big.Int) { delete(p.spent, addr) heap.Remove(p.evict, p.evict.index[addr]) - p.reserve(addr, false) + p.reserver.Release(addr) } // Clear out the transactions from the data store log.Warn("Dropping underpriced blob transaction", "from", addr, "rejected", tx.nonce, "tip", tx.execTipCap, "want", tip, "drop", nonces, "ids", ids) @@ -1101,6 +1103,39 @@ func (p *BlobPool) ValidateTxBasics(tx *types.Transaction) error { return txpool.ValidateTransaction(tx, p.head, p.signer, opts) } +// checkDelegationLimit determines if the tx sender is delegated or has a +// pending delegation, and if so, ensures they have at most one in-flight +// **executable** transaction, e.g. disallow stacked and gapped transactions +// from the account. +func (p *BlobPool) checkDelegationLimit(tx *types.Transaction) error { + from, _ := types.Sender(p.signer, tx) // validated + + // Short circuit if the sender has neither delegation nor pending delegation. + if p.state.GetCodeHash(from) == types.EmptyCodeHash { + // Because there is no exclusive lock held between different subpools + // when processing transactions, a blob transaction may be accepted + // while other SetCode transactions with pending authorities from the + // same address are also accepted simultaneously. + // + // This scenario is considered acceptable, as the rule primarily ensures + // that attackers cannot easily and endlessly stack blob transactions + // with a delegated or pending delegated sender. + if p.hasPendingAuth == nil || !p.hasPendingAuth(from) { + return nil + } + } + // Allow a single in-flight pending transaction. + pending := p.index[from] + if len(pending) == 0 { + return nil + } + // If account already has a pending transaction, allow replacement only. + if len(pending) == 1 && pending[0].nonce == tx.Nonce() { + return nil + } + return txpool.ErrInflightTxLimitReached +} + // validateTx checks whether a transaction is valid according to the consensus // rules and adheres to some heuristic limits of the local node (price and size). func (p *BlobPool) validateTx(tx *types.Transaction) error { @@ -1141,6 +1176,9 @@ func (p *BlobPool) validateTx(tx *types.Transaction) error { if err := txpool.ValidateTransactionWithState(tx, p.signer, stateOpts); err != nil { return err } + if err := p.checkDelegationLimit(tx); err != nil { + return err + } // If the transaction replaces an existing one, ensure that price bumps are // adhered to. var ( @@ -1369,7 +1407,7 @@ func (p *BlobPool) add(tx *types.Transaction) (err error) { // only by this subpool until all transactions are evicted from, _ := types.Sender(p.signer, tx) // already validated above if _, ok := p.index[from]; !ok { - if err := p.reserve(from, true); err != nil { + if err := p.reserver.Hold(from); err != nil { addNonExclusiveMeter.Mark(1) return err } @@ -1381,7 +1419,7 @@ func (p *BlobPool) add(tx *types.Transaction) (err error) { // by a return statement before running deferred methods. Take care with // removing or subscoping err as it will break this clause. if err != nil { - p.reserve(from, false) + p.reserver.Release(from) } }() } @@ -1513,7 +1551,7 @@ func (p *BlobPool) drop() { if last { delete(p.index, from) delete(p.spent, from) - p.reserve(from, false) + p.reserver.Release(from) } else { txs[len(txs)-1] = nil txs = txs[:len(txs)-1] @@ -1789,7 +1827,7 @@ func (p *BlobPool) Clear() { // can't happen until Clear releases the reservation lock. Clear cannot // acquire the subpool lock until the transaction addition is completed. for acct := range p.index { - p.reserve(acct, false) + p.reserver.Release(acct) } p.lookup = newLookup() p.index = make(map[common.Address][]*blobTxMeta) diff --git a/core/txpool/blobpool/blobpool_test.go b/core/txpool/blobpool/blobpool_test.go index b7c6cfa51e..4dfba3b52b 100644 --- a/core/txpool/blobpool/blobpool_test.go +++ b/core/txpool/blobpool/blobpool_test.go @@ -26,7 +26,6 @@ import ( "math/big" "os" "path/filepath" - "sync" "testing" "github.com/ethereum/go-ethereum/common" @@ -168,33 +167,6 @@ func (bc *testBlockChain) StateAt(common.Hash) (*state.StateDB, error) { return bc.statedb, nil } -// makeAddressReserver is a utility method to sanity check that accounts are -// properly reserved by the blobpool (no duplicate reserves or unreserves). -func makeAddressReserver() txpool.AddressReserver { - var ( - reserved = make(map[common.Address]struct{}) - lock sync.Mutex - ) - return func(addr common.Address, reserve bool) error { - lock.Lock() - defer lock.Unlock() - - _, exists := reserved[addr] - if reserve { - if exists { - panic("already reserved") - } - reserved[addr] = struct{}{} - return nil - } - if !exists { - panic("not reserved") - } - delete(reserved, addr) - return nil - } -} - // makeTx is a utility method to construct a random blob transaction and sign it // with a valid key, only setting the interesting fields from the perspective of // the blob pool. @@ -433,6 +405,10 @@ func verifyBlobRetrievals(t *testing.T, pool *BlobPool) { } } +func newReserver() *txpool.Reserver { + return txpool.NewReservationTracker().NewHandle(42) +} + // Tests that transactions can be loaded from disk on startup and that they are // correctly discarded if invalid. // @@ -699,8 +675,8 @@ func TestOpenDrops(t *testing.T) { blobfee: uint256.NewInt(params.BlobTxMinBlobGasprice), statedb: statedb, } - pool := New(Config{Datadir: storage}, chain) - if err := pool.Init(1, chain.CurrentBlock(), makeAddressReserver()); err != nil { + pool := New(Config{Datadir: storage}, chain, nil) + if err := pool.Init(1, chain.CurrentBlock(), newReserver()); err != nil { t.Fatalf("failed to create blob pool: %v", err) } defer pool.Close() @@ -817,8 +793,8 @@ func TestOpenIndex(t *testing.T) { blobfee: uint256.NewInt(params.BlobTxMinBlobGasprice), statedb: statedb, } - pool := New(Config{Datadir: storage}, chain) - if err := pool.Init(1, chain.CurrentBlock(), makeAddressReserver()); err != nil { + pool := New(Config{Datadir: storage}, chain, nil) + if err := pool.Init(1, chain.CurrentBlock(), newReserver()); err != nil { t.Fatalf("failed to create blob pool: %v", err) } defer pool.Close() @@ -918,8 +894,8 @@ func TestOpenHeap(t *testing.T) { blobfee: uint256.NewInt(105), statedb: statedb, } - pool := New(Config{Datadir: storage}, chain) - if err := pool.Init(1, chain.CurrentBlock(), makeAddressReserver()); err != nil { + pool := New(Config{Datadir: storage}, chain, nil) + if err := pool.Init(1, chain.CurrentBlock(), newReserver()); err != nil { t.Fatalf("failed to create blob pool: %v", err) } defer pool.Close() @@ -997,8 +973,8 @@ func TestOpenCap(t *testing.T) { blobfee: uint256.NewInt(105), statedb: statedb, } - pool := New(Config{Datadir: storage, Datacap: datacap}, chain) - if err := pool.Init(1, chain.CurrentBlock(), makeAddressReserver()); err != nil { + pool := New(Config{Datadir: storage, Datacap: datacap}, chain, nil) + if err := pool.Init(1, chain.CurrentBlock(), newReserver()); err != nil { t.Fatalf("failed to create blob pool: %v", err) } // Verify that enough transactions have been dropped to get the pool's size @@ -1098,8 +1074,8 @@ func TestChangingSlotterSize(t *testing.T) { blobfee: uint256.NewInt(105), statedb: statedb, } - pool := New(Config{Datadir: storage}, chain) - if err := pool.Init(1, chain.CurrentBlock(), makeAddressReserver()); err != nil { + pool := New(Config{Datadir: storage}, chain, nil) + if err := pool.Init(1, chain.CurrentBlock(), newReserver()); err != nil { t.Fatalf("failed to create blob pool: %v", err) } @@ -1541,8 +1517,8 @@ func TestAdd(t *testing.T) { blobfee: uint256.NewInt(105), statedb: statedb, } - pool := New(Config{Datadir: storage}, chain) - if err := pool.Init(1, chain.CurrentBlock(), makeAddressReserver()); err != nil { + pool := New(Config{Datadir: storage}, chain, nil) + if err := pool.Init(1, chain.CurrentBlock(), newReserver()); err != nil { t.Fatalf("test %d: failed to create blob pool: %v", i, err) } verifyPoolInternals(t, pool) @@ -1638,10 +1614,10 @@ func benchmarkPoolPending(b *testing.B, datacap uint64) { blobfee: uint256.NewInt(blobfee), statedb: statedb, } - pool = New(Config{Datadir: ""}, chain) + pool = New(Config{Datadir: ""}, chain, nil) ) - if err := pool.Init(1, chain.CurrentBlock(), makeAddressReserver()); err != nil { + if err := pool.Init(1, chain.CurrentBlock(), newReserver()); err != nil { b.Fatalf("failed to create blob pool: %v", err) } // Make the pool not use disk (just drop everything). This test never reads diff --git a/core/txpool/errors.go b/core/txpool/errors.go index c38644857e..02f5703b6c 100644 --- a/core/txpool/errors.go +++ b/core/txpool/errors.go @@ -56,4 +56,8 @@ var ( // input transaction of non-blob type when a blob transaction from this sender // remains pending (and vice-versa). ErrAlreadyReserved = errors.New("address already reserved") + + // ErrInflightTxLimitReached is returned when the maximum number of in-flight + // transactions is reached for specific accounts. + ErrInflightTxLimitReached = errors.New("in-flight transaction limit reached for delegated accounts") ) diff --git a/core/txpool/legacypool/legacypool.go b/core/txpool/legacypool/legacypool.go index 9066f3e16b..278ad0791f 100644 --- a/core/txpool/legacypool/legacypool.go +++ b/core/txpool/legacypool/legacypool.go @@ -63,10 +63,6 @@ var ( // another remote transaction. ErrTxPoolOverflow = errors.New("txpool is full") - // ErrInflightTxLimitReached is returned when the maximum number of in-flight - // transactions is reached for specific accounts. - ErrInflightTxLimitReached = errors.New("in-flight transaction limit reached for delegated accounts") - // ErrOutOfOrderTxFromDelegated is returned when the transaction with gapped // nonce received from the accounts with delegation or pending delegation. ErrOutOfOrderTxFromDelegated = errors.New("gapped-nonce tx from delegated accounts") @@ -241,8 +237,8 @@ type LegacyPool struct { currentHead atomic.Pointer[types.Header] // Current head of the blockchain currentState *state.StateDB // Current state in the blockchain head pendingNonces *noncer // Pending state tracking virtual nonces + reserver *txpool.Reserver // Address reserver to ensure exclusivity across subpools - reserve txpool.AddressReserver // Address reserver to ensure exclusivity across subpools pending map[common.Address]*list // All currently processable transactions queue map[common.Address]*list // Queued but non-processable transactions beats map[common.Address]time.Time // Last heartbeat from each known account @@ -306,9 +302,9 @@ func (pool *LegacyPool) Filter(tx *types.Transaction) bool { // Init sets the gas price needed to keep a transaction in the pool and the chain // head to allow balance / nonce checks. The internal // goroutines will be spun up and the pool deemed operational afterwards. -func (pool *LegacyPool) Init(gasTip uint64, head *types.Header, reserve txpool.AddressReserver) error { +func (pool *LegacyPool) Init(gasTip uint64, head *types.Header, reserver *txpool.Reserver) error { // Set the address reserver to request exclusive access to pooled accounts - pool.reserve = reserve + pool.reserver = reserver // Set the basic pool parameters pool.gasTip.Store(uint256.NewInt(gasTip)) @@ -618,7 +614,7 @@ func (pool *LegacyPool) checkDelegationLimit(tx *types.Transaction) error { from, _ := types.Sender(pool.signer, tx) // validated // Short circuit if the sender has neither delegation nor pending delegation. - if pool.currentState.GetCodeHash(from) == types.EmptyCodeHash && pool.all.delegationTxsCount(from) == 0 { + if pool.currentState.GetCodeHash(from) == types.EmptyCodeHash && !pool.all.hasAuth(from) { return nil } pending := pool.pending[from] @@ -633,7 +629,7 @@ func (pool *LegacyPool) checkDelegationLimit(tx *types.Transaction) error { if pending.Contains(tx.Nonce()) { return nil } - return ErrInflightTxLimitReached + return txpool.ErrInflightTxLimitReached } // validateAuth verifies that the transaction complies with code authorization @@ -644,12 +640,24 @@ func (pool *LegacyPool) validateAuth(tx *types.Transaction) error { if err := pool.checkDelegationLimit(tx); err != nil { return err } - // Authorities cannot conflict with any pending or queued transactions. + // Authorities must not conflict with any pending or queued transactions, + // nor with addresses that have already been reserved. if auths := tx.SetCodeAuthorities(); len(auths) > 0 { for _, auth := range auths { if pool.pending[auth] != nil || pool.queue[auth] != nil { return ErrAuthorityReserved } + // Because there is no exclusive lock held between different subpools + // when processing transactions, the SetCode transaction may be accepted + // while other transactions with the same sender address are also + // accepted simultaneously in the other pools. + // + // This scenario is considered acceptable, as the rule primarily ensures + // that attackers cannot easily stack a SetCode transaction when the sender + // is reserved by other pools. + if pool.reserver.Has(auth) { + return ErrAuthorityReserved + } } } return nil @@ -683,7 +691,7 @@ func (pool *LegacyPool) add(tx *types.Transaction) (replaced bool, err error) { _, hasQueued = pool.queue[from] ) if !hasPending && !hasQueued { - if err := pool.reserve(from, true); err != nil { + if err := pool.reserver.Hold(from); err != nil { return false, err } defer func() { @@ -694,7 +702,7 @@ func (pool *LegacyPool) add(tx *types.Transaction) (replaced bool, err error) { // by a return statement before running deferred methods. Take care with // removing or subscoping err as it will break this clause. if err != nil { - pool.reserve(from, false) + pool.reserver.Release(from) } }() } @@ -1087,7 +1095,7 @@ func (pool *LegacyPool) removeTx(hash common.Hash, outofbound bool, unreserve bo _, hasQueued = pool.queue[addr] ) if !hasPending && !hasQueued { - pool.reserve(addr, false) + pool.reserver.Release(addr) } }() } @@ -1467,7 +1475,7 @@ func (pool *LegacyPool) promoteExecutables(accounts []common.Address) []*types.T delete(pool.queue, addr) delete(pool.beats, addr) if _, ok := pool.pending[addr]; !ok { - pool.reserve(addr, false) + pool.reserver.Release(addr) } } } @@ -1653,7 +1661,7 @@ func (pool *LegacyPool) demoteUnexecutables() { if list.Empty() { delete(pool.pending, addr) if _, ok := pool.queue[addr]; !ok { - pool.reserve(addr, false) + pool.reserver.Release(addr) } } } @@ -1862,11 +1870,13 @@ func (t *lookup) removeAuthorities(tx *types.Transaction) { } } -// delegationTxsCount returns the number of pending authorizations for the specified address. -func (t *lookup) delegationTxsCount(addr common.Address) int { +// hasAuth returns a flag indicating whether there are pending authorizations +// from the specified address. +func (t *lookup) hasAuth(addr common.Address) bool { t.lock.RLock() defer t.lock.RUnlock() - return len(t.auths[addr]) + + return len(t.auths[addr]) > 0 } // numSlots calculates the number of slots needed for a single transaction. @@ -1880,8 +1890,8 @@ func (pool *LegacyPool) Clear() { pool.mu.Lock() defer pool.mu.Unlock() - // unreserve each tracked account. Ideally, we could just clear the - // reservation map in the parent txpool context. However, if we clear in + // unreserve each tracked account. Ideally, we could just clear the + // reservation map in the parent txpool context. However, if we clear in // parent context, to avoid exposing the subpool lock, we have to lock the // reservations and then lock each subpool. // @@ -1892,11 +1902,11 @@ func (pool *LegacyPool) Clear() { // * TxPool.Clear attempts to lock subpool mutex // // The transaction addition may attempt to reserve the sender addr which - // can't happen until Clear releases the reservation lock. Clear cannot + // can't happen until Clear releases the reservation lock. Clear cannot // acquire the subpool lock until the transaction addition is completed. for _, tx := range pool.all.txs { senderAddr, _ := types.Sender(pool.signer, tx) - pool.reserve(senderAddr, false) + pool.reserver.Release(senderAddr) } pool.all = newLookup() pool.priced = newPricedList(pool.all) @@ -1904,3 +1914,9 @@ func (pool *LegacyPool) Clear() { pool.queue = make(map[common.Address]*list) pool.pendingNonces = newNoncer(pool.currentState) } + +// HasPendingAuth returns a flag indicating whether there are pending +// authorizations from the specific address cached in the pool. +func (pool *LegacyPool) HasPendingAuth(addr common.Address) bool { + return pool.all.hasAuth(addr) +} diff --git a/core/txpool/legacypool/legacypool2_test.go b/core/txpool/legacypool/legacypool2_test.go index d55e85d74f..3f210e3d1b 100644 --- a/core/txpool/legacypool/legacypool2_test.go +++ b/core/txpool/legacypool/legacypool2_test.go @@ -86,7 +86,7 @@ func TestTransactionFutureAttack(t *testing.T) { config.GlobalQueue = 100 config.GlobalSlots = 100 pool := New(config, blockchain) - pool.Init(config.PriceLimit, blockchain.CurrentBlock(), makeAddressReserver()) + pool.Init(config.PriceLimit, blockchain.CurrentBlock(), newReserver()) defer pool.Close() fillPool(t, pool) pending, _ := pool.Stats() @@ -120,7 +120,7 @@ func TestTransactionFuture1559(t *testing.T) { statedb, _ := state.New(types.EmptyRootHash, state.NewDatabaseForTesting()) blockchain := newTestBlockChain(eip1559Config, 1000000, statedb, new(event.Feed)) pool := New(testTxPoolConfig, blockchain) - pool.Init(testTxPoolConfig.PriceLimit, blockchain.CurrentBlock(), makeAddressReserver()) + pool.Init(testTxPoolConfig.PriceLimit, blockchain.CurrentBlock(), newReserver()) defer pool.Close() // Create a number of test accounts, fund them and make transactions @@ -153,7 +153,7 @@ func TestTransactionZAttack(t *testing.T) { statedb, _ := state.New(types.EmptyRootHash, state.NewDatabaseForTesting()) blockchain := newTestBlockChain(eip1559Config, 1000000, statedb, new(event.Feed)) pool := New(testTxPoolConfig, blockchain) - pool.Init(testTxPoolConfig.PriceLimit, blockchain.CurrentBlock(), makeAddressReserver()) + pool.Init(testTxPoolConfig.PriceLimit, blockchain.CurrentBlock(), newReserver()) defer pool.Close() // Create a number of test accounts, fund them and make transactions fillPool(t, pool) @@ -224,7 +224,7 @@ func BenchmarkFutureAttack(b *testing.B) { config.GlobalQueue = 100 config.GlobalSlots = 100 pool := New(config, blockchain) - pool.Init(testTxPoolConfig.PriceLimit, blockchain.CurrentBlock(), makeAddressReserver()) + pool.Init(testTxPoolConfig.PriceLimit, blockchain.CurrentBlock(), newReserver()) defer pool.Close() fillPool(b, pool) diff --git a/core/txpool/legacypool/legacypool_test.go b/core/txpool/legacypool/legacypool_test.go index 3f269bd69e..c47a655204 100644 --- a/core/txpool/legacypool/legacypool_test.go +++ b/core/txpool/legacypool/legacypool_test.go @@ -24,7 +24,6 @@ import ( "math/big" "math/rand" "slices" - "sync" "sync/atomic" "testing" "time" @@ -168,42 +167,21 @@ func pricedSetCodeTxWithAuth(nonce uint64, gaslimit uint64, gasFee, tip *uint256 }) } -func makeAddressReserver() txpool.AddressReserver { - var ( - reserved = make(map[common.Address]struct{}) - lock sync.Mutex - ) - return func(addr common.Address, reserve bool) error { - lock.Lock() - defer lock.Unlock() - - _, exists := reserved[addr] - if reserve { - if exists { - panic("already reserved") - } - reserved[addr] = struct{}{} - return nil - } - if !exists { - panic("not reserved") - } - delete(reserved, addr) - return nil - } -} - func setupPool() (*LegacyPool, *ecdsa.PrivateKey) { return setupPoolWithConfig(params.TestChainConfig) } +func newReserver() *txpool.Reserver { + return txpool.NewReservationTracker().NewHandle(42) +} + func setupPoolWithConfig(config *params.ChainConfig) (*LegacyPool, *ecdsa.PrivateKey) { statedb, _ := state.New(types.EmptyRootHash, state.NewDatabaseForTesting()) blockchain := newTestBlockChain(config, 10000000, statedb, new(event.Feed)) key, _ := crypto.GenerateKey() pool := New(testTxPoolConfig, blockchain) - if err := pool.Init(testTxPoolConfig.PriceLimit, blockchain.CurrentBlock(), makeAddressReserver()); err != nil { + if err := pool.Init(testTxPoolConfig.PriceLimit, blockchain.CurrentBlock(), newReserver()); err != nil { panic(err) } // wait for the pool to initialize @@ -336,7 +314,7 @@ func TestStateChangeDuringReset(t *testing.T) { tx1 := transaction(1, 100000, key) pool := New(testTxPoolConfig, blockchain) - pool.Init(testTxPoolConfig.PriceLimit, blockchain.CurrentBlock(), makeAddressReserver()) + pool.Init(testTxPoolConfig.PriceLimit, blockchain.CurrentBlock(), newReserver()) defer pool.Close() nonce := pool.Nonce(address) @@ -753,7 +731,7 @@ func TestPostponing(t *testing.T) { blockchain := newTestBlockChain(params.TestChainConfig, 1000000, statedb, new(event.Feed)) pool := New(testTxPoolConfig, blockchain) - pool.Init(testTxPoolConfig.PriceLimit, blockchain.CurrentBlock(), makeAddressReserver()) + pool.Init(testTxPoolConfig.PriceLimit, blockchain.CurrentBlock(), newReserver()) defer pool.Close() // Create two test accounts to produce different gap profiles with @@ -963,7 +941,7 @@ func TestQueueGlobalLimiting(t *testing.T) { config.GlobalQueue = config.AccountQueue*3 - 1 // reduce the queue limits to shorten test time (-1 to make it non divisible) pool := New(config, blockchain) - pool.Init(testTxPoolConfig.PriceLimit, blockchain.CurrentBlock(), makeAddressReserver()) + pool.Init(testTxPoolConfig.PriceLimit, blockchain.CurrentBlock(), newReserver()) defer pool.Close() // Create a number of test accounts and fund them (last one will be the local) @@ -1015,7 +993,7 @@ func TestQueueTimeLimiting(t *testing.T) { config.Lifetime = time.Second pool := New(config, blockchain) - pool.Init(config.PriceLimit, blockchain.CurrentBlock(), makeAddressReserver()) + pool.Init(config.PriceLimit, blockchain.CurrentBlock(), newReserver()) defer pool.Close() // Create a test account to ensure remotes expire @@ -1176,7 +1154,7 @@ func TestPendingGlobalLimiting(t *testing.T) { config.GlobalSlots = config.AccountSlots * 10 pool := New(config, blockchain) - pool.Init(config.PriceLimit, blockchain.CurrentBlock(), makeAddressReserver()) + pool.Init(config.PriceLimit, blockchain.CurrentBlock(), newReserver()) defer pool.Close() // Create a number of test accounts and fund them @@ -1275,7 +1253,7 @@ func TestCapClearsFromAll(t *testing.T) { config.GlobalSlots = 8 pool := New(config, blockchain) - pool.Init(config.PriceLimit, blockchain.CurrentBlock(), makeAddressReserver()) + pool.Init(config.PriceLimit, blockchain.CurrentBlock(), newReserver()) defer pool.Close() // Create a number of test accounts and fund them @@ -1308,7 +1286,7 @@ func TestPendingMinimumAllowance(t *testing.T) { config.GlobalSlots = 1 pool := New(config, blockchain) - pool.Init(config.PriceLimit, blockchain.CurrentBlock(), makeAddressReserver()) + pool.Init(config.PriceLimit, blockchain.CurrentBlock(), newReserver()) defer pool.Close() // Create a number of test accounts and fund them @@ -1352,7 +1330,7 @@ func TestRepricing(t *testing.T) { blockchain := newTestBlockChain(params.TestChainConfig, 1000000, statedb, new(event.Feed)) pool := New(testTxPoolConfig, blockchain) - pool.Init(testTxPoolConfig.PriceLimit, blockchain.CurrentBlock(), makeAddressReserver()) + pool.Init(testTxPoolConfig.PriceLimit, blockchain.CurrentBlock(), newReserver()) defer pool.Close() // Keep track of transaction events to ensure all executables get announced @@ -1457,7 +1435,7 @@ func TestMinGasPriceEnforced(t *testing.T) { txPoolConfig := DefaultConfig txPoolConfig.NoLocals = true pool := New(txPoolConfig, blockchain) - pool.Init(txPoolConfig.PriceLimit, blockchain.CurrentBlock(), makeAddressReserver()) + pool.Init(txPoolConfig.PriceLimit, blockchain.CurrentBlock(), newReserver()) defer pool.Close() key, _ := crypto.GenerateKey() @@ -1606,7 +1584,7 @@ func TestUnderpricing(t *testing.T) { config.GlobalQueue = 2 pool := New(config, blockchain) - pool.Init(config.PriceLimit, blockchain.CurrentBlock(), makeAddressReserver()) + pool.Init(config.PriceLimit, blockchain.CurrentBlock(), newReserver()) defer pool.Close() // Keep track of transaction events to ensure all executables get announced @@ -1696,7 +1674,7 @@ func TestStableUnderpricing(t *testing.T) { config.GlobalQueue = 0 pool := New(config, blockchain) - pool.Init(config.PriceLimit, blockchain.CurrentBlock(), makeAddressReserver()) + pool.Init(config.PriceLimit, blockchain.CurrentBlock(), newReserver()) defer pool.Close() // Keep track of transaction events to ensure all executables get announced @@ -1899,7 +1877,7 @@ func TestDeduplication(t *testing.T) { blockchain := newTestBlockChain(params.TestChainConfig, 1000000, statedb, new(event.Feed)) pool := New(testTxPoolConfig, blockchain) - pool.Init(testTxPoolConfig.PriceLimit, blockchain.CurrentBlock(), makeAddressReserver()) + pool.Init(testTxPoolConfig.PriceLimit, blockchain.CurrentBlock(), newReserver()) defer pool.Close() // Create a test account to add transactions with @@ -1966,7 +1944,7 @@ func TestReplacement(t *testing.T) { blockchain := newTestBlockChain(params.TestChainConfig, 1000000, statedb, new(event.Feed)) pool := New(testTxPoolConfig, blockchain) - pool.Init(testTxPoolConfig.PriceLimit, blockchain.CurrentBlock(), makeAddressReserver()) + pool.Init(testTxPoolConfig.PriceLimit, blockchain.CurrentBlock(), newReserver()) defer pool.Close() // Keep track of transaction events to ensure all executables get announced @@ -2157,7 +2135,7 @@ func TestStatusCheck(t *testing.T) { blockchain := newTestBlockChain(params.TestChainConfig, 1000000, statedb, new(event.Feed)) pool := New(testTxPoolConfig, blockchain) - pool.Init(testTxPoolConfig.PriceLimit, blockchain.CurrentBlock(), makeAddressReserver()) + pool.Init(testTxPoolConfig.PriceLimit, blockchain.CurrentBlock(), newReserver()) defer pool.Close() // Create the test accounts to check various transaction statuses with @@ -2230,7 +2208,7 @@ func TestSetCodeTransactions(t *testing.T) { blockchain := newTestBlockChain(params.MergedTestChainConfig, 1000000, statedb, new(event.Feed)) pool := New(testTxPoolConfig, blockchain) - pool.Init(testTxPoolConfig.PriceLimit, blockchain.CurrentBlock(), makeAddressReserver()) + pool.Init(testTxPoolConfig.PriceLimit, blockchain.CurrentBlock(), newReserver()) defer pool.Close() // Create the test accounts @@ -2271,12 +2249,12 @@ func TestSetCodeTransactions(t *testing.T) { if err := pool.addRemoteSync(pricedTransaction(0, 100000, big.NewInt(1), keyA)); err != nil { t.Fatalf("%s: failed to add remote transaction: %v", name, err) } - if err := pool.addRemoteSync(pricedTransaction(1, 100000, big.NewInt(1), keyA)); !errors.Is(err, ErrInflightTxLimitReached) { - t.Fatalf("%s: error mismatch: want %v, have %v", name, ErrInflightTxLimitReached, err) + if err := pool.addRemoteSync(pricedTransaction(1, 100000, big.NewInt(1), keyA)); !errors.Is(err, txpool.ErrInflightTxLimitReached) { + t.Fatalf("%s: error mismatch: want %v, have %v", name, txpool.ErrInflightTxLimitReached, err) } // Check gapped transaction again. - if err := pool.addRemoteSync(pricedTransaction(2, 100000, big.NewInt(1), keyA)); !errors.Is(err, ErrInflightTxLimitReached) { - t.Fatalf("%s: error mismatch: want %v, have %v", name, ErrInflightTxLimitReached, err) + if err := pool.addRemoteSync(pricedTransaction(2, 100000, big.NewInt(1), keyA)); !errors.Is(err, txpool.ErrInflightTxLimitReached) { + t.Fatalf("%s: error mismatch: want %v, have %v", name, txpool.ErrInflightTxLimitReached, err) } // Replace by fee. if err := pool.addRemoteSync(pricedTransaction(0, 100000, big.NewInt(10), keyA)); err != nil { @@ -2310,8 +2288,8 @@ func TestSetCodeTransactions(t *testing.T) { t.Fatalf("%s: failed to add with pending delegatio: %v", name, err) } // Also check gapped transaction is rejected. - if err := pool.addRemoteSync(pricedTransaction(1, 100000, big.NewInt(1), keyC)); !errors.Is(err, ErrInflightTxLimitReached) { - t.Fatalf("%s: error mismatch: want %v, have %v", name, ErrInflightTxLimitReached, err) + if err := pool.addRemoteSync(pricedTransaction(1, 100000, big.NewInt(1), keyC)); !errors.Is(err, txpool.ErrInflightTxLimitReached) { + t.Fatalf("%s: error mismatch: want %v, have %v", name, txpool.ErrInflightTxLimitReached, err) } }, }, @@ -2386,7 +2364,7 @@ func TestSetCodeTransactions(t *testing.T) { if err := pool.addRemoteSync(pricedTransaction(0, 100000, big.NewInt(1000), keyC)); err != nil { t.Fatalf("%s: failed to added single pooled for account with pending delegation: %v", name, err) } - if err, want := pool.addRemoteSync(pricedTransaction(1, 100000, big.NewInt(1000), keyC)), ErrInflightTxLimitReached; !errors.Is(err, want) { + if err, want := pool.addRemoteSync(pricedTransaction(1, 100000, big.NewInt(1000), keyC)), txpool.ErrInflightTxLimitReached; !errors.Is(err, want) { t.Fatalf("%s: error mismatch: want %v, have %v", name, want, err) } }, @@ -2454,7 +2432,7 @@ func TestSetCodeTransactionsReorg(t *testing.T) { blockchain := newTestBlockChain(params.MergedTestChainConfig, 1000000, statedb, new(event.Feed)) pool := New(testTxPoolConfig, blockchain) - pool.Init(testTxPoolConfig.PriceLimit, blockchain.CurrentBlock(), makeAddressReserver()) + pool.Init(testTxPoolConfig.PriceLimit, blockchain.CurrentBlock(), newReserver()) defer pool.Close() // Create the test accounts @@ -2489,8 +2467,8 @@ func TestSetCodeTransactionsReorg(t *testing.T) { t.Fatalf("failed to add with remote setcode transaction: %v", err) } // Try to add a transactions in - if err := pool.addRemoteSync(pricedTransaction(2, 100000, big.NewInt(1000), keyA)); !errors.Is(err, ErrInflightTxLimitReached) { - t.Fatalf("unexpected error %v, expecting %v", err, ErrInflightTxLimitReached) + if err := pool.addRemoteSync(pricedTransaction(2, 100000, big.NewInt(1000), keyA)); !errors.Is(err, txpool.ErrInflightTxLimitReached) { + t.Fatalf("unexpected error %v, expecting %v", err, txpool.ErrInflightTxLimitReached) } // Simulate the chain moving blockchain.statedb.SetNonce(addrA, 2, tracing.NonceChangeAuthorization) diff --git a/core/txpool/reserver.go b/core/txpool/reserver.go new file mode 100644 index 0000000000..76ead0f3bb --- /dev/null +++ b/core/txpool/reserver.go @@ -0,0 +1,124 @@ +// Copyright 2025 The go-ethereum Authors +// This file is part of the go-ethereum library. +// +// The go-ethereum library is free software: you can redistribute it and/or modify +// it under the terms of the GNU Lesser General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// The go-ethereum library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the go-ethereum library. If not, see . + +package txpool + +import ( + "errors" + "fmt" + "sync" + + "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/log" + "github.com/ethereum/go-ethereum/metrics" +) + +var ( + // reservationsGaugeName is the prefix of a per-subpool address reservation + // metric. + // + // This is mostly a sanity metric to ensure there's no bug that would make + // some subpool hog all the reservations due to mis-accounting. + reservationsGaugeName = "txpool/reservations" +) + +// ReservationTracker is a struct shared between different subpools. It is used to reserve +// the account and ensure that one address cannot initiate transactions, authorizations, +// and other state-changing behaviors in different pools at the same time. +type ReservationTracker struct { + accounts map[common.Address]int + lock sync.RWMutex +} + +// NewReservationTracker initializes the account reservation tracker. +func NewReservationTracker() *ReservationTracker { + return &ReservationTracker{ + accounts: make(map[common.Address]int), + } +} + +// NewHandle creates a named handle on the ReservationTracker. The handle +// identifies the subpool so ownership of reservations can be determined. +func (r *ReservationTracker) NewHandle(id int) *Reserver { + return &Reserver{r, id} +} + +// Reserver is a named handle on ReservationTracker. It is held by subpools to +// make reservations for accounts it is tracking. The id is used to determine +// which pool owns an address and disallows non-owners to hold or release +// addresses it doesn't own. +type Reserver struct { + tracker *ReservationTracker + id int +} + +// Hold attempts to reserve the specified account address for the given pool. +// Returns an error if the account is already reserved. +func (h *Reserver) Hold(addr common.Address) error { + h.tracker.lock.Lock() + defer h.tracker.lock.Unlock() + + // Double reservations are forbidden even from the same pool to + // avoid subtle bugs in the long term. + owner, exists := h.tracker.accounts[addr] + if exists { + if owner == h.id { + log.Error("pool attempted to reserve already-owned address", "address", addr) + return nil // Ignore fault to give the pool a chance to recover while the bug gets fixed + } + return ErrAlreadyReserved + } + h.tracker.accounts[addr] = h.id + if metrics.Enabled() { + m := fmt.Sprintf("%s/%d", reservationsGaugeName, h.id) + metrics.GetOrRegisterGauge(m, nil).Inc(1) + } + return nil +} + +// Release attempts to release the reservation for the specified account. +// Returns an error if the address is not reserved or is reserved by another pool. +func (h *Reserver) Release(addr common.Address) error { + h.tracker.lock.Lock() + defer h.tracker.lock.Unlock() + + // Ensure subpools only attempt to unreserve their own owned addresses, + // otherwise flag as a programming error. + owner, exists := h.tracker.accounts[addr] + if !exists { + log.Error("pool attempted to unreserve non-reserved address", "address", addr) + return errors.New("address not reserved") + } + if owner != h.id { + log.Error("pool attempted to unreserve non-owned address", "address", addr) + return errors.New("address not owned") + } + delete(h.tracker.accounts, addr) + if metrics.Enabled() { + m := fmt.Sprintf("%s/%d", reservationsGaugeName, h.id) + metrics.GetOrRegisterGauge(m, nil).Dec(1) + } + return nil +} + +// Has returns a flag indicating if the address has been reserved or not. +func (h *Reserver) Has(address common.Address) bool { + h.tracker.lock.RLock() + defer h.tracker.lock.RUnlock() + + _, exists := h.tracker.accounts[address] + return exists +} diff --git a/core/txpool/subpool.go b/core/txpool/subpool.go index f5cb852d8f..2cb5103875 100644 --- a/core/txpool/subpool.go +++ b/core/txpool/subpool.go @@ -67,10 +67,6 @@ type LazyResolver interface { Get(hash common.Hash) *types.Transaction } -// AddressReserver is passed by the main transaction pool to subpools, so they -// may request (and relinquish) exclusive access to certain addresses. -type AddressReserver func(addr common.Address, reserve bool) error - // PendingFilter is a collection of filter rules to allow retrieving a subset // of transactions for announcement or mining. // @@ -109,7 +105,7 @@ type SubPool interface { // These should not be passed as a constructor argument - nor should the pools // start by themselves - in order to keep multiple subpools in lockstep with // one another. - Init(gasTip uint64, head *types.Header, reserve AddressReserver) error + Init(gasTip uint64, head *types.Header, reserver *Reserver) error // Close terminates any background processing threads and releases any held // resources. diff --git a/core/txpool/txpool.go b/core/txpool/txpool.go index 083aac92c6..47d83e03d4 100644 --- a/core/txpool/txpool.go +++ b/core/txpool/txpool.go @@ -29,7 +29,6 @@ import ( "github.com/ethereum/go-ethereum/crypto/kzg4844" "github.com/ethereum/go-ethereum/event" "github.com/ethereum/go-ethereum/log" - "github.com/ethereum/go-ethereum/metrics" "github.com/ethereum/go-ethereum/params" ) @@ -43,15 +42,6 @@ const ( TxStatusIncluded ) -var ( - // reservationsGaugeName is the prefix of a per-subpool address reservation - // metric. - // - // This is mostly a sanity metric to ensure there's no bug that would make - // some subpool hog all the reservations due to mis-accounting. - reservationsGaugeName = "txpool/reservations" -) - // BlockChain defines the minimal set of methods needed to back a tx pool with // a chain. Exists to allow mocking the live chain out of tests. type BlockChain interface { @@ -81,9 +71,6 @@ type TxPool struct { stateLock sync.RWMutex // The lock for protecting state instance state *state.StateDB // Current state at the blockchain head - reservations map[common.Address]SubPool // Map with the account to pool reservations - reserveLock sync.Mutex // Lock protecting the account reservations - subs event.SubscriptionScope // Subscription scope to unsubscribe all on shutdown quit chan chan error // Quit channel to tear down the head updater term chan struct{} // Termination channel to detect a closed pool @@ -110,17 +97,17 @@ func New(gasTip uint64, chain BlockChain, subpools []SubPool) (*TxPool, error) { return nil, err } pool := &TxPool{ - subpools: subpools, - chain: chain, - signer: types.LatestSigner(chain.Config()), - state: statedb, - reservations: make(map[common.Address]SubPool), - quit: make(chan chan error), - term: make(chan struct{}), - sync: make(chan chan error), + subpools: subpools, + chain: chain, + signer: types.LatestSigner(chain.Config()), + state: statedb, + quit: make(chan chan error), + term: make(chan struct{}), + sync: make(chan chan error), } + reserver := NewReservationTracker() for i, subpool := range subpools { - if err := subpool.Init(gasTip, head, pool.reserver(i, subpool)); err != nil { + if err := subpool.Init(gasTip, head, reserver.NewHandle(i)); err != nil { for j := i - 1; j >= 0; j-- { subpools[j].Close() } @@ -131,52 +118,6 @@ func New(gasTip uint64, chain BlockChain, subpools []SubPool) (*TxPool, error) { return pool, nil } -// reserver is a method to create an address reservation callback to exclusively -// assign/deassign addresses to/from subpools. This can ensure that at any point -// in time, only a single subpool is able to manage an account, avoiding cross -// subpool eviction issues and nonce conflicts. -func (p *TxPool) reserver(id int, subpool SubPool) AddressReserver { - return func(addr common.Address, reserve bool) error { - p.reserveLock.Lock() - defer p.reserveLock.Unlock() - - owner, exists := p.reservations[addr] - if reserve { - // Double reservations are forbidden even from the same pool to - // avoid subtle bugs in the long term. - if exists { - if owner == subpool { - log.Error("pool attempted to reserve already-owned address", "address", addr) - return nil // Ignore fault to give the pool a chance to recover while the bug gets fixed - } - return ErrAlreadyReserved - } - p.reservations[addr] = subpool - if metrics.Enabled() { - m := fmt.Sprintf("%s/%d", reservationsGaugeName, id) - metrics.GetOrRegisterGauge(m, nil).Inc(1) - } - return nil - } - // Ensure subpools only attempt to unreserve their own owned addresses, - // otherwise flag as a programming error. - if !exists { - log.Error("pool attempted to unreserve non-reserved address", "address", addr) - return errors.New("address not reserved") - } - if subpool != owner { - log.Error("pool attempted to unreserve non-owned address", "address", addr) - return errors.New("address not owned") - } - delete(p.reservations, addr) - if metrics.Enabled() { - m := fmt.Sprintf("%s/%d", reservationsGaugeName, id) - metrics.GetOrRegisterGauge(m, nil).Dec(1) - } - return nil - } -} - // Close terminates the transaction pool and all its subpools. func (p *TxPool) Close() error { var errs []error diff --git a/eth/backend.go b/eth/backend.go index 79759710b6..6716a77562 100644 --- a/eth/backend.go +++ b/eth/backend.go @@ -260,16 +260,16 @@ func New(stack *node.Node, config *ethconfig.Config) (*Ethereum, error) { eth.filterMaps = filtermaps.NewFilterMaps(chainDb, chainView, historyCutoff, finalBlock, filtermaps.DefaultParams, fmConfig) eth.closeFilterMaps = make(chan chan struct{}) - if config.BlobPool.Datadir != "" { - config.BlobPool.Datadir = stack.ResolvePath(config.BlobPool.Datadir) - } - blobPool := blobpool.New(config.BlobPool, eth.blockchain) - if config.TxPool.Journal != "" { config.TxPool.Journal = stack.ResolvePath(config.TxPool.Journal) } legacyPool := legacypool.New(config.TxPool, eth.blockchain) + if config.BlobPool.Datadir != "" { + config.BlobPool.Datadir = stack.ResolvePath(config.BlobPool.Datadir) + } + blobPool := blobpool.New(config.BlobPool, eth.blockchain, legacyPool.HasPendingAuth) + eth.txPool, err = txpool.New(config.TxPool.PriceLimit, eth.blockchain, []txpool.SubPool{legacyPool, blobPool}) if err != nil { return nil, err diff --git a/eth/protocols/eth/handler_test.go b/eth/protocols/eth/handler_test.go index bbbd888701..fa031d9899 100644 --- a/eth/protocols/eth/handler_test.go +++ b/eth/protocols/eth/handler_test.go @@ -135,7 +135,7 @@ func newTestBackendWithGenerator(blocks int, shanghai bool, cancun bool, generat storage, _ := os.MkdirTemp("", "blobpool-") defer os.RemoveAll(storage) - blobPool := blobpool.New(blobpool.Config{Datadir: storage}, chain) + blobPool := blobpool.New(blobpool.Config{Datadir: storage}, chain, nil) legacyPool := legacypool.New(txconfig, chain) txpool, _ := txpool.New(txconfig.PriceLimit, chain, []txpool.SubPool{legacyPool, blobPool})