From 252dd57159b3369dfdac529522e8dfd5af1604d2 Mon Sep 17 00:00:00 2001 From: healthykim Date: Fri, 20 Mar 2026 21:12:20 +0900 Subject: [PATCH] fix error from kurtosis test --- core/txpool/blobpool/blobpool.go | 156 ++++++++++++++++--------------- core/txpool/blobpool/lookup.go | 14 +-- core/txpool/subpool.go | 5 +- eth/fetcher/blob_fetcher.go | 83 ++++++++++------ eth/fetcher/blob_fetcher_test.go | 150 ++++++++++++++++++++++------- eth/fetcher/tx_fetcher.go | 3 + eth/handler.go | 4 +- eth/handler_test.go | 20 ++-- eth/protocols/eth/broadcast.go | 6 +- 9 files changed, 284 insertions(+), 157 deletions(-) diff --git a/core/txpool/blobpool/blobpool.go b/core/txpool/blobpool/blobpool.go index 8dd499250b..8aa92f851d 100644 --- a/core/txpool/blobpool/blobpool.go +++ b/core/txpool/blobpool/blobpool.go @@ -127,9 +127,10 @@ type blobTxMeta struct { announced bool // Whether the tx has been announced to listeners - id uint64 // Storage ID in the pool's persistent store - storageSize uint32 // Byte size in the pool's persistent store - size uint64 // RLP-encoded size of transaction including the attached blob + id uint64 // Storage ID in the pool's persistent store + storageSize uint32 // Byte size in the pool's persistent store + size uint64 // RLP-encoded size of transaction including the attached blob + sizeWithoutBlob uint64 // RLP-encoded size of transaction without blob data (for ETH/71) custody *types.CustodyBitmap @@ -152,25 +153,26 @@ type blobTxMeta struct { // newBlobTxMeta retrieves the indexed metadata fields from a pooled blob transaction // and assembles a helper struct to track in memory. func newBlobTxMeta(id uint64, size uint64, storageSize uint32, pooledTx *pooledBlobTx) *blobTxMeta { - if pooledTx.Sidecar == nil { - // This should never happen, as the pool only admits blob transactions with a sidecar - panic("missing blob tx sidecar") + var version byte + if pooledTx.Sidecar != nil { + version = pooledTx.Sidecar.Version } meta := &blobTxMeta{ - hash: pooledTx.Transaction.Hash(), - vhashes: pooledTx.Transaction.BlobHashes(), - version: pooledTx.Sidecar.Version, - id: id, - storageSize: storageSize, - size: size, - nonce: pooledTx.Transaction.Nonce(), - costCap: uint256.MustFromBig(pooledTx.Transaction.Cost()), - execTipCap: uint256.MustFromBig(pooledTx.Transaction.GasTipCap()), - execFeeCap: uint256.MustFromBig(pooledTx.Transaction.GasFeeCap()), - blobFeeCap: uint256.MustFromBig(pooledTx.Transaction.BlobGasFeeCap()), - execGas: pooledTx.Transaction.Gas(), - blobGas: pooledTx.Transaction.BlobGas(), - custody: &pooledTx.Sidecar.Custody, + hash: pooledTx.Transaction.Hash(), + vhashes: pooledTx.Transaction.BlobHashes(), + version: version, + id: id, + storageSize: storageSize, + size: size, + sizeWithoutBlob: pooledTx.SizeWithoutBlob, + nonce: pooledTx.Transaction.Nonce(), + costCap: uint256.MustFromBig(pooledTx.Transaction.Cost()), + execTipCap: uint256.MustFromBig(pooledTx.Transaction.GasTipCap()), + execFeeCap: uint256.MustFromBig(pooledTx.Transaction.GasFeeCap()), + blobFeeCap: uint256.MustFromBig(pooledTx.Transaction.BlobGasFeeCap()), + execGas: pooledTx.Transaction.Gas(), + blobGas: pooledTx.Transaction.BlobGas(), + custody: &pooledTx.Sidecar.Custody, } meta.basefeeJumps = dynamicFeeJumps(meta.execFeeCap) meta.blobfeeJumps = dynamicBlobFeeJumps(meta.blobFeeCap) @@ -179,9 +181,10 @@ func newBlobTxMeta(id uint64, size uint64, storageSize uint32, pooledTx *pooledB } type pooledBlobTx struct { - Transaction *types.Transaction - Sidecar *types.BlobTxCellSidecar - Size uint64 // original transaction size (including blobs) + Transaction *types.Transaction + Sidecar *types.BlobTxCellSidecar + Size uint64 // original transaction size (including blobs) + SizeWithoutBlob uint64 // transaction size with commitments/proofs but without blob data } // newPooledBlobTx creates pooledBlobTx struct. @@ -194,9 +197,10 @@ func newPooledBlobTx(tx *types.Transaction) (*pooledBlobTx, error) { return nil, err } return &pooledBlobTx{ - Transaction: tx.WithoutBlobTxSidecar(), - Sidecar: sidecar, - Size: tx.Size(), + Transaction: tx.WithoutBlobTxSidecar(), + Sidecar: sidecar, + Size: tx.Size(), + SizeWithoutBlob: tx.WithoutBlob().Size(), }, nil } @@ -1369,30 +1373,6 @@ func (p *BlobPool) checkDelegationLimit(tx *types.Transaction) error { return txpool.ErrInflightTxLimitReached } -// ValidateCells validates cells against transaction commitments and proofs. -func (p *BlobPool) ValidateCells(txs []common.Hash, cells [][]kzg4844.Cell, custody *types.CustodyBitmap) []error { - errs := make([]error, len(txs)) - - for i, tx := range txs { - if _, ok := p.queue[tx]; !ok { - errs[i] = fmt.Errorf("transaction %s not found", tx) - continue - } - sidecar := p.queue[tx].BlobTxSidecar() - cellProofs := make([]kzg4844.Proof, 0) - for _, proofIdx := range custody.Indices() { - // should store all proofs - for blobIdx := range len(sidecar.Commitments) { - idx := blobIdx*kzg4844.CellProofsPerBlob + int(proofIdx) - cellProofs = append(cellProofs, sidecar.Proofs[idx]) - } - } - - errs[i] = kzg4844.VerifyCells(cells[i], sidecar.Commitments, cellProofs, custody.Indices()) - } - return errs -} - // validateTx checks whether a transaction is valid according to the consensus // rules and adheres to some heuristic limits of the local node (price and size). // This function assumes the static validation has been performed already and @@ -1441,8 +1421,7 @@ func (p *BlobPool) validateTx(tx *types.Transaction, buffer bool) error { next := p.state.GetNonce(addr) for nonce, replacement := range replacements { - if len(p.index[addr]) > int(nonce-next) { - // replacement + if nonce >= next && len(p.index[addr]) > int(nonce-next) { originalCost := p.index[addr][nonce-next].costCap replacementCost := replacement.costCap @@ -1464,8 +1443,9 @@ func (p *BlobPool) validateTx(tx *types.Transaction, buffer bool) error { if p.replacementQueue[addr] != nil && p.replacementQueue[addr][nonce] != nil { return p.replacementQueue[addr][nonce].costCap.ToBig() } - if uint64(len(p.indexQueue[addr])) > nonce-next-uint64(len(p.index[addr])) { - return p.indexQueue[addr][nonce-next-uint64(len(p.index[addr]))].costCap.ToBig() + pooledCount := uint64(len(p.index[addr])) + if nonce >= next+pooledCount && uint64(len(p.indexQueue[addr])) > nonce-next-pooledCount { + return p.indexQueue[addr][nonce-next-pooledCount].costCap.ToBig() } } if uint64(len(p.index[addr])) > nonce-next { @@ -1500,10 +1480,12 @@ func (p *BlobPool) validateTx(tx *types.Transaction, buffer bool) error { } } } else if buffer { - offset := nonce - next - uint64(len(p.index[from])) - if uint64(len(p.indexQueue[from])) > offset && offset > 0 { - // buffer tx replacement - prev = p.indexQueue[from][nonce-next-uint64(len(p.index[from]))] + pooledCount := uint64(len(p.index[from])) + if nonce >= next+pooledCount { + offset := nonce - next - pooledCount + if uint64(len(p.indexQueue[from])) > offset && offset > 0 { + prev = p.indexQueue[from][offset] + } } } if prev == nil { @@ -1557,6 +1539,13 @@ func (p *BlobPool) Has(hash common.Hash) bool { return poolHas || gapped } +func (p *BlobPool) HasPayload(hash common.Hash) bool { + p.lock.RLock() + defer p.lock.RUnlock() + + return p.lookup.exists(hash) || len(p.cellQueue[hash]) != 0 +} + // getRLP returns the raw RLP-encoded pooledBlobTx data from the store. func (p *BlobPool) getRLP(hash common.Hash) []byte { // Track the amount of time waiting to retrieve a fully resolved blob tx from @@ -1642,13 +1631,14 @@ func (p *BlobPool) GetMetadata(hash common.Hash) *txpool.TxMetadata { p.lock.RLock() defer p.lock.RUnlock() - size, ok := p.lookup.sizeOfTx(hash) + meta, ok := p.lookup.txIndex[hash] if !ok { return nil } return &txpool.TxMetadata{ - Type: types.BlobTxType, - Size: size, + Type: types.BlobTxType, + Size: meta.size, + SizeWithoutBlob: meta.sizeWithoutBlob, } } @@ -1771,8 +1761,8 @@ func (p *BlobPool) Add(txs []*types.Transaction, sync bool) []error { if errs[i] = p.ValidateTxBasics(tx); errs[i] != nil { continue } - if len(tx.BlobTxSidecar().Blobs) != 0 { - // from user: convert to pooledBlobTx and add + sc := tx.BlobTxSidecar() + if sc != nil && len(sc.Blobs) != 0 { pooledTx, err := newPooledBlobTx(tx) if err != nil { errs[i] = err @@ -1780,7 +1770,6 @@ func (p *BlobPool) Add(txs []*types.Transaction, sync bool) []error { } errs[i] = p.add(pooledTx) } else { - // from p2p, buffer until the corresponding cells arrive errs[i] = p.addBuffer(tx) } } @@ -1795,7 +1784,8 @@ func (p *BlobPool) addBuffer(tx *types.Transaction) (err error) { sidecar := tx.BlobTxSidecar() var cellSidecar types.BlobTxCellSidecar - if len(cells) >= kzg4844.DataPerBlob { + blobCount := len(sidecar.Commitments) + if len(cells) >= kzg4844.DataPerBlob*blobCount { blob, err := kzg4844.RecoverBlobs(cells, p.custodyQueue[tx.Hash()].Indices()) if err != nil { return err @@ -1832,13 +1822,25 @@ func (p *BlobPool) addBuffer(tx *types.Transaction) (err error) { if err := p.validateTx(tx, true); err != nil { return err } + // Store the original tx in queue (with BlobTxSidecar intact — Blobs may be nil + // from ETH/71 but commitments/proofs are preserved for cell validation later). p.queue[tx.Hash()] = tx from, _ := types.Sender(p.signer, tx) + // Build a partial pooledBlobTx for metadata tracking. + var cellSidecar *types.BlobTxCellSidecar + if sidecar := tx.BlobTxSidecar(); sidecar != nil { + cellSidecar = &types.BlobTxCellSidecar{ + Version: sidecar.Version, + Commitments: sidecar.Commitments, + Proofs: sidecar.Proofs, + } + } next := p.state.GetNonce(from) nonce := tx.Nonce() pooledCount := uint64(len(p.index[from])) - meta := newBlobTxMeta(0, tx.Size(), 0, &pooledBlobTx{Transaction: tx, Size: tx.Size()}) + //todo this is strange + meta := newBlobTxMeta(0, tx.Size(), 0, &pooledBlobTx{Transaction: tx, Sidecar: cellSidecar, Size: tx.Size()}) if nonce < next+pooledCount { // Pooled transaction replacements are stored in replacementQueue for expenditure validation @@ -1944,7 +1946,6 @@ func (p *BlobPool) addLocked(pooledTx *pooledBlobTx, checkGapped bool) (err erro Config: p.chain.Config(), MaxBlobCount: maxBlobsPerTx, }); err != nil { - log.Trace("Sidecar validation failed", "hash", tx.Hash(), "err", err) return err } // If the address is not yet known, request exclusivity to track the account @@ -2551,12 +2552,13 @@ func (p *BlobPool) GetCells(hash common.Hash, mask types.CustodyBitmap) ([]kzg48 } tx := pooledTx.Transaction sidecar := pooledTx.Sidecar + // Return cells in blob-major order: [blob0_cell0, blob0_cell1, ..., blob1_cell0, ...] + cellsPerBlob := sidecar.Custody.OneCount() cells := make([]kzg4844.Cell, 0, mask.OneCount()*len(tx.BlobHashes())) - for cellIdx, custodyIdx := range sidecar.Custody.Indices() { - if mask.IsSet(custodyIdx) { - for blobIdx := 0; blobIdx < len(tx.BlobHashes()); blobIdx++ { - idx := blobIdx*sidecar.Custody.OneCount() + cellIdx - cells = append(cells, sidecar.Cells[idx]) + for blobIdx := 0; blobIdx < len(tx.BlobHashes()); blobIdx++ { + for cellIdx, custodyIdx := range sidecar.Custody.Indices() { + if mask.IsSet(custodyIdx) { + cells = append(cells, sidecar.Cells[blobIdx*cellsPerBlob+cellIdx]) } } } @@ -2581,7 +2583,8 @@ func (p *BlobPool) AddPayload(txs []common.Hash, cells [][]kzg4844.Cell, custody sidecar := p.queue[hash].BlobTxSidecar() var cellSidecar types.BlobTxCellSidecar - if len(cells[i]) >= kzg4844.DataPerBlob { + blobCount := len(sidecar.Commitments) + if len(cells[i]) >= kzg4844.DataPerBlob*blobCount { blob, err := kzg4844.RecoverBlobs(cells[i], custody.Indices()) if err != nil { errs[i] = err @@ -2610,7 +2613,6 @@ func (p *BlobPool) AddPayload(txs []common.Hash, cells [][]kzg4844.Cell, custody } errs[i] = p.addLocked(&pooledBlobTx{Transaction: p.queue[hash].WithoutBlobTxSidecar(), Sidecar: &cellSidecar, Size: p.queue[hash].Size()}, true) - // todo nonce gap // clean up queues tx := p.queue[hash] @@ -2628,7 +2630,11 @@ func (p *BlobPool) AddPayload(txs []common.Hash, cells [][]kzg4844.Cell, custody } // plain tx - offset := int(nonce - next - uint64(len(p.index[from]))) + pooledCount := uint64(len(p.index[from])) + if nonce < next+pooledCount { + continue + } + offset := int(nonce - next - pooledCount) if offset > 0 && offset < len(p.indexQueue[from]) { removed := p.indexQueue[from][offset] p.indexQueue[from] = append(p.indexQueue[from][:offset], p.indexQueue[from][offset+1:]...) diff --git a/core/txpool/blobpool/lookup.go b/core/txpool/blobpool/lookup.go index e105d47706..39cb2c69b9 100644 --- a/core/txpool/blobpool/lookup.go +++ b/core/txpool/blobpool/lookup.go @@ -22,9 +22,10 @@ import ( ) type txMetadata struct { - id uint64 // the billy id of transction - size uint64 // the RLP encoded size of transaction (blobs are included) - custody types.CustodyBitmap + id uint64 // the billy id of transction + size uint64 // the RLP encoded size of transaction (blobs are included) + sizeWithoutBlob uint64 // the RLP encoded size without blob data (for ETH/71 announcements) + custody types.CustodyBitmap } // lookup maps blob versioned hashes to transaction hashes that include them, @@ -93,9 +94,10 @@ func (l *lookup) track(tx *blobTxMeta) { } // Map the transaction hash to the datastore id and RLP-encoded transaction size l.txIndex[tx.hash] = &txMetadata{ - id: tx.id, - size: tx.size, - custody: *tx.custody, + id: tx.id, + size: tx.size, + sizeWithoutBlob: tx.sizeWithoutBlob, + custody: *tx.custody, } } diff --git a/core/txpool/subpool.go b/core/txpool/subpool.go index dfd0ccead7..59d60cb651 100644 --- a/core/txpool/subpool.go +++ b/core/txpool/subpool.go @@ -86,8 +86,9 @@ type PendingFilter struct { // TxMetadata denotes the metadata of a transaction. type TxMetadata struct { - Type uint8 // The type of the transaction - Size uint64 // The length of the 'rlp encoding' of a transaction + Type uint8 // The type of the transaction + Size uint64 // The length of the 'rlp encoding' of a transaction (including blobs) + SizeWithoutBlob uint64 // The length without blob data (for ETH/71 announcements) } // SubPool represents a specialized transaction pool that lives on its own (e.g. diff --git a/eth/fetcher/blob_fetcher.go b/eth/fetcher/blob_fetcher.go index 097bc5aa66..98d0ff24c4 100644 --- a/eth/fetcher/blob_fetcher.go +++ b/eth/fetcher/blob_fetcher.go @@ -40,15 +40,16 @@ type random interface { // according to the custody cell indices provided by the consensus client // connected to this execution client. +// todo var blobFetchTimeout = 5 * time.Second +var blobAvailabilityTimeout = 2 * time.Second -// todo tuning const ( availabilityThreshold = 2 maxPayloadRetrievals = 128 maxPayloadAnnounces = 4096 + fetchProbability = 15 MAX_CELLS_PER_PARTIAL_REQUEST = 8 - blobAvailabilityTimeout = 500 * time.Millisecond ) type blobTxAnnounce struct { @@ -76,9 +77,10 @@ type cellWithSeq struct { } type fetchStatus struct { - fetching *types.CustodyBitmap // To avoid fetching cells which had already been fetched / currently being fetched - fetched []uint64 // To sort cells - cells []kzg4844.Cell + fetching *types.CustodyBitmap // To avoid fetching cells which had already been fetched / currently being fetched + fetched []uint64 // Custody indices that have been fetched (per-blob, same for all blobs) + blobCells [][]kzg4844.Cell // Per-blob cell accumulator, indexed by blob + blobCount int // Number of blobs in this tx (set on first delivery) } // BlobFetcher is responsible for managing type 3 transactions based on peer announcements. @@ -121,8 +123,8 @@ type BlobFetcher struct { alternates map[common.Hash]map[string]*types.CustodyBitmap // In-flight transaction alternate origins (in case the peer is dropped) // Callbacks - validateCells func([]common.Hash, [][]kzg4844.Cell, *types.CustodyBitmap) []error - addPayload func([]common.Hash, [][]kzg4844.Cell, *types.CustodyBitmap) []error + hasPayload func(common.Hash) bool + addPayload func([]common.Hash, [][]kzg4844.Cell, *types.CustodyBitmap) []error //todo: peer disconnection is strange here fetchPayloads func(string, []common.Hash, *types.CustodyBitmap) error dropPeer func(string) @@ -133,7 +135,7 @@ type BlobFetcher struct { } func NewBlobFetcher( - validateCells func([]common.Hash, [][]kzg4844.Cell, *types.CustodyBitmap) []error, + hasPayload func(common.Hash) bool, addPayload func([]common.Hash, [][]kzg4844.Cell, *types.CustodyBitmap) []error, fetchPayloads func(string, []common.Hash, *types.CustodyBitmap) error, dropPeer func(string), custody *types.CustodyBitmap, rand random) *BlobFetcher { @@ -151,7 +153,7 @@ func NewBlobFetcher( fetches: make(map[common.Hash]*fetchStatus), requests: make(map[string][]*cellRequest), alternates: make(map[common.Hash]map[string]*types.CustodyBitmap), - validateCells: validateCells, + hasPayload: hasPayload, addPayload: addPayload, fetchPayloads: fetchPayloads, dropPeer: dropPeer, @@ -165,7 +167,15 @@ func NewBlobFetcher( // Notify is called when a Type 3 transaction is observed on the network. (TransactionPacket / NewPooledTransactionHashesPacket) func (f *BlobFetcher) Notify(peer string, txs []common.Hash, cells types.CustodyBitmap) error { blobAnnounceInMeter.Mark(int64(len(txs))) - blobAnnounce := &blobTxAnnounce{origin: peer, txs: txs, cells: cells} + anns := make([]common.Hash, 0) + for _, tx := range txs { + if f.hasPayload(tx) { + continue + } + anns = append(anns, tx) + } + + blobAnnounce := &blobTxAnnounce{origin: peer, txs: anns, cells: cells} select { case f.notify <- blobAnnounce: return nil @@ -261,7 +271,7 @@ func (f *BlobFetcher) loop() { } else { randomValue = f.rand.Intn(100) } - if randomValue < 15 { + if randomValue < fetchProbability { f.full[hash] = struct{}{} } else { f.partial[hash] = struct{}{} @@ -418,9 +428,6 @@ func (f *BlobFetcher) loop() { f.rescheduleTimeout(timeoutTimer, timeoutTrigger) case delivery := <-f.cleanup: // Remove from announce - addedHashes := make([]common.Hash, 0) - addedCells := make([][]kzg4844.Cell, 0) - var requestId int var request *cellRequest for _, hash := range delivery.txs { @@ -446,9 +453,24 @@ func (f *BlobFetcher) loop() { // Unexpected hash, ignore continue } - // Update fetch status - f.fetches[hash].fetched = append(f.fetches[hash].fetched, delivery.cellBitmap.Indices()...) - f.fetches[hash].cells = append(f.fetches[hash].cells, delivery.cells[i]...) + // delivery.cells[i] contains cells for all blobs + // in blob-major order: [blob0_cell0, ..., blob0_cellN, blob1_cell0, ...]. + indices := delivery.cellBitmap.Indices() + cellsPerBlob := len(indices) + if cellsPerBlob > 0 { + status := f.fetches[hash] + blobCount := len(delivery.cells[i]) / cellsPerBlob + // Initialize per-blob accumulators on first delivery + if status.blobCount == 0 { + status.blobCount = blobCount + status.blobCells = make([][]kzg4844.Cell, blobCount) + } + for b := 0; b < blobCount; b++ { + offset := b * cellsPerBlob + status.blobCells[b] = append(status.blobCells[b], delivery.cells[i][offset:offset+cellsPerBlob]...) + } + status.fetched = append(status.fetched, indices...) + } // Update announces of this peer delete(f.announces[delivery.origin], hash) @@ -476,12 +498,26 @@ func (f *BlobFetcher) loop() { if completed { blobFetcherFetchTime.Update(int64(time.Duration(f.clock.Now() - request.time))) - addedHashes = append(addedHashes, hash) fetchStatus := f.fetches[hash] - sort.Slice(fetchStatus.cells, func(i, j int) bool { - return fetchStatus.fetched[i] < fetchStatus.fetched[j] + + // Sort each blob's cells by ascending custody index. + // RecoverBlobs expects cells[k] to correspond to custodyIndices[k], + // and custodyIndices come from CustodyBitmap.Indices() which is always sorted. + perm := make([]int, len(fetchStatus.fetched)) + for i := range perm { + perm[i] = i + } + slices.SortFunc(perm, func(a, b int) int { + return int(fetchStatus.fetched[a]) - int(fetchStatus.fetched[b]) }) - addedCells = append(addedCells, fetchStatus.cells) + var assembled []kzg4844.Cell + for _, blobCells := range fetchStatus.blobCells { + for _, p := range perm { + assembled = append(assembled, blobCells[p]) + } + } + collectedCustody := types.NewCustodyBitmap(fetchStatus.fetched) + f.addPayload([]common.Hash{hash}, [][]kzg4844.Cell{assembled}, &collectedCustody) // remove announces from other peers for peer, txset := range f.announces { @@ -494,11 +530,7 @@ func (f *BlobFetcher) loop() { delete(f.fetches, hash) } } - // Update mempool status for arrived hashes blobRequestDoneMeter.Mark(int64(len(delivery.txs))) - if len(addedHashes) > 0 { - f.addPayload(addedHashes, addedCells, delivery.cellBitmap) - } // Remove the request f.requests[delivery.origin][requestId] = f.requests[delivery.origin][len(f.requests[delivery.origin])-1] @@ -690,7 +722,6 @@ func (f *BlobFetcher) scheduleFetches(timer *mclock.Timer, timeout chan struct{} f.fetches[hash] = &fetchStatus{ fetching: unfetched, fetched: make([]uint64, 0), - cells: make([]kzg4844.Cell, 0), } } else { f.fetches[hash].fetching = f.fetches[hash].fetching.Union(unfetched) diff --git a/eth/fetcher/blob_fetcher_test.go b/eth/fetcher/blob_fetcher_test.go index 02b42eda3b..c6e4400772 100644 --- a/eth/fetcher/blob_fetcher_test.go +++ b/eth/fetcher/blob_fetcher_test.go @@ -17,9 +17,9 @@ package fetcher import ( + "fmt" "slices" "testing" - "time" "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/common/mclock" @@ -64,11 +64,6 @@ func selectCells(cells []kzg4844.Cell, custody *types.CustodyBitmap) []kzg4844.C return result } -const ( - testBlobAvailabilityTimeout = 500 * time.Millisecond - testBlobFetchTimeout = 5 * time.Second -) - var ( testBlobTxHashes = []common.Hash{ {0x01}, {0x02}, {0x03}, {0x04}, {0x05}, {0x06}, {0x07}, {0x08}, @@ -153,16 +148,14 @@ func TestBlobFetcherFullFetch(t *testing.T) { testBlobFetcher(t, blobFetcherTest{ init: func() *BlobFetcher { return NewBlobFetcher( - func(txs []common.Hash, _ [][]kzg4844.Cell, _ *types.CustodyBitmap) []error { - return make([]error, len(txs)) - }, + func(common.Hash) bool { return false }, func(txs []common.Hash, _ [][]kzg4844.Cell, _ *types.CustodyBitmap) []error { return make([]error, len(txs)) }, func(string, []common.Hash, *types.CustodyBitmap) error { return nil }, func(string) {}, &custody, - &mockRand{value: 5}, // to force full requests (5 < 15) + &mockRand{value: 5}, // Force full requests (5 < fetchProbability) ) }, steps: []interface{}{ @@ -244,16 +237,14 @@ func TestBlobFetcherPartialFetch(t *testing.T) { testBlobFetcher(t, blobFetcherTest{ init: func() *BlobFetcher { return NewBlobFetcher( - func(txs []common.Hash, _ [][]kzg4844.Cell, _ *types.CustodyBitmap) []error { - return make([]error, len(txs)) - }, + func(common.Hash) bool { return false }, func(txs []common.Hash, _ [][]kzg4844.Cell, _ *types.CustodyBitmap) []error { return make([]error, len(txs)) }, func(string, []common.Hash, *types.CustodyBitmap) error { return nil }, func(string) {}, &custody, - &mockRand{value: 20}, // Force partial requests (20 >= 15) + &mockRand{value: 60}, // Force partial requests (20 >= 15) ) }, steps: []interface{}{ @@ -339,9 +330,7 @@ func TestBlobFetcherFullDelivery(t *testing.T) { testBlobFetcher(t, blobFetcherTest{ init: func() *BlobFetcher { return NewBlobFetcher( - func(txs []common.Hash, _ [][]kzg4844.Cell, _ *types.CustodyBitmap) []error { - return make([]error, len(txs)) - }, + func(common.Hash) bool { return false }, func(txs []common.Hash, _ [][]kzg4844.Cell, _ *types.CustodyBitmap) []error { return make([]error, len(txs)) }, @@ -387,16 +376,14 @@ func TestBlobFetcherPartialDelivery(t *testing.T) { testBlobFetcher(t, blobFetcherTest{ init: func() *BlobFetcher { return NewBlobFetcher( - func(txs []common.Hash, _ [][]kzg4844.Cell, _ *types.CustodyBitmap) []error { - return make([]error, len(txs)) - }, + func(common.Hash) bool { return false }, func(txs []common.Hash, _ [][]kzg4844.Cell, _ *types.CustodyBitmap) []error { return make([]error, len(txs)) }, func(string, []common.Hash, *types.CustodyBitmap) error { return nil }, func(string) {}, &custody, - &mockRand{value: 20}, + &mockRand{value: 60}, ) }, steps: []interface{}{ @@ -523,16 +510,14 @@ func TestBlobFetcherAvailabilityTimeout(t *testing.T) { testBlobFetcher(t, blobFetcherTest{ init: func() *BlobFetcher { return NewBlobFetcher( - func(txs []common.Hash, _ [][]kzg4844.Cell, _ *types.CustodyBitmap) []error { - return make([]error, len(txs)) - }, + func(common.Hash) bool { return false }, func(txs []common.Hash, _ [][]kzg4844.Cell, _ *types.CustodyBitmap) []error { return make([]error, len(txs)) }, func(string, []common.Hash, *types.CustodyBitmap) error { return nil }, func(string) {}, &custody, - &mockRand{value: 20}, + &mockRand{value: 60}, ) }, steps: []interface{}{ @@ -543,7 +528,7 @@ func TestBlobFetcherAvailabilityTimeout(t *testing.T) { isBlobScheduled{announces: nil, fetching: nil}, // Run clock for timeout - doWait{time: testBlobAvailabilityTimeout, step: true}, + doWait{time: blobAvailabilityTimeout, step: true}, // After timeout, waitlist should be empty isWaitingAvailability{}, @@ -557,9 +542,7 @@ func TestBlobFetcherPeerDrop(t *testing.T) { testBlobFetcher(t, blobFetcherTest{ init: func() *BlobFetcher { return NewBlobFetcher( - func(txs []common.Hash, _ [][]kzg4844.Cell, _ *types.CustodyBitmap) []error { - return make([]error, len(txs)) - }, + func(common.Hash) bool { return false }, func(txs []common.Hash, _ [][]kzg4844.Cell, _ *types.CustodyBitmap) []error { return make([]error, len(txs)) }, @@ -634,9 +617,7 @@ func TestBlobFetcherFetchTimeout(t *testing.T) { testBlobFetcher(t, blobFetcherTest{ init: func() *BlobFetcher { return NewBlobFetcher( - func(txs []common.Hash, _ [][]kzg4844.Cell, _ *types.CustodyBitmap) []error { - return make([]error, len(txs)) - }, + func(common.Hash) bool { return false }, func(txs []common.Hash, _ [][]kzg4844.Cell, _ *types.CustodyBitmap) []error { return make([]error, len(txs)) }, @@ -680,7 +661,7 @@ func TestBlobFetcherFetchTimeout(t *testing.T) { }, // Wait for fetch timeout -> should reschedule to peer B - doWait{time: testBlobFetchTimeout, step: true}, + doWait{time: blobFetchTimeout, step: true}, isBlobScheduled{ announces: map[string][]blobAnnounce{ "B": {{hash: testBlobTxHashes[0], custody: halfCustody}}, @@ -699,7 +680,7 @@ func TestBlobFetcherFetchTimeout(t *testing.T) { }, // Wait for timeout -> should drop transaction - doWait{time: testBlobFetchTimeout, step: true}, + doWait{time: blobFetchTimeout, step: true}, isBlobScheduled{announces: nil, fetching: nil}, isFetching{hashes: nil}, }, @@ -997,3 +978,104 @@ func testBlobFetcher(t *testing.T, tt blobFetcherTest) { } } } + +// selectMultiBlobCells extracts cells from a multi-blob sidecar for a given +// custody mask, returning them in blob-major order. +func selectMultiBlobCells(sc *types.BlobTxCellSidecar, mask types.CustodyBitmap) []kzg4844.Cell { + var result []kzg4844.Cell + cellsPerBlob := sc.Custody.OneCount() + blobCount := len(sc.Cells) / cellsPerBlob + for b := 0; b < blobCount; b++ { + for _, idx := range mask.Indices() { + result = append(result, sc.Cells[b*cellsPerBlob+int(idx)]) + } + } + return result +} + +// TestMultiBlobDeliveryVerification tests that cells delivered in two partial +// deliveries for a multi-blob tx are correctly assembled and pass KZG cell +// proof verification via the addPayload callback. +func TestMultiBlobDeliveryVerification(t *testing.T) { + sidecar := testBlobSidecars[2] // 3 blobs + + var verifyErr error + testBlobFetcher(t, blobFetcherTest{ + init: func() *BlobFetcher { + return NewBlobFetcher( + func(common.Hash) bool { return false }, + func(txs []common.Hash, cells [][]kzg4844.Cell, cst *types.CustodyBitmap) []error { + // Verify delivered cells pass KZG cell proof verification + // Debug: compare with expected cells + expectedCells := selectMultiBlobCells(sidecar, custody) + for ci, c := range cells { + if len(c) != len(expectedCells) { + verifyErr = fmt.Errorf("cell count mismatch: have %d, want %d", len(c), len(expectedCells)) + return make([]error, len(txs)) + } + for j := range c { + if c[j] != expectedCells[j] { + verifyErr = fmt.Errorf("tx %d cell %d mismatch (custody=%v)", ci, j, cst.Indices()) + return make([]error, len(txs)) + } + } + } + for _, c := range cells { + cs := &types.BlobTxCellSidecar{ + Version: sidecar.Version, + Cells: c, + Commitments: sidecar.Commitments, + Proofs: sidecar.Proofs, + Custody: *cst, + } + indices := cs.Custody.Indices() + var cellProofs []kzg4844.Proof + for blobIdx := range len(cs.Commitments) { + for _, proofIdx := range indices { + cellProofs = append(cellProofs, cs.Proofs[blobIdx*kzg4844.CellProofsPerBlob+int(proofIdx)]) + } + } + verifyErr = kzg4844.VerifyCells(cs.Cells, cs.Commitments, cellProofs, indices) + } + return make([]error, len(txs)) + }, + func(string, []common.Hash, *types.CustodyBitmap) error { return nil }, + func(string) {}, + &custody, + &mockRand{value: 60}, // Force partial requests (60 >= fetchProbability) + ) + }, + steps: []interface{}{ + // Two full-custody peers → passes availability, promotes to announces + doBlobNotify{peer: "A", hashes: []common.Hash{testBlobTxHashes[0]}, custody: fullCustody}, + doBlobNotify{peer: "B", hashes: []common.Hash{testBlobTxHashes[0]}, custody: fullCustody}, + + // Two partial peers with front/back custody + doBlobNotify{peer: "D", hashes: []common.Hash{testBlobTxHashes[0]}, custody: backCustody}, + doBlobNotify{peer: "C", hashes: []common.Hash{testBlobTxHashes[0]}, custody: frontCustody}, + + // Drop A and B so C and D get scheduled for fetch + doDrop("A"), + doDrop("B"), + + // Deliver back cells from D → completes fetch and triggers addPayload + doBlobEnqueue{ + peer: "D", + hashes: []common.Hash{testBlobTxHashes[0]}, + cells: [][]kzg4844.Cell{selectMultiBlobCells(sidecar, *backCustody.Intersection(&custody))}, + custody: *backCustody.Intersection(&custody), + }, + // Deliver front cells from C + doBlobEnqueue{ + peer: "C", + hashes: []common.Hash{testBlobTxHashes[0]}, + cells: [][]kzg4844.Cell{selectMultiBlobCells(sidecar, *frontCustody.Intersection(&custody))}, + custody: *frontCustody.Intersection(&custody), + }, + isCompleted{testBlobTxHashes[0]}, + }, + }) + if verifyErr != nil { + t.Fatalf("KZG cell verification failed after multi-blob delivery: %v", verifyErr) + } +} diff --git a/eth/fetcher/tx_fetcher.go b/eth/fetcher/tx_fetcher.go index 271c9ddec2..f4e9271302 100644 --- a/eth/fetcher/tx_fetcher.go +++ b/eth/fetcher/tx_fetcher.go @@ -254,6 +254,9 @@ func (f *TxFetcher) Notify(peer string, kinds []byte, sizes []uint32, hashes []c for i, hash := range hashes { err := f.validateMeta(hash, kinds[i]) if errors.Is(err, txpool.ErrAlreadyKnown) { + if kinds[i] == types.BlobTxType { + blobFetchHashes = append(blobFetchHashes, hash) + } duplicate++ continue } diff --git a/eth/handler.go b/eth/handler.go index 4dbb764d92..4ece05d013 100644 --- a/eth/handler.go +++ b/eth/handler.go @@ -103,7 +103,7 @@ type txPool interface { type blobPool interface { Has(hash common.Hash) bool GetCells(hash common.Hash, mask types.CustodyBitmap) ([]kzg4844.Cell, error) - ValidateCells([]common.Hash, [][]kzg4844.Cell, *types.CustodyBitmap) []error + HasPayload(hash common.Hash) bool AddPayload([]common.Hash, [][]kzg4844.Cell, *types.CustodyBitmap) []error GetCustody(hash common.Hash) *types.CustodyBitmap } @@ -214,7 +214,7 @@ func newHandler(config *handlerConfig) (*handler, error) { } return p.RequestPayload(hashes, cells) } - h.blobFetcher = fetcher.NewBlobFetcher(h.blobpool.ValidateCells, h.blobpool.AddPayload, fetchPayloads, h.removePeer, &config.Custody, nil) + h.blobFetcher = fetcher.NewBlobFetcher(h.blobpool.HasPayload, h.blobpool.AddPayload, fetchPayloads, h.removePeer, &config.Custody, nil) return h, nil } diff --git a/eth/handler_test.go b/eth/handler_test.go index 8839e15019..384acffb61 100644 --- a/eth/handler_test.go +++ b/eth/handler_test.go @@ -83,6 +83,15 @@ func (p *testTxPool) Has(hash common.Hash) bool { return p.txPool[hash] != nil } +// Has returns an indicator whether txpool has a transaction +// cached with the given hash. +func (p *testTxPool) HasPayload(hash common.Hash) bool { + p.lock.Lock() + defer p.lock.Unlock() + + return p.cellPool[hash] != nil +} + // Get retrieves the transaction from local txpool with given // tx hash. func (p *testTxPool) Get(hash common.Hash) *types.Transaction { @@ -223,17 +232,6 @@ func (p *testTxPool) AddPayload(txs []common.Hash, cells [][]kzg4844.Cell, custo return nil } -func (p *testTxPool) ValidateCells(txs []common.Hash, cells [][]kzg4844.Cell, custody *types.CustodyBitmap) []error { - p.lock.Lock() - defer p.lock.Unlock() - - errors := make([]error, len(txs)) - for i, tx := range txs { - errors[i] = kzg4844.VerifyCells(cells[i], p.txPool[tx].BlobTxSidecar().Commitments, p.txPool[tx].BlobTxSidecar().Proofs, custody.Indices()) - } - return errors -} - // FilterType should check whether the pool supports the given type of transactions. func (p *testTxPool) FilterType(kind byte) bool { switch kind { diff --git a/eth/protocols/eth/broadcast.go b/eth/protocols/eth/broadcast.go index 8944b0d7f7..0ecee5d2ba 100644 --- a/eth/protocols/eth/broadcast.go +++ b/eth/protocols/eth/broadcast.go @@ -133,7 +133,11 @@ func (p *Peer) announceTransactions() { } pending = append(pending, queue[count]) pendingTypes = append(pendingTypes, meta.Type) - pendingSizes = append(pendingSizes, uint32(meta.Size)) + if p.version >= ETH71 && meta.SizeWithoutBlob > 0 { + pendingSizes = append(pendingSizes, uint32(meta.SizeWithoutBlob)) + } else { + pendingSizes = append(pendingSizes, uint32(meta.Size)) + } size += common.HashLength processed[count] = true