diff --git a/core/txpool/blobpool/buffer.go b/core/txpool/blobpool/buffer.go index 1205747dc8..519190e39e 100644 --- a/core/txpool/blobpool/buffer.go +++ b/core/txpool/blobpool/buffer.go @@ -20,6 +20,8 @@ import ( "cmp" "fmt" "slices" + "sync" + "sync/atomic" "time" "github.com/ethereum/go-ethereum/common" @@ -63,6 +65,8 @@ type cellEntry struct { } type BlobBuffer struct { + mu sync.Mutex + txs map[common.Hash]*txEntry cells map[common.Hash]*cellEntry @@ -70,7 +74,8 @@ type BlobBuffer struct { validateTx func(*types.Transaction) error dropPeer func(string) - completed []*BlobTxForPool + completed []*BlobTxForPool + completedCount atomic.Int32 } func NewBlobBuffer(validateTx func(*types.Transaction) error, addToPool func(*BlobTxForPool) error, dropPeer func(string)) *BlobBuffer { @@ -86,6 +91,14 @@ func NewBlobBuffer(validateTx func(*types.Transaction) error, addToPool func(*Bl // Flush adds all completed entries to the pool and returns the hashes // and corresponding errors (nil on success) for each attempted insert. func (b *BlobBuffer) Flush() ([]common.Hash, []error) { + // Read the count first and return early + // todo: increase threshold ? + if b.completedCount.Load() == 0 { + return nil, nil + } + b.mu.Lock() + defer b.mu.Unlock() + txs := make([]common.Hash, len(b.completed)) errs := make([]error, len(b.completed)) for i, ptx := range b.completed { @@ -93,12 +106,15 @@ func (b *BlobBuffer) Flush() ([]common.Hash, []error) { errs[i] = b.addToPool(ptx) } b.completed = nil + b.completedCount.Store(0) return txs, errs } // AddTx buffers a blob transaction (without blobs) from an ETH/72 peer. // If cells are already buffered, verification and pool insertion are attempted. func (b *BlobBuffer) AddTx(txs []*types.Transaction, peer string) []error { + b.mu.Lock() + defer b.mu.Unlock() defer b.updateMetrics()() // First remove any timed-out entries. @@ -131,6 +147,8 @@ func (b *BlobBuffer) AddTx(txs []*types.Transaction, peer string) []error { // AddCells buffers per-peer cell deliveries from the blob fetcher. // If the transaction is already buffered, verification and pool insertion are attempted. func (b *BlobBuffer) AddCells(hash common.Hash, deliveries map[string]*PeerDelivery, custody *types.CustodyBitmap) { + b.mu.Lock() + defer b.mu.Unlock() defer b.updateMetrics()() // First remove any timed-out entries. @@ -173,16 +191,21 @@ func (b *BlobBuffer) add(hash common.Hash, tx *types.Transaction, cells *cellEnt } b.completed = append(b.completed, pooledTx) + b.completedCount.Add(1) delete(b.cells, hash) delete(b.txs, hash) } func (b *BlobBuffer) HasTx(hash common.Hash) bool { + b.mu.Lock() + defer b.mu.Unlock() _, ok := b.txs[hash] return ok } func (b *BlobBuffer) HasCells(hash common.Hash) bool { + b.mu.Lock() + defer b.mu.Unlock() _, ok := b.cells[hash] return ok }