go-ethereum/eth/backend.go
Bosul Mun e595aedcd0
Some checks are pending
/ Linux Build (push) Waiting to run
/ Linux Build (arm) (push) Waiting to run
/ Keeper Build (push) Waiting to run
/ Windows Build (push) Waiting to run
/ Docker Image (push) Waiting to run
core/txpool/blobpool: add cache for GetBlobs request (#35124)
This PR introduces a cache for GetBlobs request.

The main purpose of this PR is to reduce the getBlobs latency by reading and
decoding blobs from the pool in advance of the actual query. This is important
especially in the context of a sparse blobpool, since it may be necessary to
recover blobs from cells on a getBlobs request.

Previously, the Engine API read and decoded blobs from the pool on every call.
Now those calls check the cache and only fall back to the pool on a miss.

The cache has two modes:

- In topK mode (default), it wakes up periodically, picks the most profitable
  pending blob transactions up to the current fork's maxBlobsPerBlock, and loads
  their blobs. The selection logic is shared with the miner's block-building
  logic. The selection size is derived from eip4844.MaxBlobsPerBlock at the
  current head.
- When the CL calls HasBlobs, the cache switches to hasBlobs mode and tries to
  pin the set it just reported as available. Cache updates (read, decode, and
  optionally conversion in the future) run in background goroutines.

---------

Co-authored-by: Felix Lange <fjl@twurst.com>
2026-06-11 17:26:15 +02:00

618 lines
21 KiB
Go

// Copyright 2014 The go-ethereum Authors
// This file is part of the go-ethereum library.
//
// The go-ethereum library is free software: you can redistribute it and/or modify
// it under the terms of the GNU Lesser General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// The go-ethereum library is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Lesser General Public License for more details.
//
// You should have received a copy of the GNU Lesser General Public License
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
// Package eth implements the Ethereum protocol.
package eth
import (
"context"
"encoding/json"
"fmt"
"math"
"math/big"
"runtime"
"sync"
"time"
"github.com/ethereum/go-ethereum/accounts"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/common/hexutil"
"github.com/ethereum/go-ethereum/consensus"
"github.com/ethereum/go-ethereum/core"
"github.com/ethereum/go-ethereum/core/filtermaps"
"github.com/ethereum/go-ethereum/core/history"
"github.com/ethereum/go-ethereum/core/rawdb"
"github.com/ethereum/go-ethereum/core/state/pruner"
"github.com/ethereum/go-ethereum/core/txpool"
"github.com/ethereum/go-ethereum/core/txpool/blobpool"
"github.com/ethereum/go-ethereum/core/txpool/legacypool"
"github.com/ethereum/go-ethereum/core/txpool/locals"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/core/vm"
"github.com/ethereum/go-ethereum/eth/downloader"
"github.com/ethereum/go-ethereum/eth/ethconfig"
"github.com/ethereum/go-ethereum/eth/gasprice"
"github.com/ethereum/go-ethereum/eth/protocols/eth"
"github.com/ethereum/go-ethereum/eth/protocols/snap"
"github.com/ethereum/go-ethereum/eth/tracers"
"github.com/ethereum/go-ethereum/ethdb"
"github.com/ethereum/go-ethereum/event"
"github.com/ethereum/go-ethereum/internal/ethapi"
"github.com/ethereum/go-ethereum/internal/shutdowncheck"
"github.com/ethereum/go-ethereum/internal/version"
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/miner"
"github.com/ethereum/go-ethereum/node"
"github.com/ethereum/go-ethereum/p2p"
"github.com/ethereum/go-ethereum/p2p/dnsdisc"
"github.com/ethereum/go-ethereum/p2p/enode"
"github.com/ethereum/go-ethereum/params"
"github.com/ethereum/go-ethereum/rlp"
"github.com/ethereum/go-ethereum/rpc"
gethversion "github.com/ethereum/go-ethereum/version"
)
const (
// This is the fairness knob for the discovery mixer. When looking for peers, we'll
// wait this long for a single source of candidates before moving on and trying other
// sources. If this timeout expires, the source will be skipped in this round, but it
// will continue to fetch in the background and will have a chance with a new timeout
// in the next rounds, giving it overall more time but a proportionally smaller share.
// We expect a normal source to produce ~10 candidates per second.
discmixTimeout = 100 * time.Millisecond
// discoveryPrefetchBuffer is the number of peers to pre-fetch from a discovery
// source. It is useful to avoid the negative effects of potential longer timeouts
// in the discovery, keeping dial progress while waiting for the next batch of
// candidates.
discoveryPrefetchBuffer = 32
// maxParallelENRRequests is the maximum number of parallel ENR requests that can be
// performed by a disc/v4 source.
maxParallelENRRequests = 16
)
// Config contains the configuration options of the ETH protocol.
// Deprecated: use ethconfig.Config instead.
type Config = ethconfig.Config
// Ethereum implements the Ethereum full node service.
type Ethereum struct {
// core protocol objects
config *ethconfig.Config
txPool *txpool.TxPool
blobTxPool *blobpool.BlobPool
blobCache *blobpool.Cache
localTxTracker *locals.TxTracker
blockchain *core.BlockChain
handler *handler
discmix *enode.FairMix
dropper *dropper
// DB interfaces
chainDb ethdb.Database // Block chain database
engine consensus.Engine
accountManager *accounts.Manager
filterMaps *filtermaps.FilterMaps
closeFilterMaps chan chan struct{}
// Chain event subscriptions driving updateFilterMapsHeads. The
// subscriptions are registered and consumed in Start.
fmHeadEventCh chan core.ChainEvent
fmHeadSub event.Subscription
fmBlockProcCh chan bool
fmBlockProcSub event.Subscription
APIBackend *EthAPIBackend
miner *miner.Miner
gasPrice *big.Int
networkID uint64
netRPCService *ethapi.NetAPI
p2pServer *p2p.Server
lock sync.RWMutex // Protects the variadic fields (e.g. gas price and etherbase)
shutdownTracker *shutdowncheck.ShutdownTracker // Tracks if and when the node has shutdown ungracefully
}
// New creates a new Ethereum object (including the initialisation of the common Ethereum object),
// whose lifecycle will be managed by the provided node.
func New(stack *node.Node, config *ethconfig.Config) (*Ethereum, error) {
// Ensure configuration values are compatible and sane
if !config.SyncMode.IsValid() {
return nil, fmt.Errorf("invalid sync mode %d", config.SyncMode)
}
if !config.HistoryMode.IsValid() {
return nil, fmt.Errorf("invalid history mode %d", config.HistoryMode)
}
if config.Miner.GasPrice == nil || config.Miner.GasPrice.Sign() <= 0 {
log.Warn("Sanitizing invalid miner gas price", "provided", config.Miner.GasPrice, "updated", ethconfig.Defaults.Miner.GasPrice)
config.Miner.GasPrice = new(big.Int).Set(ethconfig.Defaults.Miner.GasPrice)
}
if config.NoPruning && config.TrieDirtyCache > 0 && config.StateScheme == rawdb.HashScheme {
if config.SnapshotCache > 0 {
config.TrieCleanCache += config.TrieDirtyCache * 3 / 5
config.SnapshotCache += config.TrieDirtyCache * 2 / 5
} else {
config.TrieCleanCache += config.TrieDirtyCache
}
config.TrieDirtyCache = 0
}
log.Info("Allocated trie memory caches", "clean", common.StorageSize(config.TrieCleanCache)*1024*1024, "dirty", common.StorageSize(config.TrieDirtyCache)*1024*1024)
dbOptions := node.DatabaseOptions{
Cache: config.DatabaseCache,
Handles: config.DatabaseHandles,
AncientsDirectory: config.DatabaseFreezer,
EraDirectory: config.DatabaseEra,
MetricsNamespace: "eth/db/chaindata/",
}
chainDb, err := stack.OpenDatabaseWithOptions("chaindata", dbOptions)
if err != nil {
return nil, err
}
scheme, err := rawdb.ParseStateScheme(config.StateScheme, chainDb)
if err != nil {
return nil, err
}
// Try to recover offline state pruning only in hash-based.
if scheme == rawdb.HashScheme {
if err := pruner.RecoverPruning(stack.ResolvePath(""), chainDb); err != nil {
log.Error("Failed to recover state", "error", err)
}
}
// Here we determine genesis hash and active ChainConfig.
// We need these to figure out the consensus parameters and to set up history pruning.
chainConfig, genesisHash, err := core.LoadChainConfig(chainDb, config.Genesis)
if err != nil {
return nil, err
}
engine, err := ethconfig.CreateConsensusEngine(chainConfig, chainDb)
if err != nil {
return nil, err
}
// Set networkID to chainID by default.
networkID := config.NetworkId
if networkID == 0 {
networkID = chainConfig.ChainID.Uint64()
}
// Assemble the Ethereum object.
eth := &Ethereum{
config: config,
chainDb: chainDb,
accountManager: stack.AccountManager(),
engine: engine,
networkID: networkID,
gasPrice: config.Miner.GasPrice,
p2pServer: stack.Server(),
discmix: enode.NewFairMix(discmixTimeout),
shutdownTracker: shutdowncheck.NewShutdownTracker(chainDb),
fmHeadEventCh: make(chan core.ChainEvent, 10),
fmBlockProcCh: make(chan bool, 10),
}
bcVersion := rawdb.ReadDatabaseVersion(chainDb)
var dbVer = "<nil>"
if bcVersion != nil {
dbVer = fmt.Sprintf("%d", *bcVersion)
}
log.Info("Initialising Ethereum protocol", "network", networkID, "dbversion", dbVer)
// Create BlockChain object.
if !config.SkipBcVersionCheck {
if bcVersion != nil && *bcVersion > core.BlockChainVersion {
return nil, fmt.Errorf("database version is v%d, Geth %s only supports v%d", *bcVersion, version.WithMeta, core.BlockChainVersion)
} else if bcVersion == nil || *bcVersion < core.BlockChainVersion {
if bcVersion != nil { // only print warning on upgrade, not on init
log.Warn("Upgrade blockchain database version", "from", dbVer, "to", core.BlockChainVersion)
}
rawdb.WriteDatabaseVersion(chainDb, core.BlockChainVersion)
}
}
histPolicy, err := history.NewPolicy(config.HistoryMode, genesisHash)
if err != nil {
return nil, err
}
var (
options = &core.BlockChainConfig{
TrieCleanLimit: config.TrieCleanCache,
NoPrefetch: config.NoPrefetch,
TrieDirtyLimit: config.TrieDirtyCache,
ArchiveMode: config.NoPruning,
TrieTimeLimit: config.TrieTimeout,
SnapshotLimit: config.SnapshotCache,
Preimages: config.Preimages,
StateHistory: config.StateHistory,
TrienodeHistory: config.TrienodeHistory,
NodeFullValueCheckpoint: config.NodeFullValueCheckpoint,
BinTrieGroupDepth: config.BinTrieGroupDepth,
StateScheme: scheme,
HistoryPolicy: histPolicy,
TxLookupLimit: int64(min(config.TransactionHistory, math.MaxInt64)),
VmConfig: vm.Config{
EnablePreimageRecording: config.EnablePreimageRecording,
},
// Enables file journaling for the trie database. The journal files will be stored
// within the data directory. The corresponding paths will be either:
// - DATADIR/triedb/merkle.journal
// - DATADIR/triedb/verkle.journal
TrieJournalDirectory: stack.ResolvePath("triedb"),
StateSizeTracking: config.EnableStateSizeTracking,
SlowBlockThreshold: config.SlowBlockThreshold,
StatelessSelfValidation: config.StatelessSelfValidation,
EnableWitnessStats: config.EnableWitnessStats,
}
)
if config.VMTrace != "" {
traceConfig := json.RawMessage("{}")
if config.VMTraceJsonConfig != "" {
traceConfig = json.RawMessage(config.VMTraceJsonConfig)
}
t, err := tracers.LiveDirectory.New(config.VMTrace, traceConfig)
if err != nil {
return nil, fmt.Errorf("failed to create tracer %s: %v", config.VMTrace, err)
}
options.VmConfig.Tracer = t
}
// Override the chain config with provided settings.
var overrides core.ChainOverrides
if config.OverrideOsaka != nil {
overrides.OverrideOsaka = config.OverrideOsaka
}
if config.OverrideBPO1 != nil {
overrides.OverrideBPO1 = config.OverrideBPO1
}
if config.OverrideBPO2 != nil {
overrides.OverrideBPO2 = config.OverrideBPO2
}
if config.OverrideUBT != nil {
overrides.OverrideUBT = config.OverrideUBT
}
options.Overrides = &overrides
eth.blockchain, err = core.NewBlockChain(chainDb, config.Genesis, eth.engine, options)
if err != nil {
return nil, err
}
// Initialize filtermaps log index.
fmConfig := filtermaps.Config{
History: config.LogHistory,
Disabled: config.LogNoHistory,
ExportFileName: config.LogExportCheckpoints,
HashScheme: scheme == rawdb.HashScheme,
}
chainView := eth.newChainView(eth.blockchain.CurrentBlock())
historyCutoff, _ := eth.blockchain.HistoryPruningCutoff()
var finalBlock uint64
if fb := eth.blockchain.CurrentFinalBlock(); fb != nil {
finalBlock = fb.Number.Uint64()
}
filterMaps, err := filtermaps.NewFilterMaps(chainDb, chainView, historyCutoff, finalBlock, filtermaps.DefaultParams, fmConfig)
if err != nil {
return nil, err
}
eth.filterMaps = filterMaps
eth.closeFilterMaps = make(chan chan struct{})
// TxPool
if config.TxPool.Journal != "" {
config.TxPool.Journal = stack.ResolvePath(config.TxPool.Journal)
}
legacyPool := legacypool.New(config.TxPool, eth.blockchain)
if config.BlobPool.Datadir != "" {
config.BlobPool.Datadir = stack.ResolvePath(config.BlobPool.Datadir)
}
eth.blobTxPool = blobpool.New(config.BlobPool, eth.blockchain, legacyPool.HasPendingAuth)
eth.blobCache = blobpool.NewCache(eth.blobTxPool)
eth.txPool, err = txpool.New(config.TxPool.PriceLimit, eth.blockchain, []txpool.SubPool{legacyPool, eth.blobTxPool})
if err != nil {
return nil, err
}
if !config.TxPool.NoLocals {
rejournal := config.TxPool.Rejournal
if rejournal < time.Second {
log.Warn("Sanitizing invalid txpool journal time", "provided", rejournal, "updated", time.Second)
rejournal = time.Second
}
eth.localTxTracker = locals.New(config.TxPool.Journal, rejournal, eth.blockchain.Config(), eth.txPool)
stack.RegisterLifecycle(eth.localTxTracker)
}
// Permit the downloader to use the trie cache allowance during fast sync
cacheLimit := options.TrieCleanLimit + options.TrieDirtyLimit + options.SnapshotLimit
if eth.handler, err = newHandler(&handlerConfig{
NodeID: eth.p2pServer.Self().ID(),
Database: chainDb,
Chain: eth.blockchain,
TxPool: eth.txPool,
Network: networkID,
Sync: config.SyncMode,
BloomCache: uint64(cacheLimit),
RequiredBlocks: config.RequiredBlocks,
SnapV2: config.SnapV2,
}); err != nil {
return nil, err
}
eth.dropper = newDropper(eth.p2pServer.MaxDialedConns(), eth.p2pServer.MaxInboundConns())
eth.miner = miner.New(eth, config.Miner, eth.engine)
eth.miner.SetExtra(makeExtraData(config.Miner.ExtraData))
eth.miner.SetPrioAddresses(config.TxPool.Locals)
eth.APIBackend = &EthAPIBackend{stack.Config().ExtRPCEnabled(), stack.Config().AllowUnprotectedTxs, eth, nil}
if eth.APIBackend.allowUnprotectedTxs {
log.Info("Unprotected transactions allowed")
}
eth.APIBackend.gpo = gasprice.NewOracle(eth.APIBackend, config.GPO, config.Miner.GasPrice)
// Start the RPC service
eth.netRPCService = ethapi.NewNetAPI(eth.p2pServer, networkID)
// Register the backend on the node
stack.RegisterAPIs(eth.APIs())
stack.RegisterProtocols(eth.Protocols())
stack.RegisterLifecycle(eth)
// Successful startup; push a marker and check previous unclean shutdowns.
eth.shutdownTracker.MarkStartup()
return eth, nil
}
func makeExtraData(extra []byte) []byte {
if len(extra) == 0 {
// create default extradata
extra, _ = rlp.EncodeToBytes([]interface{}{
uint(gethversion.Major<<16 | gethversion.Minor<<8 | gethversion.Patch),
"geth",
runtime.Version(),
runtime.GOOS,
})
}
if uint64(len(extra)) > params.MaximumExtraDataSize {
log.Warn("Miner extra data exceed limit", "extra", hexutil.Bytes(extra), "limit", params.MaximumExtraDataSize)
extra = nil
}
return extra
}
// APIs return the collection of RPC services the ethereum package offers.
// NOTE, some of these services probably need to be moved to somewhere else.
func (s *Ethereum) APIs() []rpc.API {
apis := ethapi.GetAPIs(s.APIBackend)
// Append all the local APIs and return
return append(apis, []rpc.API{
{
Namespace: "miner",
Service: NewMinerAPI(s),
}, {
Namespace: "eth",
Service: downloader.NewDownloaderAPI(s.handler.downloader, s.blockchain),
}, {
Namespace: "admin",
Service: NewAdminAPI(s),
}, {
Namespace: "debug",
Service: NewDebugAPI(s),
}, {
Namespace: "net",
Service: s.netRPCService,
},
}...)
}
func (s *Ethereum) ResetWithGenesisBlock(gb *types.Block) {
s.blockchain.ResetWithGenesisBlock(gb)
}
func (s *Ethereum) Miner() *miner.Miner { return s.miner }
func (s *Ethereum) AccountManager() *accounts.Manager { return s.accountManager }
func (s *Ethereum) BlockChain() *core.BlockChain { return s.blockchain }
func (s *Ethereum) TxPool() *txpool.TxPool { return s.txPool }
func (s *Ethereum) BlobTxPool() *blobpool.BlobPool { return s.blobTxPool }
func (s *Ethereum) BlobCache() *blobpool.Cache { return s.blobCache }
func (s *Ethereum) Engine() consensus.Engine { return s.engine }
func (s *Ethereum) ChainDb() ethdb.Database { return s.chainDb }
func (s *Ethereum) IsListening() bool { return true } // Always listening
func (s *Ethereum) Downloader() *downloader.Downloader { return s.handler.downloader }
func (s *Ethereum) Synced() bool { return s.handler.synced.Load() }
func (s *Ethereum) SetSynced() { s.handler.enableSyncedFeatures() }
func (s *Ethereum) ArchiveMode() bool { return s.config.NoPruning }
// Protocols returns all the currently configured
// network protocols to start.
func (s *Ethereum) Protocols() []p2p.Protocol {
protos := eth.MakeProtocols((*ethHandler)(s.handler), s.networkID, s.discmix)
if s.config.SnapshotCache > 0 {
protos = append(protos, snap.MakeProtocols((*snapHandler)(s.handler), s.config.SnapV2)...)
}
return protos
}
// Start implements node.Lifecycle, starting all internal goroutines needed by the
// Ethereum protocol implementation.
func (s *Ethereum) Start() error {
if err := s.setupDiscovery(); err != nil {
return err
}
// Regularly update shutdown marker
s.shutdownTracker.Start()
// Start the networking layer
s.handler.Start(s.p2pServer.MaxPeers)
// Start the connection manager
s.dropper.Start(s.p2pServer, func() bool { return !s.Synced() })
// Subscribe to chain events for the filterMaps head updater.
s.fmHeadSub = s.blockchain.SubscribeChainEvent(s.fmHeadEventCh)
s.fmBlockProcSub = s.blockchain.SubscribeBlockProcessingEvent(s.fmBlockProcCh)
// start log indexer
s.filterMaps.Start()
go s.updateFilterMapsHeads()
return nil
}
func (s *Ethereum) newChainView(head *types.Header) *filtermaps.ChainView {
if head == nil {
return nil
}
return filtermaps.NewChainView(s.blockchain, head.Number.Uint64(), head.Hash())
}
func (s *Ethereum) updateFilterMapsHeads() {
headEventCh := s.fmHeadEventCh
blockProcCh := s.fmBlockProcCh
defer func() {
s.fmHeadSub.Unsubscribe()
s.fmBlockProcSub.Unsubscribe()
for {
select {
case <-headEventCh:
case <-blockProcCh:
default:
return
}
}
}()
var head *types.Header
setHead := func(newHead *types.Header) {
if newHead == nil {
return
}
if head == nil || newHead.Hash() != head.Hash() {
head = newHead
chainView := s.newChainView(head)
if chainView == nil {
return
}
historyCutoff, _ := s.blockchain.HistoryPruningCutoff()
var finalBlock uint64
if fb := s.blockchain.CurrentFinalBlock(); fb != nil {
finalBlock = fb.Number.Uint64()
}
s.filterMaps.SetTarget(chainView, historyCutoff, finalBlock)
}
}
setHead(s.blockchain.CurrentBlock())
for {
select {
case ev := <-headEventCh:
setHead(ev.Header)
case blockProc := <-blockProcCh:
s.filterMaps.SetBlockProcessing(blockProc)
case <-time.After(time.Second * 10):
setHead(s.blockchain.CurrentBlock())
case ch := <-s.closeFilterMaps:
close(ch)
return
}
}
}
func (s *Ethereum) setupDiscovery() error {
eth.StartENRUpdater(s.blockchain, s.p2pServer.LocalNode())
// Add eth nodes from DNS.
dnsclient := dnsdisc.NewClient(dnsdisc.Config{})
if len(s.config.EthDiscoveryURLs) > 0 {
iter, err := dnsclient.NewIterator(s.config.EthDiscoveryURLs...)
if err != nil {
return err
}
s.discmix.AddSource(iter)
}
// Add snap nodes from DNS.
if len(s.config.SnapDiscoveryURLs) > 0 {
iter, err := dnsclient.NewIterator(s.config.SnapDiscoveryURLs...)
if err != nil {
return err
}
s.discmix.AddSource(iter)
}
// Add DHT nodes from discv4.
if s.p2pServer.DiscoveryV4() != nil {
iter := s.p2pServer.DiscoveryV4().RandomNodes()
resolverFunc := func(ctx context.Context, enr *enode.Node) *enode.Node {
// RequestENR does not yet support context. It will simply time out.
// If the ENR can't be resolved, RequestENR will return nil. We don't
// care about the specific error here, so we ignore it.
nn, _ := s.p2pServer.DiscoveryV4().RequestENR(enr)
return nn
}
iter = enode.AsyncFilter(iter, resolverFunc, maxParallelENRRequests)
iter = enode.Filter(iter, eth.NewNodeFilter(s.blockchain))
iter = enode.NewBufferIter(iter, discoveryPrefetchBuffer)
s.discmix.AddSource(iter)
}
// Add DHT nodes from discv5.
if s.p2pServer.DiscoveryV5() != nil {
filter := eth.NewNodeFilter(s.blockchain)
iter := enode.Filter(s.p2pServer.DiscoveryV5().RandomNodes(), filter)
iter = enode.NewBufferIter(iter, discoveryPrefetchBuffer)
s.discmix.AddSource(iter)
}
return nil
}
// Stop implements node.Lifecycle, terminating all internal goroutines used by the
// Ethereum protocol.
func (s *Ethereum) Stop() error {
// Stop all the peer-related stuff first.
s.discmix.Close()
s.dropper.Stop()
s.handler.Stop()
// Then stop everything else.
ch := make(chan struct{})
s.closeFilterMaps <- ch
<-ch
s.filterMaps.Stop()
s.blobCache.Stop()
s.txPool.Close()
s.blockchain.Stop()
s.engine.Close()
// Clean shutdown marker as the last thing before closing db
s.shutdownTracker.Stop()
s.chainDb.Close()
return nil
}