upgrade the blobpool

This commit is contained in:
q 2026-04-20 22:03:41 +08:00
parent 2ff529aebd
commit 6ad047e6b4
3 changed files with 244 additions and 43 deletions

View file

@ -688,8 +688,9 @@ func (p *BlobPool) recheck(addr common.Address, inclusions map[common.Hash]uint6
)
if gapped || filled {
var (
ids []uint64
nonces []uint64
ids []uint64
deleteID []uint64
nonces []uint64
)
for i := 0; i < len(txs); i++ {
ids = append(ids, txs[i].id)
@ -699,8 +700,8 @@ func (p *BlobPool) recheck(addr common.Address, inclusions map[common.Hash]uint6
p.lookup.untrack(txs[i])
// Included transactions blobs need to be moved to the limbo
if filled && inclusions != nil {
p.offload(addr, txs[i], inclusions)
if !(filled && inclusions != nil && p.offload(addr, txs[i], inclusions)) {
deleteID = append(deleteID, txs[i].id)
}
}
delete(p.index, addr)
@ -717,7 +718,7 @@ func (p *BlobPool) recheck(addr common.Address, inclusions map[common.Hash]uint6
log.Trace("Dropping filled blob transactions", "from", addr, "filled", nonces, "ids", ids)
dropFilledMeter.Mark(int64(len(ids)))
}
for _, id := range ids {
for _, id := range deleteID {
if err := p.store.Delete(id); err != nil {
log.Error("Failed to delete blob transaction", "from", addr, "id", id, "err", err)
}
@ -728,8 +729,9 @@ func (p *BlobPool) recheck(addr common.Address, inclusions map[common.Hash]uint6
// anything below the current state
if txs[0].nonce < next {
var (
ids []uint64
nonces []uint64
ids []uint64
deleteID []uint64
nonces []uint64
)
for len(txs) > 0 && txs[0].nonce < next {
ids = append(ids, txs[0].id)
@ -740,15 +742,15 @@ func (p *BlobPool) recheck(addr common.Address, inclusions map[common.Hash]uint6
p.lookup.untrack(txs[0])
// Included transactions blobs need to be moved to the limbo
if inclusions != nil {
p.offload(addr, txs[0], inclusions)
if !(inclusions != nil && p.offload(addr, txs[0], inclusions)) {
deleteID = append(deleteID, txs[0].id)
}
txs = txs[1:]
}
log.Trace("Dropping overlapped blob transactions", "from", addr, "overlapped", nonces, "ids", ids, "left", len(txs))
dropOverlappedMeter.Mark(int64(len(ids)))
for _, id := range ids {
for _, id := range deleteID {
if err := p.store.Delete(id); err != nil {
log.Error("Failed to delete blob transaction", "from", addr, "id", id, "err", err)
}
@ -920,16 +922,29 @@ func (p *BlobPool) recheck(addr common.Address, inclusions map[common.Hash]uint6
// any of it since there's no clear error case. Some errors may be due to coding
// issues, others caused by signers mining MEV stuff or swapping transactions. In
// all cases, the pool needs to continue operating.
func (p *BlobPool) offload(addr common.Address, meta *blobTxMeta, inclusions map[common.Hash]uint64) {
func (p *BlobPool) offload(addr common.Address, meta *blobTxMeta, inclusions map[common.Hash]uint64) bool {
block, ok := inclusions[meta.hash]
if !ok {
log.Warn("Blob transaction swapped out by signer", "from", addr, "nonce", meta.nonce, "id", meta.id)
return
return false
}
if err := p.limbo.push(meta, block); err != nil {
raw, err := p.store.Get(meta.id)
if err != nil {
log.Error("Blobs missing for included transaction", "from", addr, "nonce", meta.nonce, "id", meta.id, "err", err)
return false
}
if err := p.limbo.push(raw, meta, block); err != nil {
log.Warn("Failed to offload blob tx into limbo", "err", err)
return
return false
}
if err := p.store.Delete(meta.id); err != nil {
log.Error("Failed to delete blob transaction", "from", addr, "id", meta.id, "err", err)
if rollbackErr := p.limbo.drop(meta.hash); rollbackErr != nil {
log.Error("Failed to rollback limboed blob", "from", addr, "nonce", meta.nonce, "id", meta.id, "err", rollbackErr)
}
return false
}
return true
}
// Reset implements txpool.SubPool, allowing the blob pool's internal state to be
@ -976,12 +991,7 @@ func (p *BlobPool) Reset(oldHead, newHead *types.Header) {
}
// Flush out any blobs from limbo that are older than the latest finality
if p.chain.Config().IsCancun(newHead.Number, newHead.Time) {
// Delete all limboed transactions up to the finalized block.
p.limbo.finalize(p.chain.CurrentFinalBlock(), func(id uint64, txHash common.Hash) {
if err := p.store.Delete(id); err != nil {
log.Error("Failed to delete blob transaction", "hash", txHash, "id", id, "err", err)
}
})
p.limbo.finalize(p.chain.CurrentFinalBlock())
}
// Reset the price heap for the new set of basefee/blobfee pairs
var (
@ -1166,11 +1176,71 @@ func (p *BlobPool) reorg(oldHead, newHead *types.Header) (map[common.Address][]*
func (p *BlobPool) reinject(addr common.Address, txhash common.Hash) error {
// Retrieve the associated blob from the limbo. Without the blobs, we cannot
// add the transaction back into the pool as it is not mineable.
meta, err := p.limbo.pull(txhash)
item, err := p.limbo.pull(txhash)
if err != nil {
log.Error("Blobs unavailable, dropping reorged tx", "err", err)
return err
}
var (
meta = item.TxMeta
raw = item.Raw
tx = item.Tx
)
switch {
case len(raw) > 0:
case tx != nil:
raw, err = rlp.EncodeToBytes(tx)
if err != nil {
log.Error("Failed to encode transaction for reinjection", "hash", tx.Hash(), "err", err)
return err
}
case meta != nil:
log.Error("Blobs unavailable for metadata-only limbo entry", "hash", meta.hash)
return errors.New("missing blob payload")
default:
log.Error("invalid limbo entry")
}
head := p.head.Load()
isOsaka := p.chain.Config().IsOsaka(head.Number, head.Time)
if tx == nil && (isOsaka || meta == nil) {
tx = new(types.Transaction)
if err := rlp.DecodeBytes(raw, tx); err != nil {
log.Error("Failed to decode transaction for reinjection", "hash", txhash, "err", err)
return err
}
}
// Converts reorged-out legacy blob transactions to the new format to prevent
// them from becoming stuct in the pool until eviction.
//
// Performance note: Conversion takes ~140ms (Mac M1 Pro). Since a maximum of
// 9 legacy blob transactions are allowed in a block pre-Osaka, an adversary
// could theoretically halt a Geth node for ~1.2s by reorging per block. However,
// this attack if financially inefficient to execute.
if isOsaka && tx != nil && tx.BlobTxSidecar().Version == types.BlobSidecarVersion0 {
if err := tx.BlobTxSidecar().ToV1(); err != nil {
log.Error("Failed to convert the legacy sidecar", "err", err)
return err
}
raw, err = rlp.EncodeToBytes(tx)
if err != nil {
log.Error("Failed to encode transaction for reinjection", "hash", txhash, "err", err)
return err
}
log.Info("Reinjecting legacy sidecar", "hash", txhash, "raw", raw)
meta = nil // Force metadata regeneration after sidecar upgrade.
}
id, err := p.store.Put(raw)
if err != nil {
log.Error("Failed to store transaction for reinjection", "hash", txhash, "err", err)
return err
}
if meta == nil {
meta = newBlobTxMeta(id, tx.Size(), p.store.Size(id), tx)
} else {
meta.id = id
meta.storageSize = p.store.Size(id)
meta.size = uint64(len(raw))
}
if _, ok := p.index[addr]; !ok {
if err := p.reserver.Hold(addr); err != nil {
log.Warn("Failed to reserve account for blob pool", "tx", meta.hash, "from", addr, "err", err)
@ -2029,10 +2099,15 @@ func (p *BlobPool) updateLimboMetrics() {
datareal += slotDataused + slotDatagaps
slotused += shelf.FilledSlots
metrics.GetOrRegisterGauge(fmt.Sprintf(limboShelfDatausedGaugeName, shelf.SlotSize/blobSize), nil).Update(int64(slotDataused))
metrics.GetOrRegisterGauge(fmt.Sprintf(limboShelfDatagapsGaugeName, shelf.SlotSize/blobSize), nil).Update(int64(slotDatagaps))
metrics.GetOrRegisterGauge(fmt.Sprintf(limboShelfSlotusedGaugeName, shelf.SlotSize/blobSize), nil).Update(int64(shelf.FilledSlots))
metrics.GetOrRegisterGauge(fmt.Sprintf(limboShelfSlotgapsGaugeName, shelf.SlotSize/blobSize), nil).Update(int64(shelf.GappedSlots))
// Skip per-shelf metrics for the 1KB compatibility shelf (used for legacy
//metadata-only entries). shelf.SlotSize/blobSize would be 0 for that
// shelf, producing a misleading gauge name.
if blobCount := shelf.SlotSize / blobSize; blobCount > 0 {
metrics.GetOrRegisterGauge(fmt.Sprintf(limboShelfDatausedGaugeName, shelf.SlotSize/blobSize), nil).Update(int64(slotDataused))
metrics.GetOrRegisterGauge(fmt.Sprintf(limboShelfDatagapsGaugeName, shelf.SlotSize/blobSize), nil).Update(int64(slotDatagaps))
metrics.GetOrRegisterGauge(fmt.Sprintf(limboShelfSlotusedGaugeName, shelf.SlotSize/blobSize), nil).Update(int64(shelf.FilledSlots))
metrics.GetOrRegisterGauge(fmt.Sprintf(limboShelfSlotgapsGaugeName, shelf.SlotSize/blobSize), nil).Update(int64(shelf.GappedSlots))
}
}
limboDatausedGauge.Update(int64(dataused))
limboDatarealGauge.Update(int64(datareal))

View file

@ -2138,3 +2138,87 @@ func benchmarkPoolPending(b *testing.B, datacap uint64) {
}
}
}
func TestBlobpoolReinjectRestoresPayloadAcrossRestart(t *testing.T) {
storage := t.TempDir()
key, _ := crypto.GenerateKey()
addr := crypto.PubkeyToAddress(key.PublicKey)
tx := makeMultiBlobTx(0, 1, 1000, 100, 1, 0, key, types.BlobSidecarVersion0)
statedb, _ := state.New(types.EmptyRootHash, state.NewDatabaseForTesting())
statedb.AddBalance(addr, uint256.NewInt(1_000_000_000), tracing.BalanceChangeUnspecified)
statedb.Commit(0, true, false)
chain := &testBlockChain{
config: params.MainnetChainConfig,
basefee: uint256.NewInt(1050),
blobfee: uint256.NewInt(105),
statedb: statedb,
}
currentHead := chain.CurrentBlock()
pool := New(Config{Datadir: storage}, chain, nil)
if err := pool.Init(1, currentHead, newReserver()); err != nil {
t.Fatalf("failed to create blob pool: %v", err)
}
if errs := pool.Add([]*types.Transaction{tx}, true); errs[0] != nil {
t.Fatalf("failed to add tx to pool: %v", errs[0])
}
wantRLP := pool.getRLP(tx.Hash())
if len(wantRLP) == 0 {
t.Fatalf("missing blob tx payload before offload")
}
includeHead := &types.Header{
Number: big.NewInt(int64(currentHead.Number.Uint64() + 1)),
Difficulty: common.Big0,
BaseFee: currentHead.BaseFee,
}
chain.blocks = map[uint64]*types.Block{
includeHead.Number.Uint64(): types.NewBlockWithHeader(includeHead).WithBody(types.Body{
Transactions: []*types.Transaction{tx},
}),
}
chain.statedb.SetNonce(addr, tx.Nonce()+1, tracing.NonceChangeUnspecified)
pool.Reset(currentHead, includeHead)
if got := pool.Get(tx.Hash()); got != nil {
t.Fatalf("got non-nil blob tx after offload")
}
if err := pool.Close(); err != nil {
t.Fatalf("failed to close pool: %v", err)
}
pool = New(Config{Datadir: storage}, chain, nil)
if err := pool.Init(1, includeHead, newReserver()); err != nil {
t.Fatalf("failed to create blob pool: %v", err)
}
if got := pool.Get(tx.Hash()); got != nil {
t.Fatalf("got non-nil blob tx after offload")
}
chain.statedb.SetNonce(addr, tx.Nonce(), tracing.NonceChangeUnspecified)
pool.Reset(includeHead, currentHead)
got := pool.Get(tx.Hash())
if got == nil {
t.Fatalf("got nil blob tx after offload")
}
if !bytes.Equal(pool.GetRLP(tx.Hash()), wantRLP) {
t.Fatalf("got blob tx after offload")
}
blobs, _, proofs, err := pool.GetBlobs(tx.BlobHashes(), types.BlobSidecarVersion0)
if err != nil {
t.Fatalf("failed to get blobs: %v", err)
}
if len(blobs) != 1 || blobs[0] == nil {
t.Fatalf("missing blob after reinjection")
}
if len(proofs) != 1 || len(proofs[0]) != 1 {
t.Fatalf("missing proof after reinjection")
}
verifyBlobRetrievals(t, pool)
pool.Close()
}

View file

@ -18,6 +18,7 @@ package blobpool
import (
"errors"
"sort"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/types"
@ -35,6 +36,7 @@ type limboBlob struct {
Block uint64 // Block in which the blob transaction was included
Tx *types.Transaction `rlp:"nil"` // Optional full blob transaction (old storage style)
TxMeta *blobTxMeta // the blob transaction metadata.
Raw []byte // Canonical raw blob transaction payload for reinjection
id uint64 // the billy id of limboBlob
}
@ -47,15 +49,12 @@ type limbo struct {
}
// newLimbo opens and indexes a set of limboed blob transactions.
func newLimbo(config *params.ChainConfig, datadir string) (*limbo, error) {
func newLimbo(_ *params.ChainConfig, datadir string) (*limbo, error) {
l := &limbo{
index: make(map[common.Hash]*limboBlob),
}
// The limbo won't store full blobs, just store the metadata, so use a fixed size 1KB bytes is big enough.
slotter := func() (size uint32, done bool) {
return 1024, true
}
slotter := newLimboSlotter()
// Index all limboed blobs on disk and delete anything unprocessable
var fails []uint64
@ -83,6 +82,42 @@ func newLimbo(config *params.ChainConfig, datadir string) (*limbo, error) {
return l, nil
}
// newLimboSlotter returns a shelf layout that can read the legacy limbo formats
// and also store full blob transaction payloads for reinjection.
func newLimboSlotter() billy.SlotSizeFn {
var (
sizes []uint32
seen = make(map[uint32]struct{})
)
addSlotter := func(slotter billy.SlotSizeFn) {
for {
size, done := slotter()
if _, ok := seen[size]; !ok {
seen[size] = struct{}{}
sizes = append(sizes, size)
}
if done {
break
}
}
}
// Preserve compatibility with the metadata-only limbo format introduced on
// this branch while also supporting the legacy and restored full payloads.
sizes = append(sizes, 1024)
seen[1024] = struct{}{}
addSlotter(newSlotter(params.BlobTxMaxBlobs))
addSlotter(newSlotterEIP7594(params.BlobTxMaxBlobs))
sort.Slice(sizes, func(i, j int) bool { return sizes[i] < sizes[j] })
var idx int
return func() (size uint32, done bool) {
size = sizes[idx]
idx++
return size, idx == len(sizes)
}
}
// Close closes down the underlying persistent store.
func (l *limbo) Close() error {
return l.store.Close()
@ -113,13 +148,16 @@ func (l *limbo) parseBlob(id uint64, data []byte) error {
}
// finalize evicts all blobs belonging to a recently finalized block or older.
func (l *limbo) finalize(final *types.Header, fn func(id uint64, txHash common.Hash)) {
func (l *limbo) finalize(final *types.Header) {
// Just in case there's no final block yet (network not yet merged, weird
// restart, sethead, etc), fail gracefully.
if final == nil {
log.Warn("Nil finalized block cannot evict old blobs")
return
}
// Note: deleting keys from a map during range is explicitly safe in Go.
// Any key deleted mid-iteration may or may not be visited; entries missed
// here will be cleaned up on the next finalize call.
for _, item := range l.index {
if item.Block > final.Number.Uint64() {
continue
@ -127,23 +165,19 @@ func (l *limbo) finalize(final *types.Header, fn func(id uint64, txHash common.H
if err := l.drop(item.TxHash); err != nil {
log.Error("Failed to drop finalized blob", "block", item.Block, "id", item.id, "err", err)
}
if fn != nil {
meta := item.TxMeta
fn(meta.id, meta.hash)
}
}
}
// push stores a new blob transaction into the limbo, waiting until finality for
// it to be automatically evicted.
func (l *limbo) push(meta *blobTxMeta, block uint64) error {
func (l *limbo) push(raw []byte, meta *blobTxMeta, block uint64) error {
// If the blobs are already tracked by the limbo, consider it a programming
// error. There's not much to do against it, but be loud.
if _, ok := l.index[meta.hash]; ok {
log.Error("Limbo cannot push already tracked blobs", "tx", meta.hash)
return errors.New("already tracked blob transaction")
}
if err := l.setAndIndex(meta, block); err != nil {
if err := l.setAndIndex(raw, nil, meta, block); err != nil {
log.Error("Failed to set and index limboed blobs", "tx", meta.hash, "err", err)
return err
}
@ -153,7 +187,7 @@ func (l *limbo) push(meta *blobTxMeta, block uint64) error {
// pull retrieves a previously pushed set of blobs back from the limbo, removing
// it at the same time. This method should be used when a previously included blob
// transaction gets reorged out.
func (l *limbo) pull(tx common.Hash) (*blobTxMeta, error) {
func (l *limbo) pull(tx common.Hash) (*limboBlob, error) {
// If the blobs are not tracked by the limbo, there's not much to do. This
// can happen for example if a blob transaction is mined without pushing it
// into the network first.
@ -165,7 +199,7 @@ func (l *limbo) pull(tx common.Hash) (*blobTxMeta, error) {
if err := l.drop(item.TxHash); err != nil {
return nil, err
}
return item.TxMeta, nil
return item, nil
}
// update changes the block number under which a blob transaction is tracked. This
@ -194,7 +228,7 @@ func (l *limbo) update(txhash common.Hash, block uint64) {
log.Error("Failed to drop old limboed metadata", "tx", txhash, "err", err)
return
}
if err := l.setAndIndex(item.TxMeta, block); err != nil {
if err := l.setAndIndex(item.Raw, item.Tx, item.TxMeta, block); err != nil {
log.Error("Failed to set and index limboed blobs", "tx", txhash, "err", err)
return
}
@ -217,13 +251,21 @@ func (l *limbo) drop(txhash common.Hash) error {
// setAndIndex assembles a limbo blob database entry and stores it, also updating
// the in-memory indices.
func (l *limbo) setAndIndex(meta *blobTxMeta, block uint64) error {
txhash := meta.hash
func (l *limbo) setAndIndex(raw []byte, tx *types.Transaction, meta *blobTxMeta, block uint64) error {
var txhash common.Hash
switch {
case meta != nil:
txhash = meta.hash
case tx != nil:
txhash = tx.Hash()
default:
return errors.New("missing limbo payload")
}
item := &limboBlob{
TxHash: txhash,
Block: block,
Raw: raw,
TxMeta: meta,
Tx: nil, // The tx already stored in the blob database, not here.
}
data, err := rlp.EncodeToBytes(item)
if err != nil {