diff --git a/core/txpool/blobpool/blobpool.go b/core/txpool/blobpool/blobpool.go index f2e0d5f9d2..2a4fb6bb01 100644 --- a/core/txpool/blobpool/blobpool.go +++ b/core/txpool/blobpool/blobpool.go @@ -147,28 +147,38 @@ type blobTxMeta struct { evictionBlobFeeJumps float64 // Worse blob fee (converted to fee jumps) across all previous nonces } +// newStoredBlobTxMeta retrieves the indexed metadata fields from a blob transaction, +// adds storage specific fields (ID and size), and assembles a helper struct to track in memory. +// Requires the transaction to have a sidecar (or that we introduce a special version tag for no-sidecar). +func newStoredBlobTxMeta(tx *types.Transaction, id uint64, storageSize uint32) *blobTxMeta { + meta := newBlobTxMeta(tx) + meta.id = id + meta.storageSize = storageSize + return meta +} + // newBlobTxMeta retrieves the indexed metadata fields from a blob transaction // and assembles a helper struct to track in memory. -// Requires the transaction to have a sidecar (or that we introduce a special version tag for no-sidecar). -func newBlobTxMeta(id uint64, size uint64, storageSize uint32, tx *types.Transaction) *blobTxMeta { +// newBlobTxMeta leaves storage specific fields empty. Use newStoredBlobTxMeta +// to also populate those. +// Requires the transaction to have a sidecar. +func newBlobTxMeta(tx *types.Transaction) *blobTxMeta { if tx.BlobTxSidecar() == nil { // This should never happen, as the pool only admits blob transactions with a sidecar panic("missing blob tx sidecar") } meta := &blobTxMeta{ - hash: tx.Hash(), - vhashes: tx.BlobHashes(), - version: tx.BlobTxSidecar().Version, - id: id, - storageSize: storageSize, - size: size, - nonce: tx.Nonce(), - costCap: uint256.MustFromBig(tx.Cost()), - execTipCap: uint256.MustFromBig(tx.GasTipCap()), - execFeeCap: uint256.MustFromBig(tx.GasFeeCap()), - blobFeeCap: uint256.MustFromBig(tx.BlobGasFeeCap()), - execGas: tx.Gas(), - blobGas: tx.BlobGas(), + hash: tx.Hash(), + vhashes: tx.BlobHashes(), + version: tx.BlobTxSidecar().Version, + size: tx.Size(), + nonce: tx.Nonce(), + costCap: uint256.MustFromBig(tx.Cost()), + execTipCap: uint256.MustFromBig(tx.GasTipCap()), + execFeeCap: uint256.MustFromBig(tx.GasFeeCap()), + blobFeeCap: uint256.MustFromBig(tx.BlobGasFeeCap()), + execGas: tx.Gas(), + blobGas: tx.BlobGas(), } meta.basefeeJumps = dynamicFeeJumps(meta.execFeeCap) meta.blobfeeJumps = dynamicBlobFeeJumps(meta.blobFeeCap) @@ -363,9 +373,10 @@ type BlobPool struct { reserver txpool.Reserver // Address reserver to ensure exclusivity across subpools hasPendingAuth func(common.Address) bool // Determine whether the specified address has a pending 7702-auth - store billy.Database // Persistent data store for the tx metadata and blobs - stored uint64 // Useful data size of all transactions on disk - limbo *limbo // Persistent data store for the non-finalized blobs + store billy.Database // Persistent data store for the tx metadata and blobs + slotter slotSizer // O(1) slot size calculator matching the active billy shelves + stored uint64 // Useful data size of all transactions on disk + limbo *limbo // Persistent data store for the non-finalized blobs gapped map[common.Address][]*types.Transaction // Transactions that are currently gapped (nonce too high) gappedSource map[common.Hash]common.Address // Source of gapped transactions to allow rechecking on inclusion @@ -459,6 +470,14 @@ func (p *BlobPool) Init(gasTip uint64, head *types.Header, reserver txpool.Reser if err != nil { return err } + // Build an O(1) slot size calculator from the active slotter configuration. + // We need a fresh slotter instance since tryMigrate may have consumed the + // previous one, and billy.Open below will consume this one. + if p.chain.Config().OsakaTime != nil { + p.slotter = newSlotSizer(newSlotterEIP7594(params.BlobTxMaxBlobs)) + } else { + p.slotter = newSlotSizer(newSlotter(params.BlobTxMaxBlobs)) + } // Index all transactions on disk and delete anything unprocessable var fails []uint64 index := func(id uint64, size uint32, blob []byte) { @@ -573,7 +592,7 @@ func (p *BlobPool) parseTransaction(id uint64, size uint32, blob []byte) error { return errors.New("missing blob sidecar") } - meta := newBlobTxMeta(id, tx.Size(), size, tx) + meta := newStoredBlobTxMeta(tx, id, size) if p.lookup.exists(meta.hash) { // This path is only possible after a crash, where deleted items are not // removed via the normal shutdown-startup procedure and thus may get @@ -1144,7 +1163,7 @@ func (p *BlobPool) reinject(addr common.Address, txhash common.Hash) error { } // Update the indices and metrics - meta := newBlobTxMeta(id, tx.Size(), p.store.Size(id), tx) + meta := newStoredBlobTxMeta(tx, id, p.store.Size(id)) if _, ok := p.index[addr]; !ok { if err := p.reserver.Hold(addr); err != nil { log.Warn("Failed to reserve account for blob pool", "tx", tx.Hash(), "from", addr, "err", err) @@ -1321,6 +1340,7 @@ func (p *BlobPool) validateTx(tx *types.Transaction) error { if err := p.checkDelegationLimit(tx); err != nil { return err } + // If the transaction replaces an existing one, ensure that price bumps are // adhered to. var ( @@ -1622,7 +1642,9 @@ func (p *BlobPool) addLocked(tx *types.Transaction, checkGapped bool) (err error } return err } + // If the address is not yet known, request exclusivity to track the account + // This is a cheap check, so we do it before all the other checks. // only by this subpool until all transactions are evicted from, _ := types.Sender(p.signer, tx) // already validated above if _, ok := p.index[from]; !ok { @@ -1642,25 +1664,77 @@ func (p *BlobPool) addLocked(tx *types.Transaction, checkGapped bool) (err error } }() } - // Transaction permitted into the pool from a nonce and cost perspective, - // insert it into the database and update the indices + + // Create meta, in preparation of adding to the pool. + // Having the meta simplifies the check below for underpriced transactions. + // Note: the meta will be finalized with storage information after the transaction is stored. + meta := newBlobTxMeta(tx) + + // Calculate the eviction parameters for the transaction + var ( + next = p.state.GetNonce(from) + offset = int(meta.nonce - next) + ) + meta.evictionExecTip = meta.execTipCap + meta.evictionExecFeeJumps = meta.basefeeJumps + meta.evictionBlobFeeJumps = meta.blobfeeJumps + if meta.nonce > next { // transaction can't be gapped, we filter for that in validateTx + prev := p.index[from][int(meta.nonce-next-1)] + if meta.evictionExecTip.Cmp(prev.evictionExecTip) > 0 { + meta.evictionExecTip = prev.evictionExecTip + } + if meta.evictionExecFeeJumps > prev.evictionExecFeeJumps { + meta.evictionExecFeeJumps = prev.evictionExecFeeJumps + } + if meta.evictionBlobFeeJumps > prev.evictionBlobFeeJumps { + meta.evictionBlobFeeJumps = prev.evictionBlobFeeJumps + } + } + + // Check pool size limits before inserting the transaction. For this calculation + // we have to RLP encode the transaction to get the size. + // Note: equal priority as the current worst of the pool is still considered + // underpriced. This is to prevent constant replacement when the pool is full. blob, err := rlp.EncodeToBytes(tx) if err != nil { + // This should never happen, but better safe than sorry. log.Error("Failed to encode transaction for storage", "hash", tx.Hash(), "err", err) return err } + newStorageSize, err := p.slotter.getSlotSize(uint32(len(blob))) + if err != nil { + // This should never happen, but better safe than sorry. + log.Warn("Dropping blob transaction due to size", "tx", tx.Hash(), "size", meta.size, "err", err) + return err + } + // Is this a possible replacement? If so, we need to consider the storage size + // difference instead of the full size of the new transaction. + storageSizeDiff := int64(newStorageSize) + if offset < len(p.index[from]) { + storageSizeDiff -= int64(p.index[from][offset].storageSize) + } + if storageSizeDiff > 0 && p.stored+uint64(storageSizeDiff) > p.config.Datacap { + if p.evict.Underpriced(meta) { + log.Trace("Dropping underpriced blob transaction", "tx", tx.Hash(), "feecap", tx.GasFeeCap(), "tipcap", tx.GasTipCap(), "blobfeecap", tx.BlobGasFeeCap()) + return txpool.ErrUnderpriced + } + } + + // Transaction permitted into the pool from a nonce and cost perspective, + // insert it into the database and update the indices id, err := p.store.Put(blob) if err != nil { return err } - meta := newBlobTxMeta(id, tx.Size(), p.store.Size(id), tx) + // Finalize the meta with storage information + meta.id = id + meta.storageSize = p.store.Size(id) var ( - next = p.state.GetNonce(from) - offset = int(tx.Nonce() - next) - newacc = false + newacc = false + oldEvictionExecFeeJumps float64 + oldEvictionBlobFeeJumps float64 ) - var oldEvictionExecFeeJumps, oldEvictionBlobFeeJumps float64 if txs, ok := p.index[from]; ok { oldEvictionExecFeeJumps = txs[len(txs)-1].evictionExecFeeJumps oldEvictionBlobFeeJumps = txs[len(txs)-1].evictionBlobFeeJumps @@ -1693,20 +1767,13 @@ func (p *BlobPool) addLocked(tx *types.Transaction, checkGapped bool) (err error p.lookup.track(meta) p.stored += uint64(meta.storageSize) } - // Recompute the rolling eviction fields. In case of a replacement, this will - // recompute all subsequent fields. In case of an append, this will only do - // the fresh calculation. + // Recompute the rolling eviction fields for subsequent transactions + // (we've already calculated for the new/updated transaction above). + // In case of a replacement, this will recompute all subsequent fields. + // In case of an append, this will only do the fresh calculation. txs := p.index[from] - for i := offset; i < len(txs); i++ { - // The first transaction will always use itself - if i == 0 { - txs[0].evictionExecTip = txs[0].execTipCap - txs[0].evictionExecFeeJumps = txs[0].basefeeJumps - txs[0].evictionBlobFeeJumps = txs[0].blobfeeJumps - - continue - } + for i := offset + 1; i < len(txs); i++ { // Subsequent transactions will use a rolling calculation txs[i].evictionExecTip = txs[i-1].evictionExecTip if txs[i].evictionExecTip.Cmp(txs[i].execTipCap) > 0 { diff --git a/core/txpool/blobpool/blobpool_test.go b/core/txpool/blobpool/blobpool_test.go index 7c57755401..f4a329aad3 100644 --- a/core/txpool/blobpool/blobpool_test.go +++ b/core/txpool/blobpool/blobpool_test.go @@ -1364,9 +1364,10 @@ func TestAdd(t *testing.T) { } tests := []struct { - seeds map[string]seed - adds []addtx - block []addtx + seeds map[string]seed + adds []addtx + block []addtx + datacap uint64 }{ // Transactions from new accounts should be accepted if their initial // nonce matches the expected one from the statedb. Higher or lower must @@ -1720,6 +1721,66 @@ func TestAdd(t *testing.T) { }, }, }, + // Transactions above the Datacap should be rejected + { + seeds: map[string]seed{ + "alice": {balance: 10000000}, + }, + datacap: 1 * (txAvgSize + blobSize + uint64(txBlobOverhead)), // only allow 1 blob + adds: []addtx{ + { // Fits in capacity + from: "alice", + tx: makeUnsignedTx(0, 1, 1, 1), + }, + { // Beyond capacity + from: "alice", + tx: makeUnsignedTx(1, 1, 1, 1), + err: txpool.ErrUnderpriced, + }, + }, + }, + // Transactions above the Datacap should be rejected, use eviction values + { + seeds: map[string]seed{ + "alice": { + balance: 2000000, + //nonce: 1, + txs: []*types.BlobTx{ + makeUnsignedTxWithTestBlob(0, 2, 2, 2, 0), + makeUnsignedTxWithTestBlob(1, 2, 2, 2, 1), + }, + }, + "bob": { + balance: 1000000, + //nonce: 1, + txs: []*types.BlobTx{ + makeUnsignedTxWithTestBlob(0, 3, 3, 3, 2), + }, + }, + }, + datacap: 3 * (txAvgSize + blobSize + uint64(txBlobOverhead)), // only allow 2 blobs + adds: []addtx{ + { // Beyond capacity, but kicking out one from Alice should make it fit + from: "bob", + tx: makeUnsignedTxWithTestBlob(1, 3, 3, 3, 3), + }, + { // We've just kicked our nonce 1, so this is nonce too high + from: "alice", + tx: makeUnsignedTxWithTestBlob(2, 1, 2, 2, 4), + err: core.ErrNonceTooHigh, + }, + { // This should not succeed, fees are low + from: "alice", + tx: makeUnsignedTxWithTestBlob(1, 1, 1, 1, 4), + err: txpool.ErrUnderpriced, + }, + { // This should also not succeed, because of rolling fee calculation + from: "alice", + tx: makeUnsignedTxWithTestBlob(1, 1, 1, 1, 4), + err: txpool.ErrUnderpriced, + }, + }, + }, } for i, tt := range tests { // Create a temporary folder for the persistent backend @@ -1760,12 +1821,17 @@ func TestAdd(t *testing.T) { blobfee: uint256.NewInt(1), statedb: statedb, } - pool := New(Config{Datadir: storage}, chain, nil) + pool := New(Config{Datadir: storage, Datacap: tt.datacap}, chain, nil) if err := pool.Init(1, chain.CurrentBlock(), newReserver()); err != nil { t.Fatalf("test %d: failed to create blob pool: %v", i, err) } verifyPoolInternals(t, pool) + // subscibe to pool events to verify they are emitted correctly + txsCh := make(chan core.NewTxsEvent, 1) + txsSub := pool.SubscribeTransactions(txsCh, false) + defer txsSub.Unsubscribe() + // Add each transaction one by one, verifying the pool internals in between for j, add := range tt.adds { signed, _ := types.SignNewTx(keys[add.from], types.LatestSigner(params.MainnetChainConfig), add.tx) @@ -1796,6 +1862,33 @@ func TestAdd(t *testing.T) { } // Verify the pool internals after each addition verifyPoolInternals(t, pool) + // verify that if the tx was added, an event was emitted + txStatus := pool.Status(signed.Hash()) + select { + case ev := <-txsCh: + switch { + case add.err == nil && txStatus == txpool.TxStatusPending: + if len(ev.Txs) != 1 { + t.Errorf("test %d, tx %d: event txs length mismatch: have %d, want 1", i, j, len(ev.Txs)) + } + if ev.Txs[0].Hash() != signed.Hash() { + t.Errorf("test %d, tx %d: event tx mismatch: have %v, want %v", i, j, ev.Txs[0].Hash(), signed.Hash()) + } + case add.err == nil && txStatus == txpool.TxStatusQueued: + t.Errorf("test %d, tx %d: unexpected new tx event for queued tx", i, j) + case add.err != nil: + t.Errorf("test %d, tx %d: unexpected new tx event for failed tx", i, j) + default: + t.Errorf("test %d, tx %d: unexpected test result", i, j) + } + default: + switch { + case add.err == nil && txStatus == txpool.TxStatusPending: + t.Errorf("test %d, tx %d: expected new tx event, none received", i, j) + default: + // expected no event + } + } } verifyPoolInternals(t, pool) diff --git a/core/txpool/blobpool/evictheap.go b/core/txpool/blobpool/evictheap.go index a46b8e9a6f..47eceb8aa1 100644 --- a/core/txpool/blobpool/evictheap.go +++ b/core/txpool/blobpool/evictheap.go @@ -94,6 +94,12 @@ func (h *evictHeap) Less(i, j int) bool { lastI := txsI[len(txsI)-1] lastJ := txsJ[len(txsJ)-1] + return h.txPrioLt(lastI, lastJ) +} + +// LessTx compares two blobTxMeta entries and returns whether the first has a lower +// eviction priority than the second. +func (h *evictHeap) txPrioLt(lastI, lastJ *blobTxMeta) bool { prioI := evictionPriority(h.basefeeJumps, lastI.evictionExecFeeJumps, h.blobfeeJumps, lastI.evictionBlobFeeJumps) prioJ := evictionPriority(h.basefeeJumps, lastJ.evictionExecFeeJumps, h.blobfeeJumps, lastJ.evictionBlobFeeJumps) if prioI == prioJ { @@ -117,6 +123,20 @@ func (h *evictHeap) Push(x any) { h.addrs = append(h.addrs, x.(common.Address)) } +// Underpriced checks whether the given transaction is underpriced compared to the +// cheapest transaction in the heap. +// If a transaction has the same priority as the cheapest, it is still considered +// underpriced. +func (h *evictHeap) Underpriced(meta *blobTxMeta) bool { + if len(h.addrs) == 0 { + return false + } + cheapestTxs := h.metas[h.addrs[0]] + cheapestTx := cheapestTxs[len(cheapestTxs)-1] + + return !h.txPrioLt(cheapestTx, meta) +} + // Pop implements heap.Interface, removing and returning the last element of the // heap. // diff --git a/core/txpool/blobpool/slotter.go b/core/txpool/blobpool/slotter.go index 3399361e55..3760b7c293 100644 --- a/core/txpool/blobpool/slotter.go +++ b/core/txpool/blobpool/slotter.go @@ -17,10 +17,55 @@ package blobpool import ( + "errors" + "github.com/ethereum/go-ethereum/params" "github.com/holiman/billy" ) +// slotSizer computes the storage shelf size for a given transaction size using +// O(1) arithmetic. Shelf sizes form an arithmetic sequence: +// +// base, base+step, base+2*step, ... +// +// This mirrors the progression in newSlotter and newSlotterEIP7594, but avoids +// creating and iterating a stateful closure on every lookup. +type slotSizer struct { + base uint32 // Size of the first shelf (txAvgSize) + step uint32 // Size increment per subsequent shelf + max uint32 // Largest valid shelf size +} + +// newSlotSizer creates a slotSizer by consuming a slotter closure once to +// discover its base size, step size, and maximum shelf size. +func newSlotSizer(slotter billy.SlotSizeFn) slotSizer { + first, done := slotter() + if done { + return slotSizer{base: first, step: 0, max: first} + } + second, done := slotter() + step := second - first + last := second + for !done { + last, done = slotter() + } + return slotSizer{base: first, step: step, max: last} +} + +// getSlotSize returns the shelf size that can store a transaction of the given +// byte size, or an error if it exceeds the largest shelf. +func (s slotSizer) getSlotSize(size uint32) (uint32, error) { + if size <= s.base { + return s.base, nil + } + // Round up to the nearest shelf: base + ⌈(size-base)/step⌉ * step + slot := s.base + ((size-s.base+s.step-1)/s.step)*s.step + if slot > s.max { + return 0, errors.New("size exceeds maximum slot size") + } + return slot, nil +} + // tryMigrate checks if the billy needs to be migrated and migrates if needed. // Returns a slotter that can be used for the database. func tryMigrate(config *params.ChainConfig, slotter billy.SlotSizeFn, datadir string) (billy.SlotSizeFn, error) {