forked from forks/go-ethereum
core/txpool: add 7702 protection to blobpool (#31526)
This pull request introduces two constraints in the blobPool: (a) If the sender has a pending authorization or delegation, only one in-flight executable transaction can be cached. (b) If the authority address in a SetCode transaction is already reserved by the blobPool, the transaction will be rejected. These constraints mitigate an attack where an attacker spams the pool with numerous blob transactions, evicts other transactions, and then cancels all pending blob transactions by draining the sender’s funds if they have a delegation. Note, because there is no exclusive lock held between different subpools when processing transactions, it's totally possible the SetCode transaction and blob transactions with conflict sender and authorities are accepted simultaneously. I think it's acceptable as it's very hard to be exploited. --------- Co-authored-by: lightclient <lightclient@protonmail.com>
This commit is contained in:
parent
ec6d104404
commit
2e739fce58
11 changed files with 286 additions and 213 deletions
|
|
@ -298,8 +298,9 @@ func newBlobTxMeta(id uint64, size uint64, storageSize uint32, tx *types.Transac
|
|||
// minimums will need to be done only starting at the swapped in/out nonce
|
||||
// and leading up to the first no-change.
|
||||
type BlobPool struct {
|
||||
config Config // Pool configuration
|
||||
reserve txpool.AddressReserver // Address reserver to ensure exclusivity across subpools
|
||||
config Config // Pool configuration
|
||||
reserver *txpool.Reserver // Address reserver to ensure exclusivity across subpools
|
||||
hasPendingAuth func(common.Address) bool // Determine whether the specified address has a pending 7702-auth
|
||||
|
||||
store billy.Database // Persistent data store for the tx metadata and blobs
|
||||
stored uint64 // Useful data size of all transactions on disk
|
||||
|
|
@ -329,13 +330,14 @@ type BlobPool struct {
|
|||
|
||||
// New creates a new blob transaction pool to gather, sort and filter inbound
|
||||
// blob transactions from the network.
|
||||
func New(config Config, chain BlockChain) *BlobPool {
|
||||
func New(config Config, chain BlockChain, hasPendingAuth func(common.Address) bool) *BlobPool {
|
||||
// Sanitize the input to ensure no vulnerable gas prices are set
|
||||
config = (&config).sanitize()
|
||||
|
||||
// Create the transaction pool with its initial settings
|
||||
return &BlobPool{
|
||||
config: config,
|
||||
hasPendingAuth: hasPendingAuth,
|
||||
signer: types.LatestSigner(chain.Config()),
|
||||
chain: chain,
|
||||
lookup: newLookup(),
|
||||
|
|
@ -353,8 +355,8 @@ func (p *BlobPool) Filter(tx *types.Transaction) bool {
|
|||
// Init sets the gas price needed to keep a transaction in the pool and the chain
|
||||
// head to allow balance / nonce checks. The transaction journal will be loaded
|
||||
// from disk and filtered based on the provided starting settings.
|
||||
func (p *BlobPool) Init(gasTip uint64, head *types.Header, reserve txpool.AddressReserver) error {
|
||||
p.reserve = reserve
|
||||
func (p *BlobPool) Init(gasTip uint64, head *types.Header, reserver *txpool.Reserver) error {
|
||||
p.reserver = reserver
|
||||
|
||||
var (
|
||||
queuedir string
|
||||
|
|
@ -499,7 +501,7 @@ func (p *BlobPool) parseTransaction(id uint64, size uint32, blob []byte) error {
|
|||
return err
|
||||
}
|
||||
if _, ok := p.index[sender]; !ok {
|
||||
if err := p.reserve(sender, true); err != nil {
|
||||
if err := p.reserver.Hold(sender); err != nil {
|
||||
return err
|
||||
}
|
||||
p.index[sender] = []*blobTxMeta{}
|
||||
|
|
@ -554,7 +556,7 @@ func (p *BlobPool) recheck(addr common.Address, inclusions map[common.Hash]uint6
|
|||
if inclusions != nil { // only during reorgs will the heap be initialized
|
||||
heap.Remove(p.evict, p.evict.index[addr])
|
||||
}
|
||||
p.reserve(addr, false)
|
||||
p.reserver.Release(addr)
|
||||
|
||||
if gapped {
|
||||
log.Warn("Dropping dangling blob transactions", "from", addr, "missing", next, "drop", nonces, "ids", ids)
|
||||
|
|
@ -707,7 +709,7 @@ func (p *BlobPool) recheck(addr common.Address, inclusions map[common.Hash]uint6
|
|||
if inclusions != nil { // only during reorgs will the heap be initialized
|
||||
heap.Remove(p.evict, p.evict.index[addr])
|
||||
}
|
||||
p.reserve(addr, false)
|
||||
p.reserver.Release(addr)
|
||||
} else {
|
||||
p.index[addr] = txs
|
||||
}
|
||||
|
|
@ -1006,7 +1008,7 @@ func (p *BlobPool) reinject(addr common.Address, txhash common.Hash) error {
|
|||
// Update the indices and metrics
|
||||
meta := newBlobTxMeta(id, tx.Size(), p.store.Size(id), tx)
|
||||
if _, ok := p.index[addr]; !ok {
|
||||
if err := p.reserve(addr, true); err != nil {
|
||||
if err := p.reserver.Hold(addr); err != nil {
|
||||
log.Warn("Failed to reserve account for blob pool", "tx", tx.Hash(), "from", addr, "err", err)
|
||||
return err
|
||||
}
|
||||
|
|
@ -1066,7 +1068,7 @@ func (p *BlobPool) SetGasTip(tip *big.Int) {
|
|||
delete(p.spent, addr)
|
||||
|
||||
heap.Remove(p.evict, p.evict.index[addr])
|
||||
p.reserve(addr, false)
|
||||
p.reserver.Release(addr)
|
||||
}
|
||||
// Clear out the transactions from the data store
|
||||
log.Warn("Dropping underpriced blob transaction", "from", addr, "rejected", tx.nonce, "tip", tx.execTipCap, "want", tip, "drop", nonces, "ids", ids)
|
||||
|
|
@ -1101,6 +1103,39 @@ func (p *BlobPool) ValidateTxBasics(tx *types.Transaction) error {
|
|||
return txpool.ValidateTransaction(tx, p.head, p.signer, opts)
|
||||
}
|
||||
|
||||
// checkDelegationLimit determines if the tx sender is delegated or has a
|
||||
// pending delegation, and if so, ensures they have at most one in-flight
|
||||
// **executable** transaction, e.g. disallow stacked and gapped transactions
|
||||
// from the account.
|
||||
func (p *BlobPool) checkDelegationLimit(tx *types.Transaction) error {
|
||||
from, _ := types.Sender(p.signer, tx) // validated
|
||||
|
||||
// Short circuit if the sender has neither delegation nor pending delegation.
|
||||
if p.state.GetCodeHash(from) == types.EmptyCodeHash {
|
||||
// Because there is no exclusive lock held between different subpools
|
||||
// when processing transactions, a blob transaction may be accepted
|
||||
// while other SetCode transactions with pending authorities from the
|
||||
// same address are also accepted simultaneously.
|
||||
//
|
||||
// This scenario is considered acceptable, as the rule primarily ensures
|
||||
// that attackers cannot easily and endlessly stack blob transactions
|
||||
// with a delegated or pending delegated sender.
|
||||
if p.hasPendingAuth == nil || !p.hasPendingAuth(from) {
|
||||
return nil
|
||||
}
|
||||
}
|
||||
// Allow a single in-flight pending transaction.
|
||||
pending := p.index[from]
|
||||
if len(pending) == 0 {
|
||||
return nil
|
||||
}
|
||||
// If account already has a pending transaction, allow replacement only.
|
||||
if len(pending) == 1 && pending[0].nonce == tx.Nonce() {
|
||||
return nil
|
||||
}
|
||||
return txpool.ErrInflightTxLimitReached
|
||||
}
|
||||
|
||||
// validateTx checks whether a transaction is valid according to the consensus
|
||||
// rules and adheres to some heuristic limits of the local node (price and size).
|
||||
func (p *BlobPool) validateTx(tx *types.Transaction) error {
|
||||
|
|
@ -1141,6 +1176,9 @@ func (p *BlobPool) validateTx(tx *types.Transaction) error {
|
|||
if err := txpool.ValidateTransactionWithState(tx, p.signer, stateOpts); err != nil {
|
||||
return err
|
||||
}
|
||||
if err := p.checkDelegationLimit(tx); err != nil {
|
||||
return err
|
||||
}
|
||||
// If the transaction replaces an existing one, ensure that price bumps are
|
||||
// adhered to.
|
||||
var (
|
||||
|
|
@ -1369,7 +1407,7 @@ func (p *BlobPool) add(tx *types.Transaction) (err error) {
|
|||
// only by this subpool until all transactions are evicted
|
||||
from, _ := types.Sender(p.signer, tx) // already validated above
|
||||
if _, ok := p.index[from]; !ok {
|
||||
if err := p.reserve(from, true); err != nil {
|
||||
if err := p.reserver.Hold(from); err != nil {
|
||||
addNonExclusiveMeter.Mark(1)
|
||||
return err
|
||||
}
|
||||
|
|
@ -1381,7 +1419,7 @@ func (p *BlobPool) add(tx *types.Transaction) (err error) {
|
|||
// by a return statement before running deferred methods. Take care with
|
||||
// removing or subscoping err as it will break this clause.
|
||||
if err != nil {
|
||||
p.reserve(from, false)
|
||||
p.reserver.Release(from)
|
||||
}
|
||||
}()
|
||||
}
|
||||
|
|
@ -1513,7 +1551,7 @@ func (p *BlobPool) drop() {
|
|||
if last {
|
||||
delete(p.index, from)
|
||||
delete(p.spent, from)
|
||||
p.reserve(from, false)
|
||||
p.reserver.Release(from)
|
||||
} else {
|
||||
txs[len(txs)-1] = nil
|
||||
txs = txs[:len(txs)-1]
|
||||
|
|
@ -1789,7 +1827,7 @@ func (p *BlobPool) Clear() {
|
|||
// can't happen until Clear releases the reservation lock. Clear cannot
|
||||
// acquire the subpool lock until the transaction addition is completed.
|
||||
for acct := range p.index {
|
||||
p.reserve(acct, false)
|
||||
p.reserver.Release(acct)
|
||||
}
|
||||
p.lookup = newLookup()
|
||||
p.index = make(map[common.Address][]*blobTxMeta)
|
||||
|
|
|
|||
|
|
@ -26,7 +26,6 @@ import (
|
|||
"math/big"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"sync"
|
||||
"testing"
|
||||
|
||||
"github.com/ethereum/go-ethereum/common"
|
||||
|
|
@ -168,33 +167,6 @@ func (bc *testBlockChain) StateAt(common.Hash) (*state.StateDB, error) {
|
|||
return bc.statedb, nil
|
||||
}
|
||||
|
||||
// makeAddressReserver is a utility method to sanity check that accounts are
|
||||
// properly reserved by the blobpool (no duplicate reserves or unreserves).
|
||||
func makeAddressReserver() txpool.AddressReserver {
|
||||
var (
|
||||
reserved = make(map[common.Address]struct{})
|
||||
lock sync.Mutex
|
||||
)
|
||||
return func(addr common.Address, reserve bool) error {
|
||||
lock.Lock()
|
||||
defer lock.Unlock()
|
||||
|
||||
_, exists := reserved[addr]
|
||||
if reserve {
|
||||
if exists {
|
||||
panic("already reserved")
|
||||
}
|
||||
reserved[addr] = struct{}{}
|
||||
return nil
|
||||
}
|
||||
if !exists {
|
||||
panic("not reserved")
|
||||
}
|
||||
delete(reserved, addr)
|
||||
return nil
|
||||
}
|
||||
}
|
||||
|
||||
// makeTx is a utility method to construct a random blob transaction and sign it
|
||||
// with a valid key, only setting the interesting fields from the perspective of
|
||||
// the blob pool.
|
||||
|
|
@ -433,6 +405,10 @@ func verifyBlobRetrievals(t *testing.T, pool *BlobPool) {
|
|||
}
|
||||
}
|
||||
|
||||
func newReserver() *txpool.Reserver {
|
||||
return txpool.NewReservationTracker().NewHandle(42)
|
||||
}
|
||||
|
||||
// Tests that transactions can be loaded from disk on startup and that they are
|
||||
// correctly discarded if invalid.
|
||||
//
|
||||
|
|
@ -699,8 +675,8 @@ func TestOpenDrops(t *testing.T) {
|
|||
blobfee: uint256.NewInt(params.BlobTxMinBlobGasprice),
|
||||
statedb: statedb,
|
||||
}
|
||||
pool := New(Config{Datadir: storage}, chain)
|
||||
if err := pool.Init(1, chain.CurrentBlock(), makeAddressReserver()); err != nil {
|
||||
pool := New(Config{Datadir: storage}, chain, nil)
|
||||
if err := pool.Init(1, chain.CurrentBlock(), newReserver()); err != nil {
|
||||
t.Fatalf("failed to create blob pool: %v", err)
|
||||
}
|
||||
defer pool.Close()
|
||||
|
|
@ -817,8 +793,8 @@ func TestOpenIndex(t *testing.T) {
|
|||
blobfee: uint256.NewInt(params.BlobTxMinBlobGasprice),
|
||||
statedb: statedb,
|
||||
}
|
||||
pool := New(Config{Datadir: storage}, chain)
|
||||
if err := pool.Init(1, chain.CurrentBlock(), makeAddressReserver()); err != nil {
|
||||
pool := New(Config{Datadir: storage}, chain, nil)
|
||||
if err := pool.Init(1, chain.CurrentBlock(), newReserver()); err != nil {
|
||||
t.Fatalf("failed to create blob pool: %v", err)
|
||||
}
|
||||
defer pool.Close()
|
||||
|
|
@ -918,8 +894,8 @@ func TestOpenHeap(t *testing.T) {
|
|||
blobfee: uint256.NewInt(105),
|
||||
statedb: statedb,
|
||||
}
|
||||
pool := New(Config{Datadir: storage}, chain)
|
||||
if err := pool.Init(1, chain.CurrentBlock(), makeAddressReserver()); err != nil {
|
||||
pool := New(Config{Datadir: storage}, chain, nil)
|
||||
if err := pool.Init(1, chain.CurrentBlock(), newReserver()); err != nil {
|
||||
t.Fatalf("failed to create blob pool: %v", err)
|
||||
}
|
||||
defer pool.Close()
|
||||
|
|
@ -997,8 +973,8 @@ func TestOpenCap(t *testing.T) {
|
|||
blobfee: uint256.NewInt(105),
|
||||
statedb: statedb,
|
||||
}
|
||||
pool := New(Config{Datadir: storage, Datacap: datacap}, chain)
|
||||
if err := pool.Init(1, chain.CurrentBlock(), makeAddressReserver()); err != nil {
|
||||
pool := New(Config{Datadir: storage, Datacap: datacap}, chain, nil)
|
||||
if err := pool.Init(1, chain.CurrentBlock(), newReserver()); err != nil {
|
||||
t.Fatalf("failed to create blob pool: %v", err)
|
||||
}
|
||||
// Verify that enough transactions have been dropped to get the pool's size
|
||||
|
|
@ -1098,8 +1074,8 @@ func TestChangingSlotterSize(t *testing.T) {
|
|||
blobfee: uint256.NewInt(105),
|
||||
statedb: statedb,
|
||||
}
|
||||
pool := New(Config{Datadir: storage}, chain)
|
||||
if err := pool.Init(1, chain.CurrentBlock(), makeAddressReserver()); err != nil {
|
||||
pool := New(Config{Datadir: storage}, chain, nil)
|
||||
if err := pool.Init(1, chain.CurrentBlock(), newReserver()); err != nil {
|
||||
t.Fatalf("failed to create blob pool: %v", err)
|
||||
}
|
||||
|
||||
|
|
@ -1541,8 +1517,8 @@ func TestAdd(t *testing.T) {
|
|||
blobfee: uint256.NewInt(105),
|
||||
statedb: statedb,
|
||||
}
|
||||
pool := New(Config{Datadir: storage}, chain)
|
||||
if err := pool.Init(1, chain.CurrentBlock(), makeAddressReserver()); err != nil {
|
||||
pool := New(Config{Datadir: storage}, chain, nil)
|
||||
if err := pool.Init(1, chain.CurrentBlock(), newReserver()); err != nil {
|
||||
t.Fatalf("test %d: failed to create blob pool: %v", i, err)
|
||||
}
|
||||
verifyPoolInternals(t, pool)
|
||||
|
|
@ -1638,10 +1614,10 @@ func benchmarkPoolPending(b *testing.B, datacap uint64) {
|
|||
blobfee: uint256.NewInt(blobfee),
|
||||
statedb: statedb,
|
||||
}
|
||||
pool = New(Config{Datadir: ""}, chain)
|
||||
pool = New(Config{Datadir: ""}, chain, nil)
|
||||
)
|
||||
|
||||
if err := pool.Init(1, chain.CurrentBlock(), makeAddressReserver()); err != nil {
|
||||
if err := pool.Init(1, chain.CurrentBlock(), newReserver()); err != nil {
|
||||
b.Fatalf("failed to create blob pool: %v", err)
|
||||
}
|
||||
// Make the pool not use disk (just drop everything). This test never reads
|
||||
|
|
|
|||
|
|
@ -56,4 +56,8 @@ var (
|
|||
// input transaction of non-blob type when a blob transaction from this sender
|
||||
// remains pending (and vice-versa).
|
||||
ErrAlreadyReserved = errors.New("address already reserved")
|
||||
|
||||
// ErrInflightTxLimitReached is returned when the maximum number of in-flight
|
||||
// transactions is reached for specific accounts.
|
||||
ErrInflightTxLimitReached = errors.New("in-flight transaction limit reached for delegated accounts")
|
||||
)
|
||||
|
|
|
|||
|
|
@ -63,10 +63,6 @@ var (
|
|||
// another remote transaction.
|
||||
ErrTxPoolOverflow = errors.New("txpool is full")
|
||||
|
||||
// ErrInflightTxLimitReached is returned when the maximum number of in-flight
|
||||
// transactions is reached for specific accounts.
|
||||
ErrInflightTxLimitReached = errors.New("in-flight transaction limit reached for delegated accounts")
|
||||
|
||||
// ErrOutOfOrderTxFromDelegated is returned when the transaction with gapped
|
||||
// nonce received from the accounts with delegation or pending delegation.
|
||||
ErrOutOfOrderTxFromDelegated = errors.New("gapped-nonce tx from delegated accounts")
|
||||
|
|
@ -241,8 +237,8 @@ type LegacyPool struct {
|
|||
currentHead atomic.Pointer[types.Header] // Current head of the blockchain
|
||||
currentState *state.StateDB // Current state in the blockchain head
|
||||
pendingNonces *noncer // Pending state tracking virtual nonces
|
||||
reserver *txpool.Reserver // Address reserver to ensure exclusivity across subpools
|
||||
|
||||
reserve txpool.AddressReserver // Address reserver to ensure exclusivity across subpools
|
||||
pending map[common.Address]*list // All currently processable transactions
|
||||
queue map[common.Address]*list // Queued but non-processable transactions
|
||||
beats map[common.Address]time.Time // Last heartbeat from each known account
|
||||
|
|
@ -306,9 +302,9 @@ func (pool *LegacyPool) Filter(tx *types.Transaction) bool {
|
|||
// Init sets the gas price needed to keep a transaction in the pool and the chain
|
||||
// head to allow balance / nonce checks. The internal
|
||||
// goroutines will be spun up and the pool deemed operational afterwards.
|
||||
func (pool *LegacyPool) Init(gasTip uint64, head *types.Header, reserve txpool.AddressReserver) error {
|
||||
func (pool *LegacyPool) Init(gasTip uint64, head *types.Header, reserver *txpool.Reserver) error {
|
||||
// Set the address reserver to request exclusive access to pooled accounts
|
||||
pool.reserve = reserve
|
||||
pool.reserver = reserver
|
||||
|
||||
// Set the basic pool parameters
|
||||
pool.gasTip.Store(uint256.NewInt(gasTip))
|
||||
|
|
@ -618,7 +614,7 @@ func (pool *LegacyPool) checkDelegationLimit(tx *types.Transaction) error {
|
|||
from, _ := types.Sender(pool.signer, tx) // validated
|
||||
|
||||
// Short circuit if the sender has neither delegation nor pending delegation.
|
||||
if pool.currentState.GetCodeHash(from) == types.EmptyCodeHash && pool.all.delegationTxsCount(from) == 0 {
|
||||
if pool.currentState.GetCodeHash(from) == types.EmptyCodeHash && !pool.all.hasAuth(from) {
|
||||
return nil
|
||||
}
|
||||
pending := pool.pending[from]
|
||||
|
|
@ -633,7 +629,7 @@ func (pool *LegacyPool) checkDelegationLimit(tx *types.Transaction) error {
|
|||
if pending.Contains(tx.Nonce()) {
|
||||
return nil
|
||||
}
|
||||
return ErrInflightTxLimitReached
|
||||
return txpool.ErrInflightTxLimitReached
|
||||
}
|
||||
|
||||
// validateAuth verifies that the transaction complies with code authorization
|
||||
|
|
@ -644,12 +640,24 @@ func (pool *LegacyPool) validateAuth(tx *types.Transaction) error {
|
|||
if err := pool.checkDelegationLimit(tx); err != nil {
|
||||
return err
|
||||
}
|
||||
// Authorities cannot conflict with any pending or queued transactions.
|
||||
// Authorities must not conflict with any pending or queued transactions,
|
||||
// nor with addresses that have already been reserved.
|
||||
if auths := tx.SetCodeAuthorities(); len(auths) > 0 {
|
||||
for _, auth := range auths {
|
||||
if pool.pending[auth] != nil || pool.queue[auth] != nil {
|
||||
return ErrAuthorityReserved
|
||||
}
|
||||
// Because there is no exclusive lock held between different subpools
|
||||
// when processing transactions, the SetCode transaction may be accepted
|
||||
// while other transactions with the same sender address are also
|
||||
// accepted simultaneously in the other pools.
|
||||
//
|
||||
// This scenario is considered acceptable, as the rule primarily ensures
|
||||
// that attackers cannot easily stack a SetCode transaction when the sender
|
||||
// is reserved by other pools.
|
||||
if pool.reserver.Has(auth) {
|
||||
return ErrAuthorityReserved
|
||||
}
|
||||
}
|
||||
}
|
||||
return nil
|
||||
|
|
@ -683,7 +691,7 @@ func (pool *LegacyPool) add(tx *types.Transaction) (replaced bool, err error) {
|
|||
_, hasQueued = pool.queue[from]
|
||||
)
|
||||
if !hasPending && !hasQueued {
|
||||
if err := pool.reserve(from, true); err != nil {
|
||||
if err := pool.reserver.Hold(from); err != nil {
|
||||
return false, err
|
||||
}
|
||||
defer func() {
|
||||
|
|
@ -694,7 +702,7 @@ func (pool *LegacyPool) add(tx *types.Transaction) (replaced bool, err error) {
|
|||
// by a return statement before running deferred methods. Take care with
|
||||
// removing or subscoping err as it will break this clause.
|
||||
if err != nil {
|
||||
pool.reserve(from, false)
|
||||
pool.reserver.Release(from)
|
||||
}
|
||||
}()
|
||||
}
|
||||
|
|
@ -1087,7 +1095,7 @@ func (pool *LegacyPool) removeTx(hash common.Hash, outofbound bool, unreserve bo
|
|||
_, hasQueued = pool.queue[addr]
|
||||
)
|
||||
if !hasPending && !hasQueued {
|
||||
pool.reserve(addr, false)
|
||||
pool.reserver.Release(addr)
|
||||
}
|
||||
}()
|
||||
}
|
||||
|
|
@ -1467,7 +1475,7 @@ func (pool *LegacyPool) promoteExecutables(accounts []common.Address) []*types.T
|
|||
delete(pool.queue, addr)
|
||||
delete(pool.beats, addr)
|
||||
if _, ok := pool.pending[addr]; !ok {
|
||||
pool.reserve(addr, false)
|
||||
pool.reserver.Release(addr)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
@ -1653,7 +1661,7 @@ func (pool *LegacyPool) demoteUnexecutables() {
|
|||
if list.Empty() {
|
||||
delete(pool.pending, addr)
|
||||
if _, ok := pool.queue[addr]; !ok {
|
||||
pool.reserve(addr, false)
|
||||
pool.reserver.Release(addr)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
@ -1862,11 +1870,13 @@ func (t *lookup) removeAuthorities(tx *types.Transaction) {
|
|||
}
|
||||
}
|
||||
|
||||
// delegationTxsCount returns the number of pending authorizations for the specified address.
|
||||
func (t *lookup) delegationTxsCount(addr common.Address) int {
|
||||
// hasAuth returns a flag indicating whether there are pending authorizations
|
||||
// from the specified address.
|
||||
func (t *lookup) hasAuth(addr common.Address) bool {
|
||||
t.lock.RLock()
|
||||
defer t.lock.RUnlock()
|
||||
return len(t.auths[addr])
|
||||
|
||||
return len(t.auths[addr]) > 0
|
||||
}
|
||||
|
||||
// numSlots calculates the number of slots needed for a single transaction.
|
||||
|
|
@ -1880,8 +1890,8 @@ func (pool *LegacyPool) Clear() {
|
|||
pool.mu.Lock()
|
||||
defer pool.mu.Unlock()
|
||||
|
||||
// unreserve each tracked account. Ideally, we could just clear the
|
||||
// reservation map in the parent txpool context. However, if we clear in
|
||||
// unreserve each tracked account. Ideally, we could just clear the
|
||||
// reservation map in the parent txpool context. However, if we clear in
|
||||
// parent context, to avoid exposing the subpool lock, we have to lock the
|
||||
// reservations and then lock each subpool.
|
||||
//
|
||||
|
|
@ -1892,11 +1902,11 @@ func (pool *LegacyPool) Clear() {
|
|||
// * TxPool.Clear attempts to lock subpool mutex
|
||||
//
|
||||
// The transaction addition may attempt to reserve the sender addr which
|
||||
// can't happen until Clear releases the reservation lock. Clear cannot
|
||||
// can't happen until Clear releases the reservation lock. Clear cannot
|
||||
// acquire the subpool lock until the transaction addition is completed.
|
||||
for _, tx := range pool.all.txs {
|
||||
senderAddr, _ := types.Sender(pool.signer, tx)
|
||||
pool.reserve(senderAddr, false)
|
||||
pool.reserver.Release(senderAddr)
|
||||
}
|
||||
pool.all = newLookup()
|
||||
pool.priced = newPricedList(pool.all)
|
||||
|
|
@ -1904,3 +1914,9 @@ func (pool *LegacyPool) Clear() {
|
|||
pool.queue = make(map[common.Address]*list)
|
||||
pool.pendingNonces = newNoncer(pool.currentState)
|
||||
}
|
||||
|
||||
// HasPendingAuth returns a flag indicating whether there are pending
|
||||
// authorizations from the specific address cached in the pool.
|
||||
func (pool *LegacyPool) HasPendingAuth(addr common.Address) bool {
|
||||
return pool.all.hasAuth(addr)
|
||||
}
|
||||
|
|
|
|||
|
|
@ -86,7 +86,7 @@ func TestTransactionFutureAttack(t *testing.T) {
|
|||
config.GlobalQueue = 100
|
||||
config.GlobalSlots = 100
|
||||
pool := New(config, blockchain)
|
||||
pool.Init(config.PriceLimit, blockchain.CurrentBlock(), makeAddressReserver())
|
||||
pool.Init(config.PriceLimit, blockchain.CurrentBlock(), newReserver())
|
||||
defer pool.Close()
|
||||
fillPool(t, pool)
|
||||
pending, _ := pool.Stats()
|
||||
|
|
@ -120,7 +120,7 @@ func TestTransactionFuture1559(t *testing.T) {
|
|||
statedb, _ := state.New(types.EmptyRootHash, state.NewDatabaseForTesting())
|
||||
blockchain := newTestBlockChain(eip1559Config, 1000000, statedb, new(event.Feed))
|
||||
pool := New(testTxPoolConfig, blockchain)
|
||||
pool.Init(testTxPoolConfig.PriceLimit, blockchain.CurrentBlock(), makeAddressReserver())
|
||||
pool.Init(testTxPoolConfig.PriceLimit, blockchain.CurrentBlock(), newReserver())
|
||||
defer pool.Close()
|
||||
|
||||
// Create a number of test accounts, fund them and make transactions
|
||||
|
|
@ -153,7 +153,7 @@ func TestTransactionZAttack(t *testing.T) {
|
|||
statedb, _ := state.New(types.EmptyRootHash, state.NewDatabaseForTesting())
|
||||
blockchain := newTestBlockChain(eip1559Config, 1000000, statedb, new(event.Feed))
|
||||
pool := New(testTxPoolConfig, blockchain)
|
||||
pool.Init(testTxPoolConfig.PriceLimit, blockchain.CurrentBlock(), makeAddressReserver())
|
||||
pool.Init(testTxPoolConfig.PriceLimit, blockchain.CurrentBlock(), newReserver())
|
||||
defer pool.Close()
|
||||
// Create a number of test accounts, fund them and make transactions
|
||||
fillPool(t, pool)
|
||||
|
|
@ -224,7 +224,7 @@ func BenchmarkFutureAttack(b *testing.B) {
|
|||
config.GlobalQueue = 100
|
||||
config.GlobalSlots = 100
|
||||
pool := New(config, blockchain)
|
||||
pool.Init(testTxPoolConfig.PriceLimit, blockchain.CurrentBlock(), makeAddressReserver())
|
||||
pool.Init(testTxPoolConfig.PriceLimit, blockchain.CurrentBlock(), newReserver())
|
||||
defer pool.Close()
|
||||
fillPool(b, pool)
|
||||
|
||||
|
|
|
|||
|
|
@ -24,7 +24,6 @@ import (
|
|||
"math/big"
|
||||
"math/rand"
|
||||
"slices"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
"testing"
|
||||
"time"
|
||||
|
|
@ -168,42 +167,21 @@ func pricedSetCodeTxWithAuth(nonce uint64, gaslimit uint64, gasFee, tip *uint256
|
|||
})
|
||||
}
|
||||
|
||||
func makeAddressReserver() txpool.AddressReserver {
|
||||
var (
|
||||
reserved = make(map[common.Address]struct{})
|
||||
lock sync.Mutex
|
||||
)
|
||||
return func(addr common.Address, reserve bool) error {
|
||||
lock.Lock()
|
||||
defer lock.Unlock()
|
||||
|
||||
_, exists := reserved[addr]
|
||||
if reserve {
|
||||
if exists {
|
||||
panic("already reserved")
|
||||
}
|
||||
reserved[addr] = struct{}{}
|
||||
return nil
|
||||
}
|
||||
if !exists {
|
||||
panic("not reserved")
|
||||
}
|
||||
delete(reserved, addr)
|
||||
return nil
|
||||
}
|
||||
}
|
||||
|
||||
func setupPool() (*LegacyPool, *ecdsa.PrivateKey) {
|
||||
return setupPoolWithConfig(params.TestChainConfig)
|
||||
}
|
||||
|
||||
func newReserver() *txpool.Reserver {
|
||||
return txpool.NewReservationTracker().NewHandle(42)
|
||||
}
|
||||
|
||||
func setupPoolWithConfig(config *params.ChainConfig) (*LegacyPool, *ecdsa.PrivateKey) {
|
||||
statedb, _ := state.New(types.EmptyRootHash, state.NewDatabaseForTesting())
|
||||
blockchain := newTestBlockChain(config, 10000000, statedb, new(event.Feed))
|
||||
|
||||
key, _ := crypto.GenerateKey()
|
||||
pool := New(testTxPoolConfig, blockchain)
|
||||
if err := pool.Init(testTxPoolConfig.PriceLimit, blockchain.CurrentBlock(), makeAddressReserver()); err != nil {
|
||||
if err := pool.Init(testTxPoolConfig.PriceLimit, blockchain.CurrentBlock(), newReserver()); err != nil {
|
||||
panic(err)
|
||||
}
|
||||
// wait for the pool to initialize
|
||||
|
|
@ -336,7 +314,7 @@ func TestStateChangeDuringReset(t *testing.T) {
|
|||
tx1 := transaction(1, 100000, key)
|
||||
|
||||
pool := New(testTxPoolConfig, blockchain)
|
||||
pool.Init(testTxPoolConfig.PriceLimit, blockchain.CurrentBlock(), makeAddressReserver())
|
||||
pool.Init(testTxPoolConfig.PriceLimit, blockchain.CurrentBlock(), newReserver())
|
||||
defer pool.Close()
|
||||
|
||||
nonce := pool.Nonce(address)
|
||||
|
|
@ -753,7 +731,7 @@ func TestPostponing(t *testing.T) {
|
|||
blockchain := newTestBlockChain(params.TestChainConfig, 1000000, statedb, new(event.Feed))
|
||||
|
||||
pool := New(testTxPoolConfig, blockchain)
|
||||
pool.Init(testTxPoolConfig.PriceLimit, blockchain.CurrentBlock(), makeAddressReserver())
|
||||
pool.Init(testTxPoolConfig.PriceLimit, blockchain.CurrentBlock(), newReserver())
|
||||
defer pool.Close()
|
||||
|
||||
// Create two test accounts to produce different gap profiles with
|
||||
|
|
@ -963,7 +941,7 @@ func TestQueueGlobalLimiting(t *testing.T) {
|
|||
config.GlobalQueue = config.AccountQueue*3 - 1 // reduce the queue limits to shorten test time (-1 to make it non divisible)
|
||||
|
||||
pool := New(config, blockchain)
|
||||
pool.Init(testTxPoolConfig.PriceLimit, blockchain.CurrentBlock(), makeAddressReserver())
|
||||
pool.Init(testTxPoolConfig.PriceLimit, blockchain.CurrentBlock(), newReserver())
|
||||
defer pool.Close()
|
||||
|
||||
// Create a number of test accounts and fund them (last one will be the local)
|
||||
|
|
@ -1015,7 +993,7 @@ func TestQueueTimeLimiting(t *testing.T) {
|
|||
config.Lifetime = time.Second
|
||||
|
||||
pool := New(config, blockchain)
|
||||
pool.Init(config.PriceLimit, blockchain.CurrentBlock(), makeAddressReserver())
|
||||
pool.Init(config.PriceLimit, blockchain.CurrentBlock(), newReserver())
|
||||
defer pool.Close()
|
||||
|
||||
// Create a test account to ensure remotes expire
|
||||
|
|
@ -1176,7 +1154,7 @@ func TestPendingGlobalLimiting(t *testing.T) {
|
|||
config.GlobalSlots = config.AccountSlots * 10
|
||||
|
||||
pool := New(config, blockchain)
|
||||
pool.Init(config.PriceLimit, blockchain.CurrentBlock(), makeAddressReserver())
|
||||
pool.Init(config.PriceLimit, blockchain.CurrentBlock(), newReserver())
|
||||
defer pool.Close()
|
||||
|
||||
// Create a number of test accounts and fund them
|
||||
|
|
@ -1275,7 +1253,7 @@ func TestCapClearsFromAll(t *testing.T) {
|
|||
config.GlobalSlots = 8
|
||||
|
||||
pool := New(config, blockchain)
|
||||
pool.Init(config.PriceLimit, blockchain.CurrentBlock(), makeAddressReserver())
|
||||
pool.Init(config.PriceLimit, blockchain.CurrentBlock(), newReserver())
|
||||
defer pool.Close()
|
||||
|
||||
// Create a number of test accounts and fund them
|
||||
|
|
@ -1308,7 +1286,7 @@ func TestPendingMinimumAllowance(t *testing.T) {
|
|||
config.GlobalSlots = 1
|
||||
|
||||
pool := New(config, blockchain)
|
||||
pool.Init(config.PriceLimit, blockchain.CurrentBlock(), makeAddressReserver())
|
||||
pool.Init(config.PriceLimit, blockchain.CurrentBlock(), newReserver())
|
||||
defer pool.Close()
|
||||
|
||||
// Create a number of test accounts and fund them
|
||||
|
|
@ -1352,7 +1330,7 @@ func TestRepricing(t *testing.T) {
|
|||
blockchain := newTestBlockChain(params.TestChainConfig, 1000000, statedb, new(event.Feed))
|
||||
|
||||
pool := New(testTxPoolConfig, blockchain)
|
||||
pool.Init(testTxPoolConfig.PriceLimit, blockchain.CurrentBlock(), makeAddressReserver())
|
||||
pool.Init(testTxPoolConfig.PriceLimit, blockchain.CurrentBlock(), newReserver())
|
||||
defer pool.Close()
|
||||
|
||||
// Keep track of transaction events to ensure all executables get announced
|
||||
|
|
@ -1457,7 +1435,7 @@ func TestMinGasPriceEnforced(t *testing.T) {
|
|||
txPoolConfig := DefaultConfig
|
||||
txPoolConfig.NoLocals = true
|
||||
pool := New(txPoolConfig, blockchain)
|
||||
pool.Init(txPoolConfig.PriceLimit, blockchain.CurrentBlock(), makeAddressReserver())
|
||||
pool.Init(txPoolConfig.PriceLimit, blockchain.CurrentBlock(), newReserver())
|
||||
defer pool.Close()
|
||||
|
||||
key, _ := crypto.GenerateKey()
|
||||
|
|
@ -1606,7 +1584,7 @@ func TestUnderpricing(t *testing.T) {
|
|||
config.GlobalQueue = 2
|
||||
|
||||
pool := New(config, blockchain)
|
||||
pool.Init(config.PriceLimit, blockchain.CurrentBlock(), makeAddressReserver())
|
||||
pool.Init(config.PriceLimit, blockchain.CurrentBlock(), newReserver())
|
||||
defer pool.Close()
|
||||
|
||||
// Keep track of transaction events to ensure all executables get announced
|
||||
|
|
@ -1696,7 +1674,7 @@ func TestStableUnderpricing(t *testing.T) {
|
|||
config.GlobalQueue = 0
|
||||
|
||||
pool := New(config, blockchain)
|
||||
pool.Init(config.PriceLimit, blockchain.CurrentBlock(), makeAddressReserver())
|
||||
pool.Init(config.PriceLimit, blockchain.CurrentBlock(), newReserver())
|
||||
defer pool.Close()
|
||||
|
||||
// Keep track of transaction events to ensure all executables get announced
|
||||
|
|
@ -1899,7 +1877,7 @@ func TestDeduplication(t *testing.T) {
|
|||
blockchain := newTestBlockChain(params.TestChainConfig, 1000000, statedb, new(event.Feed))
|
||||
|
||||
pool := New(testTxPoolConfig, blockchain)
|
||||
pool.Init(testTxPoolConfig.PriceLimit, blockchain.CurrentBlock(), makeAddressReserver())
|
||||
pool.Init(testTxPoolConfig.PriceLimit, blockchain.CurrentBlock(), newReserver())
|
||||
defer pool.Close()
|
||||
|
||||
// Create a test account to add transactions with
|
||||
|
|
@ -1966,7 +1944,7 @@ func TestReplacement(t *testing.T) {
|
|||
blockchain := newTestBlockChain(params.TestChainConfig, 1000000, statedb, new(event.Feed))
|
||||
|
||||
pool := New(testTxPoolConfig, blockchain)
|
||||
pool.Init(testTxPoolConfig.PriceLimit, blockchain.CurrentBlock(), makeAddressReserver())
|
||||
pool.Init(testTxPoolConfig.PriceLimit, blockchain.CurrentBlock(), newReserver())
|
||||
defer pool.Close()
|
||||
|
||||
// Keep track of transaction events to ensure all executables get announced
|
||||
|
|
@ -2157,7 +2135,7 @@ func TestStatusCheck(t *testing.T) {
|
|||
blockchain := newTestBlockChain(params.TestChainConfig, 1000000, statedb, new(event.Feed))
|
||||
|
||||
pool := New(testTxPoolConfig, blockchain)
|
||||
pool.Init(testTxPoolConfig.PriceLimit, blockchain.CurrentBlock(), makeAddressReserver())
|
||||
pool.Init(testTxPoolConfig.PriceLimit, blockchain.CurrentBlock(), newReserver())
|
||||
defer pool.Close()
|
||||
|
||||
// Create the test accounts to check various transaction statuses with
|
||||
|
|
@ -2230,7 +2208,7 @@ func TestSetCodeTransactions(t *testing.T) {
|
|||
blockchain := newTestBlockChain(params.MergedTestChainConfig, 1000000, statedb, new(event.Feed))
|
||||
|
||||
pool := New(testTxPoolConfig, blockchain)
|
||||
pool.Init(testTxPoolConfig.PriceLimit, blockchain.CurrentBlock(), makeAddressReserver())
|
||||
pool.Init(testTxPoolConfig.PriceLimit, blockchain.CurrentBlock(), newReserver())
|
||||
defer pool.Close()
|
||||
|
||||
// Create the test accounts
|
||||
|
|
@ -2271,12 +2249,12 @@ func TestSetCodeTransactions(t *testing.T) {
|
|||
if err := pool.addRemoteSync(pricedTransaction(0, 100000, big.NewInt(1), keyA)); err != nil {
|
||||
t.Fatalf("%s: failed to add remote transaction: %v", name, err)
|
||||
}
|
||||
if err := pool.addRemoteSync(pricedTransaction(1, 100000, big.NewInt(1), keyA)); !errors.Is(err, ErrInflightTxLimitReached) {
|
||||
t.Fatalf("%s: error mismatch: want %v, have %v", name, ErrInflightTxLimitReached, err)
|
||||
if err := pool.addRemoteSync(pricedTransaction(1, 100000, big.NewInt(1), keyA)); !errors.Is(err, txpool.ErrInflightTxLimitReached) {
|
||||
t.Fatalf("%s: error mismatch: want %v, have %v", name, txpool.ErrInflightTxLimitReached, err)
|
||||
}
|
||||
// Check gapped transaction again.
|
||||
if err := pool.addRemoteSync(pricedTransaction(2, 100000, big.NewInt(1), keyA)); !errors.Is(err, ErrInflightTxLimitReached) {
|
||||
t.Fatalf("%s: error mismatch: want %v, have %v", name, ErrInflightTxLimitReached, err)
|
||||
if err := pool.addRemoteSync(pricedTransaction(2, 100000, big.NewInt(1), keyA)); !errors.Is(err, txpool.ErrInflightTxLimitReached) {
|
||||
t.Fatalf("%s: error mismatch: want %v, have %v", name, txpool.ErrInflightTxLimitReached, err)
|
||||
}
|
||||
// Replace by fee.
|
||||
if err := pool.addRemoteSync(pricedTransaction(0, 100000, big.NewInt(10), keyA)); err != nil {
|
||||
|
|
@ -2310,8 +2288,8 @@ func TestSetCodeTransactions(t *testing.T) {
|
|||
t.Fatalf("%s: failed to add with pending delegatio: %v", name, err)
|
||||
}
|
||||
// Also check gapped transaction is rejected.
|
||||
if err := pool.addRemoteSync(pricedTransaction(1, 100000, big.NewInt(1), keyC)); !errors.Is(err, ErrInflightTxLimitReached) {
|
||||
t.Fatalf("%s: error mismatch: want %v, have %v", name, ErrInflightTxLimitReached, err)
|
||||
if err := pool.addRemoteSync(pricedTransaction(1, 100000, big.NewInt(1), keyC)); !errors.Is(err, txpool.ErrInflightTxLimitReached) {
|
||||
t.Fatalf("%s: error mismatch: want %v, have %v", name, txpool.ErrInflightTxLimitReached, err)
|
||||
}
|
||||
},
|
||||
},
|
||||
|
|
@ -2386,7 +2364,7 @@ func TestSetCodeTransactions(t *testing.T) {
|
|||
if err := pool.addRemoteSync(pricedTransaction(0, 100000, big.NewInt(1000), keyC)); err != nil {
|
||||
t.Fatalf("%s: failed to added single pooled for account with pending delegation: %v", name, err)
|
||||
}
|
||||
if err, want := pool.addRemoteSync(pricedTransaction(1, 100000, big.NewInt(1000), keyC)), ErrInflightTxLimitReached; !errors.Is(err, want) {
|
||||
if err, want := pool.addRemoteSync(pricedTransaction(1, 100000, big.NewInt(1000), keyC)), txpool.ErrInflightTxLimitReached; !errors.Is(err, want) {
|
||||
t.Fatalf("%s: error mismatch: want %v, have %v", name, want, err)
|
||||
}
|
||||
},
|
||||
|
|
@ -2454,7 +2432,7 @@ func TestSetCodeTransactionsReorg(t *testing.T) {
|
|||
blockchain := newTestBlockChain(params.MergedTestChainConfig, 1000000, statedb, new(event.Feed))
|
||||
|
||||
pool := New(testTxPoolConfig, blockchain)
|
||||
pool.Init(testTxPoolConfig.PriceLimit, blockchain.CurrentBlock(), makeAddressReserver())
|
||||
pool.Init(testTxPoolConfig.PriceLimit, blockchain.CurrentBlock(), newReserver())
|
||||
defer pool.Close()
|
||||
|
||||
// Create the test accounts
|
||||
|
|
@ -2489,8 +2467,8 @@ func TestSetCodeTransactionsReorg(t *testing.T) {
|
|||
t.Fatalf("failed to add with remote setcode transaction: %v", err)
|
||||
}
|
||||
// Try to add a transactions in
|
||||
if err := pool.addRemoteSync(pricedTransaction(2, 100000, big.NewInt(1000), keyA)); !errors.Is(err, ErrInflightTxLimitReached) {
|
||||
t.Fatalf("unexpected error %v, expecting %v", err, ErrInflightTxLimitReached)
|
||||
if err := pool.addRemoteSync(pricedTransaction(2, 100000, big.NewInt(1000), keyA)); !errors.Is(err, txpool.ErrInflightTxLimitReached) {
|
||||
t.Fatalf("unexpected error %v, expecting %v", err, txpool.ErrInflightTxLimitReached)
|
||||
}
|
||||
// Simulate the chain moving
|
||||
blockchain.statedb.SetNonce(addrA, 2, tracing.NonceChangeAuthorization)
|
||||
|
|
|
|||
124
core/txpool/reserver.go
Normal file
124
core/txpool/reserver.go
Normal file
|
|
@ -0,0 +1,124 @@
|
|||
// Copyright 2025 The go-ethereum Authors
|
||||
// This file is part of the go-ethereum library.
|
||||
//
|
||||
// The go-ethereum library is free software: you can redistribute it and/or modify
|
||||
// it under the terms of the GNU Lesser General Public License as published by
|
||||
// the Free Software Foundation, either version 3 of the License, or
|
||||
// (at your option) any later version.
|
||||
//
|
||||
// The go-ethereum library is distributed in the hope that it will be useful,
|
||||
// but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
// GNU Lesser General Public License for more details.
|
||||
//
|
||||
// You should have received a copy of the GNU Lesser General Public License
|
||||
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
|
||||
|
||||
package txpool
|
||||
|
||||
import (
|
||||
"errors"
|
||||
"fmt"
|
||||
"sync"
|
||||
|
||||
"github.com/ethereum/go-ethereum/common"
|
||||
"github.com/ethereum/go-ethereum/log"
|
||||
"github.com/ethereum/go-ethereum/metrics"
|
||||
)
|
||||
|
||||
var (
|
||||
// reservationsGaugeName is the prefix of a per-subpool address reservation
|
||||
// metric.
|
||||
//
|
||||
// This is mostly a sanity metric to ensure there's no bug that would make
|
||||
// some subpool hog all the reservations due to mis-accounting.
|
||||
reservationsGaugeName = "txpool/reservations"
|
||||
)
|
||||
|
||||
// ReservationTracker is a struct shared between different subpools. It is used to reserve
|
||||
// the account and ensure that one address cannot initiate transactions, authorizations,
|
||||
// and other state-changing behaviors in different pools at the same time.
|
||||
type ReservationTracker struct {
|
||||
accounts map[common.Address]int
|
||||
lock sync.RWMutex
|
||||
}
|
||||
|
||||
// NewReservationTracker initializes the account reservation tracker.
|
||||
func NewReservationTracker() *ReservationTracker {
|
||||
return &ReservationTracker{
|
||||
accounts: make(map[common.Address]int),
|
||||
}
|
||||
}
|
||||
|
||||
// NewHandle creates a named handle on the ReservationTracker. The handle
|
||||
// identifies the subpool so ownership of reservations can be determined.
|
||||
func (r *ReservationTracker) NewHandle(id int) *Reserver {
|
||||
return &Reserver{r, id}
|
||||
}
|
||||
|
||||
// Reserver is a named handle on ReservationTracker. It is held by subpools to
|
||||
// make reservations for accounts it is tracking. The id is used to determine
|
||||
// which pool owns an address and disallows non-owners to hold or release
|
||||
// addresses it doesn't own.
|
||||
type Reserver struct {
|
||||
tracker *ReservationTracker
|
||||
id int
|
||||
}
|
||||
|
||||
// Hold attempts to reserve the specified account address for the given pool.
|
||||
// Returns an error if the account is already reserved.
|
||||
func (h *Reserver) Hold(addr common.Address) error {
|
||||
h.tracker.lock.Lock()
|
||||
defer h.tracker.lock.Unlock()
|
||||
|
||||
// Double reservations are forbidden even from the same pool to
|
||||
// avoid subtle bugs in the long term.
|
||||
owner, exists := h.tracker.accounts[addr]
|
||||
if exists {
|
||||
if owner == h.id {
|
||||
log.Error("pool attempted to reserve already-owned address", "address", addr)
|
||||
return nil // Ignore fault to give the pool a chance to recover while the bug gets fixed
|
||||
}
|
||||
return ErrAlreadyReserved
|
||||
}
|
||||
h.tracker.accounts[addr] = h.id
|
||||
if metrics.Enabled() {
|
||||
m := fmt.Sprintf("%s/%d", reservationsGaugeName, h.id)
|
||||
metrics.GetOrRegisterGauge(m, nil).Inc(1)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// Release attempts to release the reservation for the specified account.
|
||||
// Returns an error if the address is not reserved or is reserved by another pool.
|
||||
func (h *Reserver) Release(addr common.Address) error {
|
||||
h.tracker.lock.Lock()
|
||||
defer h.tracker.lock.Unlock()
|
||||
|
||||
// Ensure subpools only attempt to unreserve their own owned addresses,
|
||||
// otherwise flag as a programming error.
|
||||
owner, exists := h.tracker.accounts[addr]
|
||||
if !exists {
|
||||
log.Error("pool attempted to unreserve non-reserved address", "address", addr)
|
||||
return errors.New("address not reserved")
|
||||
}
|
||||
if owner != h.id {
|
||||
log.Error("pool attempted to unreserve non-owned address", "address", addr)
|
||||
return errors.New("address not owned")
|
||||
}
|
||||
delete(h.tracker.accounts, addr)
|
||||
if metrics.Enabled() {
|
||||
m := fmt.Sprintf("%s/%d", reservationsGaugeName, h.id)
|
||||
metrics.GetOrRegisterGauge(m, nil).Dec(1)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// Has returns a flag indicating if the address has been reserved or not.
|
||||
func (h *Reserver) Has(address common.Address) bool {
|
||||
h.tracker.lock.RLock()
|
||||
defer h.tracker.lock.RUnlock()
|
||||
|
||||
_, exists := h.tracker.accounts[address]
|
||||
return exists
|
||||
}
|
||||
|
|
@ -67,10 +67,6 @@ type LazyResolver interface {
|
|||
Get(hash common.Hash) *types.Transaction
|
||||
}
|
||||
|
||||
// AddressReserver is passed by the main transaction pool to subpools, so they
|
||||
// may request (and relinquish) exclusive access to certain addresses.
|
||||
type AddressReserver func(addr common.Address, reserve bool) error
|
||||
|
||||
// PendingFilter is a collection of filter rules to allow retrieving a subset
|
||||
// of transactions for announcement or mining.
|
||||
//
|
||||
|
|
@ -109,7 +105,7 @@ type SubPool interface {
|
|||
// These should not be passed as a constructor argument - nor should the pools
|
||||
// start by themselves - in order to keep multiple subpools in lockstep with
|
||||
// one another.
|
||||
Init(gasTip uint64, head *types.Header, reserve AddressReserver) error
|
||||
Init(gasTip uint64, head *types.Header, reserver *Reserver) error
|
||||
|
||||
// Close terminates any background processing threads and releases any held
|
||||
// resources.
|
||||
|
|
|
|||
|
|
@ -29,7 +29,6 @@ import (
|
|||
"github.com/ethereum/go-ethereum/crypto/kzg4844"
|
||||
"github.com/ethereum/go-ethereum/event"
|
||||
"github.com/ethereum/go-ethereum/log"
|
||||
"github.com/ethereum/go-ethereum/metrics"
|
||||
"github.com/ethereum/go-ethereum/params"
|
||||
)
|
||||
|
||||
|
|
@ -43,15 +42,6 @@ const (
|
|||
TxStatusIncluded
|
||||
)
|
||||
|
||||
var (
|
||||
// reservationsGaugeName is the prefix of a per-subpool address reservation
|
||||
// metric.
|
||||
//
|
||||
// This is mostly a sanity metric to ensure there's no bug that would make
|
||||
// some subpool hog all the reservations due to mis-accounting.
|
||||
reservationsGaugeName = "txpool/reservations"
|
||||
)
|
||||
|
||||
// BlockChain defines the minimal set of methods needed to back a tx pool with
|
||||
// a chain. Exists to allow mocking the live chain out of tests.
|
||||
type BlockChain interface {
|
||||
|
|
@ -81,9 +71,6 @@ type TxPool struct {
|
|||
stateLock sync.RWMutex // The lock for protecting state instance
|
||||
state *state.StateDB // Current state at the blockchain head
|
||||
|
||||
reservations map[common.Address]SubPool // Map with the account to pool reservations
|
||||
reserveLock sync.Mutex // Lock protecting the account reservations
|
||||
|
||||
subs event.SubscriptionScope // Subscription scope to unsubscribe all on shutdown
|
||||
quit chan chan error // Quit channel to tear down the head updater
|
||||
term chan struct{} // Termination channel to detect a closed pool
|
||||
|
|
@ -110,17 +97,17 @@ func New(gasTip uint64, chain BlockChain, subpools []SubPool) (*TxPool, error) {
|
|||
return nil, err
|
||||
}
|
||||
pool := &TxPool{
|
||||
subpools: subpools,
|
||||
chain: chain,
|
||||
signer: types.LatestSigner(chain.Config()),
|
||||
state: statedb,
|
||||
reservations: make(map[common.Address]SubPool),
|
||||
quit: make(chan chan error),
|
||||
term: make(chan struct{}),
|
||||
sync: make(chan chan error),
|
||||
subpools: subpools,
|
||||
chain: chain,
|
||||
signer: types.LatestSigner(chain.Config()),
|
||||
state: statedb,
|
||||
quit: make(chan chan error),
|
||||
term: make(chan struct{}),
|
||||
sync: make(chan chan error),
|
||||
}
|
||||
reserver := NewReservationTracker()
|
||||
for i, subpool := range subpools {
|
||||
if err := subpool.Init(gasTip, head, pool.reserver(i, subpool)); err != nil {
|
||||
if err := subpool.Init(gasTip, head, reserver.NewHandle(i)); err != nil {
|
||||
for j := i - 1; j >= 0; j-- {
|
||||
subpools[j].Close()
|
||||
}
|
||||
|
|
@ -131,52 +118,6 @@ func New(gasTip uint64, chain BlockChain, subpools []SubPool) (*TxPool, error) {
|
|||
return pool, nil
|
||||
}
|
||||
|
||||
// reserver is a method to create an address reservation callback to exclusively
|
||||
// assign/deassign addresses to/from subpools. This can ensure that at any point
|
||||
// in time, only a single subpool is able to manage an account, avoiding cross
|
||||
// subpool eviction issues and nonce conflicts.
|
||||
func (p *TxPool) reserver(id int, subpool SubPool) AddressReserver {
|
||||
return func(addr common.Address, reserve bool) error {
|
||||
p.reserveLock.Lock()
|
||||
defer p.reserveLock.Unlock()
|
||||
|
||||
owner, exists := p.reservations[addr]
|
||||
if reserve {
|
||||
// Double reservations are forbidden even from the same pool to
|
||||
// avoid subtle bugs in the long term.
|
||||
if exists {
|
||||
if owner == subpool {
|
||||
log.Error("pool attempted to reserve already-owned address", "address", addr)
|
||||
return nil // Ignore fault to give the pool a chance to recover while the bug gets fixed
|
||||
}
|
||||
return ErrAlreadyReserved
|
||||
}
|
||||
p.reservations[addr] = subpool
|
||||
if metrics.Enabled() {
|
||||
m := fmt.Sprintf("%s/%d", reservationsGaugeName, id)
|
||||
metrics.GetOrRegisterGauge(m, nil).Inc(1)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
// Ensure subpools only attempt to unreserve their own owned addresses,
|
||||
// otherwise flag as a programming error.
|
||||
if !exists {
|
||||
log.Error("pool attempted to unreserve non-reserved address", "address", addr)
|
||||
return errors.New("address not reserved")
|
||||
}
|
||||
if subpool != owner {
|
||||
log.Error("pool attempted to unreserve non-owned address", "address", addr)
|
||||
return errors.New("address not owned")
|
||||
}
|
||||
delete(p.reservations, addr)
|
||||
if metrics.Enabled() {
|
||||
m := fmt.Sprintf("%s/%d", reservationsGaugeName, id)
|
||||
metrics.GetOrRegisterGauge(m, nil).Dec(1)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
}
|
||||
|
||||
// Close terminates the transaction pool and all its subpools.
|
||||
func (p *TxPool) Close() error {
|
||||
var errs []error
|
||||
|
|
|
|||
|
|
@ -260,16 +260,16 @@ func New(stack *node.Node, config *ethconfig.Config) (*Ethereum, error) {
|
|||
eth.filterMaps = filtermaps.NewFilterMaps(chainDb, chainView, historyCutoff, finalBlock, filtermaps.DefaultParams, fmConfig)
|
||||
eth.closeFilterMaps = make(chan chan struct{})
|
||||
|
||||
if config.BlobPool.Datadir != "" {
|
||||
config.BlobPool.Datadir = stack.ResolvePath(config.BlobPool.Datadir)
|
||||
}
|
||||
blobPool := blobpool.New(config.BlobPool, eth.blockchain)
|
||||
|
||||
if config.TxPool.Journal != "" {
|
||||
config.TxPool.Journal = stack.ResolvePath(config.TxPool.Journal)
|
||||
}
|
||||
legacyPool := legacypool.New(config.TxPool, eth.blockchain)
|
||||
|
||||
if config.BlobPool.Datadir != "" {
|
||||
config.BlobPool.Datadir = stack.ResolvePath(config.BlobPool.Datadir)
|
||||
}
|
||||
blobPool := blobpool.New(config.BlobPool, eth.blockchain, legacyPool.HasPendingAuth)
|
||||
|
||||
eth.txPool, err = txpool.New(config.TxPool.PriceLimit, eth.blockchain, []txpool.SubPool{legacyPool, blobPool})
|
||||
if err != nil {
|
||||
return nil, err
|
||||
|
|
|
|||
|
|
@ -135,7 +135,7 @@ func newTestBackendWithGenerator(blocks int, shanghai bool, cancun bool, generat
|
|||
storage, _ := os.MkdirTemp("", "blobpool-")
|
||||
defer os.RemoveAll(storage)
|
||||
|
||||
blobPool := blobpool.New(blobpool.Config{Datadir: storage}, chain)
|
||||
blobPool := blobpool.New(blobpool.Config{Datadir: storage}, chain, nil)
|
||||
legacyPool := legacypool.New(txconfig, chain)
|
||||
txpool, _ := txpool.New(txconfig.PriceLimit, chain, []txpool.SubPool{legacyPool, blobPool})
|
||||
|
||||
|
|
|
|||
Loading…
Reference in a new issue