go-ethereum/eth/backend.go
Csaba Kiraly 5a918be50d eth: protect high-value peers from random dropping based on inclusion stats
The dropper periodically disconnects random peers to create churn.
This was blind to peer quality. Add inclusion-based peer protection
using two categories:

1. Total inclusions: protects peers with the highest cumulative
   count of delivered txs that were included on chain
2. Recent inclusions (EMA): protects peers with the best recent
   inclusion rate, giving newly productive peers faster protection

Each category independently protects the top 10% of inbound and
top 10% of dialed peers. The union of both sets is protected. Only
peers with positive scores qualify.

The dropper defines its own PeerInclusionStats struct and callback
type (getPeerInclusionStatsFunc) so any stats provider (e.g. a
transaction tracker) can plug in without a package dependency. The
callback is nil by default (protection disabled until wired).

The protectionCategories slice is designed for easy extension —
adding a new category requires only appending a struct with a name,
scoring function, and protection fraction.
2026-04-10 08:23:30 +02:00

605 lines
20 KiB
Go

// Copyright 2014 The go-ethereum Authors
// This file is part of the go-ethereum library.
//
// The go-ethereum library is free software: you can redistribute it and/or modify
// it under the terms of the GNU Lesser General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// The go-ethereum library is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Lesser General Public License for more details.
//
// You should have received a copy of the GNU Lesser General Public License
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
// Package eth implements the Ethereum protocol.
package eth
import (
"context"
"encoding/json"
"fmt"
"math"
"math/big"
"runtime"
"sync"
"time"
"github.com/ethereum/go-ethereum/accounts"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/common/hexutil"
"github.com/ethereum/go-ethereum/consensus"
"github.com/ethereum/go-ethereum/core"
"github.com/ethereum/go-ethereum/core/filtermaps"
"github.com/ethereum/go-ethereum/core/history"
"github.com/ethereum/go-ethereum/core/rawdb"
"github.com/ethereum/go-ethereum/core/state/pruner"
"github.com/ethereum/go-ethereum/core/txpool"
"github.com/ethereum/go-ethereum/core/txpool/blobpool"
"github.com/ethereum/go-ethereum/core/txpool/legacypool"
"github.com/ethereum/go-ethereum/core/txpool/locals"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/core/vm"
"github.com/ethereum/go-ethereum/eth/downloader"
"github.com/ethereum/go-ethereum/eth/ethconfig"
"github.com/ethereum/go-ethereum/eth/gasprice"
"github.com/ethereum/go-ethereum/eth/protocols/eth"
"github.com/ethereum/go-ethereum/eth/protocols/snap"
"github.com/ethereum/go-ethereum/eth/tracers"
"github.com/ethereum/go-ethereum/ethdb"
"github.com/ethereum/go-ethereum/event"
"github.com/ethereum/go-ethereum/internal/ethapi"
"github.com/ethereum/go-ethereum/internal/shutdowncheck"
"github.com/ethereum/go-ethereum/internal/version"
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/miner"
"github.com/ethereum/go-ethereum/node"
"github.com/ethereum/go-ethereum/p2p"
"github.com/ethereum/go-ethereum/p2p/dnsdisc"
"github.com/ethereum/go-ethereum/p2p/enode"
"github.com/ethereum/go-ethereum/params"
"github.com/ethereum/go-ethereum/rlp"
"github.com/ethereum/go-ethereum/rpc"
gethversion "github.com/ethereum/go-ethereum/version"
)
const (
// This is the fairness knob for the discovery mixer. When looking for peers, we'll
// wait this long for a single source of candidates before moving on and trying other
// sources. If this timeout expires, the source will be skipped in this round, but it
// will continue to fetch in the background and will have a chance with a new timeout
// in the next rounds, giving it overall more time but a proportionally smaller share.
// We expect a normal source to produce ~10 candidates per second.
discmixTimeout = 100 * time.Millisecond
// discoveryPrefetchBuffer is the number of peers to pre-fetch from a discovery
// source. It is useful to avoid the negative effects of potential longer timeouts
// in the discovery, keeping dial progress while waiting for the next batch of
// candidates.
discoveryPrefetchBuffer = 32
// maxParallelENRRequests is the maximum number of parallel ENR requests that can be
// performed by a disc/v4 source.
maxParallelENRRequests = 16
)
// Config contains the configuration options of the ETH protocol.
// Deprecated: use ethconfig.Config instead.
type Config = ethconfig.Config
// Ethereum implements the Ethereum full node service.
type Ethereum struct {
// core protocol objects
config *ethconfig.Config
txPool *txpool.TxPool
blobTxPool *blobpool.BlobPool
localTxTracker *locals.TxTracker
blockchain *core.BlockChain
handler *handler
discmix *enode.FairMix
dropper *dropper
// DB interfaces
chainDb ethdb.Database // Block chain database
eventMux *event.TypeMux
engine consensus.Engine
accountManager *accounts.Manager
filterMaps *filtermaps.FilterMaps
closeFilterMaps chan chan struct{}
APIBackend *EthAPIBackend
miner *miner.Miner
gasPrice *big.Int
networkID uint64
netRPCService *ethapi.NetAPI
p2pServer *p2p.Server
lock sync.RWMutex // Protects the variadic fields (e.g. gas price and etherbase)
shutdownTracker *shutdowncheck.ShutdownTracker // Tracks if and when the node has shutdown ungracefully
}
// New creates a new Ethereum object (including the initialisation of the common Ethereum object),
// whose lifecycle will be managed by the provided node.
func New(stack *node.Node, config *ethconfig.Config) (*Ethereum, error) {
// Ensure configuration values are compatible and sane
if !config.SyncMode.IsValid() {
return nil, fmt.Errorf("invalid sync mode %d", config.SyncMode)
}
if !config.HistoryMode.IsValid() {
return nil, fmt.Errorf("invalid history mode %d", config.HistoryMode)
}
if config.Miner.GasPrice == nil || config.Miner.GasPrice.Sign() <= 0 {
log.Warn("Sanitizing invalid miner gas price", "provided", config.Miner.GasPrice, "updated", ethconfig.Defaults.Miner.GasPrice)
config.Miner.GasPrice = new(big.Int).Set(ethconfig.Defaults.Miner.GasPrice)
}
if config.NoPruning && config.TrieDirtyCache > 0 && config.StateScheme == rawdb.HashScheme {
if config.SnapshotCache > 0 {
config.TrieCleanCache += config.TrieDirtyCache * 3 / 5
config.SnapshotCache += config.TrieDirtyCache * 2 / 5
} else {
config.TrieCleanCache += config.TrieDirtyCache
}
config.TrieDirtyCache = 0
}
log.Info("Allocated trie memory caches", "clean", common.StorageSize(config.TrieCleanCache)*1024*1024, "dirty", common.StorageSize(config.TrieDirtyCache)*1024*1024)
dbOptions := node.DatabaseOptions{
Cache: config.DatabaseCache,
Handles: config.DatabaseHandles,
AncientsDirectory: config.DatabaseFreezer,
EraDirectory: config.DatabaseEra,
MetricsNamespace: "eth/db/chaindata/",
}
chainDb, err := stack.OpenDatabaseWithOptions("chaindata", dbOptions)
if err != nil {
return nil, err
}
scheme, err := rawdb.ParseStateScheme(config.StateScheme, chainDb)
if err != nil {
return nil, err
}
// Try to recover offline state pruning only in hash-based.
if scheme == rawdb.HashScheme {
if err := pruner.RecoverPruning(stack.ResolvePath(""), chainDb); err != nil {
log.Error("Failed to recover state", "error", err)
}
}
// Here we determine genesis hash and active ChainConfig.
// We need these to figure out the consensus parameters and to set up history pruning.
chainConfig, genesisHash, err := core.LoadChainConfig(chainDb, config.Genesis)
if err != nil {
return nil, err
}
engine, err := ethconfig.CreateConsensusEngine(chainConfig, chainDb)
if err != nil {
return nil, err
}
// Set networkID to chainID by default.
networkID := config.NetworkId
if networkID == 0 {
networkID = chainConfig.ChainID.Uint64()
}
// Assemble the Ethereum object.
eth := &Ethereum{
config: config,
chainDb: chainDb,
eventMux: stack.EventMux(),
accountManager: stack.AccountManager(),
engine: engine,
networkID: networkID,
gasPrice: config.Miner.GasPrice,
p2pServer: stack.Server(),
discmix: enode.NewFairMix(discmixTimeout),
shutdownTracker: shutdowncheck.NewShutdownTracker(chainDb),
}
bcVersion := rawdb.ReadDatabaseVersion(chainDb)
var dbVer = "<nil>"
if bcVersion != nil {
dbVer = fmt.Sprintf("%d", *bcVersion)
}
log.Info("Initialising Ethereum protocol", "network", networkID, "dbversion", dbVer)
// Create BlockChain object.
if !config.SkipBcVersionCheck {
if bcVersion != nil && *bcVersion > core.BlockChainVersion {
return nil, fmt.Errorf("database version is v%d, Geth %s only supports v%d", *bcVersion, version.WithMeta, core.BlockChainVersion)
} else if bcVersion == nil || *bcVersion < core.BlockChainVersion {
if bcVersion != nil { // only print warning on upgrade, not on init
log.Warn("Upgrade blockchain database version", "from", dbVer, "to", core.BlockChainVersion)
}
rawdb.WriteDatabaseVersion(chainDb, core.BlockChainVersion)
}
}
histPolicy, err := history.NewPolicy(config.HistoryMode, genesisHash)
if err != nil {
return nil, err
}
var (
options = &core.BlockChainConfig{
TrieCleanLimit: config.TrieCleanCache,
NoPrefetch: config.NoPrefetch,
TrieDirtyLimit: config.TrieDirtyCache,
ArchiveMode: config.NoPruning,
TrieTimeLimit: config.TrieTimeout,
SnapshotLimit: config.SnapshotCache,
Preimages: config.Preimages,
StateHistory: config.StateHistory,
TrienodeHistory: config.TrienodeHistory,
NodeFullValueCheckpoint: config.NodeFullValueCheckpoint,
StateScheme: scheme,
HistoryPolicy: histPolicy,
TxLookupLimit: int64(min(config.TransactionHistory, math.MaxInt64)),
VmConfig: vm.Config{
EnablePreimageRecording: config.EnablePreimageRecording,
},
// Enables file journaling for the trie database. The journal files will be stored
// within the data directory. The corresponding paths will be either:
// - DATADIR/triedb/merkle.journal
// - DATADIR/triedb/verkle.journal
TrieJournalDirectory: stack.ResolvePath("triedb"),
StateSizeTracking: config.EnableStateSizeTracking,
SlowBlockThreshold: config.SlowBlockThreshold,
StatelessSelfValidation: config.StatelessSelfValidation,
EnableWitnessStats: config.EnableWitnessStats,
}
)
if config.VMTrace != "" {
traceConfig := json.RawMessage("{}")
if config.VMTraceJsonConfig != "" {
traceConfig = json.RawMessage(config.VMTraceJsonConfig)
}
t, err := tracers.LiveDirectory.New(config.VMTrace, traceConfig)
if err != nil {
return nil, fmt.Errorf("failed to create tracer %s: %v", config.VMTrace, err)
}
options.VmConfig.Tracer = t
}
// Override the chain config with provided settings.
var overrides core.ChainOverrides
if config.OverrideOsaka != nil {
overrides.OverrideOsaka = config.OverrideOsaka
}
if config.OverrideBPO1 != nil {
overrides.OverrideBPO1 = config.OverrideBPO1
}
if config.OverrideBPO2 != nil {
overrides.OverrideBPO2 = config.OverrideBPO2
}
if config.OverrideVerkle != nil {
overrides.OverrideVerkle = config.OverrideVerkle
}
options.Overrides = &overrides
eth.blockchain, err = core.NewBlockChain(chainDb, config.Genesis, eth.engine, options)
if err != nil {
return nil, err
}
// Initialize filtermaps log index.
fmConfig := filtermaps.Config{
History: config.LogHistory,
Disabled: config.LogNoHistory,
ExportFileName: config.LogExportCheckpoints,
HashScheme: scheme == rawdb.HashScheme,
}
chainView := eth.newChainView(eth.blockchain.CurrentBlock())
historyCutoff, _ := eth.blockchain.HistoryPruningCutoff()
var finalBlock uint64
if fb := eth.blockchain.CurrentFinalBlock(); fb != nil {
finalBlock = fb.Number.Uint64()
}
filterMaps, err := filtermaps.NewFilterMaps(chainDb, chainView, historyCutoff, finalBlock, filtermaps.DefaultParams, fmConfig)
if err != nil {
return nil, err
}
eth.filterMaps = filterMaps
eth.closeFilterMaps = make(chan chan struct{})
// TxPool
if config.TxPool.Journal != "" {
config.TxPool.Journal = stack.ResolvePath(config.TxPool.Journal)
}
legacyPool := legacypool.New(config.TxPool, eth.blockchain)
if config.BlobPool.Datadir != "" {
config.BlobPool.Datadir = stack.ResolvePath(config.BlobPool.Datadir)
}
eth.blobTxPool = blobpool.New(config.BlobPool, eth.blockchain, legacyPool.HasPendingAuth)
eth.txPool, err = txpool.New(config.TxPool.PriceLimit, eth.blockchain, []txpool.SubPool{legacyPool, eth.blobTxPool})
if err != nil {
return nil, err
}
if !config.TxPool.NoLocals {
rejournal := config.TxPool.Rejournal
if rejournal < time.Second {
log.Warn("Sanitizing invalid txpool journal time", "provided", rejournal, "updated", time.Second)
rejournal = time.Second
}
eth.localTxTracker = locals.New(config.TxPool.Journal, rejournal, eth.blockchain.Config(), eth.txPool)
stack.RegisterLifecycle(eth.localTxTracker)
}
// Permit the downloader to use the trie cache allowance during fast sync
cacheLimit := options.TrieCleanLimit + options.TrieDirtyLimit + options.SnapshotLimit
if eth.handler, err = newHandler(&handlerConfig{
NodeID: eth.p2pServer.Self().ID(),
Database: chainDb,
Chain: eth.blockchain,
TxPool: eth.txPool,
Network: networkID,
Sync: config.SyncMode,
BloomCache: uint64(cacheLimit),
EventMux: eth.eventMux,
RequiredBlocks: config.RequiredBlocks,
}); err != nil {
return nil, err
}
eth.dropper = newDropper(eth.p2pServer.MaxDialedConns(), eth.p2pServer.MaxInboundConns())
eth.miner = miner.New(eth, config.Miner, eth.engine)
eth.miner.SetExtra(makeExtraData(config.Miner.ExtraData))
eth.miner.SetPrioAddresses(config.TxPool.Locals)
eth.APIBackend = &EthAPIBackend{stack.Config().ExtRPCEnabled(), stack.Config().AllowUnprotectedTxs, eth, nil}
if eth.APIBackend.allowUnprotectedTxs {
log.Info("Unprotected transactions allowed")
}
eth.APIBackend.gpo = gasprice.NewOracle(eth.APIBackend, config.GPO, config.Miner.GasPrice)
// Start the RPC service
eth.netRPCService = ethapi.NewNetAPI(eth.p2pServer, networkID)
// Register the backend on the node
stack.RegisterAPIs(eth.APIs())
stack.RegisterProtocols(eth.Protocols())
stack.RegisterLifecycle(eth)
// Successful startup; push a marker and check previous unclean shutdowns.
eth.shutdownTracker.MarkStartup()
return eth, nil
}
func makeExtraData(extra []byte) []byte {
if len(extra) == 0 {
// create default extradata
extra, _ = rlp.EncodeToBytes([]interface{}{
uint(gethversion.Major<<16 | gethversion.Minor<<8 | gethversion.Patch),
"geth",
runtime.Version(),
runtime.GOOS,
})
}
if uint64(len(extra)) > params.MaximumExtraDataSize {
log.Warn("Miner extra data exceed limit", "extra", hexutil.Bytes(extra), "limit", params.MaximumExtraDataSize)
extra = nil
}
return extra
}
// APIs return the collection of RPC services the ethereum package offers.
// NOTE, some of these services probably need to be moved to somewhere else.
func (s *Ethereum) APIs() []rpc.API {
apis := ethapi.GetAPIs(s.APIBackend)
// Append all the local APIs and return
return append(apis, []rpc.API{
{
Namespace: "miner",
Service: NewMinerAPI(s),
}, {
Namespace: "eth",
Service: downloader.NewDownloaderAPI(s.handler.downloader, s.blockchain, s.eventMux),
}, {
Namespace: "admin",
Service: NewAdminAPI(s),
}, {
Namespace: "debug",
Service: NewDebugAPI(s),
}, {
Namespace: "net",
Service: s.netRPCService,
},
}...)
}
func (s *Ethereum) ResetWithGenesisBlock(gb *types.Block) {
s.blockchain.ResetWithGenesisBlock(gb)
}
func (s *Ethereum) Miner() *miner.Miner { return s.miner }
func (s *Ethereum) AccountManager() *accounts.Manager { return s.accountManager }
func (s *Ethereum) BlockChain() *core.BlockChain { return s.blockchain }
func (s *Ethereum) TxPool() *txpool.TxPool { return s.txPool }
func (s *Ethereum) BlobTxPool() *blobpool.BlobPool { return s.blobTxPool }
func (s *Ethereum) Engine() consensus.Engine { return s.engine }
func (s *Ethereum) ChainDb() ethdb.Database { return s.chainDb }
func (s *Ethereum) IsListening() bool { return true } // Always listening
func (s *Ethereum) Downloader() *downloader.Downloader { return s.handler.downloader }
func (s *Ethereum) Synced() bool { return s.handler.synced.Load() }
func (s *Ethereum) SetSynced() { s.handler.enableSyncedFeatures() }
func (s *Ethereum) ArchiveMode() bool { return s.config.NoPruning }
// Protocols returns all the currently configured
// network protocols to start.
func (s *Ethereum) Protocols() []p2p.Protocol {
protos := eth.MakeProtocols((*ethHandler)(s.handler), s.networkID, s.discmix)
if s.config.SnapshotCache > 0 {
protos = append(protos, snap.MakeProtocols((*snapHandler)(s.handler))...)
}
return protos
}
// Start implements node.Lifecycle, starting all internal goroutines needed by the
// Ethereum protocol implementation.
func (s *Ethereum) Start() error {
if err := s.setupDiscovery(); err != nil {
return err
}
// Regularly update shutdown marker
s.shutdownTracker.Start()
// Start the networking layer
s.handler.Start(s.p2pServer.MaxPeers)
// Start the connection manager
s.dropper.Start(s.p2pServer, func() bool { return !s.Synced() }, nil)
// start log indexer
s.filterMaps.Start()
go s.updateFilterMapsHeads()
return nil
}
func (s *Ethereum) newChainView(head *types.Header) *filtermaps.ChainView {
if head == nil {
return nil
}
return filtermaps.NewChainView(s.blockchain, head.Number.Uint64(), head.Hash())
}
func (s *Ethereum) updateFilterMapsHeads() {
headEventCh := make(chan core.ChainEvent, 10)
blockProcCh := make(chan bool, 10)
sub := s.blockchain.SubscribeChainEvent(headEventCh)
sub2 := s.blockchain.SubscribeBlockProcessingEvent(blockProcCh)
defer func() {
sub.Unsubscribe()
sub2.Unsubscribe()
for {
select {
case <-headEventCh:
case <-blockProcCh:
default:
return
}
}
}()
var head *types.Header
setHead := func(newHead *types.Header) {
if newHead == nil {
return
}
if head == nil || newHead.Hash() != head.Hash() {
head = newHead
chainView := s.newChainView(head)
if chainView == nil {
return
}
historyCutoff, _ := s.blockchain.HistoryPruningCutoff()
var finalBlock uint64
if fb := s.blockchain.CurrentFinalBlock(); fb != nil {
finalBlock = fb.Number.Uint64()
}
s.filterMaps.SetTarget(chainView, historyCutoff, finalBlock)
}
}
setHead(s.blockchain.CurrentBlock())
for {
select {
case ev := <-headEventCh:
setHead(ev.Header)
case blockProc := <-blockProcCh:
s.filterMaps.SetBlockProcessing(blockProc)
case <-time.After(time.Second * 10):
setHead(s.blockchain.CurrentBlock())
case ch := <-s.closeFilterMaps:
close(ch)
return
}
}
}
func (s *Ethereum) setupDiscovery() error {
eth.StartENRUpdater(s.blockchain, s.p2pServer.LocalNode())
// Add eth nodes from DNS.
dnsclient := dnsdisc.NewClient(dnsdisc.Config{})
if len(s.config.EthDiscoveryURLs) > 0 {
iter, err := dnsclient.NewIterator(s.config.EthDiscoveryURLs...)
if err != nil {
return err
}
s.discmix.AddSource(iter)
}
// Add snap nodes from DNS.
if len(s.config.SnapDiscoveryURLs) > 0 {
iter, err := dnsclient.NewIterator(s.config.SnapDiscoveryURLs...)
if err != nil {
return err
}
s.discmix.AddSource(iter)
}
// Add DHT nodes from discv4.
if s.p2pServer.DiscoveryV4() != nil {
iter := s.p2pServer.DiscoveryV4().RandomNodes()
resolverFunc := func(ctx context.Context, enr *enode.Node) *enode.Node {
// RequestENR does not yet support context. It will simply time out.
// If the ENR can't be resolved, RequestENR will return nil. We don't
// care about the specific error here, so we ignore it.
nn, _ := s.p2pServer.DiscoveryV4().RequestENR(enr)
return nn
}
iter = enode.AsyncFilter(iter, resolverFunc, maxParallelENRRequests)
iter = enode.Filter(iter, eth.NewNodeFilter(s.blockchain))
iter = enode.NewBufferIter(iter, discoveryPrefetchBuffer)
s.discmix.AddSource(iter)
}
// Add DHT nodes from discv5.
if s.p2pServer.DiscoveryV5() != nil {
filter := eth.NewNodeFilter(s.blockchain)
iter := enode.Filter(s.p2pServer.DiscoveryV5().RandomNodes(), filter)
iter = enode.NewBufferIter(iter, discoveryPrefetchBuffer)
s.discmix.AddSource(iter)
}
return nil
}
// Stop implements node.Lifecycle, terminating all internal goroutines used by the
// Ethereum protocol.
func (s *Ethereum) Stop() error {
// Stop all the peer-related stuff first.
s.discmix.Close()
s.dropper.Stop()
s.handler.Stop()
// Then stop everything else.
ch := make(chan struct{})
s.closeFilterMaps <- ch
<-ch
s.filterMaps.Stop()
s.txPool.Close()
s.blockchain.Stop()
s.engine.Close()
// Clean shutdown marker as the last thing before closing db
s.shutdownTracker.Stop()
s.chainDb.Close()
s.eventMux.Stop()
return nil
}