core/txpool: add lock in buffer.go

This commit is contained in:
healthykim 2026-05-21 16:43:09 +02:00
parent 72d31996c7
commit 8f2f286ae0

View file

@ -20,6 +20,8 @@ import (
"cmp" "cmp"
"fmt" "fmt"
"slices" "slices"
"sync"
"sync/atomic"
"time" "time"
"github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/common"
@ -63,6 +65,8 @@ type cellEntry struct {
} }
type BlobBuffer struct { type BlobBuffer struct {
mu sync.Mutex
txs map[common.Hash]*txEntry txs map[common.Hash]*txEntry
cells map[common.Hash]*cellEntry cells map[common.Hash]*cellEntry
@ -70,7 +74,8 @@ type BlobBuffer struct {
validateTx func(*types.Transaction) error validateTx func(*types.Transaction) error
dropPeer func(string) dropPeer func(string)
completed []*BlobTxForPool completed []*BlobTxForPool
completedCount atomic.Int32
} }
func NewBlobBuffer(validateTx func(*types.Transaction) error, addToPool func(*BlobTxForPool) error, dropPeer func(string)) *BlobBuffer { func NewBlobBuffer(validateTx func(*types.Transaction) error, addToPool func(*BlobTxForPool) error, dropPeer func(string)) *BlobBuffer {
@ -86,6 +91,14 @@ func NewBlobBuffer(validateTx func(*types.Transaction) error, addToPool func(*Bl
// Flush adds all completed entries to the pool and returns the hashes // Flush adds all completed entries to the pool and returns the hashes
// and corresponding errors (nil on success) for each attempted insert. // and corresponding errors (nil on success) for each attempted insert.
func (b *BlobBuffer) Flush() ([]common.Hash, []error) { func (b *BlobBuffer) Flush() ([]common.Hash, []error) {
// Read the count first and return early
// todo: increase threshold ?
if b.completedCount.Load() == 0 {
return nil, nil
}
b.mu.Lock()
defer b.mu.Unlock()
txs := make([]common.Hash, len(b.completed)) txs := make([]common.Hash, len(b.completed))
errs := make([]error, len(b.completed)) errs := make([]error, len(b.completed))
for i, ptx := range b.completed { for i, ptx := range b.completed {
@ -93,12 +106,15 @@ func (b *BlobBuffer) Flush() ([]common.Hash, []error) {
errs[i] = b.addToPool(ptx) errs[i] = b.addToPool(ptx)
} }
b.completed = nil b.completed = nil
b.completedCount.Store(0)
return txs, errs return txs, errs
} }
// AddTx buffers a blob transaction (without blobs) from an ETH/72 peer. // AddTx buffers a blob transaction (without blobs) from an ETH/72 peer.
// If cells are already buffered, verification and pool insertion are attempted. // If cells are already buffered, verification and pool insertion are attempted.
func (b *BlobBuffer) AddTx(txs []*types.Transaction, peer string) []error { func (b *BlobBuffer) AddTx(txs []*types.Transaction, peer string) []error {
b.mu.Lock()
defer b.mu.Unlock()
defer b.updateMetrics()() defer b.updateMetrics()()
// First remove any timed-out entries. // First remove any timed-out entries.
@ -131,6 +147,8 @@ func (b *BlobBuffer) AddTx(txs []*types.Transaction, peer string) []error {
// AddCells buffers per-peer cell deliveries from the blob fetcher. // AddCells buffers per-peer cell deliveries from the blob fetcher.
// If the transaction is already buffered, verification and pool insertion are attempted. // If the transaction is already buffered, verification and pool insertion are attempted.
func (b *BlobBuffer) AddCells(hash common.Hash, deliveries map[string]*PeerDelivery, custody *types.CustodyBitmap) { func (b *BlobBuffer) AddCells(hash common.Hash, deliveries map[string]*PeerDelivery, custody *types.CustodyBitmap) {
b.mu.Lock()
defer b.mu.Unlock()
defer b.updateMetrics()() defer b.updateMetrics()()
// First remove any timed-out entries. // First remove any timed-out entries.
@ -173,16 +191,21 @@ func (b *BlobBuffer) add(hash common.Hash, tx *types.Transaction, cells *cellEnt
} }
b.completed = append(b.completed, pooledTx) b.completed = append(b.completed, pooledTx)
b.completedCount.Add(1)
delete(b.cells, hash) delete(b.cells, hash)
delete(b.txs, hash) delete(b.txs, hash)
} }
func (b *BlobBuffer) HasTx(hash common.Hash) bool { func (b *BlobBuffer) HasTx(hash common.Hash) bool {
b.mu.Lock()
defer b.mu.Unlock()
_, ok := b.txs[hash] _, ok := b.txs[hash]
return ok return ok
} }
func (b *BlobBuffer) HasCells(hash common.Hash) bool { func (b *BlobBuffer) HasCells(hash common.Hash) bool {
b.mu.Lock()
defer b.mu.Unlock()
_, ok := b.cells[hash] _, ok := b.cells[hash]
return ok return ok
} }