diff --git a/core/txpool/blobpool/blobpool.go b/core/txpool/blobpool/blobpool.go index 7155a67a9b..02cb5d971e 100644 --- a/core/txpool/blobpool/blobpool.go +++ b/core/txpool/blobpool/blobpool.go @@ -1373,46 +1373,64 @@ func (p *BlobPool) Has(hash common.Hash) bool { return poolHas || gapped } -func (p *BlobPool) getRLP(hash common.Hash) []byte { - // Track the amount of time waiting to retrieve a fully resolved blob tx from - // the pool and the amount of time actually spent on pulling the data from disk. +// getAndVerify resolves a hash to its on-disk payload with the pool lock +// released across the disk read, retrying once if a concurrent reinject moved +// the same hash to a new slot. +func (p *BlobPool) getAndVerify(hash common.Hash) ([]byte, *types.Transaction) { getStart := time.Now() p.lock.RLock() getwaitHist.Update(time.Since(getStart).Nanoseconds()) - defer p.lock.RUnlock() + id, ok := p.lookup.storeidOfTx(hash) + p.lock.RUnlock() + if !ok { + return nil, nil + } defer func(start time.Time) { gettimeHist.Update(time.Since(start).Nanoseconds()) }(time.Now()) - // Pull the blob from disk and return an assembled response - id, ok := p.lookup.storeidOfTx(hash) - if !ok { - return nil - } - data, err := p.store.Get(id) - if err != nil { - log.Error("Tracked blob transaction missing from store", "hash", hash, "id", id, "err", err) - return nil + for retry := 0; retry < 2; retry++ { + data, err := p.store.Get(id) + if err != nil { + log.Warn("Failed to read blob tx from store", "hash", hash, "id", id, "err", err) + return nil, nil + } + tx := new(types.Transaction) + if err := rlp.DecodeBytes(data, tx); err != nil { + log.Warn("Failed to decode blob tx from store", "hash", hash, "id", id, "err", err) + return nil, nil + } + p.lock.RLock() + currentID, stillTracked := p.lookup.storeidOfTx(hash) + p.lock.RUnlock() + if !stillTracked { + return nil, nil + } + if currentID == id && tx.Hash() == hash { + return data, tx + } + if tx.Hash() != hash { + log.Debug("Blob tx slot reused by concurrent writer", "hash", hash, "id", id, "got", tx.Hash()) + } + if currentID == id { + return nil, nil + } + id = currentID } + return nil, nil +} + +// getRLP returns a RLP-encoded transaction if it is contained in the pool. +func (p *BlobPool) getRLP(hash common.Hash) []byte { + data, _ := p.getAndVerify(hash) return data } // Get returns a transaction if it is contained in the pool, or nil otherwise. func (p *BlobPool) Get(hash common.Hash) *types.Transaction { - data := p.getRLP(hash) - if len(data) == 0 { - return nil - } - item := new(types.Transaction) - if err := rlp.DecodeBytes(data, item); err != nil { - id, _ := p.lookup.storeidOfTx(hash) - - log.Error("Blobs corrupted for traced transaction", - "hash", hash, "id", id, "err", err) - return nil - } - return item + _, tx := p.getAndVerify(hash) + return tx } // GetRLP returns a RLP-encoded transaction if it is contained in the pool. diff --git a/core/txpool/blobpool/blobpool_test.go b/core/txpool/blobpool/blobpool_test.go index ba96bea8ed..507e759f4c 100644 --- a/core/txpool/blobpool/blobpool_test.go +++ b/core/txpool/blobpool/blobpool_test.go @@ -31,6 +31,7 @@ import ( "slices" "sync" "testing" + "time" "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/consensus/misc/eip1559" @@ -2133,3 +2134,104 @@ func benchmarkPoolPending(b *testing.B, datacap uint64) { } } } + +// setupGetBenchPool provisions a blob pool with numTxs blob transactions and +// returns it together with the inserted transaction hashes and the first sender. +func setupGetBenchPool(b *testing.B, numTxs int) (*BlobPool, []common.Hash, common.Address) { + b.Helper() + var ( + basefee = uint64(1050) + blobfee = uint64(105) + signer = types.LatestSigner(params.MainnetChainConfig) + statedb, _ = state.New(types.EmptyRootHash, state.NewDatabaseForTesting()) + chain = &testBlockChain{ + config: params.MainnetChainConfig, + basefee: uint256.NewInt(basefee), + blobfee: uint256.NewInt(blobfee), + statedb: statedb, + } + pool = New(Config{Datadir: b.TempDir()}, chain, nil) + ) + if err := pool.Init(1, chain.CurrentBlock(), newReserver()); err != nil { + b.Fatalf("failed to create blob pool: %v", err) + } + hashes := make([]common.Hash, 0, numTxs) + var firstAddr common.Address + for i := 0; i < numTxs; i++ { + blobtx := makeUnsignedTx(0, 10, basefee+10, blobfee) + blobtx.R = uint256.NewInt(1) + blobtx.S = uint256.NewInt(uint64(100 + i)) + blobtx.V = uint256.NewInt(0) + tx := types.NewTx(blobtx) + addr, err := types.Sender(signer, tx) + if err != nil { + b.Fatal(err) + } + statedb.AddBalance(addr, uint256.NewInt(1_000_000_000), tracing.BalanceChangeUnspecified) + if err := pool.add(tx); err != nil { + b.Fatalf("add tx %d: %v", i, err) + } + hashes = append(hashes, tx.Hash()) + if i == 0 { + firstAddr = addr + } + } + statedb.Commit(0, true, false) + return pool, hashes, firstAddr +} + +// BenchmarkGetRLP measures the per-call cost of a successful GetRLP against +// a real billy store (disk Get + RLP decode + hash verification). +func BenchmarkGetRLP(b *testing.B) { + pool, hashes, _ := setupGetBenchPool(b, 1024) + defer pool.Close() + + b.ReportAllocs() + b.ResetTimer() + for i := 0; i < b.N; i++ { + if data := pool.GetRLP(hashes[i%len(hashes)]); data == nil { + b.Fatalf("GetRLP returned nil for known hash") + } + } +} + +// BenchmarkWriteUnderReaderLoad measures the latency of a write-lock probe +// (Nonce on a tracked address) while readerConcurrency goroutines are +// calling GetRLP in a tight loop. The probe does near-zero in-lock work, so +// the per-iter time is dominated by lock-acquisition wait. +func BenchmarkWriteUnderReaderLoad(b *testing.B) { + pool, hashes, addr := setupGetBenchPool(b, 1024) + defer pool.Close() + + const readerConcurrency = 4 + var ( + stop = make(chan struct{}) + wg sync.WaitGroup + ) + for i := 0; i < readerConcurrency; i++ { + wg.Add(1) + go func(seed int) { + defer wg.Done() + r := rand.New(rand.NewSource(int64(seed))) + for { + select { + case <-stop: + return + default: + pool.GetRLP(hashes[r.Intn(len(hashes))]) + } + } + }(i) + } + time.Sleep(100 * time.Millisecond) + + b.ReportAllocs() + b.ResetTimer() + for i := 0; i < b.N; i++ { + pool.Nonce(addr) + } + b.StopTimer() + + close(stop) + wg.Wait() +} diff --git a/core/txpool/blobpool/metrics.go b/core/txpool/blobpool/metrics.go index 52419ade09..a25a264741 100644 --- a/core/txpool/blobpool/metrics.go +++ b/core/txpool/blobpool/metrics.go @@ -67,10 +67,11 @@ var ( // addwait/time, resetwait/time and getwait/time track the rough health of // the pool and whether it's capable of keeping up with the load from the // network. - addwaitHist = metrics.NewRegisteredHistogram("blobpool/addwait", nil, metrics.NewExpDecaySample(1028, 0.015)) - addtimeHist = metrics.NewRegisteredHistogram("blobpool/addtime", nil, metrics.NewExpDecaySample(1028, 0.015)) - getwaitHist = metrics.NewRegisteredHistogram("blobpool/getwait", nil, metrics.NewExpDecaySample(1028, 0.015)) - gettimeHist = metrics.NewRegisteredHistogram("blobpool/gettime", nil, metrics.NewExpDecaySample(1028, 0.015)) + addwaitHist = metrics.NewRegisteredHistogram("blobpool/addwait", nil, metrics.NewExpDecaySample(1028, 0.015)) + addtimeHist = metrics.NewRegisteredHistogram("blobpool/addtime", nil, metrics.NewExpDecaySample(1028, 0.015)) + getwaitHist = metrics.NewRegisteredHistogram("blobpool/getwait", nil, metrics.NewExpDecaySample(1028, 0.015)) + gettimeHist = metrics.NewRegisteredHistogram("blobpool/gettime", nil, metrics.NewExpDecaySample(1028, 0.015)) + pendwaitHist = metrics.NewRegisteredHistogram("blobpool/pendwait", nil, metrics.NewExpDecaySample(1028, 0.015)) pendtimeHist = metrics.NewRegisteredHistogram("blobpool/pendtime", nil, metrics.NewExpDecaySample(1028, 0.015)) resetwaitHist = metrics.NewRegisteredHistogram("blobpool/resetwait", nil, metrics.NewExpDecaySample(1028, 0.015))