add buffer.go and proper peer dropping mechanism

This commit is contained in:
healthykim 2026-04-02 21:48:56 +09:00
parent 64982f03ae
commit daee525741
12 changed files with 663 additions and 413 deletions

View file

@ -152,7 +152,7 @@ type blobTxMeta struct {
// newBlobTxMeta retrieves the indexed metadata fields from a pooled blob transaction
// and assembles a helper struct to track in memory.
func newBlobTxMeta(id uint64, size uint64, storageSize uint32, pooledTx *pooledBlobTx) *blobTxMeta {
func newBlobTxMeta(id uint64, size uint64, storageSize uint32, pooledTx *PooledBlobTx) *blobTxMeta {
var version byte
if pooledTx.Sidecar != nil {
version = pooledTx.Sidecar.Version
@ -180,7 +180,7 @@ func newBlobTxMeta(id uint64, size uint64, storageSize uint32, pooledTx *pooledB
return meta
}
type pooledBlobTx struct {
type PooledBlobTx struct {
Transaction *types.Transaction
Sidecar *types.BlobTxCellSidecar
Size uint64 // original transaction size (including blobs)
@ -188,7 +188,7 @@ type pooledBlobTx struct {
}
// newPooledBlobTx creates pooledBlobTx struct.
func newPooledBlobTx(tx *types.Transaction) (*pooledBlobTx, error) {
func newPooledBlobTx(tx *types.Transaction) (*PooledBlobTx, error) {
if tx.BlobTxSidecar() == nil {
return nil, errors.New("missing blob sidecar")
}
@ -196,7 +196,7 @@ func newPooledBlobTx(tx *types.Transaction) (*pooledBlobTx, error) {
if err != nil {
return nil, err
}
return &pooledBlobTx{
return &PooledBlobTx{
Transaction: tx.WithoutBlobTxSidecar(),
Sidecar: sidecar,
Size: tx.Size(),
@ -205,7 +205,7 @@ func newPooledBlobTx(tx *types.Transaction) (*pooledBlobTx, error) {
}
// convert recovers blobs from cell sidecar and returns a full transaction with blob sidecar.
func (ptx *pooledBlobTx) convert() (*types.Transaction, error) {
func (ptx *PooledBlobTx) convert() (*types.Transaction, error) {
if ptx.Sidecar == nil {
return nil, errors.New("cell sidecar missing")
}
@ -409,17 +409,9 @@ type BlobPool struct {
stored uint64 // Useful data size of all transactions on disk
limbo *limbo // Persistent data store for the non-finalized blobs
gapped map[common.Address][]*pooledBlobTx // Transactions that are currently gapped (nonce too high)
gapped map[common.Address][]*PooledBlobTx // Transactions that are currently gapped (nonce too high)
gappedSource map[common.Hash]common.Address // Source of gapped transactions to allow rechecking on inclusion
queue map[common.Hash]*types.Transaction // buffer
indexQueue map[common.Address][]*blobTxMeta // tx hashes in queue per address, sorted by nonce
spentQueue map[common.Address]*uint256.Int // Expenditure tracking for accounts, only for buffered txs
replacementQueue map[common.Address]map[uint64]*blobTxMeta // Replacement queue for pooled transactions
cellQueue map[common.Hash][]kzg4844.Cell // cell buffer
custodyQueue map[common.Hash]*types.CustodyBitmap
signer types.Signer // Transaction signer to use for sender recovery
chain BlockChain // Chain object to access the state through
@ -446,21 +438,15 @@ func New(config Config, chain BlockChain, hasPendingAuth func(common.Address) bo
// Create the transaction pool with its initial settings
return &BlobPool{
config: config,
hasPendingAuth: hasPendingAuth,
signer: types.LatestSigner(chain.Config()),
chain: chain,
lookup: newLookup(),
index: make(map[common.Address][]*blobTxMeta),
spent: make(map[common.Address]*uint256.Int),
gapped: make(map[common.Address][]*pooledBlobTx),
gappedSource: make(map[common.Hash]common.Address),
queue: make(map[common.Hash]*types.Transaction),
indexQueue: make(map[common.Address][]*blobTxMeta),
spentQueue: make(map[common.Address]*uint256.Int),
cellQueue: make(map[common.Hash][]kzg4844.Cell),
custodyQueue: make(map[common.Hash]*types.CustodyBitmap),
replacementQueue: make(map[common.Address]map[uint64]*blobTxMeta),
config: config,
hasPendingAuth: hasPendingAuth,
signer: types.LatestSigner(chain.Config()),
chain: chain,
lookup: newLookup(),
index: make(map[common.Address][]*blobTxMeta),
spent: make(map[common.Address]*uint256.Int),
gapped: make(map[common.Address][]*PooledBlobTx),
gappedSource: make(map[common.Hash]common.Address),
}
}
@ -474,7 +460,6 @@ func (p *BlobPool) FilterType(kind byte) bool {
return kind == types.BlobTxType
}
// Init sets the gas price needed to keep a transaction in the pool and the chain
// head to allow balance / nonce checks. The transaction journal will be loaded
// from disk and filtered based on the provided starting settings.
func (p *BlobPool) Init(gasTip uint64, head *types.Header, reserver txpool.Reserver) error {
@ -659,7 +644,7 @@ func (p *BlobPool) Close() error {
// If a pooledBlobTx is found, it is indexed directly and nil is returned.
func (p *BlobPool) parseTransaction(id uint64, size uint32, blob []byte) (*types.Transaction, error) {
tx := new(types.Transaction)
pooledTx := new(pooledBlobTx)
pooledTx := new(PooledBlobTx)
if err := rlp.DecodeBytes(blob, pooledTx); err != nil {
// This path is impossible unless the disk data representation changes
@ -975,7 +960,7 @@ func (p *BlobPool) offload(addr common.Address, nonce uint64, id uint64, inclusi
log.Error("Blobs missing for included transaction", "from", addr, "nonce", nonce, "id", id, "err", err)
return
}
var pooledTx pooledBlobTx
var pooledTx PooledBlobTx
if err = rlp.DecodeBytes(data, &pooledTx); err != nil {
log.Error("Blobs corrupted for included transaction", "from", addr, "nonce", nonce, "id", id, "err", err)
return
@ -1063,7 +1048,7 @@ func (p *BlobPool) Reset(oldHead, newHead *types.Header) {
log.Error("Blobs missing for announcable transaction", "from", addr, "nonce", meta.nonce, "id", meta.id, "err", err)
continue
}
var pooledTx pooledBlobTx
var pooledTx PooledBlobTx
if err = rlp.DecodeBytes(data, &pooledTx); err != nil {
log.Error("Blobs corrupted for announcable transaction", "from", addr, "nonce", meta.nonce, "id", meta.id, "err", err)
continue
@ -1377,77 +1362,31 @@ func (p *BlobPool) checkDelegationLimit(tx *types.Transaction) error {
// rules and adheres to some heuristic limits of the local node (price and size).
// This function assumes the static validation has been performed already and
// only runs the stateful checks with lock protection.
// If buffer field is set to true, consider txs in the queue as well.
// This is to prevent fetching cells of invalid transactions, which would be expensive.
func (p *BlobPool) validateTx(tx *types.Transaction, buffer bool) error {
func (p *BlobPool) validateTx(tx *types.Transaction) error {
if err := p.ValidateTxBasics(tx); err != nil {
return err
}
// Ensure the transaction adheres to the stateful pool filters (nonce, balance)
stateOpts := &txpool.ValidationOptionsWithState{
State: p.state,
FirstNonceGap: func(addr common.Address) uint64 {
// Nonce gaps are permitted in the blob pool, but only as part of the
// in-memory 'gapped' buffer. We expose the gap here to validateTx,
// then handle the error by adding to the buffer. The first gap will
// be the next nonce shifted by however many transactions we already
// have pooled.
result := p.state.GetNonce(addr) + uint64(len(p.index[addr]))
if buffer {
return result + uint64(len(p.indexQueue[addr]))
}
return result
return p.state.GetNonce(addr) + uint64(len(p.index[addr]))
},
UsedAndLeftSlots: func(addr common.Address) (int, int) {
have := len(p.index[addr])
if buffer {
have += len(p.indexQueue[addr])
}
if have >= maxTxsPerAccount {
return have, 0
}
return have, maxTxsPerAccount - have
},
ExistingExpenditure: func(addr common.Address) *big.Int {
result := new(big.Int)
if spent := p.spent[addr]; spent != nil {
result.Add(result, spent.ToBig())
return spent.ToBig()
}
// calculate expenditure after replacements
if buffer {
if replacements := p.replacementQueue[addr]; replacements != nil {
next := p.state.GetNonce(addr)
for nonce, replacement := range replacements {
if nonce >= next && len(p.index[addr]) > int(nonce-next) {
originalCost := p.index[addr][nonce-next].costCap
replacementCost := replacement.costCap
result.Add(result, new(uint256.Int).Sub(replacementCost, originalCost).ToBig())
}
}
}
if spentQueue := p.spentQueue[addr]; spentQueue != nil {
result.Add(result, spentQueue.ToBig())
}
}
return result
return new(big.Int)
},
ExistingCost: func(addr common.Address, nonce uint64) *big.Int {
next := p.state.GetNonce(addr)
if buffer {
if p.replacementQueue[addr] != nil && p.replacementQueue[addr][nonce] != nil {
return p.replacementQueue[addr][nonce].costCap.ToBig()
}
pooledCount := uint64(len(p.index[addr]))
if nonce >= next+pooledCount && uint64(len(p.indexQueue[addr])) > nonce-next-pooledCount {
return p.indexQueue[addr][nonce-next-pooledCount].costCap.ToBig()
}
}
if uint64(len(p.index[addr])) > nonce-next {
return p.index[addr][int(nonce-next)].costCap.ToBig()
}
@ -1460,33 +1399,15 @@ func (p *BlobPool) validateTx(tx *types.Transaction, buffer bool) error {
if err := p.checkDelegationLimit(tx); err != nil {
return err
}
// If the transaction replaces an existing one, ensure that price bumps are
// adhered to.
var (
from, _ = types.Sender(p.signer, tx) // already validated above
from, _ = types.Sender(p.signer, tx)
next = p.state.GetNonce(from)
)
var prev *blobTxMeta
nonce := tx.Nonce()
if nonce < next+uint64(len(p.index[from])) {
// pooled tx
prev = p.index[from][nonce-next]
// check replacement if it is buffer tx validation
if buffer && p.replacementQueue[from] != nil {
if replacement := p.replacementQueue[from][nonce]; replacement != nil {
prev = replacement
}
}
} else if buffer {
pooledCount := uint64(len(p.index[from]))
if nonce >= next+pooledCount {
offset := nonce - next - pooledCount
if uint64(len(p.indexQueue[from])) > offset && offset > 0 {
prev = p.indexQueue[from][offset]
}
}
}
if prev == nil {
return nil
@ -1530,7 +1451,7 @@ func (p *BlobPool) Has(hash common.Hash) bool {
p.lock.RLock()
defer p.lock.RUnlock()
if p.lookup.exists(hash) || p.queue[hash] != nil {
if p.lookup.exists(hash) {
return true
}
@ -1543,7 +1464,7 @@ func (p *BlobPool) HasPayload(hash common.Hash) bool {
p.lock.RLock()
defer p.lock.RUnlock()
return p.lookup.exists(hash) || len(p.cellQueue[hash]) != 0
return p.lookup.exists(hash)
}
// getRLP returns the raw RLP-encoded pooledBlobTx data from the store.
@ -1587,7 +1508,7 @@ func (p *BlobPool) Get(hash common.Hash, includeBlob bool) *types.Transaction {
if len(data) == 0 {
return nil
}
var pooledTx pooledBlobTx
var pooledTx PooledBlobTx
if err := rlp.DecodeBytes(data, &pooledTx); err != nil {
log.Error("Blobs corrupted for traced transaction", "hash", hash, "err", err)
return nil
@ -1613,7 +1534,7 @@ func (p *BlobPool) GetRLP(hash common.Hash, includeBlob bool) []byte {
if len(data) == 0 {
return nil
}
var pooledTx pooledBlobTx
var pooledTx PooledBlobTx
if err := rlp.DecodeBytes(data, &pooledTx); err != nil {
log.Error("Failed to decode transaction in blobpool", "hash", hash, "err", err)
return nil
@ -1701,7 +1622,7 @@ func (p *BlobPool) GetBlobs(vhashes []common.Hash, version byte) ([]*kzg4844.Blo
}
// Decode the blob transaction
var pooledTx pooledBlobTx
var pooledTx PooledBlobTx
if err := rlp.DecodeBytes(data, &pooledTx); err != nil {
log.Error("Blobs corrupted for traced transaction", "id", txID, "err", err)
continue
@ -1780,7 +1701,7 @@ func (p *BlobPool) GetBlobCells(vhashes []common.Hash, mask types.CustodyBitmap)
if err != nil {
continue
}
var pooledTx pooledBlobTx
var pooledTx PooledBlobTx
if err := rlp.DecodeBytes(data, &pooledTx); err != nil {
continue
}
@ -1851,127 +1772,19 @@ func (p *BlobPool) Add(txs []*types.Transaction, sync bool) []error {
if errs[i] = p.ValidateTxBasics(tx); errs[i] != nil {
continue
}
sc := tx.BlobTxSidecar()
if sc != nil && len(sc.Blobs) != 0 {
pooledTx, err := newPooledBlobTx(tx)
if err != nil {
errs[i] = err
continue
}
errs[i] = p.add(pooledTx)
} else {
errs[i] = p.addBuffer(tx)
pooledTx, err := newPooledBlobTx(tx)
if err != nil {
errs[i] = err
continue
}
errs[i] = p.AddPooledTx(pooledTx)
}
return errs
}
func (p *BlobPool) addBuffer(tx *types.Transaction) (err error) {
p.lock.Lock()
defer p.lock.Unlock()
if cells, ok := p.cellQueue[tx.Hash()]; ok {
sidecar := tx.BlobTxSidecar()
var cellSidecar types.BlobTxCellSidecar
blobCount := len(sidecar.Commitments)
if len(cells) >= kzg4844.DataPerBlob*blobCount {
blob, err := kzg4844.RecoverBlobs(cells, p.custodyQueue[tx.Hash()].Indices())
if err != nil {
return err
}
extendedCells, err := kzg4844.ComputeCells(blob)
if err != nil {
return err
}
cellSidecar = types.BlobTxCellSidecar{
Version: sidecar.Version,
Cells: extendedCells,
Commitments: sidecar.Commitments,
Proofs: sidecar.Proofs,
Custody: *types.CustodyBitmapAll,
}
} else {
cellSidecar = types.BlobTxCellSidecar{
Version: sidecar.Version,
Cells: cells,
Commitments: sidecar.Commitments,
Proofs: sidecar.Proofs,
Custody: *p.custodyQueue[tx.Hash()],
}
}
err := p.addLocked(&pooledBlobTx{Transaction: tx.WithoutBlobTxSidecar(), Sidecar: &cellSidecar, Size: tx.Size()}, true)
if err == nil {
delete(p.cellQueue, tx.Hash())
delete(p.custodyQueue, tx.Hash())
}
return err
}
if err := p.validateTx(tx, true); err != nil {
return err
}
// Store the original tx in queue (with BlobTxSidecar intact — Blobs may be nil
// from ETH/71 but commitments/proofs are preserved for cell validation later).
p.queue[tx.Hash()] = tx
from, _ := types.Sender(p.signer, tx)
// Build a partial pooledBlobTx for metadata tracking.
var cellSidecar *types.BlobTxCellSidecar
if sidecar := tx.BlobTxSidecar(); sidecar != nil {
cellSidecar = &types.BlobTxCellSidecar{
Version: sidecar.Version,
Commitments: sidecar.Commitments,
Proofs: sidecar.Proofs,
}
}
next := p.state.GetNonce(from)
nonce := tx.Nonce()
pooledCount := uint64(len(p.index[from]))
//todo this is strange
meta := newBlobTxMeta(0, tx.Size(), 0, &pooledBlobTx{Transaction: tx, Sidecar: cellSidecar, Size: tx.Size()})
if nonce < next+pooledCount {
// Pooled transaction replacements are stored in replacementQueue for expenditure validation
// for future transactions from the same account. This overestimates expenditure considering
// that replacement transaction payload fetch may fail and the tx can be dropped.
// However, this conservative approach prevents transactions that passed validation when
// entering the buffer from failing expenditure validation due to transaction replacements.
if p.replacementQueue[from] == nil {
p.replacementQueue[from] = make(map[uint64]*blobTxMeta)
}
if existingReplacement := p.replacementQueue[from][nonce]; existingReplacement != nil {
delete(p.queue, existingReplacement.hash)
}
p.replacementQueue[from][nonce] = meta
} else {
if p.spentQueue[from] == nil {
p.spentQueue[from] = new(uint256.Int)
}
bufferOffset := int(nonce - (next + pooledCount))
if len(p.indexQueue[from]) > bufferOffset {
// Replace buffer transaction
prev := p.indexQueue[from][bufferOffset]
delete(p.queue, prev.hash)
p.indexQueue[from][bufferOffset] = meta
p.spentQueue[from] = new(uint256.Int).Sub(p.spentQueue[from], prev.costCap)
p.spentQueue[from] = new(uint256.Int).Add(p.spentQueue[from], meta.costCap)
dropReplacedMeter.Mark(1)
} else {
p.indexQueue[from] = append(p.indexQueue[from], meta)
p.spentQueue[from] = new(uint256.Int).Add(p.spentQueue[from], meta.costCap)
}
}
return nil
}
// add inserts a new blob transaction into the pool if it passes validation (both
// consensus validity and pool restrictions).
func (p *BlobPool) add(pooledTx *pooledBlobTx) (err error) {
func (p *BlobPool) AddPooledTx(pooledTx *PooledBlobTx) (err error) {
// The blob pool blocks on adding a transaction. This is because blob txs are
// only even pulled from the network, so this method will act as the overload
// protection for fetches.
@ -1990,12 +1803,12 @@ func (p *BlobPool) add(pooledTx *pooledBlobTx) (err error) {
// addLocked inserts a new blob transaction into the pool if it passes validation (both
// consensus validity and pool restrictions). It must be called with the pool lock held.
// Only for internal use.
func (p *BlobPool) addLocked(pooledTx *pooledBlobTx, checkGapped bool) (err error) {
func (p *BlobPool) addLocked(pooledTx *PooledBlobTx, checkGapped bool) (err error) {
tx := pooledTx.Transaction
cellSidecar := pooledTx.Sidecar
// Ensure the transaction is valid from all perspectives
if err := p.validateTx(tx, false); err != nil {
if err := p.validateTx(tx); err != nil {
log.Trace("Transaction validation failed", "hash", tx.Hash(), "err", err)
switch {
case errors.Is(err, txpool.ErrUnderpriced):
@ -2550,9 +2363,6 @@ func (p *BlobPool) Status(hash common.Hash) txpool.TxStatus {
if p.lookup.exists(hash) {
return txpool.TxStatusPending
}
if _, ok := p.queue[hash]; ok {
return txpool.TxStatusQueued
}
if _, gapped := p.gappedSource[hash]; gapped {
return txpool.TxStatusQueued
}
@ -2607,7 +2417,7 @@ func (p *BlobPool) Clear() {
// Reset counters and the gapped buffer
p.stored = 0
p.gapped = make(map[common.Address][]*pooledBlobTx)
p.gapped = make(map[common.Address][]*PooledBlobTx)
p.gappedSource = make(map[common.Hash]common.Address)
var (
@ -2640,7 +2450,7 @@ func (p *BlobPool) GetCells(hash common.Hash, mask types.CustodyBitmap) ([]kzg48
return nil, errors.New("tracked blob transaction missing from store")
}
// Decode the blob transaction
var pooledTx pooledBlobTx
var pooledTx PooledBlobTx
if err := rlp.DecodeBytes(data, &pooledTx); err != nil {
return nil, errors.New("blobs corrupted for traced transaction")
}
@ -2661,79 +2471,3 @@ func (p *BlobPool) GetCells(hash common.Hash, mask types.CustodyBitmap) ([]kzg48
}
return cells, nil
}
// AddPayload adds cell payloads for blob transactions.
func (p *BlobPool) AddPayload(txs []common.Hash, cells [][]kzg4844.Cell, custody *types.CustodyBitmap) []error {
p.lock.Lock()
defer p.lock.Unlock()
errs := make([]error, len(txs))
for i, hash := range txs {
if _, ok := p.queue[hash]; !ok {
p.cellQueue[hash] = cells[i]
p.custodyQueue[hash] = custody
continue
}
sidecar := p.queue[hash].BlobTxSidecar()
var cellSidecar types.BlobTxCellSidecar
blobCount := len(sidecar.Commitments)
if len(cells[i]) >= kzg4844.DataPerBlob*blobCount {
blob, err := kzg4844.RecoverBlobs(cells[i], custody.Indices())
if err != nil {
errs[i] = err
continue
}
extendedCells, err := kzg4844.ComputeCells(blob)
if err != nil {
errs[i] = err
continue
}
cellSidecar = types.BlobTxCellSidecar{
Version: sidecar.Version,
Cells: extendedCells,
Commitments: sidecar.Commitments,
Proofs: sidecar.Proofs,
Custody: *types.CustodyBitmapAll,
}
} else {
cellSidecar = types.BlobTxCellSidecar{
Version: sidecar.Version,
Cells: cells[i],
Commitments: sidecar.Commitments,
Proofs: sidecar.Proofs,
Custody: *custody,
}
}
errs[i] = p.addLocked(&pooledBlobTx{Transaction: p.queue[hash].WithoutBlobTxSidecar(), Sidecar: &cellSidecar, Size: p.queue[hash].Size()}, true)
// clean up queues
tx := p.queue[hash]
delete(p.queue, hash)
from, _ := types.Sender(p.signer, tx)
nonce := tx.Nonce()
next := p.state.GetNonce(from)
if p.replacementQueue[from] != nil {
delete(p.replacementQueue[from], nonce)
if len(p.replacementQueue[from]) == 0 {
delete(p.replacementQueue, from)
}
continue
}
// plain tx
pooledCount := uint64(len(p.index[from]))
if nonce < next+pooledCount {
continue
}
offset := int(nonce - next - pooledCount)
if offset > 0 && offset < len(p.indexQueue[from]) {
removed := p.indexQueue[from][offset]
p.indexQueue[from] = append(p.indexQueue[from][:offset], p.indexQueue[from][offset+1:]...)
p.spentQueue[from] = new(uint256.Int).Sub(p.spentQueue[from], removed.costCap)
}
}
return errs
}

View file

@ -2122,7 +2122,7 @@ func benchmarkPoolPending(b *testing.B, datacap uint64) {
}
statedb.AddBalance(addr, uint256.NewInt(1_000_000_000), tracing.BalanceChangeUnspecified)
pooledTx, _ := newPooledBlobTx(tx)
pool.add(pooledTx)
pool.AddPooledTx(pooledTx)
}
statedb.Commit(0, true, false)
defer pool.Close()

View file

@ -0,0 +1,262 @@
// Copyright 2026 The go-ethereum Authors
// This file is part of the go-ethereum library.
//
// The go-ethereum library is free software: you can redistribute it and/or modify
// it under the terms of the GNU Lesser General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// The go-ethereum library is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Lesser General Public License for more details.
//
// You should have received a copy of the GNU Lesser General Public License
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
package blobpool
import (
"cmp"
"fmt"
"slices"
"time"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/crypto/kzg4844"
"github.com/ethereum/go-ethereum/log"
)
const (
bufferLifetime = 2 * time.Minute
)
// PeerDelivery holds cells delivered by a single peer, in blob-major order.
type PeerDelivery struct {
Cells []kzg4844.Cell
Indices []uint64
}
type txEntry struct {
tx *types.Transaction
peer string
added time.Time
}
type cellEntry struct {
deliveries map[string]*PeerDelivery
custody *types.CustodyBitmap
added time.Time
}
type BlobBuffer struct {
txs map[common.Hash]*txEntry
cells map[common.Hash]*cellEntry
addToPool func(*PooledBlobTx) error
dropPeer func(string)
}
func NewBlobBuffer(addToPool func(*PooledBlobTx) error, dropPeer func(string)) *BlobBuffer {
return &BlobBuffer{
txs: make(map[common.Hash]*txEntry),
cells: make(map[common.Hash]*cellEntry),
addToPool: addToPool,
dropPeer: dropPeer,
}
}
// AddTx buffers a blob transaction (without blobs) from an ETH/71 peer.
// If cells are already buffered, verification and pool insertion are attempted.
func (b *BlobBuffer) AddTx(tx *types.Transaction, peer string) error {
b.evict()
hash := tx.Hash()
sidecar := tx.BlobTxSidecar()
if sidecar == nil {
return fmt.Errorf("blob transaction without sidecar")
}
// vhash check
if err := sidecar.ValidateBlobCommitmentHashes(tx.BlobHashes()); err != nil {
log.Warn("Commitment hash mismatch, dropping peer", "peer", peer, "err", err)
b.dropPeer(peer)
return err
}
// proof count check
if len(sidecar.Proofs) < len(sidecar.Commitments)*kzg4844.CellProofsPerBlob {
b.dropPeer(peer)
return fmt.Errorf("insufficient proofs in sidecar")
}
// todo: I also considered performing additional validation for the metrics of the
// tx_fetcher. This could be used to avoid sending GetCells requests when the
// nonce is too low or the transaction is underpriced. However, doing so would
// require taking buffered transactions into account as well, and would require
// allowing the buffer to be part of the fetchers scheduling logic.
// Therefore, I will leave this as a TODO for now.
if entry, ok := b.cells[hash]; ok {
return b.add(hash, tx, entry)
}
b.txs[hash] = &txEntry{tx: tx, peer: peer, added: time.Now()}
return nil
}
// AddCells buffers per-peer cell deliveries from the blob fetcher.
// If the transaction is already buffered, verification and pool insertion are attempted.
func (b *BlobBuffer) AddCells(hash common.Hash, deliveries map[string]*PeerDelivery, custody *types.CustodyBitmap) error {
b.evict()
b.cells[hash] = &cellEntry{
deliveries: deliveries,
custody: custody,
added: time.Now(),
}
if txe, ok := b.txs[hash]; ok {
return b.add(hash, txe.tx, b.cells[hash])
}
return nil
}
// add verifies cells per-peer, sorts them, and adds to the pool.
func (b *BlobBuffer) add(hash common.Hash, tx *types.Transaction, cells *cellEntry) error {
sidecar := tx.BlobTxSidecar()
// Per-peer cell verification
if badPeers := b.verifyCells(cells, sidecar); len(badPeers) > 0 {
b.dropPeers(badPeers)
delete(b.cells, hash)
delete(b.txs, hash)
return fmt.Errorf("cell verification failed")
}
blobCount := len(tx.BlobHashes())
sorted, custody := sortCells(cells, blobCount)
cellSidecar := &types.BlobTxCellSidecar{
Version: sidecar.Version,
Cells: sorted,
Commitments: sidecar.Commitments,
Proofs: sidecar.Proofs,
Custody: *custody,
}
pooledTx := &PooledBlobTx{
Transaction: tx.WithoutBlobTxSidecar(),
Sidecar: cellSidecar,
Size: tx.Size(),
SizeWithoutBlob: tx.WithoutBlob().Size(),
}
err := b.addToPool(pooledTx)
delete(b.cells, hash)
delete(b.txs, hash)
return err
}
func (b *BlobBuffer) HasTx(hash common.Hash) bool {
_, ok := b.txs[hash]
return ok
}
func (b *BlobBuffer) HasCells(hash common.Hash) bool {
_, ok := b.cells[hash]
return ok
}
func (b *BlobBuffer) dropPeers(peers []string) {
if b.dropPeer == nil {
return
}
for _, p := range peers {
b.dropPeer(p)
}
}
func (b *BlobBuffer) evict() {
now := time.Now()
for hash, entry := range b.txs {
if now.Sub(entry.added) > bufferLifetime {
delete(b.txs, hash)
}
}
for hash, entry := range b.cells {
if now.Sub(entry.added) > bufferLifetime {
delete(b.cells, hash)
}
}
}
// verifyCells verifies each peer's cells against the sidecar.
// Returns the list of peers whose cells failed verification.
func (b *BlobBuffer) verifyCells(entry *cellEntry, sidecar *types.BlobTxSidecar) []string {
var badPeers []string
for peer, delivery := range entry.deliveries {
if err := verifyPeerCells(delivery, sidecar); err != nil {
log.Debug("Cell verification failed", "peer", peer, "err", err)
badPeers = append(badPeers, peer)
}
}
return badPeers
}
// verifyPeerCells verifies a single peer's cells against the sidecar proofs.
// delivery.Cells is blob-major: [blob0_cell0..blob0_cellN, blob1_cell0..blob1_cellN, ...]
func verifyPeerCells(delivery *PeerDelivery, sidecar *types.BlobTxSidecar) error {
cellsPerBlob := len(delivery.Indices)
blobCount := len(delivery.Cells) / cellsPerBlob
if blobCount == 0 || blobCount != len(sidecar.Commitments) {
return fmt.Errorf("blob count mismatch: delivery %d, commitments %d", blobCount, len(sidecar.Commitments))
}
// Extract proofs corresponding to this peer's cell indices
var proofs []kzg4844.Proof
for blobIdx := 0; blobIdx < blobCount; blobIdx++ {
for _, cellIdx := range delivery.Indices {
proofIdx := blobIdx*kzg4844.CellProofsPerBlob + int(cellIdx)
if proofIdx >= len(sidecar.Proofs) {
return fmt.Errorf("proof index out of range: %d", proofIdx)
}
proofs = append(proofs, sidecar.Proofs[proofIdx])
}
}
return kzg4844.VerifyCells(delivery.Cells, sidecar.Commitments, proofs, delivery.Indices)
}
// sortCells merges all per-peer deliveries into a single flat cell array
// sorted by custody index.
//
// e.g.
// peer A: cells = [blob0_cell5, blob0_cell3, blob1_cell5, blob1_cell3]
// peer B: cells = [blob0_cell1, blob0_cell7, blob1_cell1, blob1_cell7]
// -> [blob0_cell1, blob0_cell3, blob0_cell5, blob0_cell7, blob1_cell1, blob1_cell3, blob1_cell5, blob1_cell7]
func sortCells(entry *cellEntry, blobCount int) ([]kzg4844.Cell, *types.CustodyBitmap) {
// indices per delivery
var indices []uint64
// 1. compose per blob cells
blob := make([][]kzg4844.Cell, blobCount)
for _, d := range entry.deliveries {
n := len(d.Indices)
indices = append(indices, d.Indices...)
for b := range blobCount {
blob[b] = append(blob[b], d.Cells[b*n:(b+1)*n]...)
}
}
// 2. sort
perm := make([]int, len(indices))
for i := range perm {
perm[i] = i
}
// perm represents the position of cells in sorted array
slices.SortFunc(perm, func(a, b int) int {
return cmp.Compare(indices[a], indices[b])
})
// reorder cells
var res []kzg4844.Cell
for b := range blobCount {
for _, p := range perm {
res = append(res, blob[b][p])
}
}
custody := types.NewCustodyBitmap(indices)
return res, &custody
}

View file

@ -0,0 +1,254 @@
package blobpool
import (
"crypto/ecdsa"
"testing"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/crypto"
"github.com/ethereum/go-ethereum/crypto/kzg4844"
"github.com/ethereum/go-ethereum/params"
"github.com/holiman/uint256"
)
// makeV1Tx creates a V1 blob transaction with cell proofs, then strips blobs
// (simulating what ETH/71 peers send).
func makeV1Tx(t *testing.T, nonce uint64, blobCount int, blobOffset int, key *ecdsa.PrivateKey) *types.Transaction {
t.Helper()
tx := makeMultiBlobTx(nonce, 1, 1, 1, blobCount, blobOffset, key, types.BlobSidecarVersion1)
return tx.WithoutBlob()
}
// makePeerDelivery creates a PeerDelivery for given cell indices from a set of blobs.
func makePeerDelivery(t *testing.T, blobOffset, blobCount int, indices []uint64) *PeerDelivery {
t.Helper()
var allCells []kzg4844.Cell
for i := 0; i < blobCount; i++ {
cells, err := kzg4844.ComputeCells([]kzg4844.Blob{*testBlobs[blobOffset+i]})
if err != nil {
t.Fatal(err)
}
allCells = append(allCells, cells...)
}
var deliveryCells []kzg4844.Cell
for b := 0; b < blobCount; b++ {
for _, idx := range indices {
deliveryCells = append(deliveryCells, allCells[b*kzg4844.CellsPerBlob+int(idx)])
}
}
return &PeerDelivery{Cells: deliveryCells, Indices: indices}
}
func newTestBuffer(t *testing.T) *BlobBuffer {
t.Helper()
return NewBlobBuffer(
func(ptx *PooledBlobTx) error { return nil },
func(peer string) {},
)
}
func TestSortCells(t *testing.T) {
blobCount := 2
blobOffset := 0
peerA := makePeerDelivery(t, blobOffset, blobCount, []uint64{5, 3})
peerB := makePeerDelivery(t, blobOffset, blobCount, []uint64{1, 7})
custody := types.NewCustodyBitmap([]uint64{1, 3, 5, 7})
entry := &cellEntry{
deliveries: map[string]*PeerDelivery{
"peerA": peerA,
"peerB": peerB,
},
custody: &custody,
}
sorted, resultCustody := sortCells(entry, blobCount)
resultIndices := resultCustody.Indices()
if len(resultIndices) != 4 {
t.Fatalf("expected 4 indices, got %d", len(resultIndices))
}
for i, expected := range []uint64{1, 3, 5, 7} {
if resultIndices[i] != expected {
t.Errorf("index %d: expected %d, got %d", i, expected, resultIndices[i])
}
}
expected := makePeerDelivery(t, blobOffset, blobCount, []uint64{1, 3, 5, 7})
if len(sorted) != len(expected.Cells) {
t.Fatalf("sorted length %d != expected %d", len(sorted), len(expected.Cells))
}
for i := range sorted {
if sorted[i] != expected.Cells[i] {
t.Errorf("cell %d mismatch", i)
}
}
}
func TestAddTxThenCells(t *testing.T) {
key, _ := crypto.GenerateKey()
blobCount := 2
buf := newTestBuffer(t)
tx := makeV1Tx(t, 0, blobCount, 0, key)
hash := tx.Hash()
if err := buf.AddTx(tx, "peerA"); err != nil {
t.Fatal(err)
}
if !buf.HasTx(hash) {
t.Fatal("tx should be buffered")
}
dataIndices := make([]uint64, kzg4844.DataPerBlob)
for i := range dataIndices {
dataIndices[i] = uint64(i)
}
delivery := makePeerDelivery(t, 0, blobCount, dataIndices)
custody := types.NewCustodyBitmap(dataIndices)
if err := buf.AddCells(hash, map[string]*PeerDelivery{"peerB": delivery}, &custody); err != nil {
t.Fatal(err)
}
if buf.HasTx(hash) || buf.HasCells(hash) {
t.Fatal("buffer should be empty after add")
}
}
func TestAddCellsThenTx(t *testing.T) {
key, _ := crypto.GenerateKey()
blobCount := 2
buf := newTestBuffer(t)
tx := makeV1Tx(t, 0, blobCount, 0, key)
hash := tx.Hash()
dataIndices := make([]uint64, kzg4844.DataPerBlob)
for i := range dataIndices {
dataIndices[i] = uint64(i)
}
delivery := makePeerDelivery(t, 0, blobCount, dataIndices)
custody := types.NewCustodyBitmap(dataIndices)
if err := buf.AddCells(hash, map[string]*PeerDelivery{"peerB": delivery}, &custody); err != nil {
t.Fatal(err)
}
if !buf.HasCells(hash) {
t.Fatal("cells should be buffered")
}
if err := buf.AddTx(tx, "peerA"); err != nil {
t.Fatal(err)
}
if buf.HasTx(hash) || buf.HasCells(hash) {
t.Fatal("buffer should be empty after add")
}
}
func TestMultiPeerDelivery(t *testing.T) {
key, _ := crypto.GenerateKey()
blobCount := 2
buf := newTestBuffer(t)
tx := makeV1Tx(t, 0, blobCount, 0, key)
hash := tx.Hash()
buf.AddTx(tx, "peerA")
indicesA := []uint64{0, 2, 4, 6}
indicesB := []uint64{1, 3, 5, 7}
deliveryA := makePeerDelivery(t, 0, blobCount, indicesA)
deliveryB := makePeerDelivery(t, 0, blobCount, indicesB)
allIndices := append(indicesA, indicesB...)
custody := types.NewCustodyBitmap(allIndices)
if err := buf.AddCells(hash, map[string]*PeerDelivery{
"peerB": deliveryA,
"peerC": deliveryB,
}, &custody); err != nil {
t.Fatal(err)
}
if buf.HasTx(hash) || buf.HasCells(hash) {
t.Fatal("buffer should be empty after add")
}
}
func TestBadCell(t *testing.T) {
key, _ := crypto.GenerateKey()
blobCount := 1
var dropped []string
buf := NewBlobBuffer(
func(ptx *PooledBlobTx) error { return nil },
func(peer string) { dropped = append(dropped, peer) },
)
tx := makeV1Tx(t, 0, blobCount, 0, key)
hash := tx.Hash()
buf.AddTx(tx, "peerA")
goodDelivery := makePeerDelivery(t, 0, blobCount, []uint64{0, 1, 2, 3})
badDelivery := makePeerDelivery(t, 0, blobCount, []uint64{4, 5, 6, 7})
for i := range badDelivery.Cells {
for j := range badDelivery.Cells[i] {
badDelivery.Cells[i][j] ^= 0xFF
}
}
allIndices := []uint64{0, 1, 2, 3, 4, 5, 6, 7}
custody := types.NewCustodyBitmap(allIndices)
err := buf.AddCells(hash, map[string]*PeerDelivery{
"peerB": goodDelivery,
"peerC": badDelivery,
}, &custody)
if err == nil {
t.Fatal("expected error from bad cells")
}
if len(dropped) != 1 || dropped[0] != "peerC" {
t.Fatalf("only peerC should have been dropped, got: %v", dropped)
}
if buf.HasTx(hash) || buf.HasCells(hash) {
t.Fatal("buffer should be empty after bad cell drop")
}
}
func TestBadTx(t *testing.T) {
key, _ := crypto.GenerateKey()
var dropped []string
buf := NewBlobBuffer(
func(ptx *PooledBlobTx) error { return nil },
func(peer string) { dropped = append(dropped, peer) },
)
blobtx := &types.BlobTx{
ChainID: uint256.MustFromBig(params.MainnetChainConfig.ChainID),
Nonce: 0,
GasTipCap: uint256.NewInt(1),
GasFeeCap: uint256.NewInt(1),
Gas: 21000,
BlobFeeCap: uint256.NewInt(1),
BlobHashes: []common.Hash{testBlobVHashes[0]},
Value: uint256.NewInt(100),
Sidecar: types.NewBlobTxSidecar(types.BlobSidecarVersion1,
nil,
[]kzg4844.Commitment{testBlobCommits[1]},
testBlobCellProofs[1],
),
}
tx := types.MustSignNewTx(key, types.LatestSigner(params.MainnetChainConfig), blobtx)
err := buf.AddTx(tx, "peerA")
if err == nil {
t.Fatal("expected error from commitment mismatch")
}
if len(dropped) != 1 || dropped[0] != "peerA" {
t.Fatalf("only peerA should have been dropped, got: %v", dropped)
}
if buf.HasTx(tx.Hash()) {
t.Fatal("tx should not be buffered")
}
}

View file

@ -33,7 +33,7 @@ import (
type limboBlob struct {
TxHash common.Hash // Owner transaction's hash to support resurrecting reorged txs
Block uint64 // Block in which the blob transaction was included
Tx *pooledBlobTx
Tx *PooledBlobTx
}
// limbo is a light, indexed database to temporarily store recently included
@ -146,7 +146,7 @@ func (l *limbo) finalize(final *types.Header) {
// push stores a new blob transaction into the limbo, waiting until finality for
// it to be automatically evicted.
func (l *limbo) push(tx *pooledBlobTx, block uint64) error {
func (l *limbo) push(tx *PooledBlobTx, block uint64) error {
// If the blobs are already tracked by the limbo, consider it a programming
// error. There's not much to do against it, but be loud.
if _, ok := l.index[tx.Transaction.Hash()]; ok {
@ -163,7 +163,7 @@ func (l *limbo) push(tx *pooledBlobTx, block uint64) error {
// pull retrieves a previously pushed set of blobs back from the limbo, removing
// it at the same time. This method should be used when a previously included blob
// transaction gets reorged out.
func (l *limbo) pull(tx common.Hash) (*pooledBlobTx, error) {
func (l *limbo) pull(tx common.Hash) (*PooledBlobTx, error) {
// If the blobs are not tracked by the limbo, there's not much to do. This
// can happen for example if a blob transaction is mined without pushing it
// into the network first.
@ -240,7 +240,7 @@ func (l *limbo) getAndDrop(id uint64) (*limboBlob, error) {
// setAndIndex assembles a limbo blob database entry and stores it, also updating
// the in-memory indices.
func (l *limbo) setAndIndex(tx *pooledBlobTx, block uint64) error {
func (l *limbo) setAndIndex(tx *PooledBlobTx, block uint64) error {
txhash := tx.Transaction.Hash()
item := &limboBlob{
TxHash: txhash,

View file

@ -76,11 +76,17 @@ type cellWithSeq struct {
cells *types.CustodyBitmap
}
// PeerCellDelivery holds cells delivered by a single peer.
type PeerCellDelivery struct {
Cells []kzg4844.Cell // blob-major order as received
Indices []uint64 // custody indices provided by this peer
}
type fetchStatus struct {
fetching *types.CustodyBitmap // To avoid fetching cells which had already been fetched / currently being fetched
fetched []uint64 // Custody indices that have been fetched (per-blob, same for all blobs)
blobCells [][]kzg4844.Cell // Per-blob cell accumulator, indexed by blob
blobCount int // Number of blobs in this tx (set on first delivery)
fetching *types.CustodyBitmap // To avoid fetching cells which had already been fetched / currently being fetched
fetched []uint64 // Custody indices that have been fetched (per-blob, same for all blobs)
deliveries map[string]*PeerCellDelivery // Per-peer cell deliveries
blobCount int // Number of blobs in this tx (set on first delivery)
}
// BlobFetcher is responsible for managing type 3 transactions based on peer announcements.
@ -124,7 +130,7 @@ type BlobFetcher struct {
// Callbacks
hasPayload func(common.Hash) bool
addPayload func([]common.Hash, [][]kzg4844.Cell, *types.CustodyBitmap) []error //todo: peer disconnection is strange here
addCells func(common.Hash, map[string]*PeerCellDelivery, *types.CustodyBitmap) error
fetchPayloads func(string, []common.Hash, *types.CustodyBitmap) error
dropPeer func(string)
@ -136,7 +142,7 @@ type BlobFetcher struct {
func NewBlobFetcher(
hasPayload func(common.Hash) bool,
addPayload func([]common.Hash, [][]kzg4844.Cell, *types.CustodyBitmap) []error,
addCells func(common.Hash, map[string]*PeerCellDelivery, *types.CustodyBitmap) error,
fetchPayloads func(string, []common.Hash, *types.CustodyBitmap) error, dropPeer func(string),
custody *types.CustodyBitmap, rand random) *BlobFetcher {
return &BlobFetcher{
@ -154,7 +160,7 @@ func NewBlobFetcher(
requests: make(map[string][]*cellRequest),
alternates: make(map[common.Hash]map[string]*types.CustodyBitmap),
hasPayload: hasPayload,
addPayload: addPayload,
addCells: addCells,
fetchPayloads: fetchPayloads,
dropPeer: dropPeer,
custody: custody,
@ -470,21 +476,18 @@ func (f *BlobFetcher) loop() {
// Unexpected hash, ignore
continue
}
// delivery.cells[i] contains cells for all blobs
// in blob-major order: [blob0_cell0, ..., blob0_cellN, blob1_cell0, ...].
indices := delivery.cellBitmap.Indices()
cellsPerBlob := len(indices)
if cellsPerBlob > 0 {
status := f.fetches[hash]
blobCount := len(delivery.cells[i]) / cellsPerBlob
// Initialize per-blob accumulators on first delivery
if status.blobCount == 0 {
status.blobCount = blobCount
status.blobCells = make([][]kzg4844.Cell, blobCount)
status.deliveries = make(map[string]*PeerCellDelivery)
}
for b := 0; b < blobCount; b++ {
offset := b * cellsPerBlob
status.blobCells[b] = append(status.blobCells[b], delivery.cells[i][offset:offset+cellsPerBlob]...)
status.deliveries[delivery.origin] = &PeerCellDelivery{
Cells: delivery.cells[i],
Indices: indices,
}
status.fetched = append(status.fetched, indices...)
}
@ -515,28 +518,10 @@ func (f *BlobFetcher) loop() {
if completed {
blobFetcherFetchTime.Update(int64(time.Duration(f.clock.Now() - request.time)))
fetchStatus := f.fetches[hash]
status := f.fetches[hash]
collectedCustody := types.NewCustodyBitmap(status.fetched)
f.addCells(hash, status.deliveries, &collectedCustody)
// Sort each blob's cells by ascending custody index.
// RecoverBlobs expects cells[k] to correspond to custodyIndices[k],
// and custodyIndices come from CustodyBitmap.Indices() which is always sorted.
perm := make([]int, len(fetchStatus.fetched))
for i := range perm {
perm[i] = i
}
slices.SortFunc(perm, func(a, b int) int {
return int(fetchStatus.fetched[a]) - int(fetchStatus.fetched[b])
})
var assembled []kzg4844.Cell
for _, blobCells := range fetchStatus.blobCells {
for _, p := range perm {
assembled = append(assembled, blobCells[p])
}
}
collectedCustody := types.NewCustodyBitmap(fetchStatus.fetched)
f.addPayload([]common.Hash{hash}, [][]kzg4844.Cell{assembled}, &collectedCustody)
// remove announces from other peers
for peer, txset := range f.announces {
delete(txset, hash)
if len(txset) == 0 {

View file

@ -17,7 +17,6 @@
package fetcher
import (
"fmt"
"slices"
"testing"
@ -149,8 +148,8 @@ func TestBlobFetcherFullFetch(t *testing.T) {
init: func() *BlobFetcher {
return NewBlobFetcher(
func(common.Hash) bool { return false },
func(txs []common.Hash, _ [][]kzg4844.Cell, _ *types.CustodyBitmap) []error {
return make([]error, len(txs))
func(common.Hash, map[string]*PeerCellDelivery, *types.CustodyBitmap) error {
return nil
},
func(string, []common.Hash, *types.CustodyBitmap) error { return nil },
func(string) {},
@ -238,8 +237,8 @@ func TestBlobFetcherPartialFetch(t *testing.T) {
init: func() *BlobFetcher {
return NewBlobFetcher(
func(common.Hash) bool { return false },
func(txs []common.Hash, _ [][]kzg4844.Cell, _ *types.CustodyBitmap) []error {
return make([]error, len(txs))
func(common.Hash, map[string]*PeerCellDelivery, *types.CustodyBitmap) error {
return nil
},
func(string, []common.Hash, *types.CustodyBitmap) error { return nil },
func(string) {},
@ -331,8 +330,8 @@ func TestBlobFetcherFullDelivery(t *testing.T) {
init: func() *BlobFetcher {
return NewBlobFetcher(
func(common.Hash) bool { return false },
func(txs []common.Hash, _ [][]kzg4844.Cell, _ *types.CustodyBitmap) []error {
return make([]error, len(txs))
func(common.Hash, map[string]*PeerCellDelivery, *types.CustodyBitmap) error {
return nil
},
func(string, []common.Hash, *types.CustodyBitmap) error { return nil },
func(string) {},
@ -377,8 +376,8 @@ func TestBlobFetcherPartialDelivery(t *testing.T) {
init: func() *BlobFetcher {
return NewBlobFetcher(
func(common.Hash) bool { return false },
func(txs []common.Hash, _ [][]kzg4844.Cell, _ *types.CustodyBitmap) []error {
return make([]error, len(txs))
func(common.Hash, map[string]*PeerCellDelivery, *types.CustodyBitmap) error {
return nil
},
func(string, []common.Hash, *types.CustodyBitmap) error { return nil },
func(string) {},
@ -511,8 +510,8 @@ func TestBlobFetcherAvailabilityTimeout(t *testing.T) {
init: func() *BlobFetcher {
return NewBlobFetcher(
func(common.Hash) bool { return false },
func(txs []common.Hash, _ [][]kzg4844.Cell, _ *types.CustodyBitmap) []error {
return make([]error, len(txs))
func(common.Hash, map[string]*PeerCellDelivery, *types.CustodyBitmap) error {
return nil
},
func(string, []common.Hash, *types.CustodyBitmap) error { return nil },
func(string) {},
@ -551,8 +550,8 @@ func TestBlobFetcherPeerDrop(t *testing.T) {
init: func() *BlobFetcher {
return NewBlobFetcher(
func(common.Hash) bool { return false },
func(txs []common.Hash, _ [][]kzg4844.Cell, _ *types.CustodyBitmap) []error {
return make([]error, len(txs))
func(common.Hash, map[string]*PeerCellDelivery, *types.CustodyBitmap) error {
return nil
},
func(string, []common.Hash, *types.CustodyBitmap) error { return nil },
func(string) {},
@ -626,8 +625,8 @@ func TestBlobFetcherFetchTimeout(t *testing.T) {
init: func() *BlobFetcher {
return NewBlobFetcher(
func(common.Hash) bool { return false },
func(txs []common.Hash, _ [][]kzg4844.Cell, _ *types.CustodyBitmap) []error {
return make([]error, len(txs))
func(common.Hash, map[string]*PeerCellDelivery, *types.CustodyBitmap) error {
return nil
},
func(string, []common.Hash, *types.CustodyBitmap) error { return nil },
func(string) {},
@ -1012,40 +1011,21 @@ func TestMultiBlobDeliveryVerification(t *testing.T) {
init: func() *BlobFetcher {
return NewBlobFetcher(
func(common.Hash) bool { return false },
func(txs []common.Hash, cells [][]kzg4844.Cell, cst *types.CustodyBitmap) []error {
// Verify delivered cells pass KZG cell proof verification
// Debug: compare with expected cells
expectedCells := selectMultiBlobCells(sidecar, custody)
for ci, c := range cells {
if len(c) != len(expectedCells) {
verifyErr = fmt.Errorf("cell count mismatch: have %d, want %d", len(c), len(expectedCells))
return make([]error, len(txs))
}
for j := range c {
if c[j] != expectedCells[j] {
verifyErr = fmt.Errorf("tx %d cell %d mismatch (custody=%v)", ci, j, cst.Indices())
return make([]error, len(txs))
}
}
}
for _, c := range cells {
cs := &types.BlobTxCellSidecar{
Version: sidecar.Version,
Cells: c,
Commitments: sidecar.Commitments,
Proofs: sidecar.Proofs,
Custody: *cst,
}
indices := cs.Custody.Indices()
func(hash common.Hash, deliveries map[string]*PeerCellDelivery, cst *types.CustodyBitmap) error {
// Verify each peer's delivered cells pass KZG cell proof verification
for _, d := range deliveries {
var cellProofs []kzg4844.Proof
for blobIdx := range len(cs.Commitments) {
for _, proofIdx := range indices {
cellProofs = append(cellProofs, cs.Proofs[blobIdx*kzg4844.CellProofsPerBlob+int(proofIdx)])
for blobIdx := 0; blobIdx < len(sidecar.Commitments); blobIdx++ {
for _, idx := range d.Indices {
cellProofs = append(cellProofs, sidecar.Proofs[blobIdx*kzg4844.CellProofsPerBlob+int(idx)])
}
}
verifyErr = kzg4844.VerifyCells(cs.Cells, cs.Commitments, cellProofs, indices)
verifyErr = kzg4844.VerifyCells(d.Cells, sidecar.Commitments, cellProofs, d.Indices)
if verifyErr != nil {
return verifyErr
}
}
return make([]error, len(txs))
return nil
},
func(string, []common.Hash, *types.CustodyBitmap) error { return nil },
func(string) {},

View file

@ -180,10 +180,10 @@ type TxFetcher struct {
alternates map[common.Hash]map[string]struct{} // In-flight transaction alternate origins if retrieval fails
// Callbacks
validateMeta func(common.Hash, byte) error // Validate a tx metadata based on the local txpool
addTxs func([]*types.Transaction) []error // Insert a batch of transactions into local txpool
fetchTxs func(string, []common.Hash) error // Retrieves a set of txs from a remote peer
dropPeer func(string) // Drops a peer in case of announcement violation
validateMeta func(common.Hash, byte) error // Validate a tx metadata based on the local txpool
addTxs func(string, []*types.Transaction) []error // Insert a batch of transactions into local txpool
fetchTxs func(string, []common.Hash) error // Retrieves a set of txs from a remote peer
dropPeer func(string) // Drops a peer in case of announcement violation
step chan struct{} // Notification channel when the fetcher loop iterates
clock mclock.Clock // Monotonic clock or simulated clock for tests
@ -194,7 +194,7 @@ type TxFetcher struct {
// NewTxFetcher creates a transaction fetcher to retrieve transaction
// based on hash announcements.
// Chain can be nil to disable on-chain checks.
func NewTxFetcher(chain *core.BlockChain, validateMeta func(common.Hash, byte) error, addTxs func([]*types.Transaction) []error, fetchTxs func(string, []common.Hash) error, dropPeer func(string)) *TxFetcher {
func NewTxFetcher(chain *core.BlockChain, validateMeta func(common.Hash, byte) error, addTxs func(string, []*types.Transaction) []error, fetchTxs func(string, []common.Hash) error, dropPeer func(string)) *TxFetcher {
return NewTxFetcherForTests(chain, validateMeta, addTxs, fetchTxs, dropPeer, mclock.System{}, time.Now, nil)
}
@ -202,7 +202,7 @@ func NewTxFetcher(chain *core.BlockChain, validateMeta func(common.Hash, byte) e
// a simulated version and the internal randomness with a deterministic one.
// Chain can be nil to disable on-chain checks.
func NewTxFetcherForTests(
chain *core.BlockChain, validateMeta func(common.Hash, byte) error, addTxs func([]*types.Transaction) []error, fetchTxs func(string, []common.Hash) error, dropPeer func(string),
chain *core.BlockChain, validateMeta func(common.Hash, byte) error, addTxs func(string, []*types.Transaction) []error, fetchTxs func(string, []common.Hash) error, dropPeer func(string),
clock mclock.Clock, realTime func() time.Time, rand *mrand.Rand) *TxFetcher {
return &TxFetcher{
notify: make(chan *txAnnounce),
@ -352,7 +352,7 @@ func (f *TxFetcher) Enqueue(peer string, txs []*types.Transaction, direct bool)
)
batch := txs[i:end]
for j, err := range f.addTxs(batch) {
for j, err := range f.addTxs(peer, batch) {
// Track the transaction hash if the price is too low for us.
// Avoid re-request this transaction when we receive another
// announcement.

View file

@ -93,7 +93,7 @@ func newTestTxFetcher() *TxFetcher {
return NewTxFetcher(
nil,
func(common.Hash, byte) error { return nil },
func(txs []*types.Transaction) []error {
func(_ string, txs []*types.Transaction) []error {
return make([]error, len(txs))
},
func(string, []common.Hash) error { return nil },
@ -1172,7 +1172,7 @@ func TestTransactionFetcherUnderpricedDedup(t *testing.T) {
testTransactionFetcherParallel(t, txFetcherTest{
init: func() *TxFetcher {
f := newTestTxFetcher()
f.addTxs = func(txs []*types.Transaction) []error {
f.addTxs = func(_ string, txs []*types.Transaction) []error {
errs := make([]error, len(txs))
for i := 0; i < len(errs); i++ {
if i%3 == 0 {
@ -1270,7 +1270,7 @@ func TestTransactionFetcherUnderpricedDoSProtection(t *testing.T) {
testTransactionFetcher(t, txFetcherTest{
init: func() *TxFetcher {
f := newTestTxFetcher()
f.addTxs = func(txs []*types.Transaction) []error {
f.addTxs = func(_ string, txs []*types.Transaction) []error {
errs := make([]error, len(txs))
for i := 0; i < len(errs); i++ {
errs[i] = txpool.ErrUnderpriced
@ -1787,7 +1787,7 @@ func TestTransactionProtocolViolation(t *testing.T) {
testTransactionFetcherParallel(t, txFetcherTest{
init: func() *TxFetcher {
f := newTestTxFetcher()
f.addTxs = func(txs []*types.Transaction) []error {
f.addTxs = func(_ string, txs []*types.Transaction) []error {
var errs []error
for range txs {
errs = append(errs, txpool.ErrKZGVerificationError)
@ -2194,7 +2194,7 @@ func TestTransactionForgotten(t *testing.T) {
fetcher := NewTxFetcherForTests(
nil,
func(common.Hash, byte) error { return nil },
func(txs []*types.Transaction) []error {
func(_ string, txs []*types.Transaction) []error {
errs := make([]error, len(txs))
for i := 0; i < len(errs); i++ {
errs[i] = txpool.ErrUnderpriced

View file

@ -32,6 +32,7 @@ import (
"github.com/ethereum/go-ethereum/core"
"github.com/ethereum/go-ethereum/core/rawdb"
"github.com/ethereum/go-ethereum/core/txpool"
"github.com/ethereum/go-ethereum/core/txpool/blobpool"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/crypto/kzg4844"
"github.com/ethereum/go-ethereum/eth/downloader"
@ -104,8 +105,8 @@ type blobPool interface {
Has(hash common.Hash) bool
GetCells(hash common.Hash, mask types.CustodyBitmap) ([]kzg4844.Cell, error)
HasPayload(hash common.Hash) bool
AddPayload([]common.Hash, [][]kzg4844.Cell, *types.CustodyBitmap) []error
GetCustody(hash common.Hash) *types.CustodyBitmap
AddPooledTx(pooledTx *blobpool.PooledBlobTx) error
}
// handlerConfig is the collection of initialization parameters to create a full
@ -138,6 +139,7 @@ type handler struct {
downloader *downloader.Downloader
txFetcher *fetcher.TxFetcher
blobFetcher *fetcher.BlobFetcher
blobBuffer *blobpool.BlobBuffer
peers *peerSet
txBroadcastKey [16]byte
@ -192,11 +194,34 @@ func newHandler(config *handlerConfig) (*handler, error) {
}
return p.RequestTxs(hashes)
}
addTxs := func(txs []*types.Transaction) []error {
return h.txpool.Add(txs, false)
// Construct the blob buffer for assembling blob txs from separate tx and cell deliveries
h.blobBuffer = blobpool.NewBlobBuffer(h.blobpool.AddPooledTx, h.removePeer)
addTxs := func(peer string, txs []*types.Transaction) []error {
errs := make([]error, len(txs))
p := h.peers.peer(peer)
isETH71 := p != nil && p.Version() >= eth.ETH71
var poolTxs []*types.Transaction
var index []int
for i, tx := range txs {
if isETH71 && tx.Type() == types.BlobTxType {
errs[i] = h.blobBuffer.AddTx(tx, peer)
} else {
poolTxs = append(poolTxs, tx)
index = append(index, i)
}
}
if len(poolTxs) > 0 {
poolErrs := h.txpool.Add(poolTxs, false)
for j, idx := range index {
errs[idx] = poolErrs[j]
}
}
return errs
}
validateMeta := func(tx common.Hash, kind byte) error {
if h.txpool.Has(tx) {
if h.txpool.Has(tx) || h.blobBuffer.HasTx(tx) {
return txpool.ErrAlreadyKnown
}
if !h.txpool.FilterType(kind) {
@ -214,7 +239,17 @@ func newHandler(config *handlerConfig) (*handler, error) {
}
return p.RequestPayload(hashes, cells)
}
h.blobFetcher = fetcher.NewBlobFetcher(h.blobpool.HasPayload, h.blobpool.AddPayload, fetchPayloads, h.removePeer, &config.Custody, nil)
hasPayload := func(hash common.Hash) bool {
return h.blobpool.HasPayload(hash) || h.blobBuffer.HasCells(hash)
}
addCells := func(hash common.Hash, deliveries map[string]*fetcher.PeerCellDelivery, custody *types.CustodyBitmap) error {
converted := make(map[string]*blobpool.PeerDelivery, len(deliveries))
for peer, d := range deliveries {
converted[peer] = &blobpool.PeerDelivery{Cells: d.Cells, Indices: d.Indices}
}
return h.blobBuffer.AddCells(hash, converted, custody)
}
h.blobFetcher = fetcher.NewBlobFetcher(hasPayload, addCells, fetchPayloads, h.removePeer, &config.Custody, nil)
return h, nil
}

View file

@ -30,6 +30,7 @@ import (
"github.com/ethereum/go-ethereum/core"
"github.com/ethereum/go-ethereum/core/rawdb"
"github.com/ethereum/go-ethereum/core/txpool"
"github.com/ethereum/go-ethereum/core/txpool/blobpool"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/crypto"
"github.com/ethereum/go-ethereum/crypto/kzg4844"
@ -222,13 +223,12 @@ func (p *testTxPool) AddCells(hash common.Hash, cells []kzg4844.Cell, mask types
p.custody[hash] = mask
}
func (p *testTxPool) AddPayload(txs []common.Hash, cells [][]kzg4844.Cell, custody *types.CustodyBitmap) []error {
func (p *testTxPool) AddPooledTx(pooledTx *blobpool.PooledBlobTx) error {
p.lock.Lock()
defer p.lock.Unlock()
for i, tx := range txs {
p.cellPool[tx] = cells[i]
}
hash := pooledTx.Transaction.Hash()
p.cellPool[hash] = pooledTx.Sidecar.Cells
p.txPool[hash] = pooledTx.Transaction
return nil
}

View file

@ -80,7 +80,7 @@ func fuzz(input []byte) int {
f := fetcher.NewTxFetcherForTests(
nil,
func(common.Hash, byte) error { return nil },
func(txs []*types.Transaction) []error {
func(_ string, txs []*types.Transaction) []error {
return make([]error, len(txs))
},
func(string, []common.Hash) error { return nil },