go-ethereum/core/txpool/blobpool/cache.go
2026-06-26 14:02:26 +02:00

588 lines
16 KiB
Go

// Copyright 2026 The go-ethereum Authors
// This file is part of the go-ethereum library.
//
// The go-ethereum library is free software: you can redistribute it and/or modify
// it under the terms of the GNU Lesser General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// The go-ethereum library is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Lesser General Public License for more details.
//
// You should have received a copy of the GNU Lesser General Public License
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
package blobpool
import (
"context"
"sync"
"time"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/common/mclock"
"github.com/ethereum/go-ethereum/consensus/misc/eip1559"
"github.com/ethereum/go-ethereum/consensus/misc/eip4844"
"github.com/ethereum/go-ethereum/core/txpool"
"github.com/ethereum/go-ethereum/core/txpool/txorder"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/crypto/kzg4844"
"github.com/ethereum/go-ethereum/internal/telemetry"
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/metrics"
"github.com/holiman/uint256"
)
const (
topKTimeout = 1 * time.Second
hasBlobsTimeout = 1 * time.Second
)
var (
// Cache tracks 3 metrics: cache hit, cache miss, and the number of blobs
// it contains. Note that cache miss includes the blobs that we are actually
// missing on the lower level (in this case, the blobpool). The amount that
// we failed to predict) can be calculated with the telemetry span
// (blobs.filled - cache.hit).
cacheHitMeter = metrics.NewRegisteredMeter("blobpool/cache/hit", nil)
cacheMissMeter = metrics.NewRegisteredMeter("blobpool/cache/miss", nil)
cacheEntriesGauge = metrics.NewRegisteredGauge("blobpool/cache/entries", nil)
)
type cachedBlob struct {
cell []kzg4844.Cell
custody types.CustodyBitmap
blob *kzg4844.Blob
commitment kzg4844.Commitment
proofs []kzg4844.Proof
version byte
}
// Cache holds the blobs that are likely to be requested by the GetBlobs engine API.
//
// Every `topKTimeout`, the cache selects the blobs of the top K most profitable
// transactions, and preloads them into the cache.
//
// For HasBlobs requests, it also causes the blobs requested by the CL to be loaded.
// (Note: the cache is not guaranteed to always hold such blobs, since the blobpool might
// drop the transaction in the window between the engine API response and the cache
// update.)
type Cache struct {
blobpool *BlobPool
clock mclock.Clock
mu sync.Mutex
entries map[common.Hash]*cachedBlob
// needCell is owned by the loop goroutine; it is only read and written
// there, so it needs no synchronization. It is flipped on via enableCellCh.
needCell bool
// channels into loop
quit chan struct{}
topkRequest chan struct{}
topkTimer mclock.Timer
hasBlobsCh chan []common.Hash // list of tx hashes that should be pinned
cellModeCh chan bool // signals the loop to switch cell mode on/offo
step func() // test hook fired after each loop iteration
cancelInflights context.CancelFunc // cancels the conversion/decode goroutines
inflight sync.WaitGroup // tracks all in-flight conversion/decode goroutines
wg sync.WaitGroup // tracks the loop goroutine
}
// NewCache creates a blob cache backed by the given blobpool.
func NewCache(p *BlobPool) *Cache {
return newCache(p, mclock.System{}, nil)
}
// newCache creates a blob cache for testing purposes.
// It allows injecting a clock and a step hook.
func newCache(p *BlobPool, clock mclock.Clock, step func()) *Cache {
c := &Cache{
entries: make(map[common.Hash]*cachedBlob),
blobpool: p,
hasBlobsCh: make(chan []common.Hash, 1),
clock: clock,
step: step,
quit: make(chan struct{}),
topkRequest: make(chan struct{}, 1),
cellModeCh: make(chan bool, 1),
}
c.wg.Add(1)
go c.loop()
return c
}
// Stop terminates the cache loop and blocks until it and any in-flight work
// have stopped.
func (c *Cache) Stop() {
close(c.quit)
c.wg.Wait()
}
// HasBlobs reports whether the blob is available (in the cache or the
// blobpool) and asks the loop to pin the ones it found.
func (c *Cache) HasBlobs(ctx context.Context, vhashes []common.Hash) []bool {
var (
missIdx []int
missVhashes []common.Hash
needPin []common.Hash // available vhashes
)
available := make([]bool, len(vhashes))
// First check cache and pass missing ones to blobpool.
c.mu.Lock()
for i, vhash := range vhashes {
if _, ok := c.entries[vhash]; ok {
available[i] = true
needPin = append(needPin, vhash)
} else {
missIdx = append(missIdx, i)
missVhashes = append(missVhashes, vhash)
}
}
c.mu.Unlock()
if len(missVhashes) > 0 {
pooled := c.blobpool.availableBlobs(missVhashes)
// Merge two results
for j, ok := range pooled {
if ok {
available[missIdx[j]] = true
needPin = append(needPin, missVhashes[j])
}
}
}
select {
case c.hasBlobsCh <- needPin:
// Note that we also send the ones we already have in cache,
// since it can be dropped from the cache before this signal is processed.
return available
case <-c.quit:
return nil
}
}
// GetBlobs returns the blobs and proofs for the given versioned hashes, serving
// them from the cache when possible and falling back to the blobpool for misses.
// Responses are placed in the order given in the request, using null for any
// missing blob.
//
// For instance, if the request is [A_versioned_hash, B_versioned_hash,
// C_versioned_hash] and blobpool has data for blobs A and C, but doesn't have
// data for B, the response MUST be [A, null, C].
//
// This is a utility method for the engine API, enabling consensus clients to
// retrieve blobs from the pools directly instead of the network.
//
// The version argument specifies the type of proofs to return, either the
// blob proofs (version 0) or the cell proofs (version 1). Proofs conversion is
// CPU intensive and prohibited explicitly.
func (c *Cache) GetBlobs(ctx context.Context, vhashes []common.Hash, version byte) (_ []*kzg4844.Blob, _ []kzg4844.Commitment, _ [][]kzg4844.Proof, err error) {
_, span, spanEnd := telemetry.StartSpan(ctx, "blobpool.GetBlobs")
defer spanEnd(&err)
var (
blobs = make([]*kzg4844.Blob, len(vhashes))
commitments = make([]kzg4844.Commitment, len(vhashes))
proofs = make([][]kzg4844.Proof, len(vhashes))
indices = make(map[common.Hash][]int)
misses []common.Hash
cacheHits int
cacheMiss int
)
for i, h := range vhashes {
indices[h] = append(indices[h], i)
}
c.mu.Lock()
for vhash, idxs := range indices {
n := len(idxs)
cached, ok := c.entries[vhash]
if !ok || cached.version != version || cached.blob == nil {
cacheMiss += n
misses = append(misses, vhash)
continue
}
cacheHits += n
for _, index := range idxs {
blobs[index] = cached.blob
commitments[index] = cached.commitment
proofs[index] = cached.proofs
}
}
c.mu.Unlock()
if len(misses) > 0 {
mb, mc, mp, err := c.blobpool.getBlobs(misses, version)
if err != nil {
return nil, nil, nil, err
}
for j, vhash := range misses {
if mb[j] == nil {
continue
}
for _, index := range indices[vhash] {
blobs[index] = mb[j]
commitments[index] = mc[j]
proofs[index] = mp[j]
}
}
}
cacheHitMeter.Mark(int64(cacheHits))
cacheMissMeter.Mark(int64(cacheMiss))
span.SetAttributes(
telemetry.IntAttribute("cache.hit", cacheHits),
telemetry.IntAttribute("cache.miss", cacheMiss),
)
return blobs, commitments, proofs, nil
}
// GetCells returns cells for the given versioned blob hashes, filtered
// by the requested cell indices (mask). Each entry in the result
// corresponds to one vhash. Nil entries mean the cell was not available.
// If the cell is not available in cache, it falls back to the blobpool.
func (c *Cache) GetCells(vhashes []common.Hash, mask types.CustodyBitmap) ([][]*kzg4844.Cell, [][]*kzg4844.Proof, error) {
var (
cells = make([][]*kzg4844.Cell, len(vhashes))
proofs = make([][]*kzg4844.Proof, len(vhashes))
indices = make(map[common.Hash][]int)
misses []common.Hash
)
for i, h := range vhashes {
indices[h] = append(indices[h], i)
}
requested := mask.Indices()
c.mu.Lock()
for vhash, idxs := range indices {
cached, ok := c.entries[vhash]
if !ok {
misses = append(misses, vhash)
continue
}
stored := cached.custody.Indices()
blobCells := make([]*kzg4844.Cell, len(requested))
blobProofs := make([]*kzg4844.Proof, len(requested))
for i, cellIdx := range requested {
pos := -1
for k, s := range stored {
if s == cellIdx {
pos = k
break
}
}
if pos >= 0 && pos < len(cached.cell) {
cell := cached.cell[pos]
blobCells[i] = &cell
if int(cellIdx) < len(cached.proofs) {
pf := cached.proofs[cellIdx]
blobProofs[i] = &pf
}
}
}
for _, idx := range idxs {
cells[idx] = blobCells
proofs[idx] = blobProofs
}
}
c.mu.Unlock()
if len(misses) > 0 {
mc, mp, err := c.blobpool.GetBlobCells(misses, mask)
if err != nil {
return nil, nil, err
}
for j, vhash := range misses {
for _, idx := range indices[vhash] {
cells[idx] = mc[j]
proofs[idx] = mp[j]
}
}
}
return cells, proofs, nil
}
// EnableCell allows the cache to store only cells without recovering
// blobs. This means we can also cache cells that lack enough blobs to
// recover. It signals the loop to switch to cell mode and re-select
// transactions from this wider pool.
func (c *Cache) SetCellMode(on bool) {
select {
case c.cellModeCh <- on:
case <-c.quit:
}
}
func (c *Cache) loop() {
defer c.wg.Done()
c.triggerTopK()
for {
select {
case want := <-c.hasBlobsCh:
// HasBlobs request was received.
// Update the cache once with the requested blobs, then reschedule topK.
c.update(want)
c.triggerTopKAfter(hasBlobsTimeout)
case <-c.topkRequest:
want := c.selectTopTxs()
c.update(want)
c.triggerTopKAfter(topKTimeout)
case on := <-c.cellModeCh:
// This runs when the CL signals (non-)support for cell proofs. Enable/disable
// cell mode and re-select immediately to force an update.
if c.needCell != on {
c.needCell = on
c.triggerTopK()
}
case <-c.quit:
c.cancelUpdate()
if c.topkTimer != nil {
c.topkTimer.Stop()
}
c.inflight.Wait()
return
}
if c.step != nil {
c.step()
}
}
}
// cancelUpdate stops the current update.
func (c *Cache) cancelUpdate() {
if c.cancelInflights != nil {
c.cancelInflights()
c.cancelInflights = nil
}
}
// update updates the cache to hold the wanted vhashes. It evicts entries that
// are no longer wanted and loads the missing ones from the blobpool in the
// background.
func (c *Cache) update(want []common.Hash) {
cellMode := c.needCell
wantSet := make(map[common.Hash]struct{}, len(want))
for _, vh := range want {
wantSet[vh] = struct{}{}
}
// Cancel the current updates.
c.cancelUpdate()
ctx, cancel := context.WithCancel(context.Background())
c.cancelInflights = cancel
c.mu.Lock()
var missing []common.Hash
for vh := range wantSet {
e, ok := c.entries[vh]
if ok && ((cellMode && e.cell == nil) || (!cellMode && e.blob == nil)) {
delete(c.entries, vh)
cacheEntriesGauge.Dec(1)
ok = false
}
if !ok {
missing = append(missing, vh)
}
}
for vh := range c.entries {
if _, ok := wantSet[vh]; ok {
continue
}
delete(c.entries, vh)
cacheEntriesGauge.Dec(1)
}
c.mu.Unlock()
c.inflight.Add(1)
go func() {
defer c.inflight.Done()
for _, vh := range missing {
select {
case <-ctx.Done():
return
default:
}
c.mu.Lock()
_, loaded := c.entries[vh]
c.mu.Unlock()
if loaded {
continue
}
ptx := c.blobpool.getByVhash(vh)
if ptx == nil {
continue
}
if cellMode {
c.loadCells(ptx, wantSet)
} else {
c.loadBlobs(ptx, wantSet)
}
}
}()
}
// loadCells loads the cells whose vhash is in wantSet from ptx.
func (c *Cache) loadCells(ptx *BlobTxForPool, wantSet map[common.Hash]struct{}) {
cs := ptx.CellSidecar
cellsPerBlob := cs.Custody.OneCount()
c.mu.Lock()
defer c.mu.Unlock()
for i, v := range ptx.Tx.BlobHashes() {
if _, ok := wantSet[v]; !ok {
continue
}
if _, exists := c.entries[v]; exists {
continue
}
cellStart := i * cellsPerBlob
if cellStart+cellsPerBlob > len(cs.Cells) {
continue
}
blobCells := make([]kzg4844.Cell, cellsPerBlob)
copy(blobCells, cs.Cells[cellStart:cellStart+cellsPerBlob])
var pf []kzg4844.Proof
if ps := i * kzg4844.CellProofsPerBlob; ps+kzg4844.CellProofsPerBlob <= len(cs.Proofs) {
pf = make([]kzg4844.Proof, kzg4844.CellProofsPerBlob)
copy(pf, cs.Proofs[ps:ps+kzg4844.CellProofsPerBlob])
}
c.entries[v] = &cachedBlob{
cell: blobCells,
custody: cs.Custody,
commitment: cs.Commitments[i],
proofs: pf,
version: cs.Version,
}
cacheEntriesGauge.Inc(1)
}
}
// loadBlobs loads the blobs whose vhash is in wantSet from ptx.
func (c *Cache) loadBlobs(ptx *BlobTxForPool, wantSet map[common.Hash]struct{}) {
if ptx.CellSidecar.Custody.OneCount() < kzg4844.DataPerBlob {
return
}
// blobs will be computed inside of sidecar()
sidecar, err := ptx.sidecar()
if err != nil || sidecar == nil {
return
}
c.mu.Lock()
defer c.mu.Unlock()
for i, v := range sidecar.BlobHashes() {
if _, ok := wantSet[v]; !ok {
continue
}
if _, exists := c.entries[v]; exists {
continue
}
var pf []kzg4844.Proof
switch sidecar.Version {
case types.BlobSidecarVersion0:
pf = []kzg4844.Proof{sidecar.Proofs[i]}
case types.BlobSidecarVersion1:
cellProofs, err := sidecar.CellProofsAt(i)
if err != nil {
log.Error("Failed to get cell proofs", "txhash", ptx.Tx.Hash(), "err", err)
continue
}
pf = cellProofs
}
c.entries[v] = &cachedBlob{
blob: &sidecar.Blobs[i],
commitment: sidecar.Commitments[i],
proofs: pf,
version: sidecar.Version,
}
cacheEntriesGauge.Inc(1)
}
}
// selectTopTxs returns the vhashes of the top K most profitable pending blob
// transactions, up to the active fork's maxBlobsPerBlock.
func (c *Cache) selectTopTxs() []common.Hash {
p := c.blobpool
head := p.head.Load()
if head == nil {
return nil
}
config := p.chain.Config()
baseFee := eip1559.CalcBaseFee(config, head)
filter := txpool.PendingFilter{
BlobTxs: true,
BaseFee: uint256.MustFromBig(baseFee),
PartialCells: c.needCell,
}
if head.ExcessBlobGas != nil {
filter.BlobFee = uint256.MustFromBig(eip4844.CalcBlobFee(config, head))
}
if config.IsOsaka(head.Number, head.Time) {
filter.BlobVersion = types.BlobSidecarVersion1
} else {
filter.BlobVersion = types.BlobSidecarVersion0
}
pending, _ := p.Pending(filter)
vhashesOf := p.vhashesByTx()
order := txorder.NewTransactionsByPriceAndNonce(p.signer, pending, baseFee)
// Bound the selection by the active fork's blob limit so the cache follows
// BPO changes to maxBlobsPerBlock.
target := uint(eip4844.MaxBlobsPerBlock(config, head.Time))
var (
vhashes []common.Hash
blobs uint
)
for blobs < target {
tx, _ := order.Peek()
if tx == nil {
break
}
vh, ok := vhashesOf[tx.Hash]
if ok {
vhashes = append(vhashes, vh...)
blobs += uint(len(vh))
}
order.Shift()
}
return vhashes
}
// triggerTopKAfter makes a topK selection happen after the given interval.
func (c *Cache) triggerTopKAfter(interval time.Duration) {
if c.topkTimer != nil {
c.topkTimer.Stop()
}
// drain current request to avoid triggering before the interval
select {
case <-c.topkRequest:
default:
}
c.topkTimer = c.clock.AfterFunc(interval, c.triggerTopK)
}
// triggerTopK causes another topK selection to happen.
// Note this is safe to call from anywhere, even outside of the loop goroutine.
func (c *Cache) triggerTopK() {
select {
case c.topkRequest <- struct{}{}:
default:
}
}