core/txpool/blobpool: do disk read for getRLP/Get out of pool lock

This commit is contained in:
agwab 2026-04-27 00:04:28 +09:00
parent f63e9f3a80
commit 4343e586d0
3 changed files with 151 additions and 30 deletions

View file

@ -1373,46 +1373,64 @@ func (p *BlobPool) Has(hash common.Hash) bool {
return poolHas || gapped return poolHas || gapped
} }
func (p *BlobPool) getRLP(hash common.Hash) []byte { // getAndVerify resolves a hash to its on-disk payload with the pool lock
// Track the amount of time waiting to retrieve a fully resolved blob tx from // released across the disk read, retrying once if a concurrent reinject moved
// the pool and the amount of time actually spent on pulling the data from disk. // the same hash to a new slot.
func (p *BlobPool) getAndVerify(hash common.Hash) ([]byte, *types.Transaction) {
getStart := time.Now() getStart := time.Now()
p.lock.RLock() p.lock.RLock()
getwaitHist.Update(time.Since(getStart).Nanoseconds()) getwaitHist.Update(time.Since(getStart).Nanoseconds())
defer p.lock.RUnlock()
id, ok := p.lookup.storeidOfTx(hash)
p.lock.RUnlock()
if !ok {
return nil, nil
}
defer func(start time.Time) { defer func(start time.Time) {
gettimeHist.Update(time.Since(start).Nanoseconds()) gettimeHist.Update(time.Since(start).Nanoseconds())
}(time.Now()) }(time.Now())
// Pull the blob from disk and return an assembled response for retry := 0; retry < 2; retry++ {
id, ok := p.lookup.storeidOfTx(hash) data, err := p.store.Get(id)
if !ok { if err != nil {
return nil log.Warn("Failed to read blob tx from store", "hash", hash, "id", id, "err", err)
} return nil, nil
data, err := p.store.Get(id) }
if err != nil { tx := new(types.Transaction)
log.Error("Tracked blob transaction missing from store", "hash", hash, "id", id, "err", err) if err := rlp.DecodeBytes(data, tx); err != nil {
return nil log.Warn("Failed to decode blob tx from store", "hash", hash, "id", id, "err", err)
return nil, nil
}
p.lock.RLock()
currentID, stillTracked := p.lookup.storeidOfTx(hash)
p.lock.RUnlock()
if !stillTracked {
return nil, nil
}
if currentID == id && tx.Hash() == hash {
return data, tx
}
if tx.Hash() != hash {
log.Debug("Blob tx slot reused by concurrent writer", "hash", hash, "id", id, "got", tx.Hash())
}
if currentID == id {
return nil, nil
}
id = currentID
} }
return nil, nil
}
// getRLP returns a RLP-encoded transaction if it is contained in the pool.
func (p *BlobPool) getRLP(hash common.Hash) []byte {
data, _ := p.getAndVerify(hash)
return data return data
} }
// Get returns a transaction if it is contained in the pool, or nil otherwise. // Get returns a transaction if it is contained in the pool, or nil otherwise.
func (p *BlobPool) Get(hash common.Hash) *types.Transaction { func (p *BlobPool) Get(hash common.Hash) *types.Transaction {
data := p.getRLP(hash) _, tx := p.getAndVerify(hash)
if len(data) == 0 { return tx
return nil
}
item := new(types.Transaction)
if err := rlp.DecodeBytes(data, item); err != nil {
id, _ := p.lookup.storeidOfTx(hash)
log.Error("Blobs corrupted for traced transaction",
"hash", hash, "id", id, "err", err)
return nil
}
return item
} }
// GetRLP returns a RLP-encoded transaction if it is contained in the pool. // GetRLP returns a RLP-encoded transaction if it is contained in the pool.

View file

