diff --git a/core/txpool/blobpool/blobpool.go b/core/txpool/blobpool/blobpool.go index 1ae2ef906a..fcd1f7566a 100644 --- a/core/txpool/blobpool/blobpool.go +++ b/core/txpool/blobpool/blobpool.go @@ -19,7 +19,6 @@ package blobpool import ( "container/heap" - "context" "errors" "fmt" "math" @@ -40,7 +39,6 @@ import ( "github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/crypto/kzg4844" "github.com/ethereum/go-ethereum/event" - "github.com/ethereum/go-ethereum/internal/telemetry" "github.com/ethereum/go-ethereum/log" "github.com/ethereum/go-ethereum/metrics" "github.com/ethereum/go-ethereum/params" @@ -1121,6 +1119,43 @@ func (p *BlobPool) Reset(oldHead, newHead *types.Header) { p.updateStorageMetrics() } +// vhashesByTx returns a snapshot of the mapping between transaction hash and +// versioned hashes. +func (p *BlobPool) vhashesByTx() map[common.Hash][]common.Hash { + p.lock.RLock() + defer p.lock.RUnlock() + + out := make(map[common.Hash][]common.Hash) + for _, txs := range p.index { + for _, tx := range txs { + out[tx.hash] = tx.vhashes + } + } + return out +} + +// getByVhash reads and decodes the blob transaction which has the given +// versioned hash. Returns nil if unavailable. +func (p *BlobPool) getByVhash(vhash common.Hash) *blobTxForPool { + p.lock.RLock() + txID, exists := p.lookup.storeidOfBlob(vhash) + p.lock.RUnlock() + if !exists { + return nil + } + data, err := p.store.Get(txID) + if err != nil { + log.Error("Tracked blob transaction missing from store", "id", txID, "err", err) + return nil + } + var ptx blobTxForPool + if err := rlp.DecodeBytes(data, &ptx); err != nil { + log.Error("Blobs corrupted for tracked transaction", "id", txID, "err", err) + return nil + } + return &ptx +} + // reorg assembles all the transactors and missing transactions between an old // and new head to figure out which account's tx set needs to be rechecked and // which transactions need to be requeued. @@ -1591,24 +1626,10 @@ func (p *BlobPool) GetMetadata(hash common.Hash) *txpool.TxMetadata { } } -// GetBlobs returns a number of blobs and proofs for the given versioned hashes. +// getBlobs returns a number of blobs and proofs for the given versioned hashes. // Blobpool must place responses in the order given in the request, using null // for any missing blobs. -// -// For instance, if the request is [A_versioned_hash, B_versioned_hash, -// C_versioned_hash] and blobpool has data for blobs A and C, but doesn't have -// data for B, the response MUST be [A, null, C]. -// -// This is a utility method for the engine API, enabling consensus clients to -// retrieve blobs from the pools directly instead of the network. -// -// The version argument specifies the type of proofs to return, either the -// blob proofs (version 0) or the cell proofs (version 1). Proofs conversion is -// CPU intensive and prohibited in the blobpool explicitly. -func (p *BlobPool) GetBlobs(ctx context.Context, vhashes []common.Hash, version byte) (_ []*kzg4844.Blob, _ []kzg4844.Commitment, _ [][]kzg4844.Proof, err error) { - _, _, spanEnd := telemetry.StartSpan(ctx, "blobpool.GetBlobs") - defer spanEnd(&err) - +func (p *BlobPool) getBlobs(vhashes []common.Hash, version byte) (_ []*kzg4844.Blob, _ []kzg4844.Commitment, _ [][]kzg4844.Proof, err error) { var ( blobs = make([]*kzg4844.Blob, len(vhashes)) commitments = make([]kzg4844.Commitment, len(vhashes)) @@ -1683,8 +1704,8 @@ func (p *BlobPool) GetBlobs(ctx context.Context, vhashes []common.Hash, version return blobs, commitments, proofs, nil } -// AvailableBlobs returns whether the blobs are available in the subpool. -func (p *BlobPool) AvailableBlobs(vhashes []common.Hash) []bool { +// availableBlobs returns whether the blobs are available in the subpool. +func (p *BlobPool) availableBlobs(vhashes []common.Hash) []bool { available := make([]bool, len(vhashes)) p.lock.RLock() for i, vhash := range vhashes { diff --git a/core/txpool/blobpool/blobpool_test.go b/core/txpool/blobpool/blobpool_test.go index dfdaaf51b6..6b61591227 100644 --- a/core/txpool/blobpool/blobpool_test.go +++ b/core/txpool/blobpool/blobpool_test.go @@ -18,7 +18,6 @@ package blobpool import ( "bytes" - "context" "crypto/ecdsa" "crypto/sha256" "errors" @@ -441,11 +440,11 @@ func verifyBlobRetrievals(t *testing.T, pool *BlobPool) { hashes = append(hashes, tx.vhashes...) } } - blobs1, _, proofs1, err := pool.GetBlobs(context.Background(), hashes, types.BlobSidecarVersion0) + blobs1, _, proofs1, err := pool.getBlobs(hashes, types.BlobSidecarVersion0) if err != nil { t.Fatal(err) } - blobs2, _, proofs2, err := pool.GetBlobs(context.Background(), hashes, types.BlobSidecarVersion1) + blobs2, _, proofs2, err := pool.getBlobs(hashes, types.BlobSidecarVersion1) if err != nil { t.Fatal(err) } @@ -2088,7 +2087,7 @@ func TestGetBlobs(t *testing.T) { filled[len(vhashes)] = struct{}{} vhashes = append(vhashes, testrand.Hash()) } - blobs, _, proofs, err := pool.GetBlobs(context.Background(), vhashes, c.version) + blobs, _, proofs, err := pool.getBlobs(vhashes, c.version) if err != nil { t.Errorf("Unexpected error for case %d, %v", i, err) } diff --git a/core/txpool/blobpool/cache.go b/core/txpool/blobpool/cache.go new file mode 100644 index 0000000000..de4001a1b1 --- /dev/null +++ b/core/txpool/blobpool/cache.go @@ -0,0 +1,442 @@ +// Copyright 2026 The go-ethereum Authors +// This file is part of the go-ethereum library. +// +// The go-ethereum library is free software: you can redistribute it and/or modify +// it under the terms of the GNU Lesser General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// The go-ethereum library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the go-ethereum library. If not, see . + +package blobpool + +import ( + "context" + "sync" + "time" + + "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/common/mclock" + "github.com/ethereum/go-ethereum/consensus/misc/eip1559" + "github.com/ethereum/go-ethereum/consensus/misc/eip4844" + "github.com/ethereum/go-ethereum/core/txpool" + "github.com/ethereum/go-ethereum/core/txpool/txorder" + "github.com/ethereum/go-ethereum/core/types" + "github.com/ethereum/go-ethereum/crypto/kzg4844" + "github.com/ethereum/go-ethereum/internal/telemetry" + "github.com/ethereum/go-ethereum/log" + "github.com/ethereum/go-ethereum/metrics" + "github.com/holiman/uint256" +) + +const ( + topKTimeout = 4 * time.Second + hasBlobsTimeout = 1 * time.Second +) + +var ( + // Cache tracks 3 metrics: cache hit, cache miss, and the number of blobs + // it contains. Note that cache miss includes the blobs that we are actually + // missing on the lower level (in this case, the blobpool). The amount that + // we failed to predict) can be calculated with the telemetry span + // (blobs.filled - cache.hit). + cacheHitMeter = metrics.NewRegisteredMeter("blobpool/cache/hit", nil) + cacheMissMeter = metrics.NewRegisteredMeter("blobpool/cache/miss", nil) + cacheBlobsGauge = metrics.NewRegisteredGauge("blobpool/cache/blobs", nil) +) + +type cachedBlob struct { + blob *kzg4844.Blob + commitment kzg4844.Commitment + proofs []kzg4844.Proof + version byte +} + +// Cache holds the blobs that are likely to be requested by the GetBlobs engine API. +// +// Every `topKTimeout`, the cache selects the blobs of the top K most profitable +// transactions, and preloads them into the cache. +// +// For HasBlobs requests, it also causes the blobs requested by the CL to be loaded. +// (Note: the cache is not guaranteed to always hold such blobs, since the blobpool might +// drop the transaction in the window between the engine API response and the cache +// update.) +type Cache struct { + blobpool *BlobPool + clock mclock.Clock + + mu sync.Mutex + entries map[common.Hash]*cachedBlob + + // channels into loop + quit chan struct{} + topkRequest chan struct{} + topkTimer mclock.Timer + hasBlobsCh chan []common.Hash // list of tx hashes that should be pinned + + step func() // test hook fired after each loop iteration + + cancelInflights context.CancelFunc // cancels the conversion/decode goroutines + inflight sync.WaitGroup // tracks all in-flight conversion/decode goroutines + wg sync.WaitGroup // tracks the loop goroutine +} + +// NewCache creates a blob cache backed by the given blobpool. +func NewCache(p *BlobPool) *Cache { + return newCache(p, mclock.System{}, nil) +} + +// newCache creates a blob cache for testing purposes. +// It allows injecting a clock and a step hook. +func newCache(p *BlobPool, clock mclock.Clock, step func()) *Cache { + c := &Cache{ + entries: make(map[common.Hash]*cachedBlob), + blobpool: p, + hasBlobsCh: make(chan []common.Hash, 1), + clock: clock, + step: step, + quit: make(chan struct{}), + topkRequest: make(chan struct{}, 1), + } + + c.wg.Add(1) + go c.loop() + return c +} + +// Stop terminates the cache loop and blocks until it and any in-flight work +// have stopped. +func (c *Cache) Stop() { + close(c.quit) + c.wg.Wait() +} + +// HasBlobs reports whether the blob is available (in the cache or the +// blobpool) and asks the loop to pin the ones it found. +func (c *Cache) HasBlobs(ctx context.Context, vhashes []common.Hash) []bool { + var ( + missIdx []int + missVhashes []common.Hash + needPin []common.Hash // available vhashes + ) + available := make([]bool, len(vhashes)) + + // First check cache and pass missing ones to blobpool. + c.mu.Lock() + for i, vhash := range vhashes { + if _, ok := c.entries[vhash]; ok { + available[i] = true + needPin = append(needPin, vhash) + } else { + missIdx = append(missIdx, i) + missVhashes = append(missVhashes, vhash) + } + } + c.mu.Unlock() + + if len(missVhashes) > 0 { + pooled := c.blobpool.availableBlobs(missVhashes) + // Merge two results + for j, ok := range pooled { + if ok { + available[missIdx[j]] = true + needPin = append(needPin, missVhashes[j]) + } + } + } + + select { + case c.hasBlobsCh <- needPin: + // Note that we also send the ones we already have in cache, + // since it can be dropped from the cache before this signal is processed. + return available + case <-c.quit: + return nil + } +} + +// GetBlobs returns the blobs and proofs for the given versioned hashes, serving +// them from the cache when possible and falling back to the blobpool for misses. +// Responses are placed in the order given in the request, using null for any +// missing blob. +// +// For instance, if the request is [A_versioned_hash, B_versioned_hash, +// C_versioned_hash] and blobpool has data for blobs A and C, but doesn't have +// data for B, the response MUST be [A, null, C]. +// +// This is a utility method for the engine API, enabling consensus clients to +// retrieve blobs from the pools directly instead of the network. +// +// The version argument specifies the type of proofs to return, either the +// blob proofs (version 0) or the cell proofs (version 1). Proofs conversion is +// CPU intensive and prohibited explicitly. +func (c *Cache) GetBlobs(ctx context.Context, vhashes []common.Hash, version byte) (_ []*kzg4844.Blob, _ []kzg4844.Commitment, _ [][]kzg4844.Proof, err error) { + _, span, spanEnd := telemetry.StartSpan(ctx, "blobpool.GetBlobs") + defer spanEnd(&err) + var ( + blobs = make([]*kzg4844.Blob, len(vhashes)) + commitments = make([]kzg4844.Commitment, len(vhashes)) + proofs = make([][]kzg4844.Proof, len(vhashes)) + + indices = make(map[common.Hash][]int) + misses []common.Hash + + cacheHits int + cacheMiss int + ) + for i, h := range vhashes { + indices[h] = append(indices[h], i) + } + + c.mu.Lock() + for vhash, idxs := range indices { + n := len(idxs) + + cached := c.entries[vhash] + if cached == nil || cached.version != version { + cacheMiss += n + if cached == nil { + misses = append(misses, vhash) + } + continue + } + cacheHits += n + for _, index := range idxs { + blobs[index] = cached.blob + commitments[index] = cached.commitment + proofs[index] = cached.proofs + } + } + c.mu.Unlock() + + if len(misses) > 0 { + mb, mc, mp, err := c.blobpool.getBlobs(misses, version) + if err != nil { + return nil, nil, nil, err + } + for j, vhash := range misses { + if mb[j] == nil { + continue + } + for _, index := range indices[vhash] { + blobs[index] = mb[j] + commitments[index] = mc[j] + proofs[index] = mp[j] + } + } + } + cacheHitMeter.Mark(int64(cacheHits)) + cacheMissMeter.Mark(int64(cacheMiss)) + span.SetAttributes( + telemetry.IntAttribute("cache.hit", cacheHits), + telemetry.IntAttribute("cache.miss", cacheMiss), + ) + + return blobs, commitments, proofs, nil +} + +func (c *Cache) loop() { + defer c.wg.Done() + + c.triggerTopK() + for { + select { + case want := <-c.hasBlobsCh: + // HasBlobs request was received. + // Update the cache once with the requested blobs, then reschedule topK. + c.update(want) + c.triggerTopKAfter(hasBlobsTimeout) + + case <-c.topkRequest: + want := c.selectTopTxs() + c.update(want) + c.triggerTopKAfter(topKTimeout) + + case <-c.quit: + c.cancelUpdate() + if c.topkTimer != nil { + c.topkTimer.Stop() + } + c.inflight.Wait() + return + } + + if c.step != nil { + c.step() + } + } +} + +// cancelUpdate stops the current update. +func (c *Cache) cancelUpdate() { + if c.cancelInflights != nil { + c.cancelInflights() + c.cancelInflights = nil + } +} + +// update updates the cache to hold the wanted vhashes. It evicts entries that +// are no longer wanted and loads the missing ones from the blobpool in the +// background. +func (c *Cache) update(want []common.Hash) { + wantSet := make(map[common.Hash]struct{}, len(want)) + for _, vh := range want { + wantSet[vh] = struct{}{} + } + + // Cancel the current updates. + c.cancelUpdate() + ctx, cancel := context.WithCancel(context.Background()) + c.cancelInflights = cancel + + c.mu.Lock() + var missing []common.Hash + for vh := range wantSet { + if _, ok := c.entries[vh]; !ok { + missing = append(missing, vh) + } + } + for vh := range c.entries { + if _, ok := wantSet[vh]; ok { + continue + } + delete(c.entries, vh) + cacheBlobsGauge.Dec(1) + } + c.mu.Unlock() + + c.inflight.Add(1) + go func() { + defer c.inflight.Done() + for _, vh := range missing { + select { + case <-ctx.Done(): + return + default: + } + c.mu.Lock() + _, loaded := c.entries[vh] + c.mu.Unlock() + if loaded { + continue + } + ptx := c.blobpool.getByVhash(vh) + if ptx == nil { + continue + } + sidecar := ptx.Sidecar() + if sidecar == nil { + continue + } + + c.mu.Lock() + for i, v := range sidecar.BlobHashes() { + if _, ok := wantSet[v]; !ok { + continue + } + if _, exists := c.entries[v]; exists { + continue // recompute only new entries + } + var pf []kzg4844.Proof + switch sidecar.Version { + case types.BlobSidecarVersion0: + pf = []kzg4844.Proof{sidecar.Proofs[i]} + case types.BlobSidecarVersion1: + cellProofs, err := sidecar.CellProofsAt(i) + if err != nil { + log.Error("Failed to get cell proofs", "txhash", ptx.Tx.Hash(), "err", err) + continue + } + pf = cellProofs + } + c.entries[v] = &cachedBlob{ + blob: &sidecar.Blobs[i], + commitment: sidecar.Commitments[i], + proofs: pf, + version: sidecar.Version, + } + cacheBlobsGauge.Inc(1) + } + c.mu.Unlock() + } + }() +} + +// selectTopTxs returns the vhashes of the top K most profitable pending blob +// transactions, up to the active fork's maxBlobsPerBlock. +func (c *Cache) selectTopTxs() []common.Hash { + p := c.blobpool + head := p.head.Load() + if head == nil { + return nil + } + config := p.chain.Config() + baseFee := eip1559.CalcBaseFee(config, head) + + filter := txpool.PendingFilter{ + BlobTxs: true, + BaseFee: uint256.MustFromBig(baseFee), + } + if head.ExcessBlobGas != nil { + filter.BlobFee = uint256.MustFromBig(eip4844.CalcBlobFee(config, head)) + } + if config.IsOsaka(head.Number, head.Time) { + filter.BlobVersion = types.BlobSidecarVersion1 + } else { + filter.BlobVersion = types.BlobSidecarVersion0 + } + pending, _ := p.Pending(filter) + vhashesOf := p.vhashesByTx() + + order := txorder.NewTransactionsByPriceAndNonce(p.signer, pending, baseFee) + + // Bound the selection by the active fork's blob limit so the cache follows + // BPO changes to maxBlobsPerBlock. + target := uint(eip4844.MaxBlobsPerBlock(config, head.Time)) + + var ( + vhashes []common.Hash + blobs uint + ) + for blobs < target { + tx, _ := order.Peek() + if tx == nil { + break + } + vh, ok := vhashesOf[tx.Hash] + if ok { + vhashes = append(vhashes, vh...) + blobs += uint(len(vh)) + } + order.Shift() + } + return vhashes +} + +// triggerTopKAfter makes a topK selection happen after the given interval. +func (c *Cache) triggerTopKAfter(interval time.Duration) { + if c.topkTimer != nil { + c.topkTimer.Stop() + } + // drain current request to avoid triggering before the interval + select { + case <-c.topkRequest: + default: + } + c.topkTimer = c.clock.AfterFunc(interval, c.triggerTopK) +} + +// triggerTopK causes another topK selection to happen. +// Note this is safe to call from anywhere, even outside of the loop goroutine. +func (c *Cache) triggerTopK() { + select { + case c.topkRequest <- struct{}{}: + default: + } +} diff --git a/core/txpool/blobpool/cache_test.go b/core/txpool/blobpool/cache_test.go new file mode 100644 index 0000000000..f7ece4c1b8 --- /dev/null +++ b/core/txpool/blobpool/cache_test.go @@ -0,0 +1,297 @@ +// Copyright 2026 The go-ethereum Authors +// This file is part of the go-ethereum library. +// +// The go-ethereum library is free software: you can redistribute it and/or modify +// it under the terms of the GNU Lesser General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// The go-ethereum library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the go-ethereum library. If not, see . + +package blobpool + +import ( + "context" + "math/big" + "os" + "path/filepath" + "reflect" + "sort" + "testing" + "time" + + "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/common/mclock" + "github.com/ethereum/go-ethereum/core/state" + "github.com/ethereum/go-ethereum/core/tracing" + "github.com/ethereum/go-ethereum/core/types" + "github.com/ethereum/go-ethereum/crypto" + "github.com/ethereum/go-ethereum/params" + "github.com/holiman/billy" + "github.com/holiman/uint256" +) + +type txSpec struct { + blobs int + tip uint64 +} + +type testCache struct { + *Cache + clock *mclock.Simulated + iterCh chan struct{} + vhashes [][]common.Hash // vhashes in the pool + offset int // next blob index to use when injecting more txs +} + +// newTestCache creates a cache for test, with a pool that contains transactions +// specified in txConfig. The returned cache has the initial topK fetch already +// settled. +func newTestCache(t *testing.T, txConfig []txSpec) *testCache { + storage := t.TempDir() + if err := os.MkdirAll(filepath.Join(storage, pendingTransactionStore), 0700); err != nil { + t.Fatalf("mkdir: %v", err) + } + store, err := billy.Open(billy.Options{Path: filepath.Join(storage, pendingTransactionStore)}, newSlotter(params.BlobTxMaxBlobs), nil) + if err != nil { + t.Fatalf("billy open: %v", err) + } + + var ( + addrs = make([]common.Address, 0, len(txConfig)) + vhashes = make([][]common.Hash, 0, len(txConfig)) + offset int + ) + for _, s := range txConfig { + key, _ := crypto.GenerateKey() + tx := makeMultiBlobTx(0, s.tip, 1_000_000, 1_000_000, s.blobs, offset, key, types.BlobSidecarVersion1) + if _, err := store.Put(encodeForPool(tx)); err != nil { + t.Fatalf("store put: %v", err) + } + addrs = append(addrs, crypto.PubkeyToAddress(key.PublicKey)) + vhashes = append(vhashes, tx.BlobHashes()) + offset += s.blobs + } + store.Close() + + statedb, _ := state.New(types.EmptyRootHash, state.NewDatabaseForTesting()) + for _, a := range addrs { + statedb.AddBalance(a, uint256.NewInt(1_000_000_000_000), tracing.BalanceChangeUnspecified) + } + statedb.Commit(0, true, false) + + cancunTime := uint64(0) + config := ¶ms.ChainConfig{ + ChainID: big.NewInt(1), + LondonBlock: big.NewInt(0), + BerlinBlock: big.NewInt(0), + CancunTime: &cancunTime, + OsakaTime: &cancunTime, + BlobScheduleConfig: ¶ms.BlobScheduleConfig{ + Osaka: ¶ms.BlobConfig{ + Target: 1, + Max: 1, + UpdateFraction: params.DefaultCancunBlobConfig.UpdateFraction, + }, + }, + } + chain := &testBlockChain{ + config: config, + basefee: uint256.NewInt(1), + blobfee: uint256.NewInt(1), + statedb: statedb, + } + pool := New(Config{Datadir: storage}, chain, nil) + if err := pool.Init(1, chain.CurrentBlock(), newReserver()); err != nil { + t.Fatalf("init pool: %v", err) + } + t.Cleanup(func() { pool.Close() }) + + clock := &mclock.Simulated{} + iterCh := make(chan struct{}, 256) + step := func() { + select { + case iterCh <- struct{}{}: + default: + } + } + cache := newCache(pool, clock, step) + + tc := &testCache{ + Cache: cache, + clock: clock, + iterCh: iterCh, + vhashes: vhashes, + offset: offset, + } + // The loop performs the initial topK update immediately on startup and then + // arms the topK timer. Wait for the timer so we know the initial update has + // been issued, then let it settle. + clock.WaitForTimers(1) + tc.wait(t, 0) + return tc +} + +// inject adds a tx with the given spec directly to the pool's index and store, +// bypassing the normal Add path. Returns the tx's blob versioned hashes. +func (tc *testCache) inject(t *testing.T, spec txSpec) []common.Hash { + t.Helper() + key, _ := crypto.GenerateKey() + tx := makeMultiBlobTx(0, spec.tip, 1_000_000, 1_000_000, spec.blobs, tc.offset, key, types.BlobSidecarVersion1) + tc.offset += spec.blobs + + ptx := newBlobTxForPool(tx) + + tc.blobpool.lock.Lock() + defer tc.blobpool.lock.Unlock() + + id, err := tc.blobpool.store.Put(encodeForPool(tx)) + if err != nil { + t.Fatalf("store put: %v", err) + } + meta := newBlobTxMeta(id, ptx.TxSize(), tc.blobpool.store.Size(id), ptx) + addr := crypto.PubkeyToAddress(key.PublicKey) + tc.blobpool.index[addr] = append(tc.blobpool.index[addr], meta) + tc.blobpool.lookup.track(meta) + + return tx.BlobHashes() +} + +// wait advances simulated time by d (if > 0) and then blocks until the cache +// loop and any inflight fetch goroutines have settled. +func (tc *testCache) wait(t *testing.T, d time.Duration) { + t.Helper() + if d > 0 { + tc.clock.Run(d) + } + for { + select { + case <-tc.iterCh: + tc.inflight.Wait() + case <-time.After(50 * time.Millisecond): + tc.inflight.Wait() + return + } + } +} + +func (tc *testCache) expectEntries(t *testing.T, want ...common.Hash) { + t.Helper() + wantSet := make(map[common.Hash]struct{}, len(want)) + for _, w := range want { + wantSet[w] = struct{}{} + } + tc.mu.Lock() + have := make(map[common.Hash]struct{}, len(tc.entries)) + for k := range tc.entries { + have[k] = struct{}{} + } + tc.mu.Unlock() + if !reflect.DeepEqual(have, wantSet) { + t.Errorf("entries: got %s, want %s", hashSet(have), hashSet(wantSet)) + } +} + +func hashSet(m map[common.Hash]struct{}) []string { + out := make([]string, 0, len(m)) + for h := range m { + out = append(out, h.Hex()[:10]) + } + sort.Strings(out) + return out +} + +// TestCacheHasBlobsLoadsClaimedSet checks that a HasBlobs request loads +// exactly the txs whose vhashes the cache claimed available, regardless of +// whether the claim came from the cache itself or from the pool fallback. +func TestCacheHasBlobsLoadsClaimedSet(t *testing.T) { + tc := newTestCache(t, []txSpec{ + {blobs: 2, tip: 100}, + {blobs: 2, tip: 200}, + {blobs: 2, tip: 300}, + }) + + available := tc.HasBlobs(context.Background(), tc.vhashes[1]) + if !available[0] { + t.Fatalf("expected vhash to be reported available") + } + tc.wait(t, 0) + + tc.expectEntries(t, tc.vhashes[1]...) +} + +// TestCacheTopK exercises the initial topK update: after it settles in +// newTestCache, the cache entries equal the top-by-tip txs. +func TestCacheTopK(t *testing.T) { + tc := newTestCache(t, []txSpec{ + {blobs: 1, tip: 100}, + {blobs: 1, tip: 200}, + {blobs: 1, tip: 300}, + }) + + tc.expectEntries(t, tc.vhashes[2]...) +} + +// TestCacheHbTimerFallsBackToTopK checks the fallback after a HasBlobs +// request: when hasBlobsTimeout elapses, a topK update replaces the entries +// with the topK set. +func TestCacheHbTimerFallsBackToTopK(t *testing.T) { + tc := newTestCache(t, []txSpec{ + {blobs: 1, tip: 100}, + {blobs: 1, tip: 300}, + }) + + tc.HasBlobs(context.Background(), tc.vhashes[0]) + tc.wait(t, 0) + tc.expectEntries(t, tc.vhashes[0]...) + + tc.wait(t, hasBlobsTimeout) + tc.expectEntries(t, tc.vhashes[1]...) +} + +// TestCacheGetBlobs checks that GetBlobs returns the requested blobs and does +// not disturb the cached entries. +func TestCacheGetBlobs(t *testing.T) { + tc := newTestCache(t, []txSpec{ + {blobs: 1, tip: 100}, + {blobs: 1, tip: 300}, + }) + tc.expectEntries(t, tc.vhashes[1]...) + + blobs, _, proofs, err := tc.GetBlobs(context.Background(), tc.vhashes[1], types.BlobSidecarVersion1) + if err != nil { + t.Fatalf("GetBlobs: %v", err) + } + for i := range blobs { + if blobs[i] == nil { + t.Errorf("blob %d missing in GetBlobs response", i) + } + if len(proofs[i]) == 0 { + t.Errorf("proofs %d missing in GetBlobs response", i) + } + } + tc.wait(t, 0) + tc.expectEntries(t, tc.vhashes[1]...) +} + +// TestCacheTopKRefresh verifies that when a more profitable tx appears in the +// pool, the next topK tick replaces the cached entry with the better one. +func TestCacheTopKRefresh(t *testing.T) { + tc := newTestCache(t, []txSpec{ + {blobs: 1, tip: 100}, + {blobs: 1, tip: 200}, + {blobs: 1, tip: 300}, + }) + tc.expectEntries(t, tc.vhashes[2]...) + + better := tc.inject(t, txSpec{blobs: 1, tip: 400}) + + tc.wait(t, topKTimeout) + tc.expectEntries(t, better...) +} diff --git a/miner/ordering.go b/core/txpool/txorder/ordering.go similarity index 90% rename from miner/ordering.go rename to core/txpool/txorder/ordering.go index bcf7af46e8..a17a2cc251 100644 --- a/miner/ordering.go +++ b/core/txpool/txorder/ordering.go @@ -14,7 +14,7 @@ // You should have received a copy of the GNU Lesser General Public License // along with the go-ethereum library. If not, see . -package miner +package txorder import ( "container/heap" @@ -83,10 +83,10 @@ func (s *txByPriceAndTime) Pop() interface{} { return x } -// transactionsByPriceAndNonce represents a set of transactions that can return +// TransactionsByPriceAndNonce represents a set of transactions that can return // transactions in a profit-maximizing sorted order, while supporting removing // entire batches of transactions for non-executable accounts. -type transactionsByPriceAndNonce struct { +type TransactionsByPriceAndNonce struct { txs map[common.Address][]*txpool.LazyTransaction // Per account nonce-sorted list of transactions heads txByPriceAndTime // Next transaction for each unique account (price heap) signer types.Signer // Signer for the set of transactions @@ -98,7 +98,7 @@ type transactionsByPriceAndNonce struct { // // Note, the input map is reowned so the caller should not interact any more with // if after providing it to the constructor. -func newTransactionsByPriceAndNonce(signer types.Signer, txs map[common.Address][]*txpool.LazyTransaction, baseFee *big.Int) *transactionsByPriceAndNonce { +func NewTransactionsByPriceAndNonce(signer types.Signer, txs map[common.Address][]*txpool.LazyTransaction, baseFee *big.Int) *TransactionsByPriceAndNonce { // Convert the basefee from header format to uint256 format var baseFeeUint *uint256.Int if baseFee != nil { @@ -118,7 +118,7 @@ func newTransactionsByPriceAndNonce(signer types.Signer, txs map[common.Address] heap.Init(&heads) // Assemble and return the transaction set - return &transactionsByPriceAndNonce{ + return &TransactionsByPriceAndNonce{ txs: txs, heads: heads, signer: signer, @@ -127,7 +127,7 @@ func newTransactionsByPriceAndNonce(signer types.Signer, txs map[common.Address] } // Peek returns the next transaction by price. -func (t *transactionsByPriceAndNonce) Peek() (*txpool.LazyTransaction, *uint256.Int) { +func (t *TransactionsByPriceAndNonce) Peek() (*txpool.LazyTransaction, *uint256.Int) { if len(t.heads) == 0 { return nil, nil } @@ -135,7 +135,7 @@ func (t *transactionsByPriceAndNonce) Peek() (*txpool.LazyTransaction, *uint256. } // Shift replaces the current best head with the next one from the same account. -func (t *transactionsByPriceAndNonce) Shift() { +func (t *TransactionsByPriceAndNonce) Shift() { acc := t.heads[0].from if txs, ok := t.txs[acc]; ok && len(txs) > 0 { if wrapped, err := newTxWithMinerFee(txs[0], acc, t.baseFee); err == nil { @@ -150,17 +150,17 @@ func (t *transactionsByPriceAndNonce) Shift() { // Pop removes the best transaction, *not* replacing it with the next one from // the same account. This should be used when a transaction cannot be executed // and hence all subsequent ones should be discarded from the same account. -func (t *transactionsByPriceAndNonce) Pop() { +func (t *TransactionsByPriceAndNonce) Pop() { heap.Pop(&t.heads) } // Empty returns if the price heap is empty. It can be used to check it simpler // than calling peek and checking for nil return. -func (t *transactionsByPriceAndNonce) Empty() bool { +func (t *TransactionsByPriceAndNonce) Empty() bool { return len(t.heads) == 0 } // Clear removes the entire content of the heap. -func (t *transactionsByPriceAndNonce) Clear() { +func (t *TransactionsByPriceAndNonce) Clear() { t.heads, t.txs = nil, nil } diff --git a/miner/ordering_test.go b/core/txpool/txorder/ordering_test.go similarity index 97% rename from miner/ordering_test.go rename to core/txpool/txorder/ordering_test.go index 5a45261570..cb2b3bc7a2 100644 --- a/miner/ordering_test.go +++ b/core/txpool/txorder/ordering_test.go @@ -14,7 +14,7 @@ // You should have received a copy of the GNU Lesser General Public License // along with the go-ethereum library. If not, see . -package miner +package txorder import ( "crypto/ecdsa" @@ -102,7 +102,7 @@ func testTransactionPriceNonceSort(t *testing.T, baseFee *big.Int) { expectedCount += count } // Sort the transactions and cross check the nonce ordering - txset := newTransactionsByPriceAndNonce(signer, groups, baseFee) + txset := NewTransactionsByPriceAndNonce(signer, groups, baseFee) txs := types.Transactions{} for tx, _ := txset.Peek(); tx != nil; tx, _ = txset.Peek() { @@ -168,7 +168,7 @@ func TestTransactionTimeSort(t *testing.T) { }) } // Sort the transactions and cross check the nonce ordering - txset := newTransactionsByPriceAndNonce(signer, groups, nil) + txset := NewTransactionsByPriceAndNonce(signer, groups, nil) txs := types.Transactions{} for tx, _ := txset.Peek(); tx != nil; tx, _ = txset.Peek() { diff --git a/eth/backend.go b/eth/backend.go index 5853697d64..7438486696 100644 --- a/eth/backend.go +++ b/eth/backend.go @@ -95,6 +95,7 @@ type Ethereum struct { config *ethconfig.Config txPool *txpool.TxPool blobTxPool *blobpool.BlobPool + blobCache *blobpool.Cache localTxTracker *locals.TxTracker blockchain *core.BlockChain @@ -325,6 +326,7 @@ func New(stack *node.Node, config *ethconfig.Config) (*Ethereum, error) { config.BlobPool.Datadir = stack.ResolvePath(config.BlobPool.Datadir) } eth.blobTxPool = blobpool.New(config.BlobPool, eth.blockchain, legacyPool.HasPendingAuth) + eth.blobCache = blobpool.NewCache(eth.blobTxPool) eth.txPool, err = txpool.New(config.TxPool.PriceLimit, eth.blockchain, []txpool.SubPool{legacyPool, eth.blobTxPool}) if err != nil { @@ -436,6 +438,7 @@ func (s *Ethereum) AccountManager() *accounts.Manager { return s.accountManager func (s *Ethereum) BlockChain() *core.BlockChain { return s.blockchain } func (s *Ethereum) TxPool() *txpool.TxPool { return s.txPool } func (s *Ethereum) BlobTxPool() *blobpool.BlobPool { return s.blobTxPool } +func (s *Ethereum) BlobCache() *blobpool.Cache { return s.blobCache } func (s *Ethereum) Engine() consensus.Engine { return s.engine } func (s *Ethereum) ChainDb() ethdb.Database { return s.chainDb } func (s *Ethereum) IsListening() bool { return true } // Always listening @@ -601,6 +604,7 @@ func (s *Ethereum) Stop() error { s.closeFilterMaps <- ch <-ch s.filterMaps.Stop() + s.blobCache.Stop() s.txPool.Close() s.blockchain.Stop() s.engine.Close() diff --git a/eth/catalyst/api.go b/eth/catalyst/api.go index 1de2c80848..4b0c3f6ea3 100644 --- a/eth/catalyst/api.go +++ b/eth/catalyst/api.go @@ -574,7 +574,7 @@ func (api *ConsensusAPI) GetBlobsV1(ctx context.Context, hashes []common.Hash) ( if len(hashes) > 128 { return nil, engine.TooLargeRequest.With(fmt.Errorf("requested blob count too large: %v", len(hashes))) } - blobs, _, proofs, err := api.eth.BlobTxPool().GetBlobs(ctx, hashes, types.BlobSidecarVersion0) + blobs, _, proofs, err := api.eth.BlobCache().GetBlobs(ctx, hashes, types.BlobSidecarVersion0) if err != nil { return nil, engine.InvalidParams.With(err) } @@ -656,7 +656,7 @@ func (api *ConsensusAPI) getBlobs(ctx context.Context, hashes []common.Hash, v2 return nil, engine.TooLargeRequest.With(fmt.Errorf("requested blob count too large: %v", len(hashes))) } available := 0 - for _, ok := range api.eth.BlobTxPool().AvailableBlobs(hashes) { + for _, ok := range api.eth.BlobCache().HasBlobs(ctx, hashes) { if ok { available++ } @@ -671,7 +671,7 @@ func (api *ConsensusAPI) getBlobs(ctx context.Context, hashes []common.Hash, v2 } // Retrieve blobs from the pool. This operation is expensive and may involve // heavy disk I/O. - blobs, _, proofs, err := api.eth.BlobTxPool().GetBlobs(ctx, hashes, types.BlobSidecarVersion1) + blobs, _, proofs, err := api.eth.BlobCache().GetBlobs(ctx, hashes, types.BlobSidecarVersion1) if err != nil { return nil, engine.InvalidParams.With(err) } @@ -710,7 +710,7 @@ func (api *ConsensusAPI) getBlobs(ctx context.Context, hashes []common.Hash, v2 // HasBlobs reports availability for the requested blob-versioned-hashes. func (api *ConsensusAPI) HasBlobs(hashes []common.Hash) []bool { - return api.eth.BlobTxPool().AvailableBlobs(hashes) + return api.eth.BlobCache().HasBlobs(context.Background(), hashes) } // Helper for NewPayload* methods. diff --git a/miner/worker.go b/miner/worker.go index b0e144c0ab..01a14b8a02 100644 --- a/miner/worker.go +++ b/miner/worker.go @@ -31,6 +31,7 @@ import ( "github.com/ethereum/go-ethereum/core/state" "github.com/ethereum/go-ethereum/core/stateless" "github.com/ethereum/go-ethereum/core/txpool" + "github.com/ethereum/go-ethereum/core/txpool/txorder" "github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/core/types/bal" "github.com/ethereum/go-ethereum/core/vm" @@ -422,7 +423,7 @@ func (miner *Miner) applyTransaction(env *environment, tx *types.Transaction) (* return receipt, bal, nil } -func (miner *Miner) commitTransactions(ctx context.Context, env *environment, plainTxs, blobTxs *transactionsByPriceAndNonce, interrupt *atomic.Int32) error { +func (miner *Miner) commitTransactions(ctx context.Context, env *environment, plainTxs, blobTxs *txorder.TransactionsByPriceAndNonce, interrupt *atomic.Int32) error { ctx, _, spanEnd := telemetry.StartSpan(ctx, "miner.commitTransactions") defer spanEnd(nil) @@ -449,7 +450,7 @@ func (miner *Miner) commitTransactions(ctx context.Context, env *environment, pl // Retrieve the next transaction and abort if all done. var ( ltx *txpool.LazyTransaction - txs *transactionsByPriceAndNonce + txs *txorder.TransactionsByPriceAndNonce ) pltx, ptip := plainTxs.Peek() bltx, btip := blobTxs.Peek() @@ -592,16 +593,16 @@ func (miner *Miner) fillTransactions(ctx context.Context, interrupt *atomic.Int3 } // Fill the block with all available pending transactions. if len(prioPlainTxs) > 0 || len(prioBlobTxs) > 0 { - plainTxs := newTransactionsByPriceAndNonce(env.signer, prioPlainTxs, env.header.BaseFee) - blobTxs := newTransactionsByPriceAndNonce(env.signer, prioBlobTxs, env.header.BaseFee) + plainTxs := txorder.NewTransactionsByPriceAndNonce(env.signer, prioPlainTxs, env.header.BaseFee) + blobTxs := txorder.NewTransactionsByPriceAndNonce(env.signer, prioBlobTxs, env.header.BaseFee) if err := miner.commitTransactions(ctx, env, plainTxs, blobTxs, interrupt); err != nil { return err } } if len(normalPlainTxs) > 0 || len(normalBlobTxs) > 0 { - plainTxs := newTransactionsByPriceAndNonce(env.signer, normalPlainTxs, env.header.BaseFee) - blobTxs := newTransactionsByPriceAndNonce(env.signer, normalBlobTxs, env.header.BaseFee) + plainTxs := txorder.NewTransactionsByPriceAndNonce(env.signer, normalPlainTxs, env.header.BaseFee) + blobTxs := txorder.NewTransactionsByPriceAndNonce(env.signer, normalBlobTxs, env.header.BaseFee) if err := miner.commitTransactions(ctx, env, plainTxs, blobTxs, interrupt); err != nil { return err