add Flush() function called in tx fetcher

This commit is contained in:
healthykim 2026-05-20 23:11:46 +02:00
parent feff09edc5
commit a44f886891
9 changed files with 242 additions and 257 deletions

View file

@ -23,6 +23,7 @@ import (
"time"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/txpool"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/crypto/kzg4844"
"github.com/ethereum/go-ethereum/log"
@ -68,6 +69,8 @@ type BlobBuffer struct {
addToPool func(*BlobTxForPool) error
validateTx func(*types.Transaction) error
dropPeer func(string)
completed []*BlobTxForPool
}
func NewBlobBuffer(validateTx func(*types.Transaction) error, addToPool func(*BlobTxForPool) error, dropPeer func(string)) *BlobBuffer {
@ -80,47 +83,56 @@ func NewBlobBuffer(validateTx func(*types.Transaction) error, addToPool func(*Bl
}
}
// Flush adds all completed entries to the pool and returns the hashes
// and errors for any that failed add.
func (b *BlobBuffer) Flush() ([]common.Hash, []error) {
var errs []error
var txs []common.Hash
for _, ptx := range b.completed {
if err := b.addToPool(ptx); err != nil {
errs = append(errs, err)
txs = append(txs, ptx.Tx.Hash())
}
}
return txs, errs
}
// AddTx buffers a blob transaction (without blobs) from an ETH/72 peer.
// If cells are already buffered, verification and pool insertion are attempted.
func (b *BlobBuffer) AddTx(tx *types.Transaction, peer string) error {
func (b *BlobBuffer) AddTx(txs []*types.Transaction, peer string) []error {
defer b.updateMetrics()()
// First remove any timed-out entries.
b.evict()
hash := tx.Hash()
sidecar := tx.BlobTxSidecar()
if sidecar == nil {
return fmt.Errorf("blob transaction without sidecar")
errs := make([]error, len(txs))
for i, tx := range txs {
hash := tx.Hash()
sidecar := tx.BlobTxSidecar()
if sidecar == nil {
errs[i] = fmt.Errorf("blob transaction without sidecar")
continue
}
// tx validation (basic w/o lock)
// error will be handled by tx fetcher
if err := b.validateTx(tx); err != nil {
errs[i] = err
continue
}
if entry, ok := b.cells[hash]; ok {
b.add(hash, tx, entry)
continue
}
blobBufferTxFirstCounter.Inc(1)
b.txs[hash] = &txEntry{tx: tx, peer: peer, added: time.Now()}
}
// tx validation
if err := b.validateTx(tx); err != nil {
log.Warn("Transaction validation failed, dropping peer", "peer", peer, "err", err)
b.dropPeer(peer)
return err
}
// vhash check
if err := sidecar.ValidateBlobCommitmentHashes(tx.BlobHashes()); err != nil {
log.Warn("Commitment hash mismatch, dropping peer", "peer", peer, "err", err)
b.dropPeer(peer)
return err
}
// proof count check
if len(sidecar.Proofs) < len(sidecar.Commitments)*kzg4844.CellProofsPerBlob {
b.dropPeer(peer)
return fmt.Errorf("insufficient proofs in sidecar")
}
if entry, ok := b.cells[hash]; ok {
return b.add(hash, tx, entry)
}
blobBufferTxFirstCounter.Inc(1)
b.txs[hash] = &txEntry{tx: tx, peer: peer, added: time.Now()}
return nil
return errs
}
// AddCells buffers per-peer cell deliveries from the blob fetcher.
// If the transaction is already buffered, verification and pool insertion are attempted.
func (b *BlobBuffer) AddCells(hash common.Hash, deliveries map[string]*PeerDelivery, custody *types.CustodyBitmap) error {
func (b *BlobBuffer) AddCells(hash common.Hash, deliveries map[string]*PeerDelivery, custody *types.CustodyBitmap) {
defer b.updateMetrics()()
// First remove any timed-out entries.
@ -132,15 +144,13 @@ func (b *BlobBuffer) AddCells(hash common.Hash, deliveries map[string]*PeerDeliv
added: time.Now(),
}
if txe, ok := b.txs[hash]; ok {
return b.add(hash, txe.tx, b.cells[hash])
b.add(hash, txe.tx, b.cells[hash])
}
blobBufferCellsFirstCounter.Inc(1)
return nil
}
// todo: this is very strange
// add verifies cells per-peer, sorts them, and adds to the pool.
func (b *BlobBuffer) add(hash common.Hash, tx *types.Transaction, cells *cellEntry) error {
func (b *BlobBuffer) add(hash common.Hash, tx *types.Transaction, cells *cellEntry) {
sidecar := tx.BlobTxSidecar()
// Per-peer cell verification
@ -148,7 +158,6 @@ func (b *BlobBuffer) add(hash common.Hash, tx *types.Transaction, cells *cellEnt
b.dropPeers(badPeers)
delete(b.cells, hash)
delete(b.txs, hash)
return fmt.Errorf("cell verification failed")
}
blobCount := len(tx.BlobHashes())
sorted, custody := sortCells(cells, blobCount)
@ -165,10 +174,9 @@ func (b *BlobBuffer) add(hash common.Hash, tx *types.Transaction, cells *cellEnt
CellSidecar: &cellSidecar,
}
err := b.addToPool(pooledTx)
b.completed = append(b.completed, pooledTx)
delete(b.cells, hash)
delete(b.txs, hash)
return err
}
func (b *BlobBuffer) HasTx(hash common.Hash) bool {
@ -221,12 +229,20 @@ func (b *BlobBuffer) updateMetrics() func() {
}
}
// verifyCells verifies each peer's cells against the sidecar.
// verifyCells verifies each peer's cells against the sidecar by treating each
// per-peer delivery as a mini BlobTxCellSidecar and reusing txpool.ValidateCells.
// Returns the list of peers whose cells failed verification.
func (b *BlobBuffer) verifyCells(entry *cellEntry, sidecar *types.BlobTxSidecar) []string {
var badPeers []string
for peer, delivery := range entry.deliveries {
if err := verifyPeerCells(delivery, sidecar); err != nil {
perPeer := &types.BlobTxCellSidecar{
Version: sidecar.Version,
Cells: delivery.Cells,
Commitments: sidecar.Commitments,
Proofs: sidecar.Proofs,
Custody: types.NewCustodyBitmap(delivery.Indices),
}
if err := txpool.ValidateCells(perPeer); err != nil {
log.Debug("Cell verification failed", "peer", peer, "err", err)
badPeers = append(badPeers, peer)
}
@ -234,28 +250,6 @@ func (b *BlobBuffer) verifyCells(entry *cellEntry, sidecar *types.BlobTxSidecar)
return badPeers
}
// verifyPeerCells verifies a single peer's cells against the sidecar proofs.
// delivery.Cells is blob-major: [blob0_cell0..blob0_cellN, blob1_cell0..blob1_cellN, ...]
func verifyPeerCells(delivery *PeerDelivery, sidecar *types.BlobTxSidecar) error {
cellsPerBlob := len(delivery.Indices)
blobCount := len(delivery.Cells) / cellsPerBlob
if blobCount == 0 || blobCount != len(sidecar.Commitments) {
return fmt.Errorf("blob count mismatch: delivery %d, commitments %d", blobCount, len(sidecar.Commitments))
}
// Extract proofs corresponding to this peer's cell indices
var proofs []kzg4844.Proof
for blobIdx := 0; blobIdx < blobCount; blobIdx++ {
for _, cellIdx := range delivery.Indices {
proofIdx := blobIdx*kzg4844.CellProofsPerBlob + int(cellIdx)
if proofIdx >= len(sidecar.Proofs) {
return fmt.Errorf("proof index out of range: %d", proofIdx)
}
proofs = append(proofs, sidecar.Proofs[proofIdx])
}
}
return kzg4844.VerifyCells(delivery.Cells, sidecar.Commitments, proofs, delivery.Indices)
}
// sortCells merges all per-peer deliveries into a single flat cell array
// sorted by custody index.
//

View file

@ -4,12 +4,9 @@ import (
"crypto/ecdsa"
"testing"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/crypto"
"github.com/ethereum/go-ethereum/crypto/kzg4844"
"github.com/ethereum/go-ethereum/params"
"github.com/holiman/uint256"
)
// makeV1Tx creates a V1 blob transaction with cell proofs, then strips blobs
@ -95,7 +92,7 @@ func TestAddTxThenCells(t *testing.T) {
tx := makeV1Tx(t, 0, blobCount, 0, key)
hash := tx.Hash()
if err := buf.AddTx(tx, "peerA"); err != nil {
if err := buf.AddTx([]*types.Transaction{tx}, "peerA")[0]; err != nil {
t.Fatal(err)
}
if !buf.HasTx(hash) {
@ -109,9 +106,7 @@ func TestAddTxThenCells(t *testing.T) {
delivery := makePeerDelivery(t, 0, blobCount, dataIndices)
custody := types.NewCustodyBitmap(dataIndices)
if err := buf.AddCells(hash, map[string]*PeerDelivery{"peerB": delivery}, &custody); err != nil {
t.Fatal(err)
}
buf.AddCells(hash, map[string]*PeerDelivery{"peerB": delivery}, &custody)
if buf.HasTx(hash) || buf.HasCells(hash) {
t.Fatal("buffer should be empty after add")
}
@ -132,14 +127,12 @@ func TestAddCellsThenTx(t *testing.T) {
delivery := makePeerDelivery(t, 0, blobCount, dataIndices)
custody := types.NewCustodyBitmap(dataIndices)
if err := buf.AddCells(hash, map[string]*PeerDelivery{"peerB": delivery}, &custody); err != nil {
t.Fatal(err)
}
buf.AddCells(hash, map[string]*PeerDelivery{"peerB": delivery}, &custody)
if !buf.HasCells(hash) {
t.Fatal("cells should be buffered")
}
if err := buf.AddTx(tx, "peerA"); err != nil {
if err := buf.AddTx([]*types.Transaction{tx}, "peerA")[0]; err != nil {
t.Fatal(err)
}
if buf.HasTx(hash) || buf.HasCells(hash) {
@ -154,7 +147,7 @@ func TestMultiPeerDelivery(t *testing.T) {
tx := makeV1Tx(t, 0, blobCount, 0, key)
hash := tx.Hash()
buf.AddTx(tx, "peerA")
buf.AddTx([]*types.Transaction{tx}, "peerA")
indicesA := []uint64{0, 2, 4, 6}
indicesB := []uint64{1, 3, 5, 7}
@ -164,12 +157,10 @@ func TestMultiPeerDelivery(t *testing.T) {
allIndices := append(indicesA, indicesB...)
custody := types.NewCustodyBitmap(allIndices)
if err := buf.AddCells(hash, map[string]*PeerDelivery{
buf.AddCells(hash, map[string]*PeerDelivery{
"peerB": deliveryA,
"peerC": deliveryB,
}, &custody); err != nil {
t.Fatal(err)
}
}, &custody)
if buf.HasTx(hash) || buf.HasCells(hash) {
t.Fatal("buffer should be empty after add")
}
@ -188,7 +179,7 @@ func TestBadCell(t *testing.T) {
tx := makeV1Tx(t, 0, blobCount, 0, key)
hash := tx.Hash()
buf.AddTx(tx, "peerA")
buf.AddTx([]*types.Transaction{tx}, "peerA")
goodDelivery := makePeerDelivery(t, 0, blobCount, []uint64{0, 1, 2, 3})
badDelivery := makePeerDelivery(t, 0, blobCount, []uint64{4, 5, 6, 7})
@ -201,13 +192,10 @@ func TestBadCell(t *testing.T) {
allIndices := []uint64{0, 1, 2, 3, 4, 5, 6, 7}
custody := types.NewCustodyBitmap(allIndices)
err := buf.AddCells(hash, map[string]*PeerDelivery{
buf.AddCells(hash, map[string]*PeerDelivery{
"peerB": goodDelivery,
"peerC": badDelivery,
}, &custody)
if err == nil {
t.Fatal("expected error from bad cells")
}
if len(dropped) != 1 || dropped[0] != "peerC" {
t.Fatalf("only peerC should have been dropped, got: %v", dropped)
@ -216,42 +204,3 @@ func TestBadCell(t *testing.T) {
t.Fatal("buffer should be empty after bad cell drop")
}
}
func TestBadTx(t *testing.T) {
key, _ := crypto.GenerateKey()
var dropped []string
buf := NewBlobBuffer(
func(tx *types.Transaction) error { return nil },
func(ptx *BlobTxForPool) error { return nil },
func(peer string) { dropped = append(dropped, peer) },
)
blobtx := &types.BlobTx{
ChainID: uint256.MustFromBig(params.MainnetChainConfig.ChainID),
Nonce: 0,
GasTipCap: uint256.NewInt(1),
GasFeeCap: uint256.NewInt(1),
Gas: 21000,
BlobFeeCap: uint256.NewInt(1),
BlobHashes: []common.Hash{testBlobVHashes[0]},
Value: uint256.NewInt(100),
Sidecar: types.NewBlobTxSidecar(types.BlobSidecarVersion1,
nil,
[]kzg4844.Commitment{testBlobCommits[1]},
testBlobCellProofs[1],
),
}
tx := types.MustSignNewTx(key, types.LatestSigner(params.MainnetChainConfig), blobtx)
err := buf.AddTx(tx, "peerA")
if err == nil {
t.Fatal("expected error from commitment mismatch")
}
if len(dropped) != 1 || dropped[0] != "peerA" {
t.Fatalf("only peerA should have been dropped, got: %v", dropped)
}
if buf.HasTx(tx.Hash()) {
t.Fatal("tx should not be buffered")
}
}

View file

@ -91,7 +91,7 @@ type fetchStatus struct {
type BlobFetcherFunctions struct {
HasPayload func(common.Hash) bool
AddCells func(common.Hash, map[string]*PeerCellDelivery, *types.CustodyBitmap) error
AddCells func(common.Hash, map[string]*PeerCellDelivery, *types.CustodyBitmap)
FetchPayloads func(string, []common.Hash, *types.CustodyBitmap) error
DropPeer func(string)
}

View file

@ -47,9 +47,14 @@ func makeTestCellSidecar(blobCount int) *types.BlobTxCellSidecar {
proofs = append(proofs, cellProofs...)
}
sidecar, _ := types.NewBlobTxSidecar(types.BlobSidecarVersion1, blobs, commitments, proofs).ToBlobTxCellSidecar()
return sidecar
cells, _ := kzg4844.ComputeCells(blobs)
return &types.BlobTxCellSidecar{
Version: types.BlobSidecarVersion1,
Cells: cells,
Commitments: commitments,
Proofs: proofs,
Custody: *types.CustodyBitmapAll,
}
}
func selectCells(cells []kzg4844.Cell, custody *types.CustodyBitmap) []kzg4844.Cell {
@ -149,9 +154,7 @@ func TestBlobFetcherFullFetch(t *testing.T) {
return NewBlobFetcher(
BlobFetcherFunctions{
HasPayload: func(common.Hash) bool { return false },
AddCells: func(common.Hash, map[string]*PeerCellDelivery, *types.CustodyBitmap) error {
return nil
},
AddCells: func(common.Hash, map[string]*PeerCellDelivery, *types.CustodyBitmap) {},
FetchPayloads: func(string, []common.Hash, *types.CustodyBitmap) error {
return nil
},
@ -241,10 +244,8 @@ func TestBlobFetcherPartialFetch(t *testing.T) {
init: func() *BlobFetcher {
return NewBlobFetcher(
BlobFetcherFunctions{
HasPayload: func(common.Hash) bool { return false },
AddCells: func(common.Hash, map[string]*PeerCellDelivery, *types.CustodyBitmap) error {
return nil
},
HasPayload: func(common.Hash) bool { return false }, AddCells: func(common.Hash, map[string]*PeerCellDelivery, *types.CustodyBitmap) {},
FetchPayloads: func(string, []common.Hash, *types.CustodyBitmap) error {
return nil
},
@ -338,10 +339,8 @@ func TestBlobFetcherFullDelivery(t *testing.T) {
init: func() *BlobFetcher {
return NewBlobFetcher(
BlobFetcherFunctions{
HasPayload: func(common.Hash) bool { return false },
AddCells: func(common.Hash, map[string]*PeerCellDelivery, *types.CustodyBitmap) error {
return nil
},
HasPayload: func(common.Hash) bool { return false }, AddCells: func(common.Hash, map[string]*PeerCellDelivery, *types.CustodyBitmap) {},
FetchPayloads: func(string, []common.Hash, *types.CustodyBitmap) error {
return nil
},
@ -388,10 +387,8 @@ func TestBlobFetcherPartialDelivery(t *testing.T) {
init: func() *BlobFetcher {
return NewBlobFetcher(
BlobFetcherFunctions{
HasPayload: func(common.Hash) bool { return false },
AddCells: func(common.Hash, map[string]*PeerCellDelivery, *types.CustodyBitmap) error {
return nil
},
HasPayload: func(common.Hash) bool { return false }, AddCells: func(common.Hash, map[string]*PeerCellDelivery, *types.CustodyBitmap) {},
FetchPayloads: func(string, []common.Hash, *types.CustodyBitmap) error {
return nil
},
@ -526,10 +523,8 @@ func TestBlobFetcherAvailabilityTimeout(t *testing.T) {
init: func() *BlobFetcher {
return NewBlobFetcher(
BlobFetcherFunctions{
HasPayload: func(common.Hash) bool { return false },
AddCells: func(common.Hash, map[string]*PeerCellDelivery, *types.CustodyBitmap) error {
return nil
},
HasPayload: func(common.Hash) bool { return false }, AddCells: func(common.Hash, map[string]*PeerCellDelivery, *types.CustodyBitmap) {},
FetchPayloads: func(string, []common.Hash, *types.CustodyBitmap) error {
return nil
},
@ -570,10 +565,8 @@ func TestBlobFetcherPeerDrop(t *testing.T) {
init: func() *BlobFetcher {
return NewBlobFetcher(
BlobFetcherFunctions{
HasPayload: func(common.Hash) bool { return false },
AddCells: func(common.Hash, map[string]*PeerCellDelivery, *types.CustodyBitmap) error {
return nil
},
HasPayload: func(common.Hash) bool { return false }, AddCells: func(common.Hash, map[string]*PeerCellDelivery, *types.CustodyBitmap) {},
FetchPayloads: func(string, []common.Hash, *types.CustodyBitmap) error {
return nil
},
@ -649,10 +642,8 @@ func TestBlobFetcherFetchTimeout(t *testing.T) {
init: func() *BlobFetcher {
return NewBlobFetcher(
BlobFetcherFunctions{
HasPayload: func(common.Hash) bool { return false },
AddCells: func(common.Hash, map[string]*PeerCellDelivery, *types.CustodyBitmap) error {
return nil
},
HasPayload: func(common.Hash) bool { return false }, AddCells: func(common.Hash, map[string]*PeerCellDelivery, *types.CustodyBitmap) {},
FetchPayloads: func(string, []common.Hash, *types.CustodyBitmap) error {
return nil
},
@ -1040,7 +1031,7 @@ func TestMultiBlobDeliveryVerification(t *testing.T) {
return NewBlobFetcher(
BlobFetcherFunctions{
HasPayload: func(common.Hash) bool { return false },
AddCells: func(h common.Hash, deliveries map[string]*PeerCellDelivery, custody *types.CustodyBitmap) error {
AddCells: func(h common.Hash, deliveries map[string]*PeerCellDelivery, custody *types.CustodyBitmap) {
// Verify each peer's delivered cells pass KZG cell proof verification
for _, d := range deliveries {
var cellProofs []kzg4844.Proof
@ -1050,11 +1041,7 @@ func TestMultiBlobDeliveryVerification(t *testing.T) {
}
}
verifyErr = kzg4844.VerifyCells(d.Cells, sidecar.Commitments, cellProofs, d.Indices)
if verifyErr != nil {
return verifyErr
}
}
return nil
},
FetchPayloads: func(string, []common.Hash, *types.CustodyBitmap) error {
return nil

View file

@ -29,8 +29,11 @@ import (
"github.com/ethereum/go-ethereum/common/mclock"
"github.com/ethereum/go-ethereum/core"
"github.com/ethereum/go-ethereum/core/txpool"
"github.com/ethereum/go-ethereum/core/txpool/blobpool"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/eth/protocols/eth"
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/metrics"
)
const (
@ -180,10 +183,12 @@ type TxFetcher struct {
alternates map[common.Hash]map[string]struct{} // In-flight transaction alternate origins if retrieval fails
// Callbacks
validateMeta func(common.Hash, byte) error // Validate a tx metadata based on the local txpool
addTxs func(string, []*types.Transaction) []error // Insert a batch of transactions into local txpool
fetchTxs func(string, []common.Hash) error // Retrieves a set of txs from a remote peer
dropPeer func(string) // Drops a peer in case of announcement violation
validateMeta func(common.Hash, byte) error // Validate a tx metadata based on the local txpool
addTxs func([]*types.Transaction) []error // Insert a batch of transactions into local txpool
fetchTxs func(string, []common.Hash) error // Retrieves a set of txs from a remote peer
dropPeer func(string) // Drops a peer in case of announcement violation
buffer *blobpool.BlobBuffer
step chan struct{} // Notification channel when the fetcher loop iterates
clock mclock.Clock // Monotonic clock or simulated clock for tests
@ -194,16 +199,17 @@ type TxFetcher struct {
// NewTxFetcher creates a transaction fetcher to retrieve transaction
// based on hash announcements.
// Chain can be nil to disable on-chain checks.
func NewTxFetcher(chain *core.BlockChain, validateMeta func(common.Hash, byte) error, addTxs func(string, []*types.Transaction) []error, fetchTxs func(string, []common.Hash) error, dropPeer func(string)) *TxFetcher {
return NewTxFetcherForTests(chain, validateMeta, addTxs, fetchTxs, dropPeer, mclock.System{}, time.Now, nil)
func NewTxFetcher(chain *core.BlockChain, validateMeta func(common.Hash, byte) error, addTxs func([]*types.Transaction) []error, fetchTxs func(string, []common.Hash) error,
dropPeer func(string), buffer *blobpool.BlobBuffer) *TxFetcher {
return NewTxFetcherForTests(chain, validateMeta, addTxs, fetchTxs, dropPeer, buffer, mclock.System{}, time.Now, nil)
}
// NewTxFetcherForTests is a testing method to mock out the realtime clock with
// a simulated version and the internal randomness with a deterministic one.
// Chain can be nil to disable on-chain checks.
func NewTxFetcherForTests(
chain *core.BlockChain, validateMeta func(common.Hash, byte) error, addTxs func(string, []*types.Transaction) []error, fetchTxs func(string, []common.Hash) error, dropPeer func(string),
clock mclock.Clock, realTime func() time.Time, rand *mrand.Rand) *TxFetcher {
chain *core.BlockChain, validateMeta func(common.Hash, byte) error, addTxs func([]*types.Transaction) []error, fetchTxs func(string, []common.Hash) error, dropPeer func(string),
buffer *blobpool.BlobBuffer, clock mclock.Clock, realTime func() time.Time, rand *mrand.Rand) *TxFetcher {
return &TxFetcher{
notify: make(chan *txAnnounce),
cleanup: make(chan *txDelivery),
@ -224,6 +230,7 @@ func NewTxFetcherForTests(
addTxs: addTxs,
fetchTxs: fetchTxs,
dropPeer: dropPeer,
buffer: buffer,
clock: clock,
realTime: realTime,
rand: rand,
@ -312,26 +319,36 @@ func (f *TxFetcher) isKnownUnderpriced(hash common.Hash) bool {
return ok
}
type deliveryMetrics struct {
inMeter *metrics.Meter
knownMeter *metrics.Meter
underpricedMeter *metrics.Meter
otherRejectMeter *metrics.Meter
}
// Enqueue imports a batch of received transaction into the transaction pool
// and the fetcher. This method may be called by both transaction broadcasts and
// direct request replies. The differentiation is important so the fetcher can
// re-schedule missing transactions as soon as possible.
func (f *TxFetcher) Enqueue(peer string, txs []*types.Transaction, direct bool) error {
var (
inMeter = txReplyInMeter
knownMeter = txReplyKnownMeter
underpricedMeter = txReplyUnderpricedMeter
otherRejectMeter = txReplyOtherRejectMeter
violation error
)
func (f *TxFetcher) Enqueue(peer string, version uint, txs []*types.Transaction, direct bool) error {
var violation error
metrics := deliveryMetrics{
inMeter: txReplyInMeter,
knownMeter: txReplyKnownMeter,
underpricedMeter: txReplyUnderpricedMeter,
otherRejectMeter: txReplyOtherRejectMeter,
}
if !direct {
inMeter = txBroadcastInMeter
knownMeter = txBroadcastKnownMeter
underpricedMeter = txBroadcastUnderpricedMeter
otherRejectMeter = txBroadcastOtherRejectMeter
metrics = deliveryMetrics{
inMeter: txBroadcastInMeter,
knownMeter: txBroadcastKnownMeter,
underpricedMeter: txBroadcastUnderpricedMeter,
otherRejectMeter: txBroadcastOtherRejectMeter,
}
}
// Keep track of all the propagated transactions
inMeter.Mark(int64(len(txs)))
metrics.inMeter.Mark(int64(len(txs)))
// Push all the transactions into the pool, tracking underpriced ones to avoid
// re-requesting them and dropping the peer in case of malicious transfers.
@ -345,38 +362,35 @@ func (f *TxFetcher) Enqueue(peer string, txs []*types.Transaction, direct bool)
if end > len(txs) {
end = len(txs)
}
var (
duplicate int64
underpriced int64
otherreject int64
)
batch := txs[i:end]
for j, err := range f.addTxs(peer, batch) {
// Track the transaction hash if the price is too low for us.
// Avoid re-request this transaction when we receive another
// announcement.
if errors.Is(err, txpool.ErrUnderpriced) || errors.Is(err, txpool.ErrReplaceUnderpriced) || errors.Is(err, txpool.ErrTxGasPriceTooLow) {
f.underpriced.Add(batch[j].Hash(), batch[j].Time())
var (
poolTxs []*types.Transaction
blobTxs []*types.Transaction
)
if version >= eth.ETH72 {
for _, tx := range batch {
if tx.Type() == types.BlobTxType {
blobTxs = append(blobTxs, tx)
} else {
poolTxs = append(poolTxs, tx)
}
}
// Track a few interesting failure types
switch {
case err == nil: // Noop, but need to handle to not count these
} else {
poolTxs = batch
}
batch = append(poolTxs, blobTxs...)
errs := append(f.addTxs(poolTxs), f.buffer.AddTx(blobTxs, peer)...)
case errors.Is(err, txpool.ErrAlreadyKnown):
duplicate++
case errors.Is(err, txpool.ErrUnderpriced) || errors.Is(err, txpool.ErrReplaceUnderpriced) || errors.Is(err, txpool.ErrTxGasPriceTooLow):
underpriced++
case errors.Is(err, txpool.ErrKZGVerificationError) || errors.Is(err, txpool.ErrSidecarFormatError):
hashes := make([]common.Hash, len(batch))
for j := range batch {
hashes[j] = batch[j].Hash()
}
for j, err := range errs {
if errors.Is(err, txpool.ErrKZGVerificationError) || errors.Is(err, txpool.ErrSidecarFormatError) {
// KZG verification failed, terminate transaction processing immediately.
// Since KZG verification is computationally expensive, this acts as a
// defensive measure against potential DoS attacks.
violation = err
default:
otherreject++
}
added = append(added, batch[j].Hash())
metas = append(metas, txMetadata{
@ -389,15 +403,7 @@ func (f *TxFetcher) Enqueue(peer string, txs []*types.Transaction, direct bool)
break
}
}
knownMeter.Mark(duplicate)
underpricedMeter.Mark(underpriced)
otherRejectMeter.Mark(otherreject)
// If 'other reject' is >25% of the deliveries in any batch, sleep a bit.
if otherreject > int64((len(batch)+3)/4) {
log.Debug("Peer delivering stale or invalid transactions", "peer", peer, "rejected", otherreject)
time.Sleep(200 * time.Millisecond)
}
f.handleAddErrors(hashes, errs, metrics)
// If we encountered a protocol violation, disconnect this peer.
if violation != nil {
break
@ -411,6 +417,42 @@ func (f *TxFetcher) Enqueue(peer string, txs []*types.Transaction, direct bool)
}
}
func (f *TxFetcher) handleAddErrors(txs []common.Hash, errs []error, metrics deliveryMetrics) {
var (
duplicate int64
underpriced int64
otherreject int64
)
for i, err := range errs {
// Track a few interesting failure types
switch {
case err == nil: // Noop, but need to handle to not count these
case errors.Is(err, txpool.ErrAlreadyKnown):
duplicate++
// Track the transaction hash if the price is too low for us.
// Avoid re-request this transaction when we receive another
// announcement.
case errors.Is(err, txpool.ErrUnderpriced) || errors.Is(err, txpool.ErrReplaceUnderpriced) || errors.Is(err, txpool.ErrTxGasPriceTooLow):
f.underpriced.Add(txs[i], f.realTime())
underpriced++
default:
otherreject++
}
}
metrics.knownMeter.Mark(duplicate)
metrics.underpricedMeter.Mark(underpriced)
metrics.otherRejectMeter.Mark(otherreject)
// If 'other reject' is >25% of the deliveries in any batch, sleep a bit.
if otherreject > int64((len(txs)+3)/4) {
log.Debug("Peer delivering stale or invalid transactions", "rejected", otherreject)
time.Sleep(200 * time.Millisecond)
}
}
// Drop should be called when a peer disconnects. It cleans up all the internal
// data structures of the given node.
func (f *TxFetcher) Drop(peer string) error {
@ -456,6 +498,14 @@ func (f *TxFetcher) loop() {
}
for {
txs, errs := f.buffer.Flush()
f.handleAddErrors(txs, errs, deliveryMetrics{
inMeter: txReplyInMeter,
knownMeter: txReplyKnownMeter,
underpricedMeter: txReplyUnderpricedMeter,
otherRejectMeter: txReplyOtherRejectMeter,
})
select {
case ann := <-f.notify:
// Drop part of the new announcements if there are too many accumulated.

View file

@ -28,9 +28,11 @@ import (
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/common/mclock"
"github.com/ethereum/go-ethereum/core/txpool"
"github.com/ethereum/go-ethereum/core/txpool/blobpool"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/crypto"
"github.com/ethereum/go-ethereum/crypto/kzg4844"
"github.com/ethereum/go-ethereum/eth/protocols/eth"
"github.com/ethereum/go-ethereum/params"
"github.com/holiman/uint256"
)
@ -60,9 +62,10 @@ type doTxNotify struct {
sizes []uint32
}
type doTxEnqueue struct {
peer string
txs []*types.Transaction
direct bool
peer string
version uint
txs []*types.Transaction
direct bool
}
type doWait struct {
time time.Duration
@ -87,17 +90,28 @@ type txFetcherTest struct {
steps []interface{}
}
// newTestBlobBuffer returns a BlobBuffer with no-op callbacks for tests that
// don't exercise blob handling but still need a non-nil buffer.
func newTestBlobBuffer() *blobpool.BlobBuffer {
return blobpool.NewBlobBuffer(
func(*types.Transaction) error { return nil },
func(*blobpool.BlobTxForPool) error { return nil },
func(string) {},
)
}
// newTestTxFetcher creates a tx fetcher with noop callbacks, simulated clock,
// and deterministic randomness.
func newTestTxFetcher() *TxFetcher {
return NewTxFetcher(
nil,
func(common.Hash, byte) error { return nil },
func(_ string, txs []*types.Transaction) []error {
func(txs []*types.Transaction) []error {
return make([]error, len(txs))
},
func(string, []common.Hash) error { return nil },
nil,
newTestBlobBuffer(),
)
}
@ -1172,7 +1186,7 @@ func TestTransactionFetcherUnderpricedDedup(t *testing.T) {
testTransactionFetcherParallel(t, txFetcherTest{
init: func() *TxFetcher {
f := newTestTxFetcher()
f.addTxs = func(_ string, txs []*types.Transaction) []error {
f.addTxs = func(txs []*types.Transaction) []error {
errs := make([]error, len(txs))
for i := 0; i < len(errs); i++ {
if i%3 == 0 {
@ -1270,7 +1284,7 @@ func TestTransactionFetcherUnderpricedDoSProtection(t *testing.T) {
testTransactionFetcher(t, txFetcherTest{
init: func() *TxFetcher {
f := newTestTxFetcher()
f.addTxs = func(_ string, txs []*types.Transaction) []error {
f.addTxs = func(txs []*types.Transaction) []error {
errs := make([]error, len(txs))
for i := 0; i < len(errs); i++ {
errs[i] = txpool.ErrUnderpriced
@ -1787,7 +1801,7 @@ func TestTransactionProtocolViolation(t *testing.T) {
testTransactionFetcherParallel(t, txFetcherTest{
init: func() *TxFetcher {
f := newTestTxFetcher()
f.addTxs = func(_ string, txs []*types.Transaction) []error {
f.addTxs = func(txs []*types.Transaction) []error {
var errs []error
for range txs {
errs = append(errs, txpool.ErrKZGVerificationError)
@ -1899,7 +1913,7 @@ func testTransactionFetcher(t *testing.T, tt txFetcherTest) {
}
case doTxEnqueue:
if err := fetcher.Enqueue(step.peer, step.txs, step.direct); err != nil {
if err := fetcher.Enqueue(step.peer, step.version, step.txs, step.direct); err != nil {
t.Errorf("step %d: %v", i, err)
}
<-wait // Fetcher needs to process this, wait until it's done
@ -2194,7 +2208,7 @@ func TestTransactionForgotten(t *testing.T) {
fetcher := NewTxFetcherForTests(
nil,
func(common.Hash, byte) error { return nil },
func(_ string, txs []*types.Transaction) []error {
func(txs []*types.Transaction) []error {
errs := make([]error, len(txs))
for i := 0; i < len(errs); i++ {
errs[i] = txpool.ErrUnderpriced
@ -2203,6 +2217,7 @@ func TestTransactionForgotten(t *testing.T) {
},
func(string, []common.Hash) error { return nil },
func(string) {},
newTestBlobBuffer(),
mockClock,
mockTime,
rand.New(rand.NewSource(0)), // Use fixed seed for deterministic behavior
@ -2219,7 +2234,7 @@ func TestTransactionForgotten(t *testing.T) {
tx2.SetTime(now)
// Initial state: both transactions should be marked as underpriced
if err := fetcher.Enqueue("peer", []*types.Transaction{tx1, tx2}, false); err != nil {
if err := fetcher.Enqueue("peer", eth.ETH70, []*types.Transaction{tx1, tx2}, false); err != nil {
t.Fatal(err)
}
if !fetcher.isKnownUnderpriced(tx1.Hash()) {
@ -2268,7 +2283,7 @@ func TestTransactionForgotten(t *testing.T) {
// Re-enqueue tx1 with updated timestamp
tx1.SetTime(mockTime())
if err := fetcher.Enqueue("peer", []*types.Transaction{tx1}, false); err != nil {
if err := fetcher.Enqueue("peer", eth.ETH70, []*types.Transaction{tx1}, false); err != nil {
t.Fatal(err)
}
if !fetcher.isKnownUnderpriced(tx1.Hash()) {

View file

@ -139,7 +139,6 @@ type handler struct {
downloader *downloader.Downloader
txFetcher *fetcher.TxFetcher
blobFetcher *fetcher.BlobFetcher
blobBuffer *blobpool.BlobBuffer
peers *peerSet
txBroadcastKey [16]byte
@ -190,33 +189,13 @@ func newHandler(config *handlerConfig) (*handler, error) {
}
// Construct the blob buffer for assembling blob txs from separate tx and cell deliveries.
h.blobBuffer = blobpool.NewBlobBuffer(h.blobpool.ValidateTxBasics, h.blobpool.AddPooledTx, h.removePeer)
blobBuffer := blobpool.NewBlobBuffer(h.blobpool.ValidateTxBasics, h.blobpool.AddPooledTx, h.removePeer)
addTxs := func(peer string, txs []*types.Transaction) []error {
errs := make([]error, len(txs))
p := h.peers.peer(peer)
isETH72 := p != nil && p.Version() >= eth.ETH72
var poolTxs []*types.Transaction
var index []int
for i, tx := range txs {
if isETH72 && tx.Type() == types.BlobTxType {
errs[i] = h.blobBuffer.AddTx(tx, peer)
} else {
poolTxs = append(poolTxs, tx)
index = append(index, i)
}
}
if len(poolTxs) > 0 {
poolErrs := h.txpool.Add(poolTxs, false)
for j, idx := range index {
errs[idx] = poolErrs[j]
}
}
return errs
addTxs := func(txs []*types.Transaction) []error {
return h.txpool.Add(txs, false)
}
validateMeta := func(tx common.Hash, kind byte) error {
if h.txpool.Has(tx) || h.blobBuffer.HasTx(tx) {
if h.txpool.Has(tx) || blobBuffer.HasTx(tx) {
return txpool.ErrAlreadyKnown
}
if !h.txpool.FilterType(kind) {
@ -224,7 +203,7 @@ func newHandler(config *handlerConfig) (*handler, error) {
}
return nil
}
h.txFetcher = fetcher.NewTxFetcher(h.chain, validateMeta, addTxs, fetchTx, h.removePeer)
h.txFetcher = fetcher.NewTxFetcher(h.chain, validateMeta, addTxs, fetchTx, h.removePeer, blobBuffer)
// Construct the blob fetcher for cell-based blob data availability
blobCallbacks := fetcher.BlobFetcherFunctions{
@ -236,14 +215,14 @@ func newHandler(config *handlerConfig) (*handler, error) {
return p.RequestPayload(hashes, cells)
},
HasPayload: func(hash common.Hash) bool {
return h.blobpool.Has(hash) || h.blobBuffer.HasCells(hash)
return h.blobpool.Has(hash) || blobBuffer.HasCells(hash)
},
AddCells: func(hash common.Hash, deliveries map[string]*fetcher.PeerCellDelivery, custody *types.CustodyBitmap) error {
AddCells: func(hash common.Hash, deliveries map[string]*fetcher.PeerCellDelivery, custody *types.CustodyBitmap) {
converted := make(map[string]*blobpool.PeerDelivery, len(deliveries))
for peer, d := range deliveries {
converted[peer] = &blobpool.PeerDelivery{Cells: d.Cells, Indices: d.Indices}
}
return h.blobBuffer.AddCells(hash, converted, custody)
blobBuffer.AddCells(hash, converted, custody)
},
DropPeer: h.removePeer,
}

View file

@ -81,7 +81,7 @@ func (h *ethHandler) Handle(peer *eth.Peer, packet eth.Packet) error {
if err := handleTransactions(peer, txs, true); err != nil {
return fmt.Errorf("Transactions: %v", err)
}
return h.txFetcher.Enqueue(peer.ID(), txs, false)
return h.txFetcher.Enqueue(peer.ID(), peer.Version(), txs, false)
case *eth.PooledTransactionsPacket:
txs, err := packet.List.Items()
@ -91,7 +91,7 @@ func (h *ethHandler) Handle(peer *eth.Peer, packet eth.Packet) error {
if err := handleTransactions(peer, txs, false); err != nil {
return fmt.Errorf("PooledTransactions: %v", err)
}
return h.txFetcher.Enqueue(peer.ID(), txs, true)
return h.txFetcher.Enqueue(peer.ID(), peer.Version(), txs, true)
case *eth.CellsResponse:
return h.blobFetcher.Enqueue(peer.ID(), packet.Hashes, packet.Cells, packet.Mask)

View file

@ -25,22 +25,28 @@ import (
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/common/mclock"
"github.com/ethereum/go-ethereum/core/txpool/blobpool"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/eth/fetcher"
"github.com/ethereum/go-ethereum/eth/protocols/eth"
)
var (
peers []string
txs []*types.Transaction
peers []string
peerVersions map[string]uint
txs []*types.Transaction
)
func init() {
// Random is nice, but we need it deterministic
rand := rand.New(rand.NewSource(0x3a29))
supportedVersions := []uint{eth.ETH69, eth.ETH70, eth.ETH72}
peers = make([]string, 10)
peerVersions = make(map[string]uint, len(peers))
for i := 0; i < len(peers); i++ {
peers[i] = fmt.Sprintf("Peer #%d", i)
peerVersions[peers[i]] = supportedVersions[i%len(supportedVersions)]
}
txs = make([]*types.Transaction, 65536) // We need to bump enough to hit all the limits
for i := 0; i < len(txs); i++ {
@ -80,11 +86,16 @@ func fuzz(input []byte) int {
f := fetcher.NewTxFetcherForTests(
nil,
func(common.Hash, byte) error { return nil },
func(_ string, txs []*types.Transaction) []error {
func(txs []*types.Transaction) []error {
return make([]error, len(txs))
},
func(string, []common.Hash) error { return nil },
nil,
blobpool.NewBlobBuffer(
func(*types.Transaction) error { return nil },
func(*blobpool.BlobTxForPool) error { return nil },
func(string) {},
),
clock,
func() time.Time {
nanoTime := int64(clock.Now())
@ -180,7 +191,7 @@ func fuzz(input []byte) int {
if verbose {
fmt.Println("Enqueue", peer, deliverIdxs, direct)
}
if err := f.Enqueue(peer, deliveries, direct); err != nil {
if err := f.Enqueue(peer, peerVersions[peer], deliveries, direct); err != nil {
panic(err)
}