This commit is contained in:
Csaba Kiraly 2026-05-11 21:55:46 -07:00 committed by GitHub
commit 674ed60b76
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
4 changed files with 268 additions and 43 deletions

View file

@ -147,28 +147,38 @@ type blobTxMeta struct {
evictionBlobFeeJumps float64 // Worse blob fee (converted to fee jumps) across all previous nonces evictionBlobFeeJumps float64 // Worse blob fee (converted to fee jumps) across all previous nonces
} }
// newStoredBlobTxMeta retrieves the indexed metadata fields from a blob transaction,
// adds storage specific fields (ID and size), and assembles a helper struct to track in memory.
// Requires the transaction to have a sidecar (or that we introduce a special version tag for no-sidecar).
func newStoredBlobTxMeta(tx *types.Transaction, id uint64, storageSize uint32) *blobTxMeta {
meta := newBlobTxMeta(tx)
meta.id = id
meta.storageSize = storageSize
return meta
}
// newBlobTxMeta retrieves the indexed metadata fields from a blob transaction // newBlobTxMeta retrieves the indexed metadata fields from a blob transaction
// and assembles a helper struct to track in memory. // and assembles a helper struct to track in memory.
// Requires the transaction to have a sidecar (or that we introduce a special version tag for no-sidecar). // newBlobTxMeta leaves storage specific fields empty. Use newStoredBlobTxMeta
func newBlobTxMeta(id uint64, size uint64, storageSize uint32, tx *types.Transaction) *blobTxMeta { // to also populate those.
// Requires the transaction to have a sidecar.
func newBlobTxMeta(tx *types.Transaction) *blobTxMeta {
if tx.BlobTxSidecar() == nil { if tx.BlobTxSidecar() == nil {
// This should never happen, as the pool only admits blob transactions with a sidecar // This should never happen, as the pool only admits blob transactions with a sidecar
panic("missing blob tx sidecar") panic("missing blob tx sidecar")
} }
meta := &blobTxMeta{ meta := &blobTxMeta{
hash: tx.Hash(), hash: tx.Hash(),
vhashes: tx.BlobHashes(), vhashes: tx.BlobHashes(),
version: tx.BlobTxSidecar().Version, version: tx.BlobTxSidecar().Version,
id: id, size: tx.Size(),
storageSize: storageSize, nonce: tx.Nonce(),
size: size, costCap: uint256.MustFromBig(tx.Cost()),
nonce: tx.Nonce(), execTipCap: uint256.MustFromBig(tx.GasTipCap()),
costCap: uint256.MustFromBig(tx.Cost()), execFeeCap: uint256.MustFromBig(tx.GasFeeCap()),
execTipCap: uint256.MustFromBig(tx.GasTipCap()), blobFeeCap: uint256.MustFromBig(tx.BlobGasFeeCap()),
execFeeCap: uint256.MustFromBig(tx.GasFeeCap()), execGas: tx.Gas(),
blobFeeCap: uint256.MustFromBig(tx.BlobGasFeeCap()), blobGas: tx.BlobGas(),
execGas: tx.Gas(),
blobGas: tx.BlobGas(),
} }
meta.basefeeJumps = dynamicFeeJumps(meta.execFeeCap) meta.basefeeJumps = dynamicFeeJumps(meta.execFeeCap)
meta.blobfeeJumps = dynamicBlobFeeJumps(meta.blobFeeCap) meta.blobfeeJumps = dynamicBlobFeeJumps(meta.blobFeeCap)
@ -363,9 +373,10 @@ type BlobPool struct {
reserver txpool.Reserver // Address reserver to ensure exclusivity across subpools reserver txpool.Reserver // Address reserver to ensure exclusivity across subpools
hasPendingAuth func(common.Address) bool // Determine whether the specified address has a pending 7702-auth hasPendingAuth func(common.Address) bool // Determine whether the specified address has a pending 7702-auth
store billy.Database // Persistent data store for the tx metadata and blobs store billy.Database // Persistent data store for the tx metadata and blobs
stored uint64 // Useful data size of all transactions on disk slotter slotSizer // O(1) slot size calculator matching the active billy shelves
limbo *limbo // Persistent data store for the non-finalized blobs stored uint64 // Useful data size of all transactions on disk
limbo *limbo // Persistent data store for the non-finalized blobs
gapped map[common.Address][]*types.Transaction // Transactions that are currently gapped (nonce too high) gapped map[common.Address][]*types.Transaction // Transactions that are currently gapped (nonce too high)
gappedSource map[common.Hash]common.Address // Source of gapped transactions to allow rechecking on inclusion gappedSource map[common.Hash]common.Address // Source of gapped transactions to allow rechecking on inclusion
@ -459,6 +470,14 @@ func (p *BlobPool) Init(gasTip uint64, head *types.Header, reserver txpool.Reser
if err != nil { if err != nil {
return err return err
} }
// Build an O(1) slot size calculator from the active slotter configuration.
// We need a fresh slotter instance since tryMigrate may have consumed the
// previous one, and billy.Open below will consume this one.
if p.chain.Config().OsakaTime != nil {
p.slotter = newSlotSizer(newSlotterEIP7594(params.BlobTxMaxBlobs))
} else {
p.slotter = newSlotSizer(newSlotter(params.BlobTxMaxBlobs))
}
// Index all transactions on disk and delete anything unprocessable // Index all transactions on disk and delete anything unprocessable
var fails []uint64 var fails []uint64
index := func(id uint64, size uint32, blob []byte) { index := func(id uint64, size uint32, blob []byte) {
@ -573,7 +592,7 @@ func (p *BlobPool) parseTransaction(id uint64, size uint32, blob []byte) error {
return errors.New("missing blob sidecar") return errors.New("missing blob sidecar")
} }
meta := newBlobTxMeta(id, tx.Size(), size, tx) meta := newStoredBlobTxMeta(tx, id, size)
if p.lookup.exists(meta.hash) { if p.lookup.exists(meta.hash) {
// This path is only possible after a crash, where deleted items are not // This path is only possible after a crash, where deleted items are not
// removed via the normal shutdown-startup procedure and thus may get // removed via the normal shutdown-startup procedure and thus may get
@ -1144,7 +1163,7 @@ func (p *BlobPool) reinject(addr common.Address, txhash common.Hash) error {
} }
// Update the indices and metrics // Update the indices and metrics
meta := newBlobTxMeta(id, tx.Size(), p.store.Size(id), tx) meta := newStoredBlobTxMeta(tx, id, p.store.Size(id))
if _, ok := p.index[addr]; !ok { if _, ok := p.index[addr]; !ok {
if err := p.reserver.Hold(addr); err != nil { if err := p.reserver.Hold(addr); err != nil {
log.Warn("Failed to reserve account for blob pool", "tx", tx.Hash(), "from", addr, "err", err) log.Warn("Failed to reserve account for blob pool", "tx", tx.Hash(), "from", addr, "err", err)
@ -1321,6 +1340,7 @@ func (p *BlobPool) validateTx(tx *types.Transaction) error {
if err := p.checkDelegationLimit(tx); err != nil { if err := p.checkDelegationLimit(tx); err != nil {
return err return err
} }
// If the transaction replaces an existing one, ensure that price bumps are // If the transaction replaces an existing one, ensure that price bumps are
// adhered to. // adhered to.
var ( var (
@ -1622,7 +1642,9 @@ func (p *BlobPool) addLocked(tx *types.Transaction, checkGapped bool) (err error
} }
return err return err
} }
// If the address is not yet known, request exclusivity to track the account // If the address is not yet known, request exclusivity to track the account
// This is a cheap check, so we do it before all the other checks.
// only by this subpool until all transactions are evicted // only by this subpool until all transactions are evicted
from, _ := types.Sender(p.signer, tx) // already validated above from, _ := types.Sender(p.signer, tx) // already validated above
if _, ok := p.index[from]; !ok { if _, ok := p.index[from]; !ok {
@ -1642,25 +1664,77 @@ func (p *BlobPool) addLocked(tx *types.Transaction, checkGapped bool) (err error
} }
}() }()
} }
// Transaction permitted into the pool from a nonce and cost perspective,
// insert it into the database and update the indices // Create meta, in preparation of adding to the pool.
// Having the meta simplifies the check below for underpriced transactions.
// Note: the meta will be finalized with storage information after the transaction is stored.
meta := newBlobTxMeta(tx)
// Calculate the eviction parameters for the transaction
var (
next = p.state.GetNonce(from)
offset = int(meta.nonce - next)
)
meta.evictionExecTip = meta.execTipCap
meta.evictionExecFeeJumps = meta.basefeeJumps
meta.evictionBlobFeeJumps = meta.blobfeeJumps
if meta.nonce > next { // transaction can't be gapped, we filter for that in validateTx
prev := p.index[from][int(meta.nonce-next-1)]
if meta.evictionExecTip.Cmp(prev.evictionExecTip) > 0 {
meta.evictionExecTip = prev.evictionExecTip
}
if meta.evictionExecFeeJumps > prev.evictionExecFeeJumps {
meta.evictionExecFeeJumps = prev.evictionExecFeeJumps
}
if meta.evictionBlobFeeJumps > prev.evictionBlobFeeJumps {
meta.evictionBlobFeeJumps = prev.evictionBlobFeeJumps
}
}
// Check pool size limits before inserting the transaction. For this calculation
// we have to RLP encode the transaction to get the size.
// Note: equal priority as the current worst of the pool is still considered
// underpriced. This is to prevent constant replacement when the pool is full.
blob, err := rlp.EncodeToBytes(tx) blob, err := rlp.EncodeToBytes(tx)
if err != nil { if err != nil {
// This should never happen, but better safe than sorry.
log.Error("Failed to encode transaction for storage", "hash", tx.Hash(), "err", err) log.Error("Failed to encode transaction for storage", "hash", tx.Hash(), "err", err)
return err return err
} }
newStorageSize, err := p.slotter.getSlotSize(uint32(len(blob)))
if err != nil {
// This should never happen, but better safe than sorry.
log.Warn("Dropping blob transaction due to size", "tx", tx.Hash(), "size", meta.size, "err", err)
return err
}
// Is this a possible replacement? If so, we need to consider the storage size
// difference instead of the full size of the new transaction.
storageSizeDiff := int64(newStorageSize)
if offset < len(p.index[from]) {
storageSizeDiff -= int64(p.index[from][offset].storageSize)
}
if storageSizeDiff > 0 && p.stored+uint64(storageSizeDiff) > p.config.Datacap {
if p.evict.Underpriced(meta) {
log.Trace("Dropping underpriced blob transaction", "tx", tx.Hash(), "feecap", tx.GasFeeCap(), "tipcap", tx.GasTipCap(), "blobfeecap", tx.BlobGasFeeCap())
return txpool.ErrUnderpriced
}
}
// Transaction permitted into the pool from a nonce and cost perspective,
// insert it into the database and update the indices
id, err := p.store.Put(blob) id, err := p.store.Put(blob)
if err != nil { if err != nil {
return err return err
} }
meta := newBlobTxMeta(id, tx.Size(), p.store.Size(id), tx) // Finalize the meta with storage information
meta.id = id
meta.storageSize = p.store.Size(id)
var ( var (
next = p.state.GetNonce(from) newacc = false
offset = int(tx.Nonce() - next) oldEvictionExecFeeJumps float64
newacc = false oldEvictionBlobFeeJumps float64
) )
var oldEvictionExecFeeJumps, oldEvictionBlobFeeJumps float64
if txs, ok := p.index[from]; ok { if txs, ok := p.index[from]; ok {
oldEvictionExecFeeJumps = txs[len(txs)-1].evictionExecFeeJumps oldEvictionExecFeeJumps = txs[len(txs)-1].evictionExecFeeJumps
oldEvictionBlobFeeJumps = txs[len(txs)-1].evictionBlobFeeJumps oldEvictionBlobFeeJumps = txs[len(txs)-1].evictionBlobFeeJumps
@ -1693,20 +1767,13 @@ func (p *BlobPool) addLocked(tx *types.Transaction, checkGapped bool) (err error
p.lookup.track(meta) p.lookup.track(meta)
p.stored += uint64(meta.storageSize) p.stored += uint64(meta.storageSize)
} }
// Recompute the rolling eviction fields. In case of a replacement, this will // Recompute the rolling eviction fields for subsequent transactions
// recompute all subsequent fields. In case of an append, this will only do // (we've already calculated for the new/updated transaction above).
// the fresh calculation. // In case of a replacement, this will recompute all subsequent fields.
// In case of an append, this will only do the fresh calculation.
txs := p.index[from] txs := p.index[from]
for i := offset; i < len(txs); i++ { for i := offset + 1; i < len(txs); i++ {
// The first transaction will always use itself
if i == 0 {
txs[0].evictionExecTip = txs[0].execTipCap
txs[0].evictionExecFeeJumps = txs[0].basefeeJumps
txs[0].evictionBlobFeeJumps = txs[0].blobfeeJumps
continue
}
// Subsequent transactions will use a rolling calculation // Subsequent transactions will use a rolling calculation
txs[i].evictionExecTip = txs[i-1].evictionExecTip txs[i].evictionExecTip = txs[i-1].evictionExecTip
if txs[i].evictionExecTip.Cmp(txs[i].execTipCap) > 0 { if txs[i].evictionExecTip.Cmp(txs[i].execTipCap) > 0 {

View file

@ -1364,9 +1364,10 @@ func TestAdd(t *testing.T) {
} }
tests := []struct { tests := []struct {
seeds map[string]seed seeds map[string]seed
adds []addtx adds []addtx
block []addtx block []addtx
datacap uint64
}{ }{
// Transactions from new accounts should be accepted if their initial // Transactions from new accounts should be accepted if their initial
// nonce matches the expected one from the statedb. Higher or lower must // nonce matches the expected one from the statedb. Higher or lower must
@ -1720,6 +1721,66 @@ func TestAdd(t *testing.T) {
}, },
}, },
}, },
// Transactions above the Datacap should be rejected
{
seeds: map[string]seed{
"alice": {balance: 10000000},
},
datacap: 1 * (txAvgSize + blobSize + uint64(txBlobOverhead)), // only allow 1 blob
adds: []addtx{
{ // Fits in capacity
from: "alice",
tx: makeUnsignedTx(0, 1, 1, 1),
},
{ // Beyond capacity
from: "alice",
tx: makeUnsignedTx(1, 1, 1, 1),
err: txpool.ErrUnderpriced,
},
},
},
// Transactions above the Datacap should be rejected, use eviction values
{
seeds: map[string]seed{
"alice": {
balance: 2000000,
//nonce: 1,
txs: []*types.BlobTx{
makeUnsignedTxWithTestBlob(0, 2, 2, 2, 0),
makeUnsignedTxWithTestBlob(1, 2, 2, 2, 1),
},
},
"bob": {
balance: 1000000,
//nonce: 1,
txs: []*types.BlobTx{
makeUnsignedTxWithTestBlob(0, 3, 3, 3, 2),
},
},
},
datacap: 3 * (txAvgSize + blobSize + uint64(txBlobOverhead)), // only allow 2 blobs
adds: []addtx{
{ // Beyond capacity, but kicking out one from Alice should make it fit
from: "bob",
tx: makeUnsignedTxWithTestBlob(1, 3, 3, 3, 3),
},
{ // We've just kicked our nonce 1, so this is nonce too high
from: "alice",
tx: makeUnsignedTxWithTestBlob(2, 1, 2, 2, 4),
err: core.ErrNonceTooHigh,
},
{ // This should not succeed, fees are low
from: "alice",
tx: makeUnsignedTxWithTestBlob(1, 1, 1, 1, 4),
err: txpool.ErrUnderpriced,
},
{ // This should also not succeed, because of rolling fee calculation
from: "alice",
tx: makeUnsignedTxWithTestBlob(1, 1, 1, 1, 4),
err: txpool.ErrUnderpriced,
},
},
},
} }
for i, tt := range tests { for i, tt := range tests {
// Create a temporary folder for the persistent backend // Create a temporary folder for the persistent backend
@ -1760,12 +1821,17 @@ func TestAdd(t *testing.T) {
blobfee: uint256.NewInt(1), blobfee: uint256.NewInt(1),
statedb: statedb, statedb: statedb,
} }
pool := New(Config{Datadir: storage}, chain, nil) pool := New(Config{Datadir: storage, Datacap: tt.datacap}, chain, nil)
if err := pool.Init(1, chain.CurrentBlock(), newReserver()); err != nil { if err := pool.Init(1, chain.CurrentBlock(), newReserver()); err != nil {
t.Fatalf("test %d: failed to create blob pool: %v", i, err) t.Fatalf("test %d: failed to create blob pool: %v", i, err)
} }
verifyPoolInternals(t, pool) verifyPoolInternals(t, pool)
// subscibe to pool events to verify they are emitted correctly
txsCh := make(chan core.NewTxsEvent, 1)
txsSub := pool.SubscribeTransactions(txsCh, false)
defer txsSub.Unsubscribe()
// Add each transaction one by one, verifying the pool internals in between // Add each transaction one by one, verifying the pool internals in between
for j, add := range tt.adds { for j, add := range tt.adds {
signed, _ := types.SignNewTx(keys[add.from], types.LatestSigner(params.MainnetChainConfig), add.tx) signed, _ := types.SignNewTx(keys[add.from], types.LatestSigner(params.MainnetChainConfig), add.tx)
@ -1796,6 +1862,33 @@ func TestAdd(t *testing.T) {
} }
// Verify the pool internals after each addition // Verify the pool internals after each addition
verifyPoolInternals(t, pool) verifyPoolInternals(t, pool)
// verify that if the tx was added, an event was emitted
txStatus := pool.Status(signed.Hash())
select {
case ev := <-txsCh:
switch {
case add.err == nil && txStatus == txpool.TxStatusPending:
if len(ev.Txs) != 1 {
t.Errorf("test %d, tx %d: event txs length mismatch: have %d, want 1", i, j, len(ev.Txs))
}
if ev.Txs[0].Hash() != signed.Hash() {
t.Errorf("test %d, tx %d: event tx mismatch: have %v, want %v", i, j, ev.Txs[0].Hash(), signed.Hash())
}
case add.err == nil && txStatus == txpool.TxStatusQueued:
t.Errorf("test %d, tx %d: unexpected new tx event for queued tx", i, j)
case add.err != nil:
t.Errorf("test %d, tx %d: unexpected new tx event for failed tx", i, j)
default:
t.Errorf("test %d, tx %d: unexpected test result", i, j)
}
default:
switch {
case add.err == nil && txStatus == txpool.TxStatusPending:
t.Errorf("test %d, tx %d: expected new tx event, none received", i, j)
default:
// expected no event
}
}
} }
verifyPoolInternals(t, pool) verifyPoolInternals(t, pool)

View file

@ -94,6 +94,12 @@ func (h *evictHeap) Less(i, j int) bool {
lastI := txsI[len(txsI)-1] lastI := txsI[len(txsI)-1]
lastJ := txsJ[len(txsJ)-1] lastJ := txsJ[len(txsJ)-1]
return h.txPrioLt(lastI, lastJ)
}
// LessTx compares two blobTxMeta entries and returns whether the first has a lower
// eviction priority than the second.
func (h *evictHeap) txPrioLt(lastI, lastJ *blobTxMeta) bool {
prioI := evictionPriority(h.basefeeJumps, lastI.evictionExecFeeJumps, h.blobfeeJumps, lastI.evictionBlobFeeJumps) prioI := evictionPriority(h.basefeeJumps, lastI.evictionExecFeeJumps, h.blobfeeJumps, lastI.evictionBlobFeeJumps)
prioJ := evictionPriority(h.basefeeJumps, lastJ.evictionExecFeeJumps, h.blobfeeJumps, lastJ.evictionBlobFeeJumps) prioJ := evictionPriority(h.basefeeJumps, lastJ.evictionExecFeeJumps, h.blobfeeJumps, lastJ.evictionBlobFeeJumps)
if prioI == prioJ { if prioI == prioJ {
@ -117,6 +123,20 @@ func (h *evictHeap) Push(x any) {
h.addrs = append(h.addrs, x.(common.Address)) h.addrs = append(h.addrs, x.(common.Address))
} }
// Underpriced checks whether the given transaction is underpriced compared to the
// cheapest transaction in the heap.
// If a transaction has the same priority as the cheapest, it is still considered
// underpriced.
func (h *evictHeap) Underpriced(meta *blobTxMeta) bool {
if len(h.addrs) == 0 {
return false
}
cheapestTxs := h.metas[h.addrs[0]]
cheapestTx := cheapestTxs[len(cheapestTxs)-1]
return !h.txPrioLt(cheapestTx, meta)
}
// Pop implements heap.Interface, removing and returning the last element of the // Pop implements heap.Interface, removing and returning the last element of the
// heap. // heap.
// //

View file

@ -17,10 +17,55 @@
package blobpool package blobpool
import ( import (
"errors"
"github.com/ethereum/go-ethereum/params" "github.com/ethereum/go-ethereum/params"
"github.com/holiman/billy" "github.com/holiman/billy"
) )
// slotSizer computes the storage shelf size for a given transaction size using
// O(1) arithmetic. Shelf sizes form an arithmetic sequence:
//
// base, base+step, base+2*step, ...
//
// This mirrors the progression in newSlotter and newSlotterEIP7594, but avoids
// creating and iterating a stateful closure on every lookup.
type slotSizer struct {
base uint32 // Size of the first shelf (txAvgSize)
step uint32 // Size increment per subsequent shelf
max uint32 // Largest valid shelf size
}
// newSlotSizer creates a slotSizer by consuming a slotter closure once to
// discover its base size, step size, and maximum shelf size.
func newSlotSizer(slotter billy.SlotSizeFn) slotSizer {
first, done := slotter()
if done {
return slotSizer{base: first, step: 0, max: first}
}
second, done := slotter()
step := second - first
last := second
for !done {
last, done = slotter()
}
return slotSizer{base: first, step: step, max: last}
}
// getSlotSize returns the shelf size that can store a transaction of the given
// byte size, or an error if it exceeds the largest shelf.
func (s slotSizer) getSlotSize(size uint32) (uint32, error) {
if size <= s.base {
return s.base, nil
}
// Round up to the nearest shelf: base + ⌈(size-base)/step⌉ * step
slot := s.base + ((size-s.base+s.step-1)/s.step)*s.step
if slot > s.max {
return 0, errors.New("size exceeds maximum slot size")
}
return slot, nil
}
// tryMigrate checks if the billy needs to be migrated and migrates if needed. // tryMigrate checks if the billy needs to be migrated and migrates if needed.
// Returns a slotter that can be used for the database. // Returns a slotter that can be used for the database.
func tryMigrate(config *params.ChainConfig, slotter billy.SlotSizeFn, datadir string) (billy.SlotSizeFn, error) { func tryMigrate(config *params.ChainConfig, slotter billy.SlotSizeFn, datadir string) (billy.SlotSizeFn, error) {