@ -31,6 +31,7 @@ import (
"slices" "slices"
"sync" "sync"
"testing" "testing"
"time"
"github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/consensus/misc/eip1559" "github.com/ethereum/go-ethereum/consensus/misc/eip1559"
@ -2133,3 +2134,104 @@ func benchmarkPoolPending(b *testing.B, datacap uint64) {
} }
} }
} }
// setupGetBenchPool provisions a blob pool with numTxs blob transactions and
// returns it together with the inserted transaction hashes and the first sender.
func setupGetBenchPool(b *testing.B, numTxs int) (*BlobPool, []common.Hash, common.Address) {
b.Helper()
var (
basefee = uint64(1050)
blobfee = uint64(105)
signer = types.LatestSigner(params.MainnetChainConfig)
statedb, _ = state.New(types.EmptyRootHash, state.NewDatabaseForTesting())
chain = &testBlockChain{
config: params.MainnetChainConfig,
basefee: uint256.NewInt(basefee),
blobfee: uint256.NewInt(blobfee),
statedb: statedb,
}
pool = New(Config{Datadir: b.TempDir()}, chain, nil)
)
if err := pool.Init(1, chain.CurrentBlock(), newReserver()); err != nil {
b.Fatalf("failed to create blob pool: %v", err)
}
hashes := make([]common.Hash, 0, numTxs)
var firstAddr common.Address
for i := 0; i < numTxs; i++ {
blobtx := makeUnsignedTx(0, 10, basefee+10, blobfee)
blobtx.R = uint256.NewInt(1)
blobtx.S = uint256.NewInt(uint64(100 + i))
blobtx.V = uint256.NewInt(0)
tx := types.NewTx(blobtx)
addr, err := types.Sender(signer, tx)
if err != nil {
b.Fatal(err)
}
statedb.AddBalance(addr, uint256.NewInt(1_000_000_000), tracing.BalanceChangeUnspecified)
if err := pool.add(tx); err != nil {
b.Fatalf("add tx %d: %v", i, err)
}
hashes = append(hashes, tx.Hash())
if i == 0 {
firstAddr = addr
}
}
statedb.Commit(0, true, false)
return pool, hashes, firstAddr
}
// BenchmarkGetRLP measures the per-call cost of a successful GetRLP against
// a real billy store (disk Get + RLP decode + hash verification).
func BenchmarkGetRLP(b *testing.B) {
pool, hashes, _ := setupGetBenchPool(b, 1024)
defer pool.Close()
b.ReportAllocs()
b.ResetTimer()
for i := 0; i < b.N; i++ {
if data := pool.GetRLP(hashes[i%len(hashes)]); data == nil {
b.Fatalf("GetRLP returned nil for known hash")
}
}
}
// BenchmarkWriteUnderReaderLoad measures the latency of a write-lock probe
// (Nonce on a tracked address) while readerConcurrency goroutines are
// calling GetRLP in a tight loop. The probe does near-zero in-lock work, so
// the per-iter time is dominated by lock-acquisition wait.
func BenchmarkWriteUnderReaderLoad(b *testing.B) {
pool, hashes, addr := setupGetBenchPool(b, 1024)
defer pool.Close()
const readerConcurrency = 4
var (
stop = make(chan struct{})
wg sync.WaitGroup
)
for i := 0; i < readerConcurrency; i++ {
wg.Add(1)
go func(seed int) {
defer wg.Done()
r := rand.New(rand.NewSource(int64(seed)))
for {
select {
case <-stop:
return
default:
pool.GetRLP(hashes[r.Intn(len(hashes))])
}
}
}(i)
}
time.Sleep(100 * time.Millisecond)
b.ReportAllocs()
b.ResetTimer()
for i := 0; i < b.N; i++ {
pool.Nonce(addr)
}
b.StopTimer()
close(stop)
wg.Wait()
}

View file

@ -67,10 +67,11 @@ var (
// addwait/time, resetwait/time and getwait/time track the rough health of // addwait/time, resetwait/time and getwait/time track the rough health of
// the pool and whether it's capable of keeping up with the load from the // the pool and whether it's capable of keeping up with the load from the
// network. // network.
addwaitHist = metrics.NewRegisteredHistogram("blobpool/addwait", nil, metrics.NewExpDecaySample(1028, 0.015)) addwaitHist = metrics.NewRegisteredHistogram("blobpool/addwait", nil, metrics.NewExpDecaySample(1028, 0.015))
addtimeHist = metrics.NewRegisteredHistogram("blobpool/addtime", nil, metrics.NewExpDecaySample(1028, 0.015)) addtimeHist = metrics.NewRegisteredHistogram("blobpool/addtime", nil, metrics.NewExpDecaySample(1028, 0.015))
getwaitHist = metrics.NewRegisteredHistogram("blobpool/getwait", nil, metrics.NewExpDecaySample(1028, 0.015)) getwaitHist = metrics.NewRegisteredHistogram("blobpool/getwait", nil, metrics.NewExpDecaySample(1028, 0.015))
gettimeHist = metrics.NewRegisteredHistogram("blobpool/gettime", nil, metrics.NewExpDecaySample(1028, 0.015)) gettimeHist = metrics.NewRegisteredHistogram("blobpool/gettime", nil, metrics.NewExpDecaySample(1028, 0.015))
pendwaitHist = metrics.NewRegisteredHistogram("blobpool/pendwait", nil, metrics.NewExpDecaySample(1028, 0.015)) pendwaitHist = metrics.NewRegisteredHistogram("blobpool/pendwait", nil, metrics.NewExpDecaySample(1028, 0.015))
pendtimeHist = metrics.NewRegisteredHistogram("blobpool/pendtime", nil, metrics.NewExpDecaySample(1028, 0.015)) pendtimeHist = metrics.NewRegisteredHistogram("blobpool/pendtime", nil, metrics.NewExpDecaySample(1028, 0.015))
resetwaitHist = metrics.NewRegisteredHistogram("blobpool/resetwait", nil, metrics.NewExpDecaySample(1028, 0.015)) resetwaitHist = metrics.NewRegisteredHistogram("blobpool/resetwait", nil, metrics.NewExpDecaySample(1028, 0.015))