diff --git a/core/txpool/blobpool/blobpool.go b/core/txpool/blobpool/blobpool.go index 9cbb05881f..2f44991d21 100644 --- a/core/txpool/blobpool/blobpool.go +++ b/core/txpool/blobpool/blobpool.go @@ -152,7 +152,7 @@ type blobTxMeta struct { // newBlobTxMeta retrieves the indexed metadata fields from a pooled blob transaction // and assembles a helper struct to track in memory. -func newBlobTxMeta(id uint64, size uint64, storageSize uint32, pooledTx *pooledBlobTx) *blobTxMeta { +func newBlobTxMeta(id uint64, size uint64, storageSize uint32, pooledTx *PooledBlobTx) *blobTxMeta { var version byte if pooledTx.Sidecar != nil { version = pooledTx.Sidecar.Version @@ -180,7 +180,7 @@ func newBlobTxMeta(id uint64, size uint64, storageSize uint32, pooledTx *pooledB return meta } -type pooledBlobTx struct { +type PooledBlobTx struct { Transaction *types.Transaction Sidecar *types.BlobTxCellSidecar Size uint64 // original transaction size (including blobs) @@ -188,7 +188,7 @@ type pooledBlobTx struct { } // newPooledBlobTx creates pooledBlobTx struct. -func newPooledBlobTx(tx *types.Transaction) (*pooledBlobTx, error) { +func newPooledBlobTx(tx *types.Transaction) (*PooledBlobTx, error) { if tx.BlobTxSidecar() == nil { return nil, errors.New("missing blob sidecar") } @@ -196,7 +196,7 @@ func newPooledBlobTx(tx *types.Transaction) (*pooledBlobTx, error) { if err != nil { return nil, err } - return &pooledBlobTx{ + return &PooledBlobTx{ Transaction: tx.WithoutBlobTxSidecar(), Sidecar: sidecar, Size: tx.Size(), @@ -205,7 +205,7 @@ func newPooledBlobTx(tx *types.Transaction) (*pooledBlobTx, error) { } // convert recovers blobs from cell sidecar and returns a full transaction with blob sidecar. -func (ptx *pooledBlobTx) convert() (*types.Transaction, error) { +func (ptx *PooledBlobTx) convert() (*types.Transaction, error) { if ptx.Sidecar == nil { return nil, errors.New("cell sidecar missing") } @@ -409,17 +409,9 @@ type BlobPool struct { stored uint64 // Useful data size of all transactions on disk limbo *limbo // Persistent data store for the non-finalized blobs - gapped map[common.Address][]*pooledBlobTx // Transactions that are currently gapped (nonce too high) + gapped map[common.Address][]*PooledBlobTx // Transactions that are currently gapped (nonce too high) gappedSource map[common.Hash]common.Address // Source of gapped transactions to allow rechecking on inclusion - queue map[common.Hash]*types.Transaction // buffer - indexQueue map[common.Address][]*blobTxMeta // tx hashes in queue per address, sorted by nonce - spentQueue map[common.Address]*uint256.Int // Expenditure tracking for accounts, only for buffered txs - replacementQueue map[common.Address]map[uint64]*blobTxMeta // Replacement queue for pooled transactions - - cellQueue map[common.Hash][]kzg4844.Cell // cell buffer - custodyQueue map[common.Hash]*types.CustodyBitmap - signer types.Signer // Transaction signer to use for sender recovery chain BlockChain // Chain object to access the state through @@ -446,21 +438,15 @@ func New(config Config, chain BlockChain, hasPendingAuth func(common.Address) bo // Create the transaction pool with its initial settings return &BlobPool{ - config: config, - hasPendingAuth: hasPendingAuth, - signer: types.LatestSigner(chain.Config()), - chain: chain, - lookup: newLookup(), - index: make(map[common.Address][]*blobTxMeta), - spent: make(map[common.Address]*uint256.Int), - gapped: make(map[common.Address][]*pooledBlobTx), - gappedSource: make(map[common.Hash]common.Address), - queue: make(map[common.Hash]*types.Transaction), - indexQueue: make(map[common.Address][]*blobTxMeta), - spentQueue: make(map[common.Address]*uint256.Int), - cellQueue: make(map[common.Hash][]kzg4844.Cell), - custodyQueue: make(map[common.Hash]*types.CustodyBitmap), - replacementQueue: make(map[common.Address]map[uint64]*blobTxMeta), + config: config, + hasPendingAuth: hasPendingAuth, + signer: types.LatestSigner(chain.Config()), + chain: chain, + lookup: newLookup(), + index: make(map[common.Address][]*blobTxMeta), + spent: make(map[common.Address]*uint256.Int), + gapped: make(map[common.Address][]*PooledBlobTx), + gappedSource: make(map[common.Hash]common.Address), } } @@ -474,7 +460,6 @@ func (p *BlobPool) FilterType(kind byte) bool { return kind == types.BlobTxType } -// Init sets the gas price needed to keep a transaction in the pool and the chain // head to allow balance / nonce checks. The transaction journal will be loaded // from disk and filtered based on the provided starting settings. func (p *BlobPool) Init(gasTip uint64, head *types.Header, reserver txpool.Reserver) error { @@ -659,7 +644,7 @@ func (p *BlobPool) Close() error { // If a pooledBlobTx is found, it is indexed directly and nil is returned. func (p *BlobPool) parseTransaction(id uint64, size uint32, blob []byte) (*types.Transaction, error) { tx := new(types.Transaction) - pooledTx := new(pooledBlobTx) + pooledTx := new(PooledBlobTx) if err := rlp.DecodeBytes(blob, pooledTx); err != nil { // This path is impossible unless the disk data representation changes @@ -975,7 +960,7 @@ func (p *BlobPool) offload(addr common.Address, nonce uint64, id uint64, inclusi log.Error("Blobs missing for included transaction", "from", addr, "nonce", nonce, "id", id, "err", err) return } - var pooledTx pooledBlobTx + var pooledTx PooledBlobTx if err = rlp.DecodeBytes(data, &pooledTx); err != nil { log.Error("Blobs corrupted for included transaction", "from", addr, "nonce", nonce, "id", id, "err", err) return @@ -1063,7 +1048,7 @@ func (p *BlobPool) Reset(oldHead, newHead *types.Header) { log.Error("Blobs missing for announcable transaction", "from", addr, "nonce", meta.nonce, "id", meta.id, "err", err) continue } - var pooledTx pooledBlobTx + var pooledTx PooledBlobTx if err = rlp.DecodeBytes(data, &pooledTx); err != nil { log.Error("Blobs corrupted for announcable transaction", "from", addr, "nonce", meta.nonce, "id", meta.id, "err", err) continue @@ -1377,77 +1362,31 @@ func (p *BlobPool) checkDelegationLimit(tx *types.Transaction) error { // rules and adheres to some heuristic limits of the local node (price and size). // This function assumes the static validation has been performed already and // only runs the stateful checks with lock protection. -// If buffer field is set to true, consider txs in the queue as well. -// This is to prevent fetching cells of invalid transactions, which would be expensive. -func (p *BlobPool) validateTx(tx *types.Transaction, buffer bool) error { +func (p *BlobPool) validateTx(tx *types.Transaction) error { if err := p.ValidateTxBasics(tx); err != nil { return err } - // Ensure the transaction adheres to the stateful pool filters (nonce, balance) stateOpts := &txpool.ValidationOptionsWithState{ State: p.state, FirstNonceGap: func(addr common.Address) uint64 { - // Nonce gaps are permitted in the blob pool, but only as part of the - // in-memory 'gapped' buffer. We expose the gap here to validateTx, - // then handle the error by adding to the buffer. The first gap will - // be the next nonce shifted by however many transactions we already - // have pooled. - result := p.state.GetNonce(addr) + uint64(len(p.index[addr])) - if buffer { - return result + uint64(len(p.indexQueue[addr])) - } - return result + return p.state.GetNonce(addr) + uint64(len(p.index[addr])) }, UsedAndLeftSlots: func(addr common.Address) (int, int) { have := len(p.index[addr]) - if buffer { - have += len(p.indexQueue[addr]) - } if have >= maxTxsPerAccount { return have, 0 } return have, maxTxsPerAccount - have }, ExistingExpenditure: func(addr common.Address) *big.Int { - result := new(big.Int) if spent := p.spent[addr]; spent != nil { - result.Add(result, spent.ToBig()) + return spent.ToBig() } - - // calculate expenditure after replacements - if buffer { - if replacements := p.replacementQueue[addr]; replacements != nil { - next := p.state.GetNonce(addr) - - for nonce, replacement := range replacements { - if nonce >= next && len(p.index[addr]) > int(nonce-next) { - originalCost := p.index[addr][nonce-next].costCap - replacementCost := replacement.costCap - - result.Add(result, new(uint256.Int).Sub(replacementCost, originalCost).ToBig()) - } - } - } - - if spentQueue := p.spentQueue[addr]; spentQueue != nil { - result.Add(result, spentQueue.ToBig()) - } - } - - return result + return new(big.Int) }, ExistingCost: func(addr common.Address, nonce uint64) *big.Int { next := p.state.GetNonce(addr) - if buffer { - if p.replacementQueue[addr] != nil && p.replacementQueue[addr][nonce] != nil { - return p.replacementQueue[addr][nonce].costCap.ToBig() - } - pooledCount := uint64(len(p.index[addr])) - if nonce >= next+pooledCount && uint64(len(p.indexQueue[addr])) > nonce-next-pooledCount { - return p.indexQueue[addr][nonce-next-pooledCount].costCap.ToBig() - } - } if uint64(len(p.index[addr])) > nonce-next { return p.index[addr][int(nonce-next)].costCap.ToBig() } @@ -1460,33 +1399,15 @@ func (p *BlobPool) validateTx(tx *types.Transaction, buffer bool) error { if err := p.checkDelegationLimit(tx); err != nil { return err } - // If the transaction replaces an existing one, ensure that price bumps are - // adhered to. var ( - from, _ = types.Sender(p.signer, tx) // already validated above + from, _ = types.Sender(p.signer, tx) next = p.state.GetNonce(from) ) var prev *blobTxMeta nonce := tx.Nonce() if nonce < next+uint64(len(p.index[from])) { - // pooled tx prev = p.index[from][nonce-next] - - // check replacement if it is buffer tx validation - if buffer && p.replacementQueue[from] != nil { - if replacement := p.replacementQueue[from][nonce]; replacement != nil { - prev = replacement - } - } - } else if buffer { - pooledCount := uint64(len(p.index[from])) - if nonce >= next+pooledCount { - offset := nonce - next - pooledCount - if uint64(len(p.indexQueue[from])) > offset && offset > 0 { - prev = p.indexQueue[from][offset] - } - } } if prev == nil { return nil @@ -1530,7 +1451,7 @@ func (p *BlobPool) Has(hash common.Hash) bool { p.lock.RLock() defer p.lock.RUnlock() - if p.lookup.exists(hash) || p.queue[hash] != nil { + if p.lookup.exists(hash) { return true } @@ -1543,7 +1464,7 @@ func (p *BlobPool) HasPayload(hash common.Hash) bool { p.lock.RLock() defer p.lock.RUnlock() - return p.lookup.exists(hash) || len(p.cellQueue[hash]) != 0 + return p.lookup.exists(hash) } // getRLP returns the raw RLP-encoded pooledBlobTx data from the store. @@ -1587,7 +1508,7 @@ func (p *BlobPool) Get(hash common.Hash, includeBlob bool) *types.Transaction { if len(data) == 0 { return nil } - var pooledTx pooledBlobTx + var pooledTx PooledBlobTx if err := rlp.DecodeBytes(data, &pooledTx); err != nil { log.Error("Blobs corrupted for traced transaction", "hash", hash, "err", err) return nil @@ -1613,7 +1534,7 @@ func (p *BlobPool) GetRLP(hash common.Hash, includeBlob bool) []byte { if len(data) == 0 { return nil } - var pooledTx pooledBlobTx + var pooledTx PooledBlobTx if err := rlp.DecodeBytes(data, &pooledTx); err != nil { log.Error("Failed to decode transaction in blobpool", "hash", hash, "err", err) return nil @@ -1701,7 +1622,7 @@ func (p *BlobPool) GetBlobs(vhashes []common.Hash, version byte) ([]*kzg4844.Blo } // Decode the blob transaction - var pooledTx pooledBlobTx + var pooledTx PooledBlobTx if err := rlp.DecodeBytes(data, &pooledTx); err != nil { log.Error("Blobs corrupted for traced transaction", "id", txID, "err", err) continue @@ -1780,7 +1701,7 @@ func (p *BlobPool) GetBlobCells(vhashes []common.Hash, mask types.CustodyBitmap) if err != nil { continue } - var pooledTx pooledBlobTx + var pooledTx PooledBlobTx if err := rlp.DecodeBytes(data, &pooledTx); err != nil { continue } @@ -1851,127 +1772,19 @@ func (p *BlobPool) Add(txs []*types.Transaction, sync bool) []error { if errs[i] = p.ValidateTxBasics(tx); errs[i] != nil { continue } - sc := tx.BlobTxSidecar() - if sc != nil && len(sc.Blobs) != 0 { - pooledTx, err := newPooledBlobTx(tx) - if err != nil { - errs[i] = err - continue - } - errs[i] = p.add(pooledTx) - } else { - errs[i] = p.addBuffer(tx) + pooledTx, err := newPooledBlobTx(tx) + if err != nil { + errs[i] = err + continue } + errs[i] = p.AddPooledTx(pooledTx) } return errs } -func (p *BlobPool) addBuffer(tx *types.Transaction) (err error) { - p.lock.Lock() - defer p.lock.Unlock() - - if cells, ok := p.cellQueue[tx.Hash()]; ok { - sidecar := tx.BlobTxSidecar() - - var cellSidecar types.BlobTxCellSidecar - blobCount := len(sidecar.Commitments) - if len(cells) >= kzg4844.DataPerBlob*blobCount { - blob, err := kzg4844.RecoverBlobs(cells, p.custodyQueue[tx.Hash()].Indices()) - if err != nil { - return err - } - extendedCells, err := kzg4844.ComputeCells(blob) - if err != nil { - return err - } - cellSidecar = types.BlobTxCellSidecar{ - Version: sidecar.Version, - Cells: extendedCells, - Commitments: sidecar.Commitments, - Proofs: sidecar.Proofs, - Custody: *types.CustodyBitmapAll, - } - } else { - cellSidecar = types.BlobTxCellSidecar{ - Version: sidecar.Version, - Cells: cells, - Commitments: sidecar.Commitments, - Proofs: sidecar.Proofs, - Custody: *p.custodyQueue[tx.Hash()], - } - } - - err := p.addLocked(&pooledBlobTx{Transaction: tx.WithoutBlobTxSidecar(), Sidecar: &cellSidecar, Size: tx.Size()}, true) - if err == nil { - delete(p.cellQueue, tx.Hash()) - delete(p.custodyQueue, tx.Hash()) - } - return err - } - - if err := p.validateTx(tx, true); err != nil { - return err - } - // Store the original tx in queue (with BlobTxSidecar intact — Blobs may be nil - // from ETH/71 but commitments/proofs are preserved for cell validation later). - p.queue[tx.Hash()] = tx - from, _ := types.Sender(p.signer, tx) - - // Build a partial pooledBlobTx for metadata tracking. - var cellSidecar *types.BlobTxCellSidecar - if sidecar := tx.BlobTxSidecar(); sidecar != nil { - cellSidecar = &types.BlobTxCellSidecar{ - Version: sidecar.Version, - Commitments: sidecar.Commitments, - Proofs: sidecar.Proofs, - } - } - next := p.state.GetNonce(from) - nonce := tx.Nonce() - pooledCount := uint64(len(p.index[from])) - //todo this is strange - meta := newBlobTxMeta(0, tx.Size(), 0, &pooledBlobTx{Transaction: tx, Sidecar: cellSidecar, Size: tx.Size()}) - - if nonce < next+pooledCount { - // Pooled transaction replacements are stored in replacementQueue for expenditure validation - // for future transactions from the same account. This overestimates expenditure considering - // that replacement transaction payload fetch may fail and the tx can be dropped. - // However, this conservative approach prevents transactions that passed validation when - // entering the buffer from failing expenditure validation due to transaction replacements. - if p.replacementQueue[from] == nil { - p.replacementQueue[from] = make(map[uint64]*blobTxMeta) - } - if existingReplacement := p.replacementQueue[from][nonce]; existingReplacement != nil { - delete(p.queue, existingReplacement.hash) - } - p.replacementQueue[from][nonce] = meta - } else { - if p.spentQueue[from] == nil { - p.spentQueue[from] = new(uint256.Int) - } - bufferOffset := int(nonce - (next + pooledCount)) - if len(p.indexQueue[from]) > bufferOffset { - // Replace buffer transaction - prev := p.indexQueue[from][bufferOffset] - - delete(p.queue, prev.hash) - - p.indexQueue[from][bufferOffset] = meta - p.spentQueue[from] = new(uint256.Int).Sub(p.spentQueue[from], prev.costCap) - p.spentQueue[from] = new(uint256.Int).Add(p.spentQueue[from], meta.costCap) - - dropReplacedMeter.Mark(1) - } else { - p.indexQueue[from] = append(p.indexQueue[from], meta) - p.spentQueue[from] = new(uint256.Int).Add(p.spentQueue[from], meta.costCap) - } - } - return nil -} - // add inserts a new blob transaction into the pool if it passes validation (both // consensus validity and pool restrictions). -func (p *BlobPool) add(pooledTx *pooledBlobTx) (err error) { +func (p *BlobPool) AddPooledTx(pooledTx *PooledBlobTx) (err error) { // The blob pool blocks on adding a transaction. This is because blob txs are // only even pulled from the network, so this method will act as the overload // protection for fetches. @@ -1990,12 +1803,12 @@ func (p *BlobPool) add(pooledTx *pooledBlobTx) (err error) { // addLocked inserts a new blob transaction into the pool if it passes validation (both // consensus validity and pool restrictions). It must be called with the pool lock held. // Only for internal use. -func (p *BlobPool) addLocked(pooledTx *pooledBlobTx, checkGapped bool) (err error) { +func (p *BlobPool) addLocked(pooledTx *PooledBlobTx, checkGapped bool) (err error) { tx := pooledTx.Transaction cellSidecar := pooledTx.Sidecar // Ensure the transaction is valid from all perspectives - if err := p.validateTx(tx, false); err != nil { + if err := p.validateTx(tx); err != nil { log.Trace("Transaction validation failed", "hash", tx.Hash(), "err", err) switch { case errors.Is(err, txpool.ErrUnderpriced): @@ -2550,9 +2363,6 @@ func (p *BlobPool) Status(hash common.Hash) txpool.TxStatus { if p.lookup.exists(hash) { return txpool.TxStatusPending } - if _, ok := p.queue[hash]; ok { - return txpool.TxStatusQueued - } if _, gapped := p.gappedSource[hash]; gapped { return txpool.TxStatusQueued } @@ -2607,7 +2417,7 @@ func (p *BlobPool) Clear() { // Reset counters and the gapped buffer p.stored = 0 - p.gapped = make(map[common.Address][]*pooledBlobTx) + p.gapped = make(map[common.Address][]*PooledBlobTx) p.gappedSource = make(map[common.Hash]common.Address) var ( @@ -2640,7 +2450,7 @@ func (p *BlobPool) GetCells(hash common.Hash, mask types.CustodyBitmap) ([]kzg48 return nil, errors.New("tracked blob transaction missing from store") } // Decode the blob transaction - var pooledTx pooledBlobTx + var pooledTx PooledBlobTx if err := rlp.DecodeBytes(data, &pooledTx); err != nil { return nil, errors.New("blobs corrupted for traced transaction") } @@ -2661,79 +2471,3 @@ func (p *BlobPool) GetCells(hash common.Hash, mask types.CustodyBitmap) ([]kzg48 } return cells, nil } - -// AddPayload adds cell payloads for blob transactions. -func (p *BlobPool) AddPayload(txs []common.Hash, cells [][]kzg4844.Cell, custody *types.CustodyBitmap) []error { - p.lock.Lock() - defer p.lock.Unlock() - errs := make([]error, len(txs)) - for i, hash := range txs { - if _, ok := p.queue[hash]; !ok { - p.cellQueue[hash] = cells[i] - p.custodyQueue[hash] = custody - continue - } - - sidecar := p.queue[hash].BlobTxSidecar() - - var cellSidecar types.BlobTxCellSidecar - blobCount := len(sidecar.Commitments) - if len(cells[i]) >= kzg4844.DataPerBlob*blobCount { - blob, err := kzg4844.RecoverBlobs(cells[i], custody.Indices()) - if err != nil { - errs[i] = err - continue - } - extendedCells, err := kzg4844.ComputeCells(blob) - if err != nil { - errs[i] = err - continue - } - cellSidecar = types.BlobTxCellSidecar{ - Version: sidecar.Version, - Cells: extendedCells, - Commitments: sidecar.Commitments, - Proofs: sidecar.Proofs, - Custody: *types.CustodyBitmapAll, - } - } else { - cellSidecar = types.BlobTxCellSidecar{ - Version: sidecar.Version, - Cells: cells[i], - Commitments: sidecar.Commitments, - Proofs: sidecar.Proofs, - Custody: *custody, - } - } - - errs[i] = p.addLocked(&pooledBlobTx{Transaction: p.queue[hash].WithoutBlobTxSidecar(), Sidecar: &cellSidecar, Size: p.queue[hash].Size()}, true) - - // clean up queues - tx := p.queue[hash] - delete(p.queue, hash) - from, _ := types.Sender(p.signer, tx) - nonce := tx.Nonce() - next := p.state.GetNonce(from) - - if p.replacementQueue[from] != nil { - delete(p.replacementQueue[from], nonce) - if len(p.replacementQueue[from]) == 0 { - delete(p.replacementQueue, from) - } - continue - } - - // plain tx - pooledCount := uint64(len(p.index[from])) - if nonce < next+pooledCount { - continue - } - offset := int(nonce - next - pooledCount) - if offset > 0 && offset < len(p.indexQueue[from]) { - removed := p.indexQueue[from][offset] - p.indexQueue[from] = append(p.indexQueue[from][:offset], p.indexQueue[from][offset+1:]...) - p.spentQueue[from] = new(uint256.Int).Sub(p.spentQueue[from], removed.costCap) - } - } - return errs -} diff --git a/core/txpool/blobpool/blobpool_test.go b/core/txpool/blobpool/blobpool_test.go index 98abd96bc4..35e9958bf3 100644 --- a/core/txpool/blobpool/blobpool_test.go +++ b/core/txpool/blobpool/blobpool_test.go @@ -2122,7 +2122,7 @@ func benchmarkPoolPending(b *testing.B, datacap uint64) { } statedb.AddBalance(addr, uint256.NewInt(1_000_000_000), tracing.BalanceChangeUnspecified) pooledTx, _ := newPooledBlobTx(tx) - pool.add(pooledTx) + pool.AddPooledTx(pooledTx) } statedb.Commit(0, true, false) defer pool.Close() diff --git a/core/txpool/blobpool/buffer.go b/core/txpool/blobpool/buffer.go new file mode 100644 index 0000000000..30dba84ae8 --- /dev/null +++ b/core/txpool/blobpool/buffer.go @@ -0,0 +1,262 @@ +// Copyright 2026 The go-ethereum Authors +// This file is part of the go-ethereum library. +// +// The go-ethereum library is free software: you can redistribute it and/or modify +// it under the terms of the GNU Lesser General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// The go-ethereum library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the go-ethereum library. If not, see . + +package blobpool + +import ( + "cmp" + "fmt" + "slices" + "time" + + "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/core/types" + "github.com/ethereum/go-ethereum/crypto/kzg4844" + "github.com/ethereum/go-ethereum/log" +) + +const ( + bufferLifetime = 2 * time.Minute +) + +// PeerDelivery holds cells delivered by a single peer, in blob-major order. +type PeerDelivery struct { + Cells []kzg4844.Cell + Indices []uint64 +} + +type txEntry struct { + tx *types.Transaction + peer string + added time.Time +} + +type cellEntry struct { + deliveries map[string]*PeerDelivery + custody *types.CustodyBitmap + added time.Time +} + +type BlobBuffer struct { + txs map[common.Hash]*txEntry + cells map[common.Hash]*cellEntry + + addToPool func(*PooledBlobTx) error + dropPeer func(string) +} + +func NewBlobBuffer(addToPool func(*PooledBlobTx) error, dropPeer func(string)) *BlobBuffer { + return &BlobBuffer{ + txs: make(map[common.Hash]*txEntry), + cells: make(map[common.Hash]*cellEntry), + addToPool: addToPool, + dropPeer: dropPeer, + } +} + +// AddTx buffers a blob transaction (without blobs) from an ETH/71 peer. +// If cells are already buffered, verification and pool insertion are attempted. +func (b *BlobBuffer) AddTx(tx *types.Transaction, peer string) error { + b.evict() + + hash := tx.Hash() + sidecar := tx.BlobTxSidecar() + if sidecar == nil { + return fmt.Errorf("blob transaction without sidecar") + } + // vhash check + if err := sidecar.ValidateBlobCommitmentHashes(tx.BlobHashes()); err != nil { + log.Warn("Commitment hash mismatch, dropping peer", "peer", peer, "err", err) + b.dropPeer(peer) + return err + } + // proof count check + if len(sidecar.Proofs) < len(sidecar.Commitments)*kzg4844.CellProofsPerBlob { + b.dropPeer(peer) + return fmt.Errorf("insufficient proofs in sidecar") + } + // todo: I also considered performing additional validation for the metrics of the + // tx_fetcher. This could be used to avoid sending GetCells requests when the + // nonce is too low or the transaction is underpriced. However, doing so would + // require taking buffered transactions into account as well, and would require + // allowing the buffer to be part of the fetcher’s scheduling logic. + // Therefore, I will leave this as a TODO for now. + + if entry, ok := b.cells[hash]; ok { + return b.add(hash, tx, entry) + } + b.txs[hash] = &txEntry{tx: tx, peer: peer, added: time.Now()} + return nil +} + +// AddCells buffers per-peer cell deliveries from the blob fetcher. +// If the transaction is already buffered, verification and pool insertion are attempted. +func (b *BlobBuffer) AddCells(hash common.Hash, deliveries map[string]*PeerDelivery, custody *types.CustodyBitmap) error { + b.evict() + b.cells[hash] = &cellEntry{ + deliveries: deliveries, + custody: custody, + added: time.Now(), + } + + if txe, ok := b.txs[hash]; ok { + return b.add(hash, txe.tx, b.cells[hash]) + } + return nil +} + +// add verifies cells per-peer, sorts them, and adds to the pool. +func (b *BlobBuffer) add(hash common.Hash, tx *types.Transaction, cells *cellEntry) error { + sidecar := tx.BlobTxSidecar() + + // Per-peer cell verification + if badPeers := b.verifyCells(cells, sidecar); len(badPeers) > 0 { + b.dropPeers(badPeers) + delete(b.cells, hash) + delete(b.txs, hash) + return fmt.Errorf("cell verification failed") + } + blobCount := len(tx.BlobHashes()) + sorted, custody := sortCells(cells, blobCount) + + cellSidecar := &types.BlobTxCellSidecar{ + Version: sidecar.Version, + Cells: sorted, + Commitments: sidecar.Commitments, + Proofs: sidecar.Proofs, + Custody: *custody, + } + pooledTx := &PooledBlobTx{ + Transaction: tx.WithoutBlobTxSidecar(), + Sidecar: cellSidecar, + Size: tx.Size(), + SizeWithoutBlob: tx.WithoutBlob().Size(), + } + err := b.addToPool(pooledTx) + delete(b.cells, hash) + delete(b.txs, hash) + return err +} + +func (b *BlobBuffer) HasTx(hash common.Hash) bool { + _, ok := b.txs[hash] + return ok +} + +func (b *BlobBuffer) HasCells(hash common.Hash) bool { + _, ok := b.cells[hash] + return ok +} + +func (b *BlobBuffer) dropPeers(peers []string) { + if b.dropPeer == nil { + return + } + for _, p := range peers { + b.dropPeer(p) + } +} + +func (b *BlobBuffer) evict() { + now := time.Now() + for hash, entry := range b.txs { + if now.Sub(entry.added) > bufferLifetime { + delete(b.txs, hash) + } + } + for hash, entry := range b.cells { + if now.Sub(entry.added) > bufferLifetime { + delete(b.cells, hash) + } + } +} + +// verifyCells verifies each peer's cells against the sidecar. +// Returns the list of peers whose cells failed verification. +func (b *BlobBuffer) verifyCells(entry *cellEntry, sidecar *types.BlobTxSidecar) []string { + var badPeers []string + for peer, delivery := range entry.deliveries { + if err := verifyPeerCells(delivery, sidecar); err != nil { + log.Debug("Cell verification failed", "peer", peer, "err", err) + badPeers = append(badPeers, peer) + } + } + return badPeers +} + +// verifyPeerCells verifies a single peer's cells against the sidecar proofs. +// delivery.Cells is blob-major: [blob0_cell0..blob0_cellN, blob1_cell0..blob1_cellN, ...] +func verifyPeerCells(delivery *PeerDelivery, sidecar *types.BlobTxSidecar) error { + cellsPerBlob := len(delivery.Indices) + blobCount := len(delivery.Cells) / cellsPerBlob + if blobCount == 0 || blobCount != len(sidecar.Commitments) { + return fmt.Errorf("blob count mismatch: delivery %d, commitments %d", blobCount, len(sidecar.Commitments)) + } + // Extract proofs corresponding to this peer's cell indices + var proofs []kzg4844.Proof + for blobIdx := 0; blobIdx < blobCount; blobIdx++ { + for _, cellIdx := range delivery.Indices { + proofIdx := blobIdx*kzg4844.CellProofsPerBlob + int(cellIdx) + if proofIdx >= len(sidecar.Proofs) { + return fmt.Errorf("proof index out of range: %d", proofIdx) + } + proofs = append(proofs, sidecar.Proofs[proofIdx]) + } + } + return kzg4844.VerifyCells(delivery.Cells, sidecar.Commitments, proofs, delivery.Indices) +} + +// sortCells merges all per-peer deliveries into a single flat cell array +// sorted by custody index. +// +// e.g. +// peer A: cells = [blob0_cell5, blob0_cell3, blob1_cell5, blob1_cell3] +// peer B: cells = [blob0_cell1, blob0_cell7, blob1_cell1, blob1_cell7] +// -> [blob0_cell1, blob0_cell3, blob0_cell5, blob0_cell7, blob1_cell1, blob1_cell3, blob1_cell5, blob1_cell7] +func sortCells(entry *cellEntry, blobCount int) ([]kzg4844.Cell, *types.CustodyBitmap) { + // indices per delivery + var indices []uint64 + + // 1. compose per blob cells + blob := make([][]kzg4844.Cell, blobCount) + for _, d := range entry.deliveries { + n := len(d.Indices) + indices = append(indices, d.Indices...) + for b := range blobCount { + blob[b] = append(blob[b], d.Cells[b*n:(b+1)*n]...) + } + } + + // 2. sort + perm := make([]int, len(indices)) + for i := range perm { + perm[i] = i + } + // perm represents the position of cells in sorted array + slices.SortFunc(perm, func(a, b int) int { + return cmp.Compare(indices[a], indices[b]) + }) + // reorder cells + var res []kzg4844.Cell + for b := range blobCount { + for _, p := range perm { + res = append(res, blob[b][p]) + } + } + + custody := types.NewCustodyBitmap(indices) + return res, &custody +} diff --git a/core/txpool/blobpool/buffer_test.go b/core/txpool/blobpool/buffer_test.go new file mode 100644 index 0000000000..74680c530a --- /dev/null +++ b/core/txpool/blobpool/buffer_test.go @@ -0,0 +1,254 @@ +package blobpool + +import ( + "crypto/ecdsa" + "testing" + + "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/core/types" + "github.com/ethereum/go-ethereum/crypto" + "github.com/ethereum/go-ethereum/crypto/kzg4844" + "github.com/ethereum/go-ethereum/params" + "github.com/holiman/uint256" +) + +// makeV1Tx creates a V1 blob transaction with cell proofs, then strips blobs +// (simulating what ETH/71 peers send). +func makeV1Tx(t *testing.T, nonce uint64, blobCount int, blobOffset int, key *ecdsa.PrivateKey) *types.Transaction { + t.Helper() + tx := makeMultiBlobTx(nonce, 1, 1, 1, blobCount, blobOffset, key, types.BlobSidecarVersion1) + return tx.WithoutBlob() +} + +// makePeerDelivery creates a PeerDelivery for given cell indices from a set of blobs. +func makePeerDelivery(t *testing.T, blobOffset, blobCount int, indices []uint64) *PeerDelivery { + t.Helper() + var allCells []kzg4844.Cell + for i := 0; i < blobCount; i++ { + cells, err := kzg4844.ComputeCells([]kzg4844.Blob{*testBlobs[blobOffset+i]}) + if err != nil { + t.Fatal(err) + } + allCells = append(allCells, cells...) + } + var deliveryCells []kzg4844.Cell + for b := 0; b < blobCount; b++ { + for _, idx := range indices { + deliveryCells = append(deliveryCells, allCells[b*kzg4844.CellsPerBlob+int(idx)]) + } + } + return &PeerDelivery{Cells: deliveryCells, Indices: indices} +} + +func newTestBuffer(t *testing.T) *BlobBuffer { + t.Helper() + return NewBlobBuffer( + func(ptx *PooledBlobTx) error { return nil }, + func(peer string) {}, + ) +} + +func TestSortCells(t *testing.T) { + blobCount := 2 + blobOffset := 0 + + peerA := makePeerDelivery(t, blobOffset, blobCount, []uint64{5, 3}) + peerB := makePeerDelivery(t, blobOffset, blobCount, []uint64{1, 7}) + + custody := types.NewCustodyBitmap([]uint64{1, 3, 5, 7}) + entry := &cellEntry{ + deliveries: map[string]*PeerDelivery{ + "peerA": peerA, + "peerB": peerB, + }, + custody: &custody, + } + sorted, resultCustody := sortCells(entry, blobCount) + + resultIndices := resultCustody.Indices() + if len(resultIndices) != 4 { + t.Fatalf("expected 4 indices, got %d", len(resultIndices)) + } + for i, expected := range []uint64{1, 3, 5, 7} { + if resultIndices[i] != expected { + t.Errorf("index %d: expected %d, got %d", i, expected, resultIndices[i]) + } + } + + expected := makePeerDelivery(t, blobOffset, blobCount, []uint64{1, 3, 5, 7}) + if len(sorted) != len(expected.Cells) { + t.Fatalf("sorted length %d != expected %d", len(sorted), len(expected.Cells)) + } + for i := range sorted { + if sorted[i] != expected.Cells[i] { + t.Errorf("cell %d mismatch", i) + } + } +} + +func TestAddTxThenCells(t *testing.T) { + key, _ := crypto.GenerateKey() + blobCount := 2 + buf := newTestBuffer(t) + + tx := makeV1Tx(t, 0, blobCount, 0, key) + hash := tx.Hash() + + if err := buf.AddTx(tx, "peerA"); err != nil { + t.Fatal(err) + } + if !buf.HasTx(hash) { + t.Fatal("tx should be buffered") + } + + dataIndices := make([]uint64, kzg4844.DataPerBlob) + for i := range dataIndices { + dataIndices[i] = uint64(i) + } + delivery := makePeerDelivery(t, 0, blobCount, dataIndices) + custody := types.NewCustodyBitmap(dataIndices) + + if err := buf.AddCells(hash, map[string]*PeerDelivery{"peerB": delivery}, &custody); err != nil { + t.Fatal(err) + } + if buf.HasTx(hash) || buf.HasCells(hash) { + t.Fatal("buffer should be empty after add") + } +} + +func TestAddCellsThenTx(t *testing.T) { + key, _ := crypto.GenerateKey() + blobCount := 2 + buf := newTestBuffer(t) + + tx := makeV1Tx(t, 0, blobCount, 0, key) + hash := tx.Hash() + + dataIndices := make([]uint64, kzg4844.DataPerBlob) + for i := range dataIndices { + dataIndices[i] = uint64(i) + } + delivery := makePeerDelivery(t, 0, blobCount, dataIndices) + custody := types.NewCustodyBitmap(dataIndices) + + if err := buf.AddCells(hash, map[string]*PeerDelivery{"peerB": delivery}, &custody); err != nil { + t.Fatal(err) + } + if !buf.HasCells(hash) { + t.Fatal("cells should be buffered") + } + + if err := buf.AddTx(tx, "peerA"); err != nil { + t.Fatal(err) + } + if buf.HasTx(hash) || buf.HasCells(hash) { + t.Fatal("buffer should be empty after add") + } +} + +func TestMultiPeerDelivery(t *testing.T) { + key, _ := crypto.GenerateKey() + blobCount := 2 + buf := newTestBuffer(t) + + tx := makeV1Tx(t, 0, blobCount, 0, key) + hash := tx.Hash() + buf.AddTx(tx, "peerA") + + indicesA := []uint64{0, 2, 4, 6} + indicesB := []uint64{1, 3, 5, 7} + deliveryA := makePeerDelivery(t, 0, blobCount, indicesA) + deliveryB := makePeerDelivery(t, 0, blobCount, indicesB) + + allIndices := append(indicesA, indicesB...) + custody := types.NewCustodyBitmap(allIndices) + + if err := buf.AddCells(hash, map[string]*PeerDelivery{ + "peerB": deliveryA, + "peerC": deliveryB, + }, &custody); err != nil { + t.Fatal(err) + } + if buf.HasTx(hash) || buf.HasCells(hash) { + t.Fatal("buffer should be empty after add") + } +} + +func TestBadCell(t *testing.T) { + key, _ := crypto.GenerateKey() + blobCount := 1 + + var dropped []string + buf := NewBlobBuffer( + func(ptx *PooledBlobTx) error { return nil }, + func(peer string) { dropped = append(dropped, peer) }, + ) + + tx := makeV1Tx(t, 0, blobCount, 0, key) + hash := tx.Hash() + buf.AddTx(tx, "peerA") + + goodDelivery := makePeerDelivery(t, 0, blobCount, []uint64{0, 1, 2, 3}) + badDelivery := makePeerDelivery(t, 0, blobCount, []uint64{4, 5, 6, 7}) + for i := range badDelivery.Cells { + for j := range badDelivery.Cells[i] { + badDelivery.Cells[i][j] ^= 0xFF + } + } + + allIndices := []uint64{0, 1, 2, 3, 4, 5, 6, 7} + custody := types.NewCustodyBitmap(allIndices) + + err := buf.AddCells(hash, map[string]*PeerDelivery{ + "peerB": goodDelivery, + "peerC": badDelivery, + }, &custody) + if err == nil { + t.Fatal("expected error from bad cells") + } + + if len(dropped) != 1 || dropped[0] != "peerC" { + t.Fatalf("only peerC should have been dropped, got: %v", dropped) + } + if buf.HasTx(hash) || buf.HasCells(hash) { + t.Fatal("buffer should be empty after bad cell drop") + } +} + +func TestBadTx(t *testing.T) { + key, _ := crypto.GenerateKey() + + var dropped []string + buf := NewBlobBuffer( + func(ptx *PooledBlobTx) error { return nil }, + func(peer string) { dropped = append(dropped, peer) }, + ) + + blobtx := &types.BlobTx{ + ChainID: uint256.MustFromBig(params.MainnetChainConfig.ChainID), + Nonce: 0, + GasTipCap: uint256.NewInt(1), + GasFeeCap: uint256.NewInt(1), + Gas: 21000, + BlobFeeCap: uint256.NewInt(1), + BlobHashes: []common.Hash{testBlobVHashes[0]}, + Value: uint256.NewInt(100), + Sidecar: types.NewBlobTxSidecar(types.BlobSidecarVersion1, + nil, + []kzg4844.Commitment{testBlobCommits[1]}, + testBlobCellProofs[1], + ), + } + tx := types.MustSignNewTx(key, types.LatestSigner(params.MainnetChainConfig), blobtx) + + err := buf.AddTx(tx, "peerA") + if err == nil { + t.Fatal("expected error from commitment mismatch") + } + if len(dropped) != 1 || dropped[0] != "peerA" { + t.Fatalf("only peerA should have been dropped, got: %v", dropped) + } + if buf.HasTx(tx.Hash()) { + t.Fatal("tx should not be buffered") + } +} diff --git a/core/txpool/blobpool/limbo.go b/core/txpool/blobpool/limbo.go index 90cd9d4a9d..8b711f744b 100644 --- a/core/txpool/blobpool/limbo.go +++ b/core/txpool/blobpool/limbo.go @@ -33,7 +33,7 @@ import ( type limboBlob struct { TxHash common.Hash // Owner transaction's hash to support resurrecting reorged txs Block uint64 // Block in which the blob transaction was included - Tx *pooledBlobTx + Tx *PooledBlobTx } // limbo is a light, indexed database to temporarily store recently included @@ -146,7 +146,7 @@ func (l *limbo) finalize(final *types.Header) { // push stores a new blob transaction into the limbo, waiting until finality for // it to be automatically evicted. -func (l *limbo) push(tx *pooledBlobTx, block uint64) error { +func (l *limbo) push(tx *PooledBlobTx, block uint64) error { // If the blobs are already tracked by the limbo, consider it a programming // error. There's not much to do against it, but be loud. if _, ok := l.index[tx.Transaction.Hash()]; ok { @@ -163,7 +163,7 @@ func (l *limbo) push(tx *pooledBlobTx, block uint64) error { // pull retrieves a previously pushed set of blobs back from the limbo, removing // it at the same time. This method should be used when a previously included blob // transaction gets reorged out. -func (l *limbo) pull(tx common.Hash) (*pooledBlobTx, error) { +func (l *limbo) pull(tx common.Hash) (*PooledBlobTx, error) { // If the blobs are not tracked by the limbo, there's not much to do. This // can happen for example if a blob transaction is mined without pushing it // into the network first. @@ -240,7 +240,7 @@ func (l *limbo) getAndDrop(id uint64) (*limboBlob, error) { // setAndIndex assembles a limbo blob database entry and stores it, also updating // the in-memory indices. -func (l *limbo) setAndIndex(tx *pooledBlobTx, block uint64) error { +func (l *limbo) setAndIndex(tx *PooledBlobTx, block uint64) error { txhash := tx.Transaction.Hash() item := &limboBlob{ TxHash: txhash, diff --git a/eth/fetcher/blob_fetcher.go b/eth/fetcher/blob_fetcher.go index f928a99517..b917ad5c9f 100644 --- a/eth/fetcher/blob_fetcher.go +++ b/eth/fetcher/blob_fetcher.go @@ -76,11 +76,17 @@ type cellWithSeq struct { cells *types.CustodyBitmap } +// PeerCellDelivery holds cells delivered by a single peer. +type PeerCellDelivery struct { + Cells []kzg4844.Cell // blob-major order as received + Indices []uint64 // custody indices provided by this peer +} + type fetchStatus struct { - fetching *types.CustodyBitmap // To avoid fetching cells which had already been fetched / currently being fetched - fetched []uint64 // Custody indices that have been fetched (per-blob, same for all blobs) - blobCells [][]kzg4844.Cell // Per-blob cell accumulator, indexed by blob - blobCount int // Number of blobs in this tx (set on first delivery) + fetching *types.CustodyBitmap // To avoid fetching cells which had already been fetched / currently being fetched + fetched []uint64 // Custody indices that have been fetched (per-blob, same for all blobs) + deliveries map[string]*PeerCellDelivery // Per-peer cell deliveries + blobCount int // Number of blobs in this tx (set on first delivery) } // BlobFetcher is responsible for managing type 3 transactions based on peer announcements. @@ -124,7 +130,7 @@ type BlobFetcher struct { // Callbacks hasPayload func(common.Hash) bool - addPayload func([]common.Hash, [][]kzg4844.Cell, *types.CustodyBitmap) []error //todo: peer disconnection is strange here + addCells func(common.Hash, map[string]*PeerCellDelivery, *types.CustodyBitmap) error fetchPayloads func(string, []common.Hash, *types.CustodyBitmap) error dropPeer func(string) @@ -136,7 +142,7 @@ type BlobFetcher struct { func NewBlobFetcher( hasPayload func(common.Hash) bool, - addPayload func([]common.Hash, [][]kzg4844.Cell, *types.CustodyBitmap) []error, + addCells func(common.Hash, map[string]*PeerCellDelivery, *types.CustodyBitmap) error, fetchPayloads func(string, []common.Hash, *types.CustodyBitmap) error, dropPeer func(string), custody *types.CustodyBitmap, rand random) *BlobFetcher { return &BlobFetcher{ @@ -154,7 +160,7 @@ func NewBlobFetcher( requests: make(map[string][]*cellRequest), alternates: make(map[common.Hash]map[string]*types.CustodyBitmap), hasPayload: hasPayload, - addPayload: addPayload, + addCells: addCells, fetchPayloads: fetchPayloads, dropPeer: dropPeer, custody: custody, @@ -470,21 +476,18 @@ func (f *BlobFetcher) loop() { // Unexpected hash, ignore continue } - // delivery.cells[i] contains cells for all blobs - // in blob-major order: [blob0_cell0, ..., blob0_cellN, blob1_cell0, ...]. indices := delivery.cellBitmap.Indices() cellsPerBlob := len(indices) if cellsPerBlob > 0 { status := f.fetches[hash] blobCount := len(delivery.cells[i]) / cellsPerBlob - // Initialize per-blob accumulators on first delivery if status.blobCount == 0 { status.blobCount = blobCount - status.blobCells = make([][]kzg4844.Cell, blobCount) + status.deliveries = make(map[string]*PeerCellDelivery) } - for b := 0; b < blobCount; b++ { - offset := b * cellsPerBlob - status.blobCells[b] = append(status.blobCells[b], delivery.cells[i][offset:offset+cellsPerBlob]...) + status.deliveries[delivery.origin] = &PeerCellDelivery{ + Cells: delivery.cells[i], + Indices: indices, } status.fetched = append(status.fetched, indices...) } @@ -515,28 +518,10 @@ func (f *BlobFetcher) loop() { if completed { blobFetcherFetchTime.Update(int64(time.Duration(f.clock.Now() - request.time))) - fetchStatus := f.fetches[hash] + status := f.fetches[hash] + collectedCustody := types.NewCustodyBitmap(status.fetched) + f.addCells(hash, status.deliveries, &collectedCustody) - // Sort each blob's cells by ascending custody index. - // RecoverBlobs expects cells[k] to correspond to custodyIndices[k], - // and custodyIndices come from CustodyBitmap.Indices() which is always sorted. - perm := make([]int, len(fetchStatus.fetched)) - for i := range perm { - perm[i] = i - } - slices.SortFunc(perm, func(a, b int) int { - return int(fetchStatus.fetched[a]) - int(fetchStatus.fetched[b]) - }) - var assembled []kzg4844.Cell - for _, blobCells := range fetchStatus.blobCells { - for _, p := range perm { - assembled = append(assembled, blobCells[p]) - } - } - collectedCustody := types.NewCustodyBitmap(fetchStatus.fetched) - f.addPayload([]common.Hash{hash}, [][]kzg4844.Cell{assembled}, &collectedCustody) - - // remove announces from other peers for peer, txset := range f.announces { delete(txset, hash) if len(txset) == 0 { diff --git a/eth/fetcher/blob_fetcher_test.go b/eth/fetcher/blob_fetcher_test.go index de11b30da3..589957bf23 100644 --- a/eth/fetcher/blob_fetcher_test.go +++ b/eth/fetcher/blob_fetcher_test.go @@ -17,7 +17,6 @@ package fetcher import ( - "fmt" "slices" "testing" @@ -149,8 +148,8 @@ func TestBlobFetcherFullFetch(t *testing.T) { init: func() *BlobFetcher { return NewBlobFetcher( func(common.Hash) bool { return false }, - func(txs []common.Hash, _ [][]kzg4844.Cell, _ *types.CustodyBitmap) []error { - return make([]error, len(txs)) + func(common.Hash, map[string]*PeerCellDelivery, *types.CustodyBitmap) error { + return nil }, func(string, []common.Hash, *types.CustodyBitmap) error { return nil }, func(string) {}, @@ -238,8 +237,8 @@ func TestBlobFetcherPartialFetch(t *testing.T) { init: func() *BlobFetcher { return NewBlobFetcher( func(common.Hash) bool { return false }, - func(txs []common.Hash, _ [][]kzg4844.Cell, _ *types.CustodyBitmap) []error { - return make([]error, len(txs)) + func(common.Hash, map[string]*PeerCellDelivery, *types.CustodyBitmap) error { + return nil }, func(string, []common.Hash, *types.CustodyBitmap) error { return nil }, func(string) {}, @@ -331,8 +330,8 @@ func TestBlobFetcherFullDelivery(t *testing.T) { init: func() *BlobFetcher { return NewBlobFetcher( func(common.Hash) bool { return false }, - func(txs []common.Hash, _ [][]kzg4844.Cell, _ *types.CustodyBitmap) []error { - return make([]error, len(txs)) + func(common.Hash, map[string]*PeerCellDelivery, *types.CustodyBitmap) error { + return nil }, func(string, []common.Hash, *types.CustodyBitmap) error { return nil }, func(string) {}, @@ -377,8 +376,8 @@ func TestBlobFetcherPartialDelivery(t *testing.T) { init: func() *BlobFetcher { return NewBlobFetcher( func(common.Hash) bool { return false }, - func(txs []common.Hash, _ [][]kzg4844.Cell, _ *types.CustodyBitmap) []error { - return make([]error, len(txs)) + func(common.Hash, map[string]*PeerCellDelivery, *types.CustodyBitmap) error { + return nil }, func(string, []common.Hash, *types.CustodyBitmap) error { return nil }, func(string) {}, @@ -511,8 +510,8 @@ func TestBlobFetcherAvailabilityTimeout(t *testing.T) { init: func() *BlobFetcher { return NewBlobFetcher( func(common.Hash) bool { return false }, - func(txs []common.Hash, _ [][]kzg4844.Cell, _ *types.CustodyBitmap) []error { - return make([]error, len(txs)) + func(common.Hash, map[string]*PeerCellDelivery, *types.CustodyBitmap) error { + return nil }, func(string, []common.Hash, *types.CustodyBitmap) error { return nil }, func(string) {}, @@ -551,8 +550,8 @@ func TestBlobFetcherPeerDrop(t *testing.T) { init: func() *BlobFetcher { return NewBlobFetcher( func(common.Hash) bool { return false }, - func(txs []common.Hash, _ [][]kzg4844.Cell, _ *types.CustodyBitmap) []error { - return make([]error, len(txs)) + func(common.Hash, map[string]*PeerCellDelivery, *types.CustodyBitmap) error { + return nil }, func(string, []common.Hash, *types.CustodyBitmap) error { return nil }, func(string) {}, @@ -626,8 +625,8 @@ func TestBlobFetcherFetchTimeout(t *testing.T) { init: func() *BlobFetcher { return NewBlobFetcher( func(common.Hash) bool { return false }, - func(txs []common.Hash, _ [][]kzg4844.Cell, _ *types.CustodyBitmap) []error { - return make([]error, len(txs)) + func(common.Hash, map[string]*PeerCellDelivery, *types.CustodyBitmap) error { + return nil }, func(string, []common.Hash, *types.CustodyBitmap) error { return nil }, func(string) {}, @@ -1012,40 +1011,21 @@ func TestMultiBlobDeliveryVerification(t *testing.T) { init: func() *BlobFetcher { return NewBlobFetcher( func(common.Hash) bool { return false }, - func(txs []common.Hash, cells [][]kzg4844.Cell, cst *types.CustodyBitmap) []error { - // Verify delivered cells pass KZG cell proof verification - // Debug: compare with expected cells - expectedCells := selectMultiBlobCells(sidecar, custody) - for ci, c := range cells { - if len(c) != len(expectedCells) { - verifyErr = fmt.Errorf("cell count mismatch: have %d, want %d", len(c), len(expectedCells)) - return make([]error, len(txs)) - } - for j := range c { - if c[j] != expectedCells[j] { - verifyErr = fmt.Errorf("tx %d cell %d mismatch (custody=%v)", ci, j, cst.Indices()) - return make([]error, len(txs)) - } - } - } - for _, c := range cells { - cs := &types.BlobTxCellSidecar{ - Version: sidecar.Version, - Cells: c, - Commitments: sidecar.Commitments, - Proofs: sidecar.Proofs, - Custody: *cst, - } - indices := cs.Custody.Indices() + func(hash common.Hash, deliveries map[string]*PeerCellDelivery, cst *types.CustodyBitmap) error { + // Verify each peer's delivered cells pass KZG cell proof verification + for _, d := range deliveries { var cellProofs []kzg4844.Proof - for blobIdx := range len(cs.Commitments) { - for _, proofIdx := range indices { - cellProofs = append(cellProofs, cs.Proofs[blobIdx*kzg4844.CellProofsPerBlob+int(proofIdx)]) + for blobIdx := 0; blobIdx < len(sidecar.Commitments); blobIdx++ { + for _, idx := range d.Indices { + cellProofs = append(cellProofs, sidecar.Proofs[blobIdx*kzg4844.CellProofsPerBlob+int(idx)]) } } - verifyErr = kzg4844.VerifyCells(cs.Cells, cs.Commitments, cellProofs, indices) + verifyErr = kzg4844.VerifyCells(d.Cells, sidecar.Commitments, cellProofs, d.Indices) + if verifyErr != nil { + return verifyErr + } } - return make([]error, len(txs)) + return nil }, func(string, []common.Hash, *types.CustodyBitmap) error { return nil }, func(string) {}, diff --git a/eth/fetcher/tx_fetcher.go b/eth/fetcher/tx_fetcher.go index f4e9271302..6e2ea24426 100644 --- a/eth/fetcher/tx_fetcher.go +++ b/eth/fetcher/tx_fetcher.go @@ -180,10 +180,10 @@ type TxFetcher struct { alternates map[common.Hash]map[string]struct{} // In-flight transaction alternate origins if retrieval fails // Callbacks - validateMeta func(common.Hash, byte) error // Validate a tx metadata based on the local txpool - addTxs func([]*types.Transaction) []error // Insert a batch of transactions into local txpool - fetchTxs func(string, []common.Hash) error // Retrieves a set of txs from a remote peer - dropPeer func(string) // Drops a peer in case of announcement violation + validateMeta func(common.Hash, byte) error // Validate a tx metadata based on the local txpool + addTxs func(string, []*types.Transaction) []error // Insert a batch of transactions into local txpool + fetchTxs func(string, []common.Hash) error // Retrieves a set of txs from a remote peer + dropPeer func(string) // Drops a peer in case of announcement violation step chan struct{} // Notification channel when the fetcher loop iterates clock mclock.Clock // Monotonic clock or simulated clock for tests @@ -194,7 +194,7 @@ type TxFetcher struct { // NewTxFetcher creates a transaction fetcher to retrieve transaction // based on hash announcements. // Chain can be nil to disable on-chain checks. -func NewTxFetcher(chain *core.BlockChain, validateMeta func(common.Hash, byte) error, addTxs func([]*types.Transaction) []error, fetchTxs func(string, []common.Hash) error, dropPeer func(string)) *TxFetcher { +func NewTxFetcher(chain *core.BlockChain, validateMeta func(common.Hash, byte) error, addTxs func(string, []*types.Transaction) []error, fetchTxs func(string, []common.Hash) error, dropPeer func(string)) *TxFetcher { return NewTxFetcherForTests(chain, validateMeta, addTxs, fetchTxs, dropPeer, mclock.System{}, time.Now, nil) } @@ -202,7 +202,7 @@ func NewTxFetcher(chain *core.BlockChain, validateMeta func(common.Hash, byte) e // a simulated version and the internal randomness with a deterministic one. // Chain can be nil to disable on-chain checks. func NewTxFetcherForTests( - chain *core.BlockChain, validateMeta func(common.Hash, byte) error, addTxs func([]*types.Transaction) []error, fetchTxs func(string, []common.Hash) error, dropPeer func(string), + chain *core.BlockChain, validateMeta func(common.Hash, byte) error, addTxs func(string, []*types.Transaction) []error, fetchTxs func(string, []common.Hash) error, dropPeer func(string), clock mclock.Clock, realTime func() time.Time, rand *mrand.Rand) *TxFetcher { return &TxFetcher{ notify: make(chan *txAnnounce), @@ -352,7 +352,7 @@ func (f *TxFetcher) Enqueue(peer string, txs []*types.Transaction, direct bool) ) batch := txs[i:end] - for j, err := range f.addTxs(batch) { + for j, err := range f.addTxs(peer, batch) { // Track the transaction hash if the price is too low for us. // Avoid re-request this transaction when we receive another // announcement. diff --git a/eth/fetcher/tx_fetcher_test.go b/eth/fetcher/tx_fetcher_test.go index de8413142a..4e8ea14000 100644 --- a/eth/fetcher/tx_fetcher_test.go +++ b/eth/fetcher/tx_fetcher_test.go @@ -93,7 +93,7 @@ func newTestTxFetcher() *TxFetcher { return NewTxFetcher( nil, func(common.Hash, byte) error { return nil }, - func(txs []*types.Transaction) []error { + func(_ string, txs []*types.Transaction) []error { return make([]error, len(txs)) }, func(string, []common.Hash) error { return nil }, @@ -1172,7 +1172,7 @@ func TestTransactionFetcherUnderpricedDedup(t *testing.T) { testTransactionFetcherParallel(t, txFetcherTest{ init: func() *TxFetcher { f := newTestTxFetcher() - f.addTxs = func(txs []*types.Transaction) []error { + f.addTxs = func(_ string, txs []*types.Transaction) []error { errs := make([]error, len(txs)) for i := 0; i < len(errs); i++ { if i%3 == 0 { @@ -1270,7 +1270,7 @@ func TestTransactionFetcherUnderpricedDoSProtection(t *testing.T) { testTransactionFetcher(t, txFetcherTest{ init: func() *TxFetcher { f := newTestTxFetcher() - f.addTxs = func(txs []*types.Transaction) []error { + f.addTxs = func(_ string, txs []*types.Transaction) []error { errs := make([]error, len(txs)) for i := 0; i < len(errs); i++ { errs[i] = txpool.ErrUnderpriced @@ -1787,7 +1787,7 @@ func TestTransactionProtocolViolation(t *testing.T) { testTransactionFetcherParallel(t, txFetcherTest{ init: func() *TxFetcher { f := newTestTxFetcher() - f.addTxs = func(txs []*types.Transaction) []error { + f.addTxs = func(_ string, txs []*types.Transaction) []error { var errs []error for range txs { errs = append(errs, txpool.ErrKZGVerificationError) @@ -2194,7 +2194,7 @@ func TestTransactionForgotten(t *testing.T) { fetcher := NewTxFetcherForTests( nil, func(common.Hash, byte) error { return nil }, - func(txs []*types.Transaction) []error { + func(_ string, txs []*types.Transaction) []error { errs := make([]error, len(txs)) for i := 0; i < len(errs); i++ { errs[i] = txpool.ErrUnderpriced diff --git a/eth/handler.go b/eth/handler.go index c584e7a78b..34ce49102d 100644 --- a/eth/handler.go +++ b/eth/handler.go @@ -32,6 +32,7 @@ import ( "github.com/ethereum/go-ethereum/core" "github.com/ethereum/go-ethereum/core/rawdb" "github.com/ethereum/go-ethereum/core/txpool" + "github.com/ethereum/go-ethereum/core/txpool/blobpool" "github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/crypto/kzg4844" "github.com/ethereum/go-ethereum/eth/downloader" @@ -104,8 +105,8 @@ type blobPool interface { Has(hash common.Hash) bool GetCells(hash common.Hash, mask types.CustodyBitmap) ([]kzg4844.Cell, error) HasPayload(hash common.Hash) bool - AddPayload([]common.Hash, [][]kzg4844.Cell, *types.CustodyBitmap) []error GetCustody(hash common.Hash) *types.CustodyBitmap + AddPooledTx(pooledTx *blobpool.PooledBlobTx) error } // handlerConfig is the collection of initialization parameters to create a full @@ -138,6 +139,7 @@ type handler struct { downloader *downloader.Downloader txFetcher *fetcher.TxFetcher blobFetcher *fetcher.BlobFetcher + blobBuffer *blobpool.BlobBuffer peers *peerSet txBroadcastKey [16]byte @@ -192,11 +194,34 @@ func newHandler(config *handlerConfig) (*handler, error) { } return p.RequestTxs(hashes) } - addTxs := func(txs []*types.Transaction) []error { - return h.txpool.Add(txs, false) + // Construct the blob buffer for assembling blob txs from separate tx and cell deliveries + h.blobBuffer = blobpool.NewBlobBuffer(h.blobpool.AddPooledTx, h.removePeer) + + addTxs := func(peer string, txs []*types.Transaction) []error { + errs := make([]error, len(txs)) + p := h.peers.peer(peer) + isETH71 := p != nil && p.Version() >= eth.ETH71 + + var poolTxs []*types.Transaction + var index []int + for i, tx := range txs { + if isETH71 && tx.Type() == types.BlobTxType { + errs[i] = h.blobBuffer.AddTx(tx, peer) + } else { + poolTxs = append(poolTxs, tx) + index = append(index, i) + } + } + if len(poolTxs) > 0 { + poolErrs := h.txpool.Add(poolTxs, false) + for j, idx := range index { + errs[idx] = poolErrs[j] + } + } + return errs } validateMeta := func(tx common.Hash, kind byte) error { - if h.txpool.Has(tx) { + if h.txpool.Has(tx) || h.blobBuffer.HasTx(tx) { return txpool.ErrAlreadyKnown } if !h.txpool.FilterType(kind) { @@ -214,7 +239,17 @@ func newHandler(config *handlerConfig) (*handler, error) { } return p.RequestPayload(hashes, cells) } - h.blobFetcher = fetcher.NewBlobFetcher(h.blobpool.HasPayload, h.blobpool.AddPayload, fetchPayloads, h.removePeer, &config.Custody, nil) + hasPayload := func(hash common.Hash) bool { + return h.blobpool.HasPayload(hash) || h.blobBuffer.HasCells(hash) + } + addCells := func(hash common.Hash, deliveries map[string]*fetcher.PeerCellDelivery, custody *types.CustodyBitmap) error { + converted := make(map[string]*blobpool.PeerDelivery, len(deliveries)) + for peer, d := range deliveries { + converted[peer] = &blobpool.PeerDelivery{Cells: d.Cells, Indices: d.Indices} + } + return h.blobBuffer.AddCells(hash, converted, custody) + } + h.blobFetcher = fetcher.NewBlobFetcher(hasPayload, addCells, fetchPayloads, h.removePeer, &config.Custody, nil) return h, nil } diff --git a/eth/handler_test.go b/eth/handler_test.go index 0bb994cb13..75b8d12668 100644 --- a/eth/handler_test.go +++ b/eth/handler_test.go @@ -30,6 +30,7 @@ import ( "github.com/ethereum/go-ethereum/core" "github.com/ethereum/go-ethereum/core/rawdb" "github.com/ethereum/go-ethereum/core/txpool" + "github.com/ethereum/go-ethereum/core/txpool/blobpool" "github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/crypto" "github.com/ethereum/go-ethereum/crypto/kzg4844" @@ -222,13 +223,12 @@ func (p *testTxPool) AddCells(hash common.Hash, cells []kzg4844.Cell, mask types p.custody[hash] = mask } -func (p *testTxPool) AddPayload(txs []common.Hash, cells [][]kzg4844.Cell, custody *types.CustodyBitmap) []error { +func (p *testTxPool) AddPooledTx(pooledTx *blobpool.PooledBlobTx) error { p.lock.Lock() defer p.lock.Unlock() - - for i, tx := range txs { - p.cellPool[tx] = cells[i] - } + hash := pooledTx.Transaction.Hash() + p.cellPool[hash] = pooledTx.Sidecar.Cells + p.txPool[hash] = pooledTx.Transaction return nil } diff --git a/tests/fuzzers/txfetcher/txfetcher_fuzzer.go b/tests/fuzzers/txfetcher/txfetcher_fuzzer.go index 872573d40f..1e15b991fa 100644 --- a/tests/fuzzers/txfetcher/txfetcher_fuzzer.go +++ b/tests/fuzzers/txfetcher/txfetcher_fuzzer.go @@ -80,7 +80,7 @@ func fuzz(input []byte) int { f := fetcher.NewTxFetcherForTests( nil, func(common.Hash, byte) error { return nil }, - func(txs []*types.Transaction) []error { + func(_ string, txs []*types.Transaction) []error { return make([]error, len(txs)) }, func(string, []common.Hash) error { return nil },