From a44f8868915b610d38323c4ca4c7e391945cbf65 Mon Sep 17 00:00:00 2001 From: healthykim Date: Wed, 20 May 2026 23:11:46 +0200 Subject: [PATCH] add Flush() function called in tx fetcher --- core/txpool/blobpool/buffer.go | 114 +++++++------- core/txpool/blobpool/buffer_test.go | 69 ++------- eth/fetcher/blob_fetcher.go | 2 +- eth/fetcher/blob_fetcher_test.go | 57 +++---- eth/fetcher/tx_fetcher.go | 160 +++++++++++++------- eth/fetcher/tx_fetcher_test.go | 37 +++-- eth/handler.go | 37 +---- eth/handler_eth.go | 4 +- tests/fuzzers/txfetcher/txfetcher_fuzzer.go | 19 ++- 9 files changed, 242 insertions(+), 257 deletions(-) diff --git a/core/txpool/blobpool/buffer.go b/core/txpool/blobpool/buffer.go index 6bf4ac233a..75b0c0d78e 100644 --- a/core/txpool/blobpool/buffer.go +++ b/core/txpool/blobpool/buffer.go @@ -23,6 +23,7 @@ import ( "time" "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/core/txpool" "github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/crypto/kzg4844" "github.com/ethereum/go-ethereum/log" @@ -68,6 +69,8 @@ type BlobBuffer struct { addToPool func(*BlobTxForPool) error validateTx func(*types.Transaction) error dropPeer func(string) + + completed []*BlobTxForPool } func NewBlobBuffer(validateTx func(*types.Transaction) error, addToPool func(*BlobTxForPool) error, dropPeer func(string)) *BlobBuffer { @@ -80,47 +83,56 @@ func NewBlobBuffer(validateTx func(*types.Transaction) error, addToPool func(*Bl } } +// Flush adds all completed entries to the pool and returns the hashes +// and errors for any that failed add. +func (b *BlobBuffer) Flush() ([]common.Hash, []error) { + var errs []error + var txs []common.Hash + for _, ptx := range b.completed { + if err := b.addToPool(ptx); err != nil { + errs = append(errs, err) + txs = append(txs, ptx.Tx.Hash()) + } + } + + return txs, errs +} + // AddTx buffers a blob transaction (without blobs) from an ETH/72 peer. // If cells are already buffered, verification and pool insertion are attempted. -func (b *BlobBuffer) AddTx(tx *types.Transaction, peer string) error { +func (b *BlobBuffer) AddTx(txs []*types.Transaction, peer string) []error { defer b.updateMetrics()() // First remove any timed-out entries. b.evict() - hash := tx.Hash() - sidecar := tx.BlobTxSidecar() - if sidecar == nil { - return fmt.Errorf("blob transaction without sidecar") + errs := make([]error, len(txs)) + for i, tx := range txs { + hash := tx.Hash() + sidecar := tx.BlobTxSidecar() + if sidecar == nil { + errs[i] = fmt.Errorf("blob transaction without sidecar") + continue + } + // tx validation (basic w/o lock) + // error will be handled by tx fetcher + if err := b.validateTx(tx); err != nil { + errs[i] = err + continue + } + if entry, ok := b.cells[hash]; ok { + b.add(hash, tx, entry) + continue + } + blobBufferTxFirstCounter.Inc(1) + b.txs[hash] = &txEntry{tx: tx, peer: peer, added: time.Now()} } - // tx validation - if err := b.validateTx(tx); err != nil { - log.Warn("Transaction validation failed, dropping peer", "peer", peer, "err", err) - b.dropPeer(peer) - return err - } - // vhash check - if err := sidecar.ValidateBlobCommitmentHashes(tx.BlobHashes()); err != nil { - log.Warn("Commitment hash mismatch, dropping peer", "peer", peer, "err", err) - b.dropPeer(peer) - return err - } - // proof count check - if len(sidecar.Proofs) < len(sidecar.Commitments)*kzg4844.CellProofsPerBlob { - b.dropPeer(peer) - return fmt.Errorf("insufficient proofs in sidecar") - } - if entry, ok := b.cells[hash]; ok { - return b.add(hash, tx, entry) - } - blobBufferTxFirstCounter.Inc(1) - b.txs[hash] = &txEntry{tx: tx, peer: peer, added: time.Now()} - return nil + return errs } // AddCells buffers per-peer cell deliveries from the blob fetcher. // If the transaction is already buffered, verification and pool insertion are attempted. -func (b *BlobBuffer) AddCells(hash common.Hash, deliveries map[string]*PeerDelivery, custody *types.CustodyBitmap) error { +func (b *BlobBuffer) AddCells(hash common.Hash, deliveries map[string]*PeerDelivery, custody *types.CustodyBitmap) { defer b.updateMetrics()() // First remove any timed-out entries. @@ -132,15 +144,13 @@ func (b *BlobBuffer) AddCells(hash common.Hash, deliveries map[string]*PeerDeliv added: time.Now(), } if txe, ok := b.txs[hash]; ok { - return b.add(hash, txe.tx, b.cells[hash]) + b.add(hash, txe.tx, b.cells[hash]) } blobBufferCellsFirstCounter.Inc(1) - return nil } -// todo: this is very strange // add verifies cells per-peer, sorts them, and adds to the pool. -func (b *BlobBuffer) add(hash common.Hash, tx *types.Transaction, cells *cellEntry) error { +func (b *BlobBuffer) add(hash common.Hash, tx *types.Transaction, cells *cellEntry) { sidecar := tx.BlobTxSidecar() // Per-peer cell verification @@ -148,7 +158,6 @@ func (b *BlobBuffer) add(hash common.Hash, tx *types.Transaction, cells *cellEnt b.dropPeers(badPeers) delete(b.cells, hash) delete(b.txs, hash) - return fmt.Errorf("cell verification failed") } blobCount := len(tx.BlobHashes()) sorted, custody := sortCells(cells, blobCount) @@ -165,10 +174,9 @@ func (b *BlobBuffer) add(hash common.Hash, tx *types.Transaction, cells *cellEnt CellSidecar: &cellSidecar, } - err := b.addToPool(pooledTx) + b.completed = append(b.completed, pooledTx) delete(b.cells, hash) delete(b.txs, hash) - return err } func (b *BlobBuffer) HasTx(hash common.Hash) bool { @@ -221,12 +229,20 @@ func (b *BlobBuffer) updateMetrics() func() { } } -// verifyCells verifies each peer's cells against the sidecar. +// verifyCells verifies each peer's cells against the sidecar by treating each +// per-peer delivery as a mini BlobTxCellSidecar and reusing txpool.ValidateCells. // Returns the list of peers whose cells failed verification. func (b *BlobBuffer) verifyCells(entry *cellEntry, sidecar *types.BlobTxSidecar) []string { var badPeers []string for peer, delivery := range entry.deliveries { - if err := verifyPeerCells(delivery, sidecar); err != nil { + perPeer := &types.BlobTxCellSidecar{ + Version: sidecar.Version, + Cells: delivery.Cells, + Commitments: sidecar.Commitments, + Proofs: sidecar.Proofs, + Custody: types.NewCustodyBitmap(delivery.Indices), + } + if err := txpool.ValidateCells(perPeer); err != nil { log.Debug("Cell verification failed", "peer", peer, "err", err) badPeers = append(badPeers, peer) } @@ -234,28 +250,6 @@ func (b *BlobBuffer) verifyCells(entry *cellEntry, sidecar *types.BlobTxSidecar) return badPeers } -// verifyPeerCells verifies a single peer's cells against the sidecar proofs. -// delivery.Cells is blob-major: [blob0_cell0..blob0_cellN, blob1_cell0..blob1_cellN, ...] -func verifyPeerCells(delivery *PeerDelivery, sidecar *types.BlobTxSidecar) error { - cellsPerBlob := len(delivery.Indices) - blobCount := len(delivery.Cells) / cellsPerBlob - if blobCount == 0 || blobCount != len(sidecar.Commitments) { - return fmt.Errorf("blob count mismatch: delivery %d, commitments %d", blobCount, len(sidecar.Commitments)) - } - // Extract proofs corresponding to this peer's cell indices - var proofs []kzg4844.Proof - for blobIdx := 0; blobIdx < blobCount; blobIdx++ { - for _, cellIdx := range delivery.Indices { - proofIdx := blobIdx*kzg4844.CellProofsPerBlob + int(cellIdx) - if proofIdx >= len(sidecar.Proofs) { - return fmt.Errorf("proof index out of range: %d", proofIdx) - } - proofs = append(proofs, sidecar.Proofs[proofIdx]) - } - } - return kzg4844.VerifyCells(delivery.Cells, sidecar.Commitments, proofs, delivery.Indices) -} - // sortCells merges all per-peer deliveries into a single flat cell array // sorted by custody index. // diff --git a/core/txpool/blobpool/buffer_test.go b/core/txpool/blobpool/buffer_test.go index 67abde7ad4..ef2aadd3de 100644 --- a/core/txpool/blobpool/buffer_test.go +++ b/core/txpool/blobpool/buffer_test.go @@ -4,12 +4,9 @@ import ( "crypto/ecdsa" "testing" - "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/crypto" "github.com/ethereum/go-ethereum/crypto/kzg4844" - "github.com/ethereum/go-ethereum/params" - "github.com/holiman/uint256" ) // makeV1Tx creates a V1 blob transaction with cell proofs, then strips blobs @@ -95,7 +92,7 @@ func TestAddTxThenCells(t *testing.T) { tx := makeV1Tx(t, 0, blobCount, 0, key) hash := tx.Hash() - if err := buf.AddTx(tx, "peerA"); err != nil { + if err := buf.AddTx([]*types.Transaction{tx}, "peerA")[0]; err != nil { t.Fatal(err) } if !buf.HasTx(hash) { @@ -109,9 +106,7 @@ func TestAddTxThenCells(t *testing.T) { delivery := makePeerDelivery(t, 0, blobCount, dataIndices) custody := types.NewCustodyBitmap(dataIndices) - if err := buf.AddCells(hash, map[string]*PeerDelivery{"peerB": delivery}, &custody); err != nil { - t.Fatal(err) - } + buf.AddCells(hash, map[string]*PeerDelivery{"peerB": delivery}, &custody) if buf.HasTx(hash) || buf.HasCells(hash) { t.Fatal("buffer should be empty after add") } @@ -132,14 +127,12 @@ func TestAddCellsThenTx(t *testing.T) { delivery := makePeerDelivery(t, 0, blobCount, dataIndices) custody := types.NewCustodyBitmap(dataIndices) - if err := buf.AddCells(hash, map[string]*PeerDelivery{"peerB": delivery}, &custody); err != nil { - t.Fatal(err) - } + buf.AddCells(hash, map[string]*PeerDelivery{"peerB": delivery}, &custody) if !buf.HasCells(hash) { t.Fatal("cells should be buffered") } - if err := buf.AddTx(tx, "peerA"); err != nil { + if err := buf.AddTx([]*types.Transaction{tx}, "peerA")[0]; err != nil { t.Fatal(err) } if buf.HasTx(hash) || buf.HasCells(hash) { @@ -154,7 +147,7 @@ func TestMultiPeerDelivery(t *testing.T) { tx := makeV1Tx(t, 0, blobCount, 0, key) hash := tx.Hash() - buf.AddTx(tx, "peerA") + buf.AddTx([]*types.Transaction{tx}, "peerA") indicesA := []uint64{0, 2, 4, 6} indicesB := []uint64{1, 3, 5, 7} @@ -164,12 +157,10 @@ func TestMultiPeerDelivery(t *testing.T) { allIndices := append(indicesA, indicesB...) custody := types.NewCustodyBitmap(allIndices) - if err := buf.AddCells(hash, map[string]*PeerDelivery{ + buf.AddCells(hash, map[string]*PeerDelivery{ "peerB": deliveryA, "peerC": deliveryB, - }, &custody); err != nil { - t.Fatal(err) - } + }, &custody) if buf.HasTx(hash) || buf.HasCells(hash) { t.Fatal("buffer should be empty after add") } @@ -188,7 +179,7 @@ func TestBadCell(t *testing.T) { tx := makeV1Tx(t, 0, blobCount, 0, key) hash := tx.Hash() - buf.AddTx(tx, "peerA") + buf.AddTx([]*types.Transaction{tx}, "peerA") goodDelivery := makePeerDelivery(t, 0, blobCount, []uint64{0, 1, 2, 3}) badDelivery := makePeerDelivery(t, 0, blobCount, []uint64{4, 5, 6, 7}) @@ -201,13 +192,10 @@ func TestBadCell(t *testing.T) { allIndices := []uint64{0, 1, 2, 3, 4, 5, 6, 7} custody := types.NewCustodyBitmap(allIndices) - err := buf.AddCells(hash, map[string]*PeerDelivery{ + buf.AddCells(hash, map[string]*PeerDelivery{ "peerB": goodDelivery, "peerC": badDelivery, }, &custody) - if err == nil { - t.Fatal("expected error from bad cells") - } if len(dropped) != 1 || dropped[0] != "peerC" { t.Fatalf("only peerC should have been dropped, got: %v", dropped) @@ -216,42 +204,3 @@ func TestBadCell(t *testing.T) { t.Fatal("buffer should be empty after bad cell drop") } } - -func TestBadTx(t *testing.T) { - key, _ := crypto.GenerateKey() - - var dropped []string - buf := NewBlobBuffer( - func(tx *types.Transaction) error { return nil }, - func(ptx *BlobTxForPool) error { return nil }, - func(peer string) { dropped = append(dropped, peer) }, - ) - - blobtx := &types.BlobTx{ - ChainID: uint256.MustFromBig(params.MainnetChainConfig.ChainID), - Nonce: 0, - GasTipCap: uint256.NewInt(1), - GasFeeCap: uint256.NewInt(1), - Gas: 21000, - BlobFeeCap: uint256.NewInt(1), - BlobHashes: []common.Hash{testBlobVHashes[0]}, - Value: uint256.NewInt(100), - Sidecar: types.NewBlobTxSidecar(types.BlobSidecarVersion1, - nil, - []kzg4844.Commitment{testBlobCommits[1]}, - testBlobCellProofs[1], - ), - } - tx := types.MustSignNewTx(key, types.LatestSigner(params.MainnetChainConfig), blobtx) - - err := buf.AddTx(tx, "peerA") - if err == nil { - t.Fatal("expected error from commitment mismatch") - } - if len(dropped) != 1 || dropped[0] != "peerA" { - t.Fatalf("only peerA should have been dropped, got: %v", dropped) - } - if buf.HasTx(tx.Hash()) { - t.Fatal("tx should not be buffered") - } -} diff --git a/eth/fetcher/blob_fetcher.go b/eth/fetcher/blob_fetcher.go index 089a946b1a..6226b89820 100644 --- a/eth/fetcher/blob_fetcher.go +++ b/eth/fetcher/blob_fetcher.go @@ -91,7 +91,7 @@ type fetchStatus struct { type BlobFetcherFunctions struct { HasPayload func(common.Hash) bool - AddCells func(common.Hash, map[string]*PeerCellDelivery, *types.CustodyBitmap) error + AddCells func(common.Hash, map[string]*PeerCellDelivery, *types.CustodyBitmap) FetchPayloads func(string, []common.Hash, *types.CustodyBitmap) error DropPeer func(string) } diff --git a/eth/fetcher/blob_fetcher_test.go b/eth/fetcher/blob_fetcher_test.go index e89f7602ed..84732e0d92 100644 --- a/eth/fetcher/blob_fetcher_test.go +++ b/eth/fetcher/blob_fetcher_test.go @@ -47,9 +47,14 @@ func makeTestCellSidecar(blobCount int) *types.BlobTxCellSidecar { proofs = append(proofs, cellProofs...) } - sidecar, _ := types.NewBlobTxSidecar(types.BlobSidecarVersion1, blobs, commitments, proofs).ToBlobTxCellSidecar() - - return sidecar + cells, _ := kzg4844.ComputeCells(blobs) + return &types.BlobTxCellSidecar{ + Version: types.BlobSidecarVersion1, + Cells: cells, + Commitments: commitments, + Proofs: proofs, + Custody: *types.CustodyBitmapAll, + } } func selectCells(cells []kzg4844.Cell, custody *types.CustodyBitmap) []kzg4844.Cell { @@ -149,9 +154,7 @@ func TestBlobFetcherFullFetch(t *testing.T) { return NewBlobFetcher( BlobFetcherFunctions{ HasPayload: func(common.Hash) bool { return false }, - AddCells: func(common.Hash, map[string]*PeerCellDelivery, *types.CustodyBitmap) error { - return nil - }, + AddCells: func(common.Hash, map[string]*PeerCellDelivery, *types.CustodyBitmap) {}, FetchPayloads: func(string, []common.Hash, *types.CustodyBitmap) error { return nil }, @@ -241,10 +244,8 @@ func TestBlobFetcherPartialFetch(t *testing.T) { init: func() *BlobFetcher { return NewBlobFetcher( BlobFetcherFunctions{ - HasPayload: func(common.Hash) bool { return false }, - AddCells: func(common.Hash, map[string]*PeerCellDelivery, *types.CustodyBitmap) error { - return nil - }, + HasPayload: func(common.Hash) bool { return false }, AddCells: func(common.Hash, map[string]*PeerCellDelivery, *types.CustodyBitmap) {}, + FetchPayloads: func(string, []common.Hash, *types.CustodyBitmap) error { return nil }, @@ -338,10 +339,8 @@ func TestBlobFetcherFullDelivery(t *testing.T) { init: func() *BlobFetcher { return NewBlobFetcher( BlobFetcherFunctions{ - HasPayload: func(common.Hash) bool { return false }, - AddCells: func(common.Hash, map[string]*PeerCellDelivery, *types.CustodyBitmap) error { - return nil - }, + HasPayload: func(common.Hash) bool { return false }, AddCells: func(common.Hash, map[string]*PeerCellDelivery, *types.CustodyBitmap) {}, + FetchPayloads: func(string, []common.Hash, *types.CustodyBitmap) error { return nil }, @@ -388,10 +387,8 @@ func TestBlobFetcherPartialDelivery(t *testing.T) { init: func() *BlobFetcher { return NewBlobFetcher( BlobFetcherFunctions{ - HasPayload: func(common.Hash) bool { return false }, - AddCells: func(common.Hash, map[string]*PeerCellDelivery, *types.CustodyBitmap) error { - return nil - }, + HasPayload: func(common.Hash) bool { return false }, AddCells: func(common.Hash, map[string]*PeerCellDelivery, *types.CustodyBitmap) {}, + FetchPayloads: func(string, []common.Hash, *types.CustodyBitmap) error { return nil }, @@ -526,10 +523,8 @@ func TestBlobFetcherAvailabilityTimeout(t *testing.T) { init: func() *BlobFetcher { return NewBlobFetcher( BlobFetcherFunctions{ - HasPayload: func(common.Hash) bool { return false }, - AddCells: func(common.Hash, map[string]*PeerCellDelivery, *types.CustodyBitmap) error { - return nil - }, + HasPayload: func(common.Hash) bool { return false }, AddCells: func(common.Hash, map[string]*PeerCellDelivery, *types.CustodyBitmap) {}, + FetchPayloads: func(string, []common.Hash, *types.CustodyBitmap) error { return nil }, @@ -570,10 +565,8 @@ func TestBlobFetcherPeerDrop(t *testing.T) { init: func() *BlobFetcher { return NewBlobFetcher( BlobFetcherFunctions{ - HasPayload: func(common.Hash) bool { return false }, - AddCells: func(common.Hash, map[string]*PeerCellDelivery, *types.CustodyBitmap) error { - return nil - }, + HasPayload: func(common.Hash) bool { return false }, AddCells: func(common.Hash, map[string]*PeerCellDelivery, *types.CustodyBitmap) {}, + FetchPayloads: func(string, []common.Hash, *types.CustodyBitmap) error { return nil }, @@ -649,10 +642,8 @@ func TestBlobFetcherFetchTimeout(t *testing.T) { init: func() *BlobFetcher { return NewBlobFetcher( BlobFetcherFunctions{ - HasPayload: func(common.Hash) bool { return false }, - AddCells: func(common.Hash, map[string]*PeerCellDelivery, *types.CustodyBitmap) error { - return nil - }, + HasPayload: func(common.Hash) bool { return false }, AddCells: func(common.Hash, map[string]*PeerCellDelivery, *types.CustodyBitmap) {}, + FetchPayloads: func(string, []common.Hash, *types.CustodyBitmap) error { return nil }, @@ -1040,7 +1031,7 @@ func TestMultiBlobDeliveryVerification(t *testing.T) { return NewBlobFetcher( BlobFetcherFunctions{ HasPayload: func(common.Hash) bool { return false }, - AddCells: func(h common.Hash, deliveries map[string]*PeerCellDelivery, custody *types.CustodyBitmap) error { + AddCells: func(h common.Hash, deliveries map[string]*PeerCellDelivery, custody *types.CustodyBitmap) { // Verify each peer's delivered cells pass KZG cell proof verification for _, d := range deliveries { var cellProofs []kzg4844.Proof @@ -1050,11 +1041,7 @@ func TestMultiBlobDeliveryVerification(t *testing.T) { } } verifyErr = kzg4844.VerifyCells(d.Cells, sidecar.Commitments, cellProofs, d.Indices) - if verifyErr != nil { - return verifyErr - } } - return nil }, FetchPayloads: func(string, []common.Hash, *types.CustodyBitmap) error { return nil diff --git a/eth/fetcher/tx_fetcher.go b/eth/fetcher/tx_fetcher.go index 20a6d031f2..9c530b082f 100644 --- a/eth/fetcher/tx_fetcher.go +++ b/eth/fetcher/tx_fetcher.go @@ -29,8 +29,11 @@ import ( "github.com/ethereum/go-ethereum/common/mclock" "github.com/ethereum/go-ethereum/core" "github.com/ethereum/go-ethereum/core/txpool" + "github.com/ethereum/go-ethereum/core/txpool/blobpool" "github.com/ethereum/go-ethereum/core/types" + "github.com/ethereum/go-ethereum/eth/protocols/eth" "github.com/ethereum/go-ethereum/log" + "github.com/ethereum/go-ethereum/metrics" ) const ( @@ -180,10 +183,12 @@ type TxFetcher struct { alternates map[common.Hash]map[string]struct{} // In-flight transaction alternate origins if retrieval fails // Callbacks - validateMeta func(common.Hash, byte) error // Validate a tx metadata based on the local txpool - addTxs func(string, []*types.Transaction) []error // Insert a batch of transactions into local txpool - fetchTxs func(string, []common.Hash) error // Retrieves a set of txs from a remote peer - dropPeer func(string) // Drops a peer in case of announcement violation + validateMeta func(common.Hash, byte) error // Validate a tx metadata based on the local txpool + addTxs func([]*types.Transaction) []error // Insert a batch of transactions into local txpool + fetchTxs func(string, []common.Hash) error // Retrieves a set of txs from a remote peer + dropPeer func(string) // Drops a peer in case of announcement violation + + buffer *blobpool.BlobBuffer step chan struct{} // Notification channel when the fetcher loop iterates clock mclock.Clock // Monotonic clock or simulated clock for tests @@ -194,16 +199,17 @@ type TxFetcher struct { // NewTxFetcher creates a transaction fetcher to retrieve transaction // based on hash announcements. // Chain can be nil to disable on-chain checks. -func NewTxFetcher(chain *core.BlockChain, validateMeta func(common.Hash, byte) error, addTxs func(string, []*types.Transaction) []error, fetchTxs func(string, []common.Hash) error, dropPeer func(string)) *TxFetcher { - return NewTxFetcherForTests(chain, validateMeta, addTxs, fetchTxs, dropPeer, mclock.System{}, time.Now, nil) +func NewTxFetcher(chain *core.BlockChain, validateMeta func(common.Hash, byte) error, addTxs func([]*types.Transaction) []error, fetchTxs func(string, []common.Hash) error, + dropPeer func(string), buffer *blobpool.BlobBuffer) *TxFetcher { + return NewTxFetcherForTests(chain, validateMeta, addTxs, fetchTxs, dropPeer, buffer, mclock.System{}, time.Now, nil) } // NewTxFetcherForTests is a testing method to mock out the realtime clock with // a simulated version and the internal randomness with a deterministic one. // Chain can be nil to disable on-chain checks. func NewTxFetcherForTests( - chain *core.BlockChain, validateMeta func(common.Hash, byte) error, addTxs func(string, []*types.Transaction) []error, fetchTxs func(string, []common.Hash) error, dropPeer func(string), - clock mclock.Clock, realTime func() time.Time, rand *mrand.Rand) *TxFetcher { + chain *core.BlockChain, validateMeta func(common.Hash, byte) error, addTxs func([]*types.Transaction) []error, fetchTxs func(string, []common.Hash) error, dropPeer func(string), + buffer *blobpool.BlobBuffer, clock mclock.Clock, realTime func() time.Time, rand *mrand.Rand) *TxFetcher { return &TxFetcher{ notify: make(chan *txAnnounce), cleanup: make(chan *txDelivery), @@ -224,6 +230,7 @@ func NewTxFetcherForTests( addTxs: addTxs, fetchTxs: fetchTxs, dropPeer: dropPeer, + buffer: buffer, clock: clock, realTime: realTime, rand: rand, @@ -312,26 +319,36 @@ func (f *TxFetcher) isKnownUnderpriced(hash common.Hash) bool { return ok } +type deliveryMetrics struct { + inMeter *metrics.Meter + knownMeter *metrics.Meter + underpricedMeter *metrics.Meter + otherRejectMeter *metrics.Meter +} + // Enqueue imports a batch of received transaction into the transaction pool // and the fetcher. This method may be called by both transaction broadcasts and // direct request replies. The differentiation is important so the fetcher can // re-schedule missing transactions as soon as possible. -func (f *TxFetcher) Enqueue(peer string, txs []*types.Transaction, direct bool) error { - var ( - inMeter = txReplyInMeter - knownMeter = txReplyKnownMeter - underpricedMeter = txReplyUnderpricedMeter - otherRejectMeter = txReplyOtherRejectMeter - violation error - ) +func (f *TxFetcher) Enqueue(peer string, version uint, txs []*types.Transaction, direct bool) error { + var violation error + + metrics := deliveryMetrics{ + inMeter: txReplyInMeter, + knownMeter: txReplyKnownMeter, + underpricedMeter: txReplyUnderpricedMeter, + otherRejectMeter: txReplyOtherRejectMeter, + } if !direct { - inMeter = txBroadcastInMeter - knownMeter = txBroadcastKnownMeter - underpricedMeter = txBroadcastUnderpricedMeter - otherRejectMeter = txBroadcastOtherRejectMeter + metrics = deliveryMetrics{ + inMeter: txBroadcastInMeter, + knownMeter: txBroadcastKnownMeter, + underpricedMeter: txBroadcastUnderpricedMeter, + otherRejectMeter: txBroadcastOtherRejectMeter, + } } // Keep track of all the propagated transactions - inMeter.Mark(int64(len(txs))) + metrics.inMeter.Mark(int64(len(txs))) // Push all the transactions into the pool, tracking underpriced ones to avoid // re-requesting them and dropping the peer in case of malicious transfers. @@ -345,38 +362,35 @@ func (f *TxFetcher) Enqueue(peer string, txs []*types.Transaction, direct bool) if end > len(txs) { end = len(txs) } - var ( - duplicate int64 - underpriced int64 - otherreject int64 - ) batch := txs[i:end] - - for j, err := range f.addTxs(peer, batch) { - // Track the transaction hash if the price is too low for us. - // Avoid re-request this transaction when we receive another - // announcement. - if errors.Is(err, txpool.ErrUnderpriced) || errors.Is(err, txpool.ErrReplaceUnderpriced) || errors.Is(err, txpool.ErrTxGasPriceTooLow) { - f.underpriced.Add(batch[j].Hash(), batch[j].Time()) + var ( + poolTxs []*types.Transaction + blobTxs []*types.Transaction + ) + if version >= eth.ETH72 { + for _, tx := range batch { + if tx.Type() == types.BlobTxType { + blobTxs = append(blobTxs, tx) + } else { + poolTxs = append(poolTxs, tx) + } } - // Track a few interesting failure types - switch { - case err == nil: // Noop, but need to handle to not count these + } else { + poolTxs = batch + } + batch = append(poolTxs, blobTxs...) + errs := append(f.addTxs(poolTxs), f.buffer.AddTx(blobTxs, peer)...) - case errors.Is(err, txpool.ErrAlreadyKnown): - duplicate++ - - case errors.Is(err, txpool.ErrUnderpriced) || errors.Is(err, txpool.ErrReplaceUnderpriced) || errors.Is(err, txpool.ErrTxGasPriceTooLow): - underpriced++ - - case errors.Is(err, txpool.ErrKZGVerificationError) || errors.Is(err, txpool.ErrSidecarFormatError): + hashes := make([]common.Hash, len(batch)) + for j := range batch { + hashes[j] = batch[j].Hash() + } + for j, err := range errs { + if errors.Is(err, txpool.ErrKZGVerificationError) || errors.Is(err, txpool.ErrSidecarFormatError) { // KZG verification failed, terminate transaction processing immediately. // Since KZG verification is computationally expensive, this acts as a // defensive measure against potential DoS attacks. violation = err - - default: - otherreject++ } added = append(added, batch[j].Hash()) metas = append(metas, txMetadata{ @@ -389,15 +403,7 @@ func (f *TxFetcher) Enqueue(peer string, txs []*types.Transaction, direct bool) break } } - knownMeter.Mark(duplicate) - underpricedMeter.Mark(underpriced) - otherRejectMeter.Mark(otherreject) - - // If 'other reject' is >25% of the deliveries in any batch, sleep a bit. - if otherreject > int64((len(batch)+3)/4) { - log.Debug("Peer delivering stale or invalid transactions", "peer", peer, "rejected", otherreject) - time.Sleep(200 * time.Millisecond) - } + f.handleAddErrors(hashes, errs, metrics) // If we encountered a protocol violation, disconnect this peer. if violation != nil { break @@ -411,6 +417,42 @@ func (f *TxFetcher) Enqueue(peer string, txs []*types.Transaction, direct bool) } } +func (f *TxFetcher) handleAddErrors(txs []common.Hash, errs []error, metrics deliveryMetrics) { + var ( + duplicate int64 + underpriced int64 + otherreject int64 + ) + for i, err := range errs { + // Track a few interesting failure types + switch { + case err == nil: // Noop, but need to handle to not count these + + case errors.Is(err, txpool.ErrAlreadyKnown): + duplicate++ + + // Track the transaction hash if the price is too low for us. + // Avoid re-request this transaction when we receive another + // announcement. + case errors.Is(err, txpool.ErrUnderpriced) || errors.Is(err, txpool.ErrReplaceUnderpriced) || errors.Is(err, txpool.ErrTxGasPriceTooLow): + f.underpriced.Add(txs[i], f.realTime()) + underpriced++ + + default: + otherreject++ + } + } + metrics.knownMeter.Mark(duplicate) + metrics.underpricedMeter.Mark(underpriced) + metrics.otherRejectMeter.Mark(otherreject) + + // If 'other reject' is >25% of the deliveries in any batch, sleep a bit. + if otherreject > int64((len(txs)+3)/4) { + log.Debug("Peer delivering stale or invalid transactions", "rejected", otherreject) + time.Sleep(200 * time.Millisecond) + } +} + // Drop should be called when a peer disconnects. It cleans up all the internal // data structures of the given node. func (f *TxFetcher) Drop(peer string) error { @@ -456,6 +498,14 @@ func (f *TxFetcher) loop() { } for { + txs, errs := f.buffer.Flush() + f.handleAddErrors(txs, errs, deliveryMetrics{ + inMeter: txReplyInMeter, + knownMeter: txReplyKnownMeter, + underpricedMeter: txReplyUnderpricedMeter, + otherRejectMeter: txReplyOtherRejectMeter, + }) + select { case ann := <-f.notify: // Drop part of the new announcements if there are too many accumulated. diff --git a/eth/fetcher/tx_fetcher_test.go b/eth/fetcher/tx_fetcher_test.go index 4e8ea14000..9c667dc2a0 100644 --- a/eth/fetcher/tx_fetcher_test.go +++ b/eth/fetcher/tx_fetcher_test.go @@ -28,9 +28,11 @@ import ( "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/common/mclock" "github.com/ethereum/go-ethereum/core/txpool" + "github.com/ethereum/go-ethereum/core/txpool/blobpool" "github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/crypto" "github.com/ethereum/go-ethereum/crypto/kzg4844" + "github.com/ethereum/go-ethereum/eth/protocols/eth" "github.com/ethereum/go-ethereum/params" "github.com/holiman/uint256" ) @@ -60,9 +62,10 @@ type doTxNotify struct { sizes []uint32 } type doTxEnqueue struct { - peer string - txs []*types.Transaction - direct bool + peer string + version uint + txs []*types.Transaction + direct bool } type doWait struct { time time.Duration @@ -87,17 +90,28 @@ type txFetcherTest struct { steps []interface{} } +// newTestBlobBuffer returns a BlobBuffer with no-op callbacks for tests that +// don't exercise blob handling but still need a non-nil buffer. +func newTestBlobBuffer() *blobpool.BlobBuffer { + return blobpool.NewBlobBuffer( + func(*types.Transaction) error { return nil }, + func(*blobpool.BlobTxForPool) error { return nil }, + func(string) {}, + ) +} + // newTestTxFetcher creates a tx fetcher with noop callbacks, simulated clock, // and deterministic randomness. func newTestTxFetcher() *TxFetcher { return NewTxFetcher( nil, func(common.Hash, byte) error { return nil }, - func(_ string, txs []*types.Transaction) []error { + func(txs []*types.Transaction) []error { return make([]error, len(txs)) }, func(string, []common.Hash) error { return nil }, nil, + newTestBlobBuffer(), ) } @@ -1172,7 +1186,7 @@ func TestTransactionFetcherUnderpricedDedup(t *testing.T) { testTransactionFetcherParallel(t, txFetcherTest{ init: func() *TxFetcher { f := newTestTxFetcher() - f.addTxs = func(_ string, txs []*types.Transaction) []error { + f.addTxs = func(txs []*types.Transaction) []error { errs := make([]error, len(txs)) for i := 0; i < len(errs); i++ { if i%3 == 0 { @@ -1270,7 +1284,7 @@ func TestTransactionFetcherUnderpricedDoSProtection(t *testing.T) { testTransactionFetcher(t, txFetcherTest{ init: func() *TxFetcher { f := newTestTxFetcher() - f.addTxs = func(_ string, txs []*types.Transaction) []error { + f.addTxs = func(txs []*types.Transaction) []error { errs := make([]error, len(txs)) for i := 0; i < len(errs); i++ { errs[i] = txpool.ErrUnderpriced @@ -1787,7 +1801,7 @@ func TestTransactionProtocolViolation(t *testing.T) { testTransactionFetcherParallel(t, txFetcherTest{ init: func() *TxFetcher { f := newTestTxFetcher() - f.addTxs = func(_ string, txs []*types.Transaction) []error { + f.addTxs = func(txs []*types.Transaction) []error { var errs []error for range txs { errs = append(errs, txpool.ErrKZGVerificationError) @@ -1899,7 +1913,7 @@ func testTransactionFetcher(t *testing.T, tt txFetcherTest) { } case doTxEnqueue: - if err := fetcher.Enqueue(step.peer, step.txs, step.direct); err != nil { + if err := fetcher.Enqueue(step.peer, step.version, step.txs, step.direct); err != nil { t.Errorf("step %d: %v", i, err) } <-wait // Fetcher needs to process this, wait until it's done @@ -2194,7 +2208,7 @@ func TestTransactionForgotten(t *testing.T) { fetcher := NewTxFetcherForTests( nil, func(common.Hash, byte) error { return nil }, - func(_ string, txs []*types.Transaction) []error { + func(txs []*types.Transaction) []error { errs := make([]error, len(txs)) for i := 0; i < len(errs); i++ { errs[i] = txpool.ErrUnderpriced @@ -2203,6 +2217,7 @@ func TestTransactionForgotten(t *testing.T) { }, func(string, []common.Hash) error { return nil }, func(string) {}, + newTestBlobBuffer(), mockClock, mockTime, rand.New(rand.NewSource(0)), // Use fixed seed for deterministic behavior @@ -2219,7 +2234,7 @@ func TestTransactionForgotten(t *testing.T) { tx2.SetTime(now) // Initial state: both transactions should be marked as underpriced - if err := fetcher.Enqueue("peer", []*types.Transaction{tx1, tx2}, false); err != nil { + if err := fetcher.Enqueue("peer", eth.ETH70, []*types.Transaction{tx1, tx2}, false); err != nil { t.Fatal(err) } if !fetcher.isKnownUnderpriced(tx1.Hash()) { @@ -2268,7 +2283,7 @@ func TestTransactionForgotten(t *testing.T) { // Re-enqueue tx1 with updated timestamp tx1.SetTime(mockTime()) - if err := fetcher.Enqueue("peer", []*types.Transaction{tx1}, false); err != nil { + if err := fetcher.Enqueue("peer", eth.ETH70, []*types.Transaction{tx1}, false); err != nil { t.Fatal(err) } if !fetcher.isKnownUnderpriced(tx1.Hash()) { diff --git a/eth/handler.go b/eth/handler.go index dc6f0ed1b4..cb86a33d7d 100644 --- a/eth/handler.go +++ b/eth/handler.go @@ -139,7 +139,6 @@ type handler struct { downloader *downloader.Downloader txFetcher *fetcher.TxFetcher blobFetcher *fetcher.BlobFetcher - blobBuffer *blobpool.BlobBuffer peers *peerSet txBroadcastKey [16]byte @@ -190,33 +189,13 @@ func newHandler(config *handlerConfig) (*handler, error) { } // Construct the blob buffer for assembling blob txs from separate tx and cell deliveries. - h.blobBuffer = blobpool.NewBlobBuffer(h.blobpool.ValidateTxBasics, h.blobpool.AddPooledTx, h.removePeer) + blobBuffer := blobpool.NewBlobBuffer(h.blobpool.ValidateTxBasics, h.blobpool.AddPooledTx, h.removePeer) - addTxs := func(peer string, txs []*types.Transaction) []error { - errs := make([]error, len(txs)) - p := h.peers.peer(peer) - isETH72 := p != nil && p.Version() >= eth.ETH72 - - var poolTxs []*types.Transaction - var index []int - for i, tx := range txs { - if isETH72 && tx.Type() == types.BlobTxType { - errs[i] = h.blobBuffer.AddTx(tx, peer) - } else { - poolTxs = append(poolTxs, tx) - index = append(index, i) - } - } - if len(poolTxs) > 0 { - poolErrs := h.txpool.Add(poolTxs, false) - for j, idx := range index { - errs[idx] = poolErrs[j] - } - } - return errs + addTxs := func(txs []*types.Transaction) []error { + return h.txpool.Add(txs, false) } validateMeta := func(tx common.Hash, kind byte) error { - if h.txpool.Has(tx) || h.blobBuffer.HasTx(tx) { + if h.txpool.Has(tx) || blobBuffer.HasTx(tx) { return txpool.ErrAlreadyKnown } if !h.txpool.FilterType(kind) { @@ -224,7 +203,7 @@ func newHandler(config *handlerConfig) (*handler, error) { } return nil } - h.txFetcher = fetcher.NewTxFetcher(h.chain, validateMeta, addTxs, fetchTx, h.removePeer) + h.txFetcher = fetcher.NewTxFetcher(h.chain, validateMeta, addTxs, fetchTx, h.removePeer, blobBuffer) // Construct the blob fetcher for cell-based blob data availability blobCallbacks := fetcher.BlobFetcherFunctions{ @@ -236,14 +215,14 @@ func newHandler(config *handlerConfig) (*handler, error) { return p.RequestPayload(hashes, cells) }, HasPayload: func(hash common.Hash) bool { - return h.blobpool.Has(hash) || h.blobBuffer.HasCells(hash) + return h.blobpool.Has(hash) || blobBuffer.HasCells(hash) }, - AddCells: func(hash common.Hash, deliveries map[string]*fetcher.PeerCellDelivery, custody *types.CustodyBitmap) error { + AddCells: func(hash common.Hash, deliveries map[string]*fetcher.PeerCellDelivery, custody *types.CustodyBitmap) { converted := make(map[string]*blobpool.PeerDelivery, len(deliveries)) for peer, d := range deliveries { converted[peer] = &blobpool.PeerDelivery{Cells: d.Cells, Indices: d.Indices} } - return h.blobBuffer.AddCells(hash, converted, custody) + blobBuffer.AddCells(hash, converted, custody) }, DropPeer: h.removePeer, } diff --git a/eth/handler_eth.go b/eth/handler_eth.go index 37573c3cbd..cb5e3d92a6 100644 --- a/eth/handler_eth.go +++ b/eth/handler_eth.go @@ -81,7 +81,7 @@ func (h *ethHandler) Handle(peer *eth.Peer, packet eth.Packet) error { if err := handleTransactions(peer, txs, true); err != nil { return fmt.Errorf("Transactions: %v", err) } - return h.txFetcher.Enqueue(peer.ID(), txs, false) + return h.txFetcher.Enqueue(peer.ID(), peer.Version(), txs, false) case *eth.PooledTransactionsPacket: txs, err := packet.List.Items() @@ -91,7 +91,7 @@ func (h *ethHandler) Handle(peer *eth.Peer, packet eth.Packet) error { if err := handleTransactions(peer, txs, false); err != nil { return fmt.Errorf("PooledTransactions: %v", err) } - return h.txFetcher.Enqueue(peer.ID(), txs, true) + return h.txFetcher.Enqueue(peer.ID(), peer.Version(), txs, true) case *eth.CellsResponse: return h.blobFetcher.Enqueue(peer.ID(), packet.Hashes, packet.Cells, packet.Mask) diff --git a/tests/fuzzers/txfetcher/txfetcher_fuzzer.go b/tests/fuzzers/txfetcher/txfetcher_fuzzer.go index 1e15b991fa..8f8660a515 100644 --- a/tests/fuzzers/txfetcher/txfetcher_fuzzer.go +++ b/tests/fuzzers/txfetcher/txfetcher_fuzzer.go @@ -25,22 +25,28 @@ import ( "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/common/mclock" + "github.com/ethereum/go-ethereum/core/txpool/blobpool" "github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/eth/fetcher" + "github.com/ethereum/go-ethereum/eth/protocols/eth" ) var ( - peers []string - txs []*types.Transaction + peers []string + peerVersions map[string]uint + txs []*types.Transaction ) func init() { // Random is nice, but we need it deterministic rand := rand.New(rand.NewSource(0x3a29)) + supportedVersions := []uint{eth.ETH69, eth.ETH70, eth.ETH72} peers = make([]string, 10) + peerVersions = make(map[string]uint, len(peers)) for i := 0; i < len(peers); i++ { peers[i] = fmt.Sprintf("Peer #%d", i) + peerVersions[peers[i]] = supportedVersions[i%len(supportedVersions)] } txs = make([]*types.Transaction, 65536) // We need to bump enough to hit all the limits for i := 0; i < len(txs); i++ { @@ -80,11 +86,16 @@ func fuzz(input []byte) int { f := fetcher.NewTxFetcherForTests( nil, func(common.Hash, byte) error { return nil }, - func(_ string, txs []*types.Transaction) []error { + func(txs []*types.Transaction) []error { return make([]error, len(txs)) }, func(string, []common.Hash) error { return nil }, nil, + blobpool.NewBlobBuffer( + func(*types.Transaction) error { return nil }, + func(*blobpool.BlobTxForPool) error { return nil }, + func(string) {}, + ), clock, func() time.Time { nanoTime := int64(clock.Now()) @@ -180,7 +191,7 @@ func fuzz(input []byte) int { if verbose { fmt.Println("Enqueue", peer, deliverIdxs, direct) } - if err := f.Enqueue(peer, deliveries, direct); err != nil { + if err := f.Enqueue(peer, peerVersions[peer], deliveries, direct); err != nil { panic(err) }