diff --git a/core/txpool/blobpool/blobpool.go b/core/txpool/blobpool/blobpool.go index 8e07639fed..4737c50129 100644 --- a/core/txpool/blobpool/blobpool.go +++ b/core/txpool/blobpool/blobpool.go @@ -688,8 +688,9 @@ func (p *BlobPool) recheck(addr common.Address, inclusions map[common.Hash]uint6 ) if gapped || filled { var ( - ids []uint64 - nonces []uint64 + ids []uint64 + deleteID []uint64 + nonces []uint64 ) for i := 0; i < len(txs); i++ { ids = append(ids, txs[i].id) @@ -699,8 +700,8 @@ func (p *BlobPool) recheck(addr common.Address, inclusions map[common.Hash]uint6 p.lookup.untrack(txs[i]) // Included transactions blobs need to be moved to the limbo - if filled && inclusions != nil { - p.offload(addr, txs[i], inclusions) + if !(filled && inclusions != nil && p.offload(addr, txs[i], inclusions)) { + deleteID = append(deleteID, txs[i].id) } } delete(p.index, addr) @@ -717,7 +718,7 @@ func (p *BlobPool) recheck(addr common.Address, inclusions map[common.Hash]uint6 log.Trace("Dropping filled blob transactions", "from", addr, "filled", nonces, "ids", ids) dropFilledMeter.Mark(int64(len(ids))) } - for _, id := range ids { + for _, id := range deleteID { if err := p.store.Delete(id); err != nil { log.Error("Failed to delete blob transaction", "from", addr, "id", id, "err", err) } @@ -728,8 +729,9 @@ func (p *BlobPool) recheck(addr common.Address, inclusions map[common.Hash]uint6 // anything below the current state if txs[0].nonce < next { var ( - ids []uint64 - nonces []uint64 + ids []uint64 + deleteID []uint64 + nonces []uint64 ) for len(txs) > 0 && txs[0].nonce < next { ids = append(ids, txs[0].id) @@ -740,15 +742,15 @@ func (p *BlobPool) recheck(addr common.Address, inclusions map[common.Hash]uint6 p.lookup.untrack(txs[0]) // Included transactions blobs need to be moved to the limbo - if inclusions != nil { - p.offload(addr, txs[0], inclusions) + if !(inclusions != nil && p.offload(addr, txs[0], inclusions)) { + deleteID = append(deleteID, txs[0].id) } txs = txs[1:] } log.Trace("Dropping overlapped blob transactions", "from", addr, "overlapped", nonces, "ids", ids, "left", len(txs)) dropOverlappedMeter.Mark(int64(len(ids))) - for _, id := range ids { + for _, id := range deleteID { if err := p.store.Delete(id); err != nil { log.Error("Failed to delete blob transaction", "from", addr, "id", id, "err", err) } @@ -920,16 +922,29 @@ func (p *BlobPool) recheck(addr common.Address, inclusions map[common.Hash]uint6 // any of it since there's no clear error case. Some errors may be due to coding // issues, others caused by signers mining MEV stuff or swapping transactions. In // all cases, the pool needs to continue operating. -func (p *BlobPool) offload(addr common.Address, meta *blobTxMeta, inclusions map[common.Hash]uint64) { +func (p *BlobPool) offload(addr common.Address, meta *blobTxMeta, inclusions map[common.Hash]uint64) bool { block, ok := inclusions[meta.hash] if !ok { log.Warn("Blob transaction swapped out by signer", "from", addr, "nonce", meta.nonce, "id", meta.id) - return + return false } - if err := p.limbo.push(meta, block); err != nil { + raw, err := p.store.Get(meta.id) + if err != nil { + log.Error("Blobs missing for included transaction", "from", addr, "nonce", meta.nonce, "id", meta.id, "err", err) + return false + } + if err := p.limbo.push(raw, meta, block); err != nil { log.Warn("Failed to offload blob tx into limbo", "err", err) - return + return false } + if err := p.store.Delete(meta.id); err != nil { + log.Error("Failed to delete blob transaction", "from", addr, "id", meta.id, "err", err) + if rollbackErr := p.limbo.drop(meta.hash); rollbackErr != nil { + log.Error("Failed to rollback limboed blob", "from", addr, "nonce", meta.nonce, "id", meta.id, "err", rollbackErr) + } + return false + } + return true } // Reset implements txpool.SubPool, allowing the blob pool's internal state to be @@ -976,12 +991,7 @@ func (p *BlobPool) Reset(oldHead, newHead *types.Header) { } // Flush out any blobs from limbo that are older than the latest finality if p.chain.Config().IsCancun(newHead.Number, newHead.Time) { - // Delete all limboed transactions up to the finalized block. - p.limbo.finalize(p.chain.CurrentFinalBlock(), func(id uint64, txHash common.Hash) { - if err := p.store.Delete(id); err != nil { - log.Error("Failed to delete blob transaction", "hash", txHash, "id", id, "err", err) - } - }) + p.limbo.finalize(p.chain.CurrentFinalBlock()) } // Reset the price heap for the new set of basefee/blobfee pairs var ( @@ -1166,11 +1176,71 @@ func (p *BlobPool) reorg(oldHead, newHead *types.Header) (map[common.Address][]* func (p *BlobPool) reinject(addr common.Address, txhash common.Hash) error { // Retrieve the associated blob from the limbo. Without the blobs, we cannot // add the transaction back into the pool as it is not mineable. - meta, err := p.limbo.pull(txhash) + item, err := p.limbo.pull(txhash) if err != nil { log.Error("Blobs unavailable, dropping reorged tx", "err", err) return err } + var ( + meta = item.TxMeta + raw = item.Raw + tx = item.Tx + ) + switch { + case len(raw) > 0: + case tx != nil: + raw, err = rlp.EncodeToBytes(tx) + if err != nil { + log.Error("Failed to encode transaction for reinjection", "hash", tx.Hash(), "err", err) + return err + } + case meta != nil: + log.Error("Blobs unavailable for metadata-only limbo entry", "hash", meta.hash) + return errors.New("missing blob payload") + default: + log.Error("invalid limbo entry") + } + head := p.head.Load() + isOsaka := p.chain.Config().IsOsaka(head.Number, head.Time) + if tx == nil && (isOsaka || meta == nil) { + tx = new(types.Transaction) + if err := rlp.DecodeBytes(raw, tx); err != nil { + log.Error("Failed to decode transaction for reinjection", "hash", txhash, "err", err) + return err + } + } + // Converts reorged-out legacy blob transactions to the new format to prevent + // them from becoming stuct in the pool until eviction. + // + // Performance note: Conversion takes ~140ms (Mac M1 Pro). Since a maximum of + // 9 legacy blob transactions are allowed in a block pre-Osaka, an adversary + // could theoretically halt a Geth node for ~1.2s by reorging per block. However, + // this attack if financially inefficient to execute. + if isOsaka && tx != nil && tx.BlobTxSidecar().Version == types.BlobSidecarVersion0 { + if err := tx.BlobTxSidecar().ToV1(); err != nil { + log.Error("Failed to convert the legacy sidecar", "err", err) + return err + } + raw, err = rlp.EncodeToBytes(tx) + if err != nil { + log.Error("Failed to encode transaction for reinjection", "hash", txhash, "err", err) + return err + } + log.Info("Reinjecting legacy sidecar", "hash", txhash, "raw", raw) + meta = nil // Force metadata regeneration after sidecar upgrade. + } + id, err := p.store.Put(raw) + if err != nil { + log.Error("Failed to store transaction for reinjection", "hash", txhash, "err", err) + return err + } + if meta == nil { + meta = newBlobTxMeta(id, tx.Size(), p.store.Size(id), tx) + } else { + meta.id = id + meta.storageSize = p.store.Size(id) + meta.size = uint64(len(raw)) + } if _, ok := p.index[addr]; !ok { if err := p.reserver.Hold(addr); err != nil { log.Warn("Failed to reserve account for blob pool", "tx", meta.hash, "from", addr, "err", err) @@ -2029,10 +2099,15 @@ func (p *BlobPool) updateLimboMetrics() { datareal += slotDataused + slotDatagaps slotused += shelf.FilledSlots - metrics.GetOrRegisterGauge(fmt.Sprintf(limboShelfDatausedGaugeName, shelf.SlotSize/blobSize), nil).Update(int64(slotDataused)) - metrics.GetOrRegisterGauge(fmt.Sprintf(limboShelfDatagapsGaugeName, shelf.SlotSize/blobSize), nil).Update(int64(slotDatagaps)) - metrics.GetOrRegisterGauge(fmt.Sprintf(limboShelfSlotusedGaugeName, shelf.SlotSize/blobSize), nil).Update(int64(shelf.FilledSlots)) - metrics.GetOrRegisterGauge(fmt.Sprintf(limboShelfSlotgapsGaugeName, shelf.SlotSize/blobSize), nil).Update(int64(shelf.GappedSlots)) + // Skip per-shelf metrics for the 1KB compatibility shelf (used for legacy + //metadata-only entries). shelf.SlotSize/blobSize would be 0 for that + // shelf, producing a misleading gauge name. + if blobCount := shelf.SlotSize / blobSize; blobCount > 0 { + metrics.GetOrRegisterGauge(fmt.Sprintf(limboShelfDatausedGaugeName, shelf.SlotSize/blobSize), nil).Update(int64(slotDataused)) + metrics.GetOrRegisterGauge(fmt.Sprintf(limboShelfDatagapsGaugeName, shelf.SlotSize/blobSize), nil).Update(int64(slotDatagaps)) + metrics.GetOrRegisterGauge(fmt.Sprintf(limboShelfSlotusedGaugeName, shelf.SlotSize/blobSize), nil).Update(int64(shelf.FilledSlots)) + metrics.GetOrRegisterGauge(fmt.Sprintf(limboShelfSlotgapsGaugeName, shelf.SlotSize/blobSize), nil).Update(int64(shelf.GappedSlots)) + } } limboDatausedGauge.Update(int64(dataused)) limboDatarealGauge.Update(int64(datareal)) diff --git a/core/txpool/blobpool/blobpool_test.go b/core/txpool/blobpool/blobpool_test.go index 7c57755401..45c6d9871d 100644 --- a/core/txpool/blobpool/blobpool_test.go +++ b/core/txpool/blobpool/blobpool_test.go @@ -2138,3 +2138,87 @@ func benchmarkPoolPending(b *testing.B, datacap uint64) { } } } + +func TestBlobpoolReinjectRestoresPayloadAcrossRestart(t *testing.T) { + storage := t.TempDir() + + key, _ := crypto.GenerateKey() + addr := crypto.PubkeyToAddress(key.PublicKey) + tx := makeMultiBlobTx(0, 1, 1000, 100, 1, 0, key, types.BlobSidecarVersion0) + + statedb, _ := state.New(types.EmptyRootHash, state.NewDatabaseForTesting()) + statedb.AddBalance(addr, uint256.NewInt(1_000_000_000), tracing.BalanceChangeUnspecified) + statedb.Commit(0, true, false) + + chain := &testBlockChain{ + config: params.MainnetChainConfig, + basefee: uint256.NewInt(1050), + blobfee: uint256.NewInt(105), + statedb: statedb, + } + currentHead := chain.CurrentBlock() + + pool := New(Config{Datadir: storage}, chain, nil) + if err := pool.Init(1, currentHead, newReserver()); err != nil { + t.Fatalf("failed to create blob pool: %v", err) + } + if errs := pool.Add([]*types.Transaction{tx}, true); errs[0] != nil { + t.Fatalf("failed to add tx to pool: %v", errs[0]) + } + wantRLP := pool.getRLP(tx.Hash()) + if len(wantRLP) == 0 { + t.Fatalf("missing blob tx payload before offload") + } + + includeHead := &types.Header{ + Number: big.NewInt(int64(currentHead.Number.Uint64() + 1)), + Difficulty: common.Big0, + BaseFee: currentHead.BaseFee, + } + chain.blocks = map[uint64]*types.Block{ + includeHead.Number.Uint64(): types.NewBlockWithHeader(includeHead).WithBody(types.Body{ + Transactions: []*types.Transaction{tx}, + }), + } + chain.statedb.SetNonce(addr, tx.Nonce()+1, tracing.NonceChangeUnspecified) + pool.Reset(currentHead, includeHead) + + if got := pool.Get(tx.Hash()); got != nil { + t.Fatalf("got non-nil blob tx after offload") + } + if err := pool.Close(); err != nil { + t.Fatalf("failed to close pool: %v", err) + } + + pool = New(Config{Datadir: storage}, chain, nil) + if err := pool.Init(1, includeHead, newReserver()); err != nil { + t.Fatalf("failed to create blob pool: %v", err) + } + if got := pool.Get(tx.Hash()); got != nil { + t.Fatalf("got non-nil blob tx after offload") + } + + chain.statedb.SetNonce(addr, tx.Nonce(), tracing.NonceChangeUnspecified) + pool.Reset(includeHead, currentHead) + + got := pool.Get(tx.Hash()) + if got == nil { + t.Fatalf("got nil blob tx after offload") + } + if !bytes.Equal(pool.GetRLP(tx.Hash()), wantRLP) { + t.Fatalf("got blob tx after offload") + } + blobs, _, proofs, err := pool.GetBlobs(tx.BlobHashes(), types.BlobSidecarVersion0) + if err != nil { + t.Fatalf("failed to get blobs: %v", err) + } + if len(blobs) != 1 || blobs[0] == nil { + t.Fatalf("missing blob after reinjection") + } + if len(proofs) != 1 || len(proofs[0]) != 1 { + t.Fatalf("missing proof after reinjection") + } + verifyBlobRetrievals(t, pool) + + pool.Close() +} diff --git a/core/txpool/blobpool/limbo.go b/core/txpool/blobpool/limbo.go index 4f1078c394..f75e1a867e 100644 --- a/core/txpool/blobpool/limbo.go +++ b/core/txpool/blobpool/limbo.go @@ -18,6 +18,7 @@ package blobpool import ( "errors" + "sort" "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/core/types" @@ -35,6 +36,7 @@ type limboBlob struct { Block uint64 // Block in which the blob transaction was included Tx *types.Transaction `rlp:"nil"` // Optional full blob transaction (old storage style) TxMeta *blobTxMeta // the blob transaction metadata. + Raw []byte // Canonical raw blob transaction payload for reinjection id uint64 // the billy id of limboBlob } @@ -47,15 +49,12 @@ type limbo struct { } // newLimbo opens and indexes a set of limboed blob transactions. -func newLimbo(config *params.ChainConfig, datadir string) (*limbo, error) { +func newLimbo(_ *params.ChainConfig, datadir string) (*limbo, error) { l := &limbo{ index: make(map[common.Hash]*limboBlob), } - // The limbo won't store full blobs, just store the metadata, so use a fixed size 1KB bytes is big enough. - slotter := func() (size uint32, done bool) { - return 1024, true - } + slotter := newLimboSlotter() // Index all limboed blobs on disk and delete anything unprocessable var fails []uint64 @@ -83,6 +82,42 @@ func newLimbo(config *params.ChainConfig, datadir string) (*limbo, error) { return l, nil } +// newLimboSlotter returns a shelf layout that can read the legacy limbo formats +// and also store full blob transaction payloads for reinjection. +func newLimboSlotter() billy.SlotSizeFn { + var ( + sizes []uint32 + seen = make(map[uint32]struct{}) + ) + addSlotter := func(slotter billy.SlotSizeFn) { + for { + size, done := slotter() + if _, ok := seen[size]; !ok { + seen[size] = struct{}{} + sizes = append(sizes, size) + } + if done { + break + } + } + } + // Preserve compatibility with the metadata-only limbo format introduced on + // this branch while also supporting the legacy and restored full payloads. + sizes = append(sizes, 1024) + seen[1024] = struct{}{} + + addSlotter(newSlotter(params.BlobTxMaxBlobs)) + addSlotter(newSlotterEIP7594(params.BlobTxMaxBlobs)) + + sort.Slice(sizes, func(i, j int) bool { return sizes[i] < sizes[j] }) + var idx int + return func() (size uint32, done bool) { + size = sizes[idx] + idx++ + return size, idx == len(sizes) + } +} + // Close closes down the underlying persistent store. func (l *limbo) Close() error { return l.store.Close() @@ -113,13 +148,16 @@ func (l *limbo) parseBlob(id uint64, data []byte) error { } // finalize evicts all blobs belonging to a recently finalized block or older. -func (l *limbo) finalize(final *types.Header, fn func(id uint64, txHash common.Hash)) { +func (l *limbo) finalize(final *types.Header) { // Just in case there's no final block yet (network not yet merged, weird // restart, sethead, etc), fail gracefully. if final == nil { log.Warn("Nil finalized block cannot evict old blobs") return } + // Note: deleting keys from a map during range is explicitly safe in Go. + // Any key deleted mid-iteration may or may not be visited; entries missed + // here will be cleaned up on the next finalize call. for _, item := range l.index { if item.Block > final.Number.Uint64() { continue @@ -127,23 +165,19 @@ func (l *limbo) finalize(final *types.Header, fn func(id uint64, txHash common.H if err := l.drop(item.TxHash); err != nil { log.Error("Failed to drop finalized blob", "block", item.Block, "id", item.id, "err", err) } - if fn != nil { - meta := item.TxMeta - fn(meta.id, meta.hash) - } } } // push stores a new blob transaction into the limbo, waiting until finality for // it to be automatically evicted. -func (l *limbo) push(meta *blobTxMeta, block uint64) error { +func (l *limbo) push(raw []byte, meta *blobTxMeta, block uint64) error { // If the blobs are already tracked by the limbo, consider it a programming // error. There's not much to do against it, but be loud. if _, ok := l.index[meta.hash]; ok { log.Error("Limbo cannot push already tracked blobs", "tx", meta.hash) return errors.New("already tracked blob transaction") } - if err := l.setAndIndex(meta, block); err != nil { + if err := l.setAndIndex(raw, nil, meta, block); err != nil { log.Error("Failed to set and index limboed blobs", "tx", meta.hash, "err", err) return err } @@ -153,7 +187,7 @@ func (l *limbo) push(meta *blobTxMeta, block uint64) error { // pull retrieves a previously pushed set of blobs back from the limbo, removing // it at the same time. This method should be used when a previously included blob // transaction gets reorged out. -func (l *limbo) pull(tx common.Hash) (*blobTxMeta, error) { +func (l *limbo) pull(tx common.Hash) (*limboBlob, error) { // If the blobs are not tracked by the limbo, there's not much to do. This // can happen for example if a blob transaction is mined without pushing it // into the network first. @@ -165,7 +199,7 @@ func (l *limbo) pull(tx common.Hash) (*blobTxMeta, error) { if err := l.drop(item.TxHash); err != nil { return nil, err } - return item.TxMeta, nil + return item, nil } // update changes the block number under which a blob transaction is tracked. This @@ -194,7 +228,7 @@ func (l *limbo) update(txhash common.Hash, block uint64) { log.Error("Failed to drop old limboed metadata", "tx", txhash, "err", err) return } - if err := l.setAndIndex(item.TxMeta, block); err != nil { + if err := l.setAndIndex(item.Raw, item.Tx, item.TxMeta, block); err != nil { log.Error("Failed to set and index limboed blobs", "tx", txhash, "err", err) return } @@ -217,13 +251,21 @@ func (l *limbo) drop(txhash common.Hash) error { // setAndIndex assembles a limbo blob database entry and stores it, also updating // the in-memory indices. -func (l *limbo) setAndIndex(meta *blobTxMeta, block uint64) error { - txhash := meta.hash +func (l *limbo) setAndIndex(raw []byte, tx *types.Transaction, meta *blobTxMeta, block uint64) error { + var txhash common.Hash + switch { + case meta != nil: + txhash = meta.hash + case tx != nil: + txhash = tx.Hash() + default: + return errors.New("missing limbo payload") + } item := &limboBlob{ TxHash: txhash, Block: block, + Raw: raw, TxMeta: meta, - Tx: nil, // The tx already stored in the blob database, not here. } data, err := rlp.EncodeToBytes(item) if err != nil {