From e595aedcd067304d77480fdccf504f5f121352f6 Mon Sep 17 00:00:00 2001 From: Bosul Mun Date: Thu, 11 Jun 2026 17:26:15 +0200 Subject: [PATCH] core/txpool/blobpool: add cache for GetBlobs request (#35124) This PR introduces a cache for GetBlobs request. The main purpose of this PR is to reduce the getBlobs latency by reading and decoding blobs from the pool in advance of the actual query. This is important especially in the context of a sparse blobpool, since it may be necessary to recover blobs from cells on a getBlobs request. Previously, the Engine API read and decoded blobs from the pool on every call. Now those calls check the cache and only fall back to the pool on a miss. The cache has two modes: - In topK mode (default), it wakes up periodically, picks the most profitable pending blob transactions up to the current fork's maxBlobsPerBlock, and loads their blobs. The selection logic is shared with the miner's block-building logic. The selection size is derived from eip4844.MaxBlobsPerBlock at the current head. - When the CL calls HasBlobs, the cache switches to hasBlobs mode and tries to pin the set it just reported as available. Cache updates (read, decode, and optionally conversion in the future) run in background goroutines. --------- Co-authored-by: Felix Lange --- core/txpool/blobpool/blobpool.go | 61 ++- core/txpool/blobpool/blobpool_test.go | 7 +- core/txpool/blobpool/cache.go | 442 ++++++++++++++++++ core/txpool/blobpool/cache_test.go | 297 ++++++++++++ {miner => core/txpool/txorder}/ordering.go | 20 +- .../txpool/txorder}/ordering_test.go | 6 +- eth/backend.go | 4 + eth/catalyst/api.go | 8 +- miner/worker.go | 13 +- 9 files changed, 811 insertions(+), 47 deletions(-) create mode 100644 core/txpool/blobpool/cache.go create mode 100644 core/txpool/blobpool/cache_test.go rename {miner => core/txpool/txorder}/ordering.go (90%) rename {miner => core/txpool/txorder}/ordering_test.go (97%) diff --git a/core/txpool/blobpool/blobpool.go b/core/txpool/blobpool/blobpool.go index 1ae2ef906a..fcd1f7566a 100644 --- a/core/txpool/blobpool/blobpool.go +++ b/core/txpool/blobpool/blobpool.go @@ -19,7 +19,6 @@ package blobpool import ( "container/heap" - "context" "errors" "fmt" "math" @@ -40,7 +39,6 @@ import ( "github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/crypto/kzg4844" "github.com/ethereum/go-ethereum/event" - "github.com/ethereum/go-ethereum/internal/telemetry" "github.com/ethereum/go-ethereum/log" "github.com/ethereum/go-ethereum/metrics" "github.com/ethereum/go-ethereum/params" @@ -1121,6 +1119,43 @@ func (p *BlobPool) Reset(oldHead, newHead *types.Header) { p.updateStorageMetrics() } +// vhashesByTx returns a snapshot of the mapping between transaction hash and +// versioned hashes. +func (p *BlobPool) vhashesByTx() map[common.Hash][]common.Hash { + p.lock.RLock() + defer p.lock.RUnlock() + + out := make(map[common.Hash][]common.Hash) + for _, txs := range p.index { + for _, tx := range txs { + out[tx.hash] = tx.vhashes + } + } + return out +} + +// getByVhash reads and decodes the blob transaction which has the given +// versioned hash. Returns nil if unavailable. +func (p *BlobPool) getByVhash(vhash common.Hash) *blobTxForPool { + p.lock.RLock() + txID, exists := p.lookup.storeidOfBlob(vhash) + p.lock.RUnlock() + if !exists { + return nil + } + data, err := p.store.Get(txID) + if err != nil { + log.Error("Tracked blob transaction missing from store", "id", txID, "err", err) + return nil + } + var ptx blobTxForPool + if err := rlp.DecodeBytes(data, &ptx); err != nil { + log.Error("Blobs corrupted for tracked transaction", "id", txID, "err", err) + return nil + } + return &ptx +} + // reorg assembles all the transactors and missing transactions between an old // and new head to figure out which account's tx set needs to be rechecked and // which transactions need to be requeued. @@ -1591,24 +1626,10 @@ func (p *BlobPool) GetMetadata(hash common.Hash) *txpool.TxMetadata { } } -// GetBlobs returns a number of blobs and proofs for the given versioned hashes. +// getBlobs returns a number of blobs and proofs for the given versioned hashes. // Blobpool must place responses in the order given in the request, using null // for any missing blobs. -// -// For instance, if the request is [A_versioned_hash, B_versioned_hash, -// C_versioned_hash] and blobpool has data for blobs A and C, but doesn't have -// data for B, the response MUST be [A, null, C]. -// -// This is a utility method for the engine API, enabling consensus clients to -// retrieve blobs from the pools directly instead of the network. -// -// The version argument specifies the type of proofs to return, either the -// blob proofs (version 0) or the cell proofs (version 1). Proofs conversion is -// CPU intensive and prohibited in the blobpool explicitly. -func (p *BlobPool) GetBlobs(ctx context.Context, vhashes []common.Hash, version byte) (_ []*kzg4844.Blob, _ []kzg4844.Commitment, _ [][]kzg4844.Proof, err error) { - _, _, spanEnd := telemetry.StartSpan(ctx, "blobpool.GetBlobs") - defer spanEnd(&err) - +func (p *BlobPool) getBlobs(vhashes []common.Hash, version byte) (_ []*kzg4844.Blob, _ []kzg4844.Commitment, _ [][]kzg4844.Proof, err error) { var ( blobs = make([]*kzg4844.Blob, len(vhashes)) commitments = make([]kzg4844.Commitment, len(vhashes)) @@ -1683,8 +1704,8 @@ func (p *BlobPool) GetBlobs(ctx context.Context, vhashes []common.Hash, version return blobs, commitments, proofs, nil } -// AvailableBlobs returns whether the blobs are available in the subpool. -func (p *BlobPool) AvailableBlobs(vhashes []common.Hash) []bool { +// availableBlobs returns whether the blobs are available in the subpool. +func (p *BlobPool) availableBlobs(vhashes []common.Hash) []bool { available := make([]bool, len(vhashes)) p.lock.RLock() for i, vhash := range vhashes { diff --git a/core/txpool/blobpool/blobpool_test.go b/core/txpool/blobpool/blobpool_test.go index dfdaaf51b6..6b61591227 100644 --- a/core/txpool/blobpool/blobpool_test.go +++ b/core/txpool/blobpool/blobpool_test.go @@ -18,7 +18,6 @@ package blobpool import ( "bytes" - "context" "crypto/ecdsa" "crypto/sha256" "errors" @@ -441,11 +440,11 @@ func verifyBlobRetrievals(t *testing.T, pool *BlobPool) { hashes = append(hashes, tx.vhashes...) } } - blobs1, _, proofs1, err := pool.GetBlobs(context.Background(), hashes, types.BlobSidecarVersion0) + blobs1, _, proofs1, err := pool.getBlobs(hashes, types.BlobSidecarVersion0) if err != nil { t.Fatal(err) } - blobs2, _, proofs2, err := pool.GetBlobs(context.Background(), hashes, types.BlobSidecarVersion1) + blobs2, _, proofs2, err := pool.getBlobs(hashes, types.BlobSidecarVersion1) if err != nil { t.Fatal(err) } @@ -2088,7 +2087,7 @@ func TestGetBlobs(t *testing.T) { filled[len(vhashes)] = struct{}{} vhashes = append(vhashes, testrand.Hash()) } - blobs, _, proofs, err := pool.GetBlobs(context.Background(), vhashes, c.version) + blobs, _, proofs, err := pool.getBlobs(vhashes, c.version) if err != nil { t.Errorf("Unexpected error for case %d, %v", i, err) } diff --git a/core/txpool/blobpool/cache.go b/core/txpool/blobpool/cache.go new file mode 100644 index 0000000000..de4001a1b1 --- /dev/null +++ b/core/txpool/blobpool/cache.go @@ -0,0 +1,442 @@ +// Copyright 2026 The go-ethereum Authors +// This file is part of the go-ethereum library. +// +// The go-ethereum library is free software: you can redistribute it and/or modify +// it under the terms of the GNU Lesser General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// The go-ethereum library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the go-ethereum library. If not, see . + +package blobpool + +import ( + "context" + "sync" + "time" + + "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/common/mclock" + "github.com/ethereum/go-ethereum/consensus/misc/eip1559" + "github.com/ethereum/go-ethereum/consensus/misc/eip4844" + "github.com/ethereum/go-ethereum/core/txpool" + "github.com/ethereum/go-ethereum/core/txpool/txorder" + "github.com/ethereum/go-ethereum/core/types" + "github.com/ethereum/go-ethereum/crypto/kzg4844" + "github.com/ethereum/go-ethereum/internal/telemetry" + "github.com/ethereum/go-ethereum/log" + "github.com/ethereum/go-ethereum/metrics" + "github.com/holiman/uint256" +) + +const ( + topKTimeout = 4 * time.Second + hasBlobsTimeout = 1 * time.Second +) + +var ( + // Cache tracks 3 metrics: cache hit, cache miss, and the number of blobs + // it contains. Note that cache miss includes the blobs that we are actually + // missing on the lower level (in this case, the blobpool). The amount that + // we failed to predict) can be calculated with the telemetry span + // (blobs.filled - cache.hit). + cacheHitMeter = metrics.NewRegisteredMeter("blobpool/cache/hit", nil) + cacheMissMeter = metrics.NewRegisteredMeter("blobpool/cache/miss", nil) + cacheBlobsGauge = metrics.NewRegisteredGauge("blobpool/cache/blobs", nil) +) + +type cachedBlob struct { + blob *kzg4844.Blob + commitment kzg4844.Commitment + proofs []kzg4844.Proof + version byte +} + +// Cache holds the blobs that are likely to be requested by the GetBlobs engine API. +// +// Every `topKTimeout`, the cache selects the blobs of the top K most profitable +// transactions, and preloads them into the cache. +// +// For HasBlobs requests, it also causes the blobs requested by the CL to be loaded. +// (Note: the cache is not guaranteed to always hold such blobs, since the blobpool might +// drop the transaction in the window between the engine API response and the cache +// update.) +type Cache struct { + blobpool *BlobPool + clock mclock.Clock + + mu sync.Mutex + entries map[common.Hash]*cachedBlob + + // channels into loop + quit chan struct{} + topkRequest chan struct{} + topkTimer mclock.Timer + hasBlobsCh chan []common.Hash // list of tx hashes that should be pinned + + step func() // test hook fired after each loop iteration + + cancelInflights context.CancelFunc // cancels the conversion/decode goroutines + inflight sync.WaitGroup // tracks all in-flight conversion/decode goroutines + wg sync.WaitGroup // tracks the loop goroutine +} + +// NewCache creates a blob cache backed by the given blobpool. +func NewCache(p *BlobPool) *Cache { + return newCache(p, mclock.System{}, nil) +} + +// newCache creates a blob cache for testing purposes. +// It allows injecting a clock and a step hook. +func newCache(p *BlobPool, clock mclock.Clock, step func()) *Cache { + c := &Cache{ + entries: make(map[common.Hash]*cachedBlob), + blobpool: p, + hasBlobsCh: make(chan []common.Hash, 1), + clock: clock, + step: step, + quit: make(chan struct{}), + topkRequest: make(chan struct{}, 1), + } + + c.wg.Add(1) + go c.loop() + return c +} + +// Stop terminates the cache loop and blocks until it and any in-flight work +// have stopped. +func (c *Cache) Stop() { + close(c.quit) + c.wg.Wait() +} + +// HasBlobs reports whether the blob is available (in the cache or the +// blobpool) and asks the loop to pin the ones it found. +func (c *Cache) HasBlobs(ctx context.Context, vhashes []common.Hash) []bool { + var ( + missIdx []int + missVhashes []common.Hash + needPin []common.Hash // available vhashes + ) + available := make([]bool, len(vhashes)) + + // First check cache and pass missing ones to blobpool. + c.mu.Lock() + for i, vhash := range vhashes { + if _, ok := c.entries[vhash]; ok { + available[i] = true + needPin = append(needPin, vhash) + } else { + missIdx = append(missIdx, i) + missVhashes = append(missVhashes, vhash) + } + } + c.mu.Unlock() + + if len(missVhashes) > 0 { + pooled := c.blobpool.availableBlobs(missVhashes) + // Merge two results + for j, ok := range pooled { + if ok { + available[missIdx[j]] = true + needPin = append(needPin, missVhashes[j]) + } + } + } + + select { + case c.hasBlobsCh <- needPin: + // Note that we also send the ones we already have in cache, + // since it can be dropped from the cache before this signal is processed. + return available + case <-c.quit: + return nil + } +} + +// GetBlobs returns the blobs and proofs for the given versioned hashes, serving +// them from the cache when possible and falling back to the blobpool for misses. +// Responses are placed in the order given in the request, using null for any +// missing blob. +// +// For instance, if the request is [A_versioned_hash, B_versioned_hash, +// C_versioned_hash] and blobpool has data for blobs A and C, but doesn't have +// data for B, the response MUST be [A, null, C]. +// +// This is a utility method for the engine API, enabling consensus clients to +// retrieve blobs from the pools directly instead of the network. +// +// The version argument specifies the type of proofs to return, either the +// blob proofs (version 0) or the cell proofs (version 1). Proofs conversion is +// CPU intensive and prohibited explicitly. +func (c *Cache) GetBlobs(ctx context.Context, vhashes []common.Hash, version byte) (_ []*kzg4844.Blob, _ []kzg4844.Commitment, _ [][]kzg4844.Proof, err error) { + _, span, spanEnd := telemetry.StartSpan(ctx, "blobpool.GetBlobs") + defer spanEnd(&err) + var ( + blobs = make([]*kzg4844.Blob, len(vhashes)) + commitments = make([]kzg4844.Commitment, len(vhashes)) + proofs = make([][]kzg4844.Proof, len(vhashes)) + + indices = make(map[common.Hash][]int) + misses []common.Hash + + cacheHits int + cacheMiss int + ) + for i, h := range vhashes { + indices[h] = append(indices[h], i) + } + + c.mu.Lock() + for vhash, idxs := range indices { + n := len(idxs) + + cached := c.entries[vhash] + if cached == nil || cached.version != version { + cacheMiss += n + if cached == nil { + misses = append(misses, vhash) + } + continue + } + cacheHits += n + for _, index := range idxs { + blobs[index] = cached.blob + commitments[index] = cached.commitment + proofs[index] = cached.proofs + } + } + c.mu.Unlock() + + if len(misses) > 0 { + mb, mc, mp, err := c.blobpool.getBlobs(misses, version) + if err != nil { + return nil, nil, nil, err + } + for j, vhash := range misses { + if mb[j] == nil { + continue + } + for _, index := range indices[vhash] { + blobs[index] = mb[j] + commitments[index] = mc[j] + proofs[index] = mp[j] + } + } + } + cacheHitMeter.Mark(int64(cacheHits)) + cacheMissMeter.Mark(int64(cacheMiss)) + span.SetAttributes( + telemetry.IntAttribute("cache.hit", cacheHits), + telemetry.IntAttribute("cache.miss", cacheMiss), + ) + + return blobs, commitments, proofs, nil +} + +func (c *Cache) loop() { + defer c.wg.Done() + + c.triggerTopK() + for { + select { + case want := <-c.hasBlobsCh: + // HasBlobs request was received. + // Update the cache once with the requested blobs, then reschedule topK. + c.update(want) + c.triggerTopKAfter(hasBlobsTimeout) + + case <-c.topkRequest: + want := c.selectTopTxs() + c.update(want) + c.triggerTopKAfter(topKTimeout) + + case <-c.quit: + c.cancelUpdate() + if c.topkTimer != nil { + c.topkTimer.Stop() + } + c.inflight.Wait() + return + } + + if c.step != nil { + c.step() + } + } +} + +// cancelUpdate stops the current update. +func (c *Cache) cancelUpdate() { + if c.cancelInflights != nil { + c.cancelInflights() + c.cancelInflights = nil + } +} + +// update updates the cache to hold the wanted vhashes. It evicts entries that +// are no longer wanted and loads the missing ones from the blobpool in the +// background. +func (c *Cache) update(want []common.Hash) { + wantSet := make(map[common.Hash]struct{}, len(want)) + for _, vh := range want { + wantSet[vh] = struct{}{} + } + + // Cancel the current updates. + c.cancelUpdate() + ctx, cancel := context.WithCancel(context.Background()) + c.cancelInflights = cancel + + c.mu.Lock() + var missing []common.Hash + for vh := range wantSet { + if _, ok := c.entries[vh]; !ok { + missing = append(missing, vh) + } + } + for vh := range c.entries { + if _, ok := wantSet[vh]; ok { + continue + } + delete(c.entries, vh) + cacheBlobsGauge.Dec(1) + } + c.mu.Unlock() + + c.inflight.Add(1) + go func() { + defer c.inflight.Done() + for _, vh := range missing { + select { + case <-ctx.Done(): + return + default: + } + c.mu.Lock() + _, loaded := c.entries[vh] + c.mu.Unlock() + if loaded { + continue + } + ptx := c.blobpool.getByVhash(vh) + if ptx == nil { + continue + } + sidecar := ptx.Sidecar() + if sidecar == nil { + continue + } + + c.mu.Lock() + for i, v := range sidecar.BlobHashes() { + if _, ok := wantSet[v]; !ok { + continue + } + if _, exists := c.entries[v]; exists { + continue // recompute only new entries + } + var pf []kzg4844.Proof + switch sidecar.Version { + case types.BlobSidecarVersion0: + pf = []kzg4844.Proof{sidecar.Proofs[i]} + case types.BlobSidecarVersion1: + cellProofs, err := sidecar.CellProofsAt(i) + if err != nil { + log.Error("Failed to get cell proofs", "txhash", ptx.Tx.Hash(), "err", err) + continue + } + pf = cellProofs + } + c.entries[v] = &cachedBlob{ + blob: &sidecar.Blobs[i], + commitment: sidecar.Commitments[i], + proofs: pf, + version: sidecar.Version, + } + cacheBlobsGauge.Inc(1) + } + c.mu.Unlock() + } + }() +} + +// selectTopTxs returns the vhashes of the top K most profitable pending blob +// transactions, up to the active fork's maxBlobsPerBlock. +func (c *Cache) selectTopTxs() []common.Hash { + p := c.blobpool + head := p.head.Load() + if head == nil { + return nil + } + config := p.chain.Config() + baseFee := eip1559.CalcBaseFee(config, head) + + filter := txpool.PendingFilter{ + BlobTxs: true, + BaseFee: uint256.MustFromBig(baseFee), + } + if head.ExcessBlobGas != nil { + filter.BlobFee = uint256.MustFromBig(eip4844.CalcBlobFee(config, head)) + } + if config.IsOsaka(head.Number, head.Time) { + filter.BlobVersion = types.BlobSidecarVersion1 + } else { + filter.BlobVersion = types.BlobSidecarVersion0 + } + pending, _ := p.Pending(filter) + vhashesOf := p.vhashesByTx() + + order := txorder.NewTransactionsByPriceAndNonce(p.signer, pending, baseFee) + + // Bound the selection by the active fork's blob limit so the cache follows + // BPO changes to maxBlobsPerBlock. + target := uint(eip4844.MaxBlobsPerBlock(config, head.Time)) + + var ( + vhashes []common.Hash + blobs uint + ) + for blobs < target { + tx, _ := order.Peek() + if tx == nil { + break + } + vh, ok := vhashesOf[tx.Hash] + if ok { + vhashes = append(vhashes, vh...) + blobs += uint(len(vh)) + } + order.Shift() + } + return vhashes +} + +// triggerTopKAfter makes a topK selection happen after the given interval. +func (c *Cache) triggerTopKAfter(interval time.Duration) { + if c.topkTimer != nil { + c.topkTimer.Stop() + } + // drain current request to avoid triggering before the interval + select { + case <-c.topkRequest: + default: + } + c.topkTimer = c.clock.AfterFunc(interval, c.triggerTopK) +} + +// triggerTopK causes another topK selection to happen. +// Note this is safe to call from anywhere, even outside of the loop goroutine. +func (c *Cache) triggerTopK() { + select { + case c.topkRequest <- struct{}{}: + default: + } +} diff --git a/core/txpool/blobpool/cache_test.go b/core/txpool/blobpool/cache_test.go new file mode 100644 index 0000000000..f7ece4c1b8 --- /dev/null +++ b/core/txpool/blobpool/cache_test.go @@ -0,0 +1,297 @@ +// Copyright 2026 The go-ethereum Authors +// This file is part of the go-ethereum library. +// +// The go-ethereum library is free software: you can redistribute it and/or modify +// it under the terms of the GNU Lesser General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// The go-ethereum library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the go-ethereum library. If not, see . + +package blobpool + +import ( + "context" + "math/big" + "os" + "path/filepath" + "reflect" + "sort" + "testing" + "time" + + "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/common/mclock" + "github.com/ethereum/go-ethereum/core/state" + "github.com/ethereum/go-ethereum/core/tracing" + "github.com/ethereum/go-ethereum/core/types" + "github.com/ethereum/go-ethereum/crypto" + "github.com/ethereum/go-ethereum/params" + "github.com/holiman/billy" + "github.com/holiman/uint256" +) + +type txSpec struct { + blobs int + tip uint64 +} + +type testCache struct { + *Cache + clock *mclock.Simulated + iterCh chan struct{} + vhashes [][]common.Hash // vhashes in the pool + offset int // next blob index to use when injecting more txs +} + +// newTestCache creates a cache for test, with a pool that contains transactions +// specified in txConfig. The returned cache has the initial topK fetch already +// settled. +func newTestCache(t *testing.T, txConfig []txSpec) *testCache { + storage := t.TempDir() + if err := os.MkdirAll(filepath.Join(storage, pendingTransactionStore), 0700); err != nil { + t.Fatalf("mkdir: %v", err) + } + store, err := billy.Open(billy.Options{Path: filepath.Join(storage, pendingTransactionStore)}, newSlotter(params.BlobTxMaxBlobs), nil) + if err != nil { + t.Fatalf("billy open: %v", err) + } + + var ( + addrs = make([]common.Address, 0, len(txConfig)) + vhashes = make([][]common.Hash, 0, len(txConfig)) + offset int + ) + for _, s := range txConfig { + key, _ := crypto.GenerateKey() + tx := makeMultiBlobTx(0, s.tip, 1_000_000, 1_000_000, s.blobs, offset, key, types.BlobSidecarVersion1) + if _, err := store.Put(encodeForPool(tx)); err != nil { + t.Fatalf("store put: %v", err) + } + addrs = append(addrs, crypto.PubkeyToAddress(key.PublicKey)) + vhashes = append(vhashes, tx.BlobHashes()) + offset += s.blobs + } + store.Close() + + statedb, _ := state.New(types.EmptyRootHash, state.NewDatabaseForTesting()) + for _, a := range addrs { + statedb.AddBalance(a, uint256.NewInt(1_000_000_000_000), tracing.BalanceChangeUnspecified) + } + statedb.Commit(0, true, false) + + cancunTime := uint64(0) + config := ¶ms.ChainConfig{ + ChainID: big.NewInt(1), + LondonBlock: big.NewInt(0), + BerlinBlock: big.NewInt(0), + CancunTime: &cancunTime, + OsakaTime: &cancunTime, + BlobScheduleConfig: ¶ms.BlobScheduleConfig{ + Osaka: ¶ms.BlobConfig{ + Target: 1, + Max: 1, + UpdateFraction: params.DefaultCancunBlobConfig.UpdateFraction, + }, + }, + } + chain := &testBlockChain{ + config: config, + basefee: uint256.NewInt(1), + blobfee: uint256.NewInt(1), + statedb: statedb, + } + pool := New(Config{Datadir: storage}, chain, nil) + if err := pool.Init(1, chain.CurrentBlock(), newReserver()); err != nil { + t.Fatalf("init pool: %v", err) + } + t.Cleanup(func() { pool.Close() }) + + clock := &mclock.Simulated{} + iterCh := make(chan struct{}, 256) + step := func() { + select { + case iterCh <- struct{}{}: + default: + } + } + cache := newCache(pool, clock, step) + + tc := &testCache{ + Cache: cache, + clock: clock, + iterCh: iterCh, + vhashes: vhashes, + offset: offset, + } + // The loop performs the initial topK update immediately on startup and then + // arms the topK timer. Wait for the timer so we know the initial update has + // been issued, then let it settle. + clock.WaitForTimers(1) + tc.wait(t, 0) + return tc +} + +// inject adds a tx with the given spec directly to the pool's index and store, +// bypassing the normal Add path. Returns the tx's blob versioned hashes. +func (tc *testCache) inject(t *testing.T, spec txSpec) []common.Hash { + t.Helper() + key, _ := crypto.GenerateKey() + tx := makeMultiBlobTx(0, spec.tip, 1_000_000, 1_000_000, spec.blobs, tc.offset, key, types.BlobSidecarVersion1) + tc.offset += spec.blobs + + ptx := newBlobTxForPool(tx) + + tc.blobpool.lock.Lock() + defer tc.blobpool.lock.Unlock() + + id, err := tc.blobpool.store.Put(encodeForPool(tx)) + if err != nil { + t.Fatalf("store put: %v", err) + } + meta := newBlobTxMeta(id, ptx.TxSize(), tc.blobpool.store.Size(id), ptx) + addr := crypto.PubkeyToAddress(key.PublicKey) + tc.blobpool.index[addr] = append(tc.blobpool.index[addr], meta) + tc.blobpool.lookup.track(meta) + + return tx.BlobHashes() +} + +// wait advances simulated time by d (if > 0) and then blocks until the cache +// loop and any inflight fetch goroutines have settled. +func (tc *testCache) wait(t *testing.T, d time.Duration) { + t.Helper() + if d > 0 { + tc.clock.Run(d) + } + for { + select { + case <-tc.iterCh: + tc.inflight.Wait() + case <-time.After(50 * time.Millisecond): + tc.inflight.Wait() + return + } + } +} + +func (tc *testCache) expectEntries(t *testing.T, want ...common.Hash) { + t.Helper() + wantSet := make(map[common.Hash]struct{}, len(want)) + for _, w := range want { + wantSet[w] = struct{}{} + } + tc.mu.Lock() + have := make(map[common.Hash]struct{}, len(tc.entries)) + for k := range tc.entries { + have[k] = struct{}{} + } + tc.mu.Unlock() + if !reflect.DeepEqual(have, wantSet) { + t.Errorf("entries: got %s, want %s", hashSet(have), hashSet(wantSet)) + } +} + +func hashSet(m map[common.Hash]struct{}) []string { + out := make([]string, 0, len(m)) + for h := range m { + out = append(out, h.Hex()[:10]) + } + sort.Strings(out) + return out +} + +// TestCacheHasBlobsLoadsClaimedSet checks that a HasBlobs request loads +// exactly the txs whose vhashes the cache claimed available, regardless of +// whether the claim came from the cache itself or from the pool fallback. +func TestCacheHasBlobsLoadsClaimedSet(t *testing.T) { + tc := newTestCache(t, []txSpec{ + {blobs: 2, tip: 100}, + {blobs: 2, tip: 200}, + {blobs: 2, tip: 300}, + }) + + available := tc.HasBlobs(context.Background(), tc.vhashes[1]) + if !available[0] { + t.Fatalf("expected vhash to be reported available") + } + tc.wait(t, 0) + + tc.expectEntries(t, tc.vhashes[1]...) +} + +// TestCacheTopK exercises the initial topK update: after it settles in +// newTestCache, the cache entries equal the top-by-tip txs. +func TestCacheTopK(t *testing.T) { + tc := newTestCache(t, []txSpec{ + {blobs: 1, tip: 100}, + {blobs: 1, tip: 200}, + {blobs: 1, tip: 300}, + }) + + tc.expectEntries(t, tc.vhashes[2]...) +} + +// TestCacheHbTimerFallsBackToTopK checks the fallback after a HasBlobs +// request: when hasBlobsTimeout elapses, a topK update replaces the entries +// with the topK set. +func TestCacheHbTimerFallsBackToTopK(t *testing.T) { + tc := newTestCache(t, []txSpec{ + {blobs: 1, tip: 100}, + {blobs: 1, tip: 300}, + }) + + tc.HasBlobs(context.Background(), tc.vhashes[0]) + tc.wait(t, 0) + tc.expectEntries(t, tc.vhashes[0]...) + + tc.wait(t, hasBlobsTimeout) + tc.expectEntries(t, tc.vhashes[1]...) +} + +// TestCacheGetBlobs checks that GetBlobs returns the requested blobs and does +// not disturb the cached entries. +func TestCacheGetBlobs(t *testing.T) { + tc := newTestCache(t, []txSpec{ + {blobs: 1, tip: 100}, + {blobs: 1, tip: 300}, + }) + tc.expectEntries(t, tc.vhashes[1]...) + + blobs, _, proofs, err := tc.GetBlobs(context.Background(), tc.vhashes[1], types.BlobSidecarVersion1) + if err != nil { + t.Fatalf("GetBlobs: %v", err) + } + for i := range blobs { + if blobs[i] == nil { + t.Errorf("blob %d missing in GetBlobs response", i) + } + if len(proofs[i]) == 0 { + t.Errorf("proofs %d missing in GetBlobs response", i) + } + } + tc.wait(t, 0) + tc.expectEntries(t, tc.vhashes[1]...) +} + +// TestCacheTopKRefresh verifies that when a more profitable tx appears in the +// pool, the next topK tick replaces the cached entry with the better one. +func TestCacheTopKRefresh(t *testing.T) { + tc := newTestCache(t, []txSpec{ + {blobs: 1, tip: 100}, + {blobs: 1, tip: 200}, + {blobs: 1, tip: 300}, + }) + tc.expectEntries(t, tc.vhashes[2]...) + + better := tc.inject(t, txSpec{blobs: 1, tip: 400}) + + tc.wait(t, topKTimeout) + tc.expectEntries(t, better...) +} diff --git a/miner/ordering.go b/core/txpool/txorder/ordering.go similarity index 90% rename from miner/ordering.go rename to core/txpool/txorder/ordering.go index bcf7af46e8..a17a2cc251 100644 --- a/miner/ordering.go +++ b/core/txpool/txorder/ordering.go @@ -14,7 +14,7 @@ // You should have received a copy of the GNU Lesser General Public License // along with the go-ethereum library. If not, see . -package miner +package txorder import ( "container/heap" @@ -83,10 +83,10 @@ func (s *txByPriceAndTime) Pop() interface{} { return x } -// transactionsByPriceAndNonce represents a set of transactions that can return +// TransactionsByPriceAndNonce represents a set of transactions that can return // transactions in a profit-maximizing sorted order, while supporting removing // entire batches of transactions for non-executable accounts. -type transactionsByPriceAndNonce struct { +type TransactionsByPriceAndNonce struct { txs map[common.Address][]*txpool.LazyTransaction // Per account nonce-sorted list of transactions heads txByPriceAndTime // Next transaction for each unique account (price heap) signer types.Signer // Signer for the set of transactions @@ -98,7 +98,7 @@ type transactionsByPriceAndNonce struct { // // Note, the input map is reowned so the caller should not interact any more with // if after providing it to the constructor. -func newTransactionsByPriceAndNonce(signer types.Signer, txs map[common.Address][]*txpool.LazyTransaction, baseFee *big.Int) *transactionsByPriceAndNonce { +func NewTransactionsByPriceAndNonce(signer types.Signer, txs map[common.Address][]*txpool.LazyTransaction, baseFee *big.Int) *TransactionsByPriceAndNonce { // Convert the basefee from header format to uint256 format var baseFeeUint *uint256.Int if baseFee != nil { @@ -118,7 +118,7 @@ func newTransactionsByPriceAndNonce(signer types.Signer, txs map[common.Address] heap.Init(&heads) // Assemble and return the transaction set - return &transactionsByPriceAndNonce{ + return &TransactionsByPriceAndNonce{ txs: txs, heads: heads, signer: signer, @@ -127,7 +127,7 @@ func newTransactionsByPriceAndNonce(signer types.Signer, txs map[common.Address] } // Peek returns the next transaction by price. -func (t *transactionsByPriceAndNonce) Peek() (*txpool.LazyTransaction, *uint256.Int) { +func (t *TransactionsByPriceAndNonce) Peek() (*txpool.LazyTransaction, *uint256.Int) { if len(t.heads) == 0 { return nil, nil } @@ -135,7 +135,7 @@ func (t *transactionsByPriceAndNonce) Peek() (*txpool.LazyTransaction, *uint256. } // Shift replaces the current best head with the next one from the same account. -func (t *transactionsByPriceAndNonce) Shift() { +func (t *TransactionsByPriceAndNonce) Shift() { acc := t.heads[0].from if txs, ok := t.txs[acc]; ok && len(txs) > 0 { if wrapped, err := newTxWithMinerFee(txs[0], acc, t.baseFee); err == nil { @@ -150,17 +150,17 @@ func (t *transactionsByPriceAndNonce) Shift() { // Pop removes the best transaction, *not* replacing it with the next one from // the same account. This should be used when a transaction cannot be executed // and hence all subsequent ones should be discarded from the same account. -func (t *transactionsByPriceAndNonce) Pop() { +func (t *TransactionsByPriceAndNonce) Pop() { heap.Pop(&t.heads) } // Empty returns if the price heap is empty. It can be used to check it simpler // than calling peek and checking for nil return. -func (t *transactionsByPriceAndNonce) Empty() bool { +func (t *TransactionsByPriceAndNonce) Empty() bool { return len(t.heads) == 0 } // Clear removes the entire content of the heap. -func (t *transactionsByPriceAndNonce) Clear() { +func (t *TransactionsByPriceAndNonce) Clear() { t.heads, t.txs = nil, nil } diff --git a/miner/ordering_test.go b/core/txpool/txorder/ordering_test.go similarity index 97% rename from miner/ordering_test.go rename to core/txpool/txorder/ordering_test.go index 5a45261570..cb2b3bc7a2 100644 --- a/miner/ordering_test.go +++ b/core/txpool/txorder/ordering_test.go @@ -14,7 +14,7 @@ // You should have received a copy of the GNU Lesser General Public License // along with the go-ethereum library. If not, see . -package miner +package txorder import ( "crypto/ecdsa" @@ -102,7 +102,7 @@ func testTransactionPriceNonceSort(t *testing.T, baseFee *big.Int) { expectedCount += count } // Sort the transactions and cross check the nonce ordering - txset := newTransactionsByPriceAndNonce(signer, groups, baseFee) + txset := NewTransactionsByPriceAndNonce(signer, groups, baseFee) txs := types.Transactions{} for tx, _ := txset.Peek(); tx != nil; tx, _ = txset.Peek() { @@ -168,7 +168,7 @@ func TestTransactionTimeSort(t *testing.T) { }) } // Sort the transactions and cross check the nonce ordering - txset := newTransactionsByPriceAndNonce(signer, groups, nil) + txset := NewTransactionsByPriceAndNonce(signer, groups, nil) txs := types.Transactions{} for tx, _ := txset.Peek(); tx != nil; tx, _ = txset.Peek() { diff --git a/eth/backend.go b/eth/backend.go index 5853697d64..7438486696 100644 --- a/eth/backend.go +++ b/eth/backend.go @@ -95,6 +95,7 @@ type Ethereum struct { config *ethconfig.Config txPool *txpool.TxPool blobTxPool *blobpool.BlobPool + blobCache *blobpool.Cache localTxTracker *locals.TxTracker blockchain *core.BlockChain @@ -325,6 +326,7 @@ func New(stack *node.Node, config *ethconfig.Config) (*Ethereum, error) { config.BlobPool.Datadir = stack.ResolvePath(config.BlobPool.Datadir) } eth.blobTxPool = blobpool.New(config.BlobPool, eth.blockchain, legacyPool.HasPendingAuth) + eth.blobCache = blobpool.NewCache(eth.blobTxPool) eth.txPool, err = txpool.New(config.TxPool.PriceLimit, eth.blockchain, []txpool.SubPool{legacyPool, eth.blobTxPool}) if err != nil { @@ -436,6 +438,7 @@ func (s *Ethereum) AccountManager() *accounts.Manager { return s.accountManager func (s *Ethereum) BlockChain() *core.BlockChain { return s.blockchain } func (s *Ethereum) TxPool() *txpool.TxPool { return s.txPool } func (s *Ethereum) BlobTxPool() *blobpool.BlobPool { return s.blobTxPool } +func (s *Ethereum) BlobCache() *blobpool.Cache { return s.blobCache } func (s *Ethereum) Engine() consensus.Engine { return s.engine } func (s *Ethereum) ChainDb() ethdb.Database { return s.chainDb } func (s *Ethereum) IsListening() bool { return true } // Always listening @@ -601,6 +604,7 @@ func (s *Ethereum) Stop() error { s.closeFilterMaps <- ch <-ch s.filterMaps.Stop() + s.blobCache.Stop() s.txPool.Close() s.blockchain.Stop() s.engine.Close() diff --git a/eth/catalyst/api.go b/eth/catalyst/api.go index 1de2c80848..4b0c3f6ea3 100644 --- a/eth/catalyst/api.go +++ b/eth/catalyst/api.go @@ -574,7 +574,7 @@ func (api *ConsensusAPI) GetBlobsV1(ctx context.Context, hashes []common.Hash) ( if len(hashes) > 128 { return nil, engine.TooLargeRequest.With(fmt.Errorf("requested blob count too large: %v", len(hashes))) } - blobs, _, proofs, err := api.eth.BlobTxPool().GetBlobs(ctx, hashes, types.BlobSidecarVersion0) + blobs, _, proofs, err := api.eth.BlobCache().GetBlobs(ctx, hashes, types.BlobSidecarVersion0) if err != nil { return nil, engine.InvalidParams.With(err) } @@ -656,7 +656,7 @@ func (api *ConsensusAPI) getBlobs(ctx context.Context, hashes []common.Hash, v2 return nil, engine.TooLargeRequest.With(fmt.Errorf("requested blob count too large: %v", len(hashes))) } available := 0 - for _, ok := range api.eth.BlobTxPool().AvailableBlobs(hashes) { + for _, ok := range api.eth.BlobCache().HasBlobs(ctx, hashes) { if ok { available++ } @@ -671,7 +671,7 @@ func (api *ConsensusAPI) getBlobs(ctx context.Context, hashes []common.Hash, v2 } // Retrieve blobs from the pool. This operation is expensive and may involve // heavy disk I/O. - blobs, _, proofs, err := api.eth.BlobTxPool().GetBlobs(ctx, hashes, types.BlobSidecarVersion1) + blobs, _, proofs, err := api.eth.BlobCache().GetBlobs(ctx, hashes, types.BlobSidecarVersion1) if err != nil { return nil, engine.InvalidParams.With(err) } @@ -710,7 +710,7 @@ func (api *ConsensusAPI) getBlobs(ctx context.Context, hashes []common.Hash, v2 // HasBlobs reports availability for the requested blob-versioned-hashes. func (api *ConsensusAPI) HasBlobs(hashes []common.Hash) []bool { - return api.eth.BlobTxPool().AvailableBlobs(hashes) + return api.eth.BlobCache().HasBlobs(context.Background(), hashes) } // Helper for NewPayload* methods. diff --git a/miner/worker.go b/miner/worker.go index b0e144c0ab..01a14b8a02 100644 --- a/miner/worker.go +++ b/miner/worker.go @@ -31,6 +31,7 @@ import ( "github.com/ethereum/go-ethereum/core/state" "github.com/ethereum/go-ethereum/core/stateless" "github.com/ethereum/go-ethereum/core/txpool" + "github.com/ethereum/go-ethereum/core/txpool/txorder" "github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/core/types/bal" "github.com/ethereum/go-ethereum/core/vm" @@ -422,7 +423,7 @@ func (miner *Miner) applyTransaction(env *environment, tx *types.Transaction) (* return receipt, bal, nil } -func (miner *Miner) commitTransactions(ctx context.Context, env *environment, plainTxs, blobTxs *transactionsByPriceAndNonce, interrupt *atomic.Int32) error { +func (miner *Miner) commitTransactions(ctx context.Context, env *environment, plainTxs, blobTxs *txorder.TransactionsByPriceAndNonce, interrupt *atomic.Int32) error { ctx, _, spanEnd := telemetry.StartSpan(ctx, "miner.commitTransactions") defer spanEnd(nil) @@ -449,7 +450,7 @@ func (miner *Miner) commitTransactions(ctx context.Context, env *environment, pl // Retrieve the next transaction and abort if all done. var ( ltx *txpool.LazyTransaction - txs *transactionsByPriceAndNonce + txs *txorder.TransactionsByPriceAndNonce ) pltx, ptip := plainTxs.Peek() bltx, btip := blobTxs.Peek() @@ -592,16 +593,16 @@ func (miner *Miner) fillTransactions(ctx context.Context, interrupt *atomic.Int3 } // Fill the block with all available pending transactions. if len(prioPlainTxs) > 0 || len(prioBlobTxs) > 0 { - plainTxs := newTransactionsByPriceAndNonce(env.signer, prioPlainTxs, env.header.BaseFee) - blobTxs := newTransactionsByPriceAndNonce(env.signer, prioBlobTxs, env.header.BaseFee) + plainTxs := txorder.NewTransactionsByPriceAndNonce(env.signer, prioPlainTxs, env.header.BaseFee) + blobTxs := txorder.NewTransactionsByPriceAndNonce(env.signer, prioBlobTxs, env.header.BaseFee) if err := miner.commitTransactions(ctx, env, plainTxs, blobTxs, interrupt); err != nil { return err } } if len(normalPlainTxs) > 0 || len(normalBlobTxs) > 0 { - plainTxs := newTransactionsByPriceAndNonce(env.signer, normalPlainTxs, env.header.BaseFee) - blobTxs := newTransactionsByPriceAndNonce(env.signer, normalBlobTxs, env.header.BaseFee) + plainTxs := txorder.NewTransactionsByPriceAndNonce(env.signer, normalPlainTxs, env.header.BaseFee) + blobTxs := txorder.NewTransactionsByPriceAndNonce(env.signer, normalBlobTxs, env.header.BaseFee) if err := miner.commitTransactions(ctx, env, plainTxs, blobTxs, interrupt); err != nil { return err