diff --git a/core/txpool/blobpool/blobpool.go b/core/txpool/blobpool/blobpool.go index 1770066a8d..12a4133b40 100644 --- a/core/txpool/blobpool/blobpool.go +++ b/core/txpool/blobpool/blobpool.go @@ -299,7 +299,7 @@ func newBlobTxMeta(id uint64, size uint64, storageSize uint32, tx *types.Transac // and leading up to the first no-change. type BlobPool struct { config Config // Pool configuration - reserver *txpool.Reserver // Address reserver to ensure exclusivity across subpools + reserver txpool.Reserver // Address reserver to ensure exclusivity across subpools hasPendingAuth func(common.Address) bool // Determine whether the specified address has a pending 7702-auth store billy.Database // Persistent data store for the tx metadata and blobs @@ -355,7 +355,7 @@ func (p *BlobPool) Filter(tx *types.Transaction) bool { // Init sets the gas price needed to keep a transaction in the pool and the chain // head to allow balance / nonce checks. The transaction journal will be loaded // from disk and filtered based on the provided starting settings. -func (p *BlobPool) Init(gasTip uint64, head *types.Header, reserver *txpool.Reserver) error { +func (p *BlobPool) Init(gasTip uint64, head *types.Header, reserver txpool.Reserver) error { p.reserver = reserver var ( diff --git a/core/txpool/blobpool/blobpool_test.go b/core/txpool/blobpool/blobpool_test.go index 4dfba3b52b..76d21a0c9e 100644 --- a/core/txpool/blobpool/blobpool_test.go +++ b/core/txpool/blobpool/blobpool_test.go @@ -26,6 +26,7 @@ import ( "math/big" "os" "path/filepath" + "sync" "testing" "github.com/ethereum/go-ethereum/common" @@ -167,6 +168,44 @@ func (bc *testBlockChain) StateAt(common.Hash) (*state.StateDB, error) { return bc.statedb, nil } +// reserver is a utility struct to sanity check that accounts are +// properly reserved by the blobpool (no duplicate reserves or unreserves). +type reserver struct { + accounts map[common.Address]struct{} + lock sync.RWMutex +} + +func newReserver() txpool.Reserver { + return &reserver{accounts: make(map[common.Address]struct{})} +} + +func (r *reserver) Hold(addr common.Address) error { + r.lock.Lock() + defer r.lock.Unlock() + if _, exists := r.accounts[addr]; exists { + panic("already reserved") + } + r.accounts[addr] = struct{}{} + return nil +} + +func (r *reserver) Release(addr common.Address) error { + r.lock.Lock() + defer r.lock.Unlock() + if _, exists := r.accounts[addr]; !exists { + panic("not reserved") + } + delete(r.accounts, addr) + return nil +} + +func (r *reserver) Has(address common.Address) bool { + r.lock.RLock() + defer r.lock.RUnlock() + _, exists := r.accounts[address] + return exists +} + // makeTx is a utility method to construct a random blob transaction and sign it // with a valid key, only setting the interesting fields from the perspective of // the blob pool. @@ -405,10 +444,6 @@ func verifyBlobRetrievals(t *testing.T, pool *BlobPool) { } } -func newReserver() *txpool.Reserver { - return txpool.NewReservationTracker().NewHandle(42) -} - // Tests that transactions can be loaded from disk on startup and that they are // correctly discarded if invalid. // diff --git a/core/txpool/legacypool/legacypool.go b/core/txpool/legacypool/legacypool.go index 75dc4a8461..04f1a2234c 100644 --- a/core/txpool/legacypool/legacypool.go +++ b/core/txpool/legacypool/legacypool.go @@ -237,7 +237,7 @@ type LegacyPool struct { currentHead atomic.Pointer[types.Header] // Current head of the blockchain currentState *state.StateDB // Current state in the blockchain head pendingNonces *noncer // Pending state tracking virtual nonces - reserver *txpool.Reserver // Address reserver to ensure exclusivity across subpools + reserver txpool.Reserver // Address reserver to ensure exclusivity across subpools pending map[common.Address]*list // All currently processable transactions queue map[common.Address]*list // Queued but non-processable transactions @@ -302,7 +302,7 @@ func (pool *LegacyPool) Filter(tx *types.Transaction) bool { // Init sets the gas price needed to keep a transaction in the pool and the chain // head to allow balance / nonce checks. The internal // goroutines will be spun up and the pool deemed operational afterwards. -func (pool *LegacyPool) Init(gasTip uint64, head *types.Header, reserver *txpool.Reserver) error { +func (pool *LegacyPool) Init(gasTip uint64, head *types.Header, reserver txpool.Reserver) error { // Set the address reserver to request exclusive access to pooled accounts pool.reserver = reserver @@ -640,11 +640,18 @@ func (pool *LegacyPool) validateAuth(tx *types.Transaction) error { if err := pool.checkDelegationLimit(tx); err != nil { return err } - // Authorities must not conflict with any pending or queued transactions, - // nor with addresses that have already been reserved. + // For symmetry, allow at most one in-flight tx for any authority with a + // pending transaction. if auths := tx.SetCodeAuthorities(); len(auths) > 0 { for _, auth := range auths { - if pool.pending[auth] != nil || pool.queue[auth] != nil { + var count int + if pending := pool.pending[auth]; pending != nil { + count += pending.Len() + } + if queue := pool.queue[auth]; queue != nil { + count += queue.Len() + } + if count > 1 { return ErrAuthorityReserved } // Because there is no exclusive lock held between different subpools @@ -1907,9 +1914,14 @@ func (pool *LegacyPool) Clear() { // The transaction addition may attempt to reserve the sender addr which // can't happen until Clear releases the reservation lock. Clear cannot // acquire the subpool lock until the transaction addition is completed. - for _, tx := range pool.all.txs { - senderAddr, _ := types.Sender(pool.signer, tx) - pool.reserver.Release(senderAddr) + + for addr := range pool.pending { + if _, ok := pool.queue[addr]; !ok { + pool.reserver.Release(addr) + } + } + for addr := range pool.queue { + pool.reserver.Release(addr) } pool.all = newLookup() pool.priced = newPricedList(pool.all) diff --git a/core/txpool/legacypool/legacypool_test.go b/core/txpool/legacypool/legacypool_test.go index c47a655204..bb1323a7d1 100644 --- a/core/txpool/legacypool/legacypool_test.go +++ b/core/txpool/legacypool/legacypool_test.go @@ -24,6 +24,7 @@ import ( "math/big" "math/rand" "slices" + "sync" "sync/atomic" "testing" "time" @@ -171,8 +172,39 @@ func setupPool() (*LegacyPool, *ecdsa.PrivateKey) { return setupPoolWithConfig(params.TestChainConfig) } -func newReserver() *txpool.Reserver { - return txpool.NewReservationTracker().NewHandle(42) +// reserver is a utility struct to sanity check that accounts are +// properly reserved by the blobpool (no duplicate reserves or unreserves). +type reserver struct { + accounts map[common.Address]struct{} + lock sync.RWMutex +} + +func newReserver() txpool.Reserver { + return &reserver{accounts: make(map[common.Address]struct{})} +} + +func (r *reserver) Hold(addr common.Address) error { + r.lock.Lock() + defer r.lock.Unlock() + if _, exists := r.accounts[addr]; exists { + panic("already reserved") + } + r.accounts[addr] = struct{}{} + return nil +} + +func (r *reserver) Release(addr common.Address) error { + r.lock.Lock() + defer r.lock.Unlock() + if _, exists := r.accounts[addr]; !exists { + panic("not reserved") + } + delete(r.accounts, addr) + return nil +} + +func (r *reserver) Has(address common.Address) bool { + return false // reserver only supports a single pool } func setupPoolWithConfig(config *params.ChainConfig) (*LegacyPool, *ecdsa.PrivateKey) { @@ -2232,9 +2264,8 @@ func TestSetCodeTransactions(t *testing.T) { }{ { // Check that only one in-flight transaction is allowed for accounts - // with delegation set. Also verify the accepted transaction can be - // replaced by fee. - name: "only-one-in-flight", + // with delegation set. + name: "accept-one-inflight-tx-of-delegated-account", pending: 1, run: func(name string) { aa := common.Address{0xaa, 0xaa} @@ -2249,6 +2280,7 @@ func TestSetCodeTransactions(t *testing.T) { if err := pool.addRemoteSync(pricedTransaction(0, 100000, big.NewInt(1), keyA)); err != nil { t.Fatalf("%s: failed to add remote transaction: %v", name, err) } + // Second and further transactions shall be rejected if err := pool.addRemoteSync(pricedTransaction(1, 100000, big.NewInt(1), keyA)); !errors.Is(err, txpool.ErrInflightTxLimitReached) { t.Fatalf("%s: error mismatch: want %v, have %v", name, txpool.ErrInflightTxLimitReached, err) } @@ -2260,6 +2292,70 @@ func TestSetCodeTransactions(t *testing.T) { if err := pool.addRemoteSync(pricedTransaction(0, 100000, big.NewInt(10), keyA)); err != nil { t.Fatalf("%s: failed to replace with remote transaction: %v", name, err) } + + // Reset the delegation, avoid leaking state into the other tests + statedb.SetCode(addrA, nil) + }, + }, + { + // This test is analogous to the previous one, but the delegation is pending + // instead of set. + name: "allow-one-tx-from-pooled-delegation", + pending: 2, + run: func(name string) { + // Create a pending delegation request from B. + if err := pool.addRemoteSync(setCodeTx(0, keyA, []unsignedAuth{{0, keyB}})); err != nil { + t.Fatalf("%s: failed to add with remote setcode transaction: %v", name, err) + } + // First transaction from B is accepted. + if err := pool.addRemoteSync(pricedTransaction(0, 100000, big.NewInt(1), keyB)); err != nil { + t.Fatalf("%s: failed to add remote transaction: %v", name, err) + } + // Second transaction fails due to limit. + if err := pool.addRemoteSync(pricedTransaction(1, 100000, big.NewInt(1), keyB)); !errors.Is(err, txpool.ErrInflightTxLimitReached) { + t.Fatalf("%s: error mismatch: want %v, have %v", name, txpool.ErrInflightTxLimitReached, err) + } + // Replace by fee for first transaction from B works. + if err := pool.addRemoteSync(pricedTransaction(0, 100000, big.NewInt(2), keyB)); err != nil { + t.Fatalf("%s: failed to add remote transaction: %v", name, err) + } + }, + }, + { + // This is the symmetric case of the previous one, where the delegation request + // is received after the transaction. The resulting state shall be the same. + name: "accept-authorization-from-sender-of-one-inflight-tx", + pending: 2, + run: func(name string) { + // The first in-flight transaction is accepted. + if err := pool.addRemoteSync(pricedTransaction(0, 100000, big.NewInt(1), keyB)); err != nil { + t.Fatalf("%s: failed to add with pending delegation: %v", name, err) + } + // Delegation is accepted. + if err := pool.addRemoteSync(setCodeTx(0, keyA, []unsignedAuth{{0, keyB}})); err != nil { + t.Fatalf("%s: failed to add remote transaction: %v", name, err) + } + // The second in-flight transaction is rejected. + if err := pool.addRemoteSync(pricedTransaction(1, 100000, big.NewInt(1), keyB)); !errors.Is(err, txpool.ErrInflightTxLimitReached) { + t.Fatalf("%s: error mismatch: want %v, have %v", name, txpool.ErrInflightTxLimitReached, err) + } + }, + }, + { + name: "reject-authorization-from-sender-with-more-than-one-inflight-tx", + pending: 2, + run: func(name string) { + // Submit two transactions. + if err := pool.addRemoteSync(pricedTransaction(0, 100000, big.NewInt(1), keyB)); err != nil { + t.Fatalf("%s: failed to add with pending delegation: %v", name, err) + } + if err := pool.addRemoteSync(pricedTransaction(1, 100000, big.NewInt(1), keyB)); err != nil { + t.Fatalf("%s: failed to add with pending delegation: %v", name, err) + } + // Delegation rejected since two txs are already in-flight. + if err := pool.addRemoteSync(setCodeTx(0, keyA, []unsignedAuth{{0, keyB}})); !errors.Is(err, ErrAuthorityReserved) { + t.Fatalf("%s: error mismatch: want %v, have %v", name, ErrAuthorityReserved, err) + } }, }, { @@ -2267,7 +2363,7 @@ func TestSetCodeTransactions(t *testing.T) { pending: 2, run: func(name string) { // Send two transactions where the first has no conflicting delegations and - // the second should be allowed despite conflicting with the authorities in 1). + // the second should be allowed despite conflicting with the authorities in the first. if err := pool.addRemoteSync(setCodeTx(0, keyA, []unsignedAuth{{1, keyC}})); err != nil { t.Fatalf("%s: failed to add with remote setcode transaction: %v", name, err) } @@ -2276,28 +2372,10 @@ func TestSetCodeTransactions(t *testing.T) { } }, }, - { - name: "allow-one-tx-from-pooled-delegation", - pending: 2, - run: func(name string) { - // Verify C cannot originate another transaction when it has a pooled delegation. - if err := pool.addRemoteSync(setCodeTx(0, keyA, []unsignedAuth{{0, keyC}})); err != nil { - t.Fatalf("%s: failed to add with remote setcode transaction: %v", name, err) - } - if err := pool.addRemoteSync(pricedTransaction(0, 100000, big.NewInt(1), keyC)); err != nil { - t.Fatalf("%s: failed to add with pending delegatio: %v", name, err) - } - // Also check gapped transaction is rejected. - if err := pool.addRemoteSync(pricedTransaction(1, 100000, big.NewInt(1), keyC)); !errors.Is(err, txpool.ErrInflightTxLimitReached) { - t.Fatalf("%s: error mismatch: want %v, have %v", name, txpool.ErrInflightTxLimitReached, err) - } - }, - }, { name: "replace-by-fee-setcode-tx", pending: 1, run: func(name string) { - // 4. Fee bump the setcode tx send. if err := pool.addRemoteSync(setCodeTx(0, keyB, []unsignedAuth{{1, keyC}})); err != nil { t.Fatalf("%s: failed to add with remote setcode transaction: %v", name, err) } @@ -2307,44 +2385,85 @@ func TestSetCodeTransactions(t *testing.T) { }, }, { - name: "allow-tx-from-replaced-authority", - pending: 2, + name: "allow-more-than-one-tx-from-replaced-authority", + pending: 3, run: func(name string) { - // Fee bump with a different auth list. Make sure that unlocks the authorities. + // Send transaction from A with B as an authority. if err := pool.addRemoteSync(pricedSetCodeTx(0, 250000, uint256.NewInt(10), uint256.NewInt(3), keyA, []unsignedAuth{{0, keyB}})); err != nil { t.Fatalf("%s: failed to add with remote setcode transaction: %v", name, err) } + // Replace transaction with another having C as an authority. if err := pool.addRemoteSync(pricedSetCodeTx(0, 250000, uint256.NewInt(3000), uint256.NewInt(300), keyA, []unsignedAuth{{0, keyC}})); err != nil { t.Fatalf("%s: failed to add with remote setcode transaction: %v", name, err) } - // Now send a regular tx from B. + // B should not be considred as having an in-flight delegation, so + // should allow more than one pooled transaction. if err := pool.addRemoteSync(pricedTransaction(0, 100000, big.NewInt(10), keyB)); err != nil { t.Fatalf("%s: failed to replace with remote transaction: %v", name, err) } + if err := pool.addRemoteSync(pricedTransaction(1, 100000, big.NewInt(10), keyB)); err != nil { + t.Fatalf("%s: failed to replace with remote transaction: %v", name, err) + } }, }, { + // This test is analogous to the previous one, but the the replaced + // transaction is self-sponsored. name: "allow-tx-from-replaced-self-sponsor-authority", - pending: 2, + pending: 3, run: func(name string) { - // + // Send transaction from A with A as an authority. if err := pool.addRemoteSync(pricedSetCodeTx(0, 250000, uint256.NewInt(10), uint256.NewInt(3), keyA, []unsignedAuth{{0, keyA}})); err != nil { t.Fatalf("%s: failed to add with remote setcode transaction: %v", name, err) } + // Replace transaction with a transaction with B as an authority. if err := pool.addRemoteSync(pricedSetCodeTx(0, 250000, uint256.NewInt(30), uint256.NewInt(30), keyA, []unsignedAuth{{0, keyB}})); err != nil { t.Fatalf("%s: failed to add with remote setcode transaction: %v", name, err) } - // Now send a regular tx from keyA. - if err := pool.addRemoteSync(pricedTransaction(0, 100000, big.NewInt(1000), keyA)); err != nil { + // The one in-flight transaction limit from A no longer applies, so we + // can stack a second transaction for the account. + if err := pool.addRemoteSync(pricedTransaction(1, 100000, big.NewInt(1000), keyA)); err != nil { t.Fatalf("%s: failed to replace with remote transaction: %v", name, err) } - // Make sure we can still send from keyB. + // B should still be able to send transactions. if err := pool.addRemoteSync(pricedTransaction(0, 100000, big.NewInt(1000), keyB)); err != nil { t.Fatalf("%s: failed to replace with remote transaction: %v", name, err) } + // However B still has the limitation to one in-flight transaction. + if err := pool.addRemoteSync(pricedTransaction(1, 100000, big.NewInt(1), keyB)); !errors.Is(err, txpool.ErrInflightTxLimitReached) { + t.Fatalf("%s: error mismatch: want %v, have %v", name, txpool.ErrInflightTxLimitReached, err) + } }, }, { + name: "replacements-respect-inflight-tx-count", + pending: 2, + run: func(name string) { + // Send transaction from A with B as an authority. + if err := pool.addRemoteSync(pricedSetCodeTx(0, 250000, uint256.NewInt(10), uint256.NewInt(3), keyA, []unsignedAuth{{0, keyB}})); err != nil { + t.Fatalf("%s: failed to add with remote setcode transaction: %v", name, err) + } + // Send two transactions from B. Only the first should be accepted due + // to in-flight limit. + if err := pool.addRemoteSync(pricedTransaction(0, 100000, big.NewInt(1), keyB)); err != nil { + t.Fatalf("%s: failed to add remote transaction: %v", name, err) + } + if err := pool.addRemoteSync(pricedTransaction(1, 100000, big.NewInt(1), keyB)); !errors.Is(err, txpool.ErrInflightTxLimitReached) { + t.Fatalf("%s: error mismatch: want %v, have %v", name, txpool.ErrInflightTxLimitReached, err) + } + // Replace the in-flight transaction from B. + if err := pool.addRemoteSync(pricedTransaction(0, 100000, big.NewInt(30), keyB)); err != nil { + t.Fatalf("%s: failed to replace with remote transaction: %v", name, err) + } + // Ensure the in-flight limit for B is still in place. + if err := pool.addRemoteSync(pricedTransaction(1, 100000, big.NewInt(1), keyB)); !errors.Is(err, txpool.ErrInflightTxLimitReached) { + t.Fatalf("%s: error mismatch: want %v, have %v", name, txpool.ErrInflightTxLimitReached, err) + } + }, + }, + { + // Since multiple authorizations can be pending simultaneously, replacing + // one of them should not break the one in-flight-transaction limit. name: "track-multiple-conflicting-delegations", pending: 3, run: func(name string) { @@ -2369,19 +2488,6 @@ func TestSetCodeTransactions(t *testing.T) { } }, }, - { - name: "reject-delegation-from-pending-account", - pending: 1, - run: func(name string) { - // Attempt to submit a delegation from an account with a pending tx. - if err := pool.addRemoteSync(pricedTransaction(0, 100000, big.NewInt(1000), keyC)); err != nil { - t.Fatalf("%s: failed to add with remote setcode transaction: %v", name, err) - } - if err, want := pool.addRemoteSync(setCodeTx(0, keyA, []unsignedAuth{{1, keyC}})), ErrAuthorityReserved; !errors.Is(err, want) { - t.Fatalf("%s: error mismatch: want %v, have %v", name, want, err) - } - }, - }, { name: "remove-hash-from-authority-tracker", pending: 10, diff --git a/core/txpool/reserver.go b/core/txpool/reserver.go index 76ead0f3bb..b6ecef9f1a 100644 --- a/core/txpool/reserver.go +++ b/core/txpool/reserver.go @@ -52,22 +52,37 @@ func NewReservationTracker() *ReservationTracker { // NewHandle creates a named handle on the ReservationTracker. The handle // identifies the subpool so ownership of reservations can be determined. -func (r *ReservationTracker) NewHandle(id int) *Reserver { - return &Reserver{r, id} +func (r *ReservationTracker) NewHandle(id int) *ReservationHandle { + return &ReservationHandle{r, id} } -// Reserver is a named handle on ReservationTracker. It is held by subpools to +// Reserver is an interface for creating and releasing owned reservations in the +// ReservationTracker struct, which is shared between subpools. +type Reserver interface { + // Hold attempts to reserve the specified account address for the given pool. + // Returns an error if the account is already reserved. + Hold(addr common.Address) error + + // Release attempts to release the reservation for the specified account. + // Returns an error if the address is not reserved or is reserved by another pool. + Release(addr common.Address) error + + // Has returns a flag indicating if the address has been reserved by a pool + // other than one with the current Reserver handle. + Has(address common.Address) bool +} + +// ReservationHandle is a named handle on ReservationTracker. It is held by subpools to // make reservations for accounts it is tracking. The id is used to determine // which pool owns an address and disallows non-owners to hold or release // addresses it doesn't own. -type Reserver struct { +type ReservationHandle struct { tracker *ReservationTracker id int } -// Hold attempts to reserve the specified account address for the given pool. -// Returns an error if the account is already reserved. -func (h *Reserver) Hold(addr common.Address) error { +// Hold implements the Reserver interface. +func (h *ReservationHandle) Hold(addr common.Address) error { h.tracker.lock.Lock() defer h.tracker.lock.Unlock() @@ -89,9 +104,8 @@ func (h *Reserver) Hold(addr common.Address) error { return nil } -// Release attempts to release the reservation for the specified account. -// Returns an error if the address is not reserved or is reserved by another pool. -func (h *Reserver) Release(addr common.Address) error { +// Release implements the Reserver interface. +func (h *ReservationHandle) Release(addr common.Address) error { h.tracker.lock.Lock() defer h.tracker.lock.Unlock() @@ -114,11 +128,11 @@ func (h *Reserver) Release(addr common.Address) error { return nil } -// Has returns a flag indicating if the address has been reserved or not. -func (h *Reserver) Has(address common.Address) bool { +// Has implements the Reserver interface. +func (h *ReservationHandle) Has(address common.Address) bool { h.tracker.lock.RLock() defer h.tracker.lock.RUnlock() - _, exists := h.tracker.accounts[address] - return exists + id, exists := h.tracker.accounts[address] + return exists && id != h.id } diff --git a/core/txpool/subpool.go b/core/txpool/subpool.go index 2cb5103875..8cfc14f164 100644 --- a/core/txpool/subpool.go +++ b/core/txpool/subpool.go @@ -105,7 +105,7 @@ type SubPool interface { // These should not be passed as a constructor argument - nor should the pools // start by themselves - in order to keep multiple subpools in lockstep with // one another. - Init(gasTip uint64, head *types.Header, reserver *Reserver) error + Init(gasTip uint64, head *types.Header, reserver Reserver) error // Close terminates any background processing threads and releases any held // resources.