// Copyright 2014 The go-ethereum Authors // This file is part of the go-ethereum library. // // The go-ethereum library is free software: you can redistribute it and/or modify // it under the terms of the GNU Lesser General Public License as published by // the Free Software Foundation, either version 3 of the License, or // (at your option) any later version. // // The go-ethereum library is distributed in the hope that it will be useful, // but WITHOUT ANY WARRANTY; without even the implied warranty of // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the // GNU Lesser General Public License for more details. // // You should have received a copy of the GNU Lesser General Public License // along with the go-ethereum library. If not, see . // Package eth implements the Ethereum protocol. package eth import ( "context" "encoding/json" "fmt" "math" "math/big" "runtime" "sync" "time" "github.com/ethereum/go-ethereum/accounts" "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/common/hexutil" "github.com/ethereum/go-ethereum/consensus" "github.com/ethereum/go-ethereum/core" "github.com/ethereum/go-ethereum/core/filtermaps" "github.com/ethereum/go-ethereum/core/rawdb" "github.com/ethereum/go-ethereum/core/state/pruner" "github.com/ethereum/go-ethereum/core/txpool" "github.com/ethereum/go-ethereum/core/txpool/blobpool" "github.com/ethereum/go-ethereum/core/txpool/legacypool" "github.com/ethereum/go-ethereum/core/txpool/locals" "github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/core/vm" "github.com/ethereum/go-ethereum/eth/downloader" "github.com/ethereum/go-ethereum/eth/ethconfig" "github.com/ethereum/go-ethereum/eth/gasprice" "github.com/ethereum/go-ethereum/eth/protocols/eth" "github.com/ethereum/go-ethereum/eth/protocols/snap" "github.com/ethereum/go-ethereum/eth/tracers" "github.com/ethereum/go-ethereum/ethdb" "github.com/ethereum/go-ethereum/event" "github.com/ethereum/go-ethereum/internal/ethapi" "github.com/ethereum/go-ethereum/internal/shutdowncheck" "github.com/ethereum/go-ethereum/internal/version" "github.com/ethereum/go-ethereum/log" "github.com/ethereum/go-ethereum/miner" "github.com/ethereum/go-ethereum/node" "github.com/ethereum/go-ethereum/p2p" "github.com/ethereum/go-ethereum/p2p/dnsdisc" "github.com/ethereum/go-ethereum/p2p/enode" "github.com/ethereum/go-ethereum/params" "github.com/ethereum/go-ethereum/rlp" "github.com/ethereum/go-ethereum/rpc" gethversion "github.com/ethereum/go-ethereum/version" ) const ( // This is the fairness knob for the discovery mixer. When looking for peers, we'll // wait this long for a single source of candidates before moving on and trying other // sources. If this timeout expires, the source will be skipped in this round, but it // will continue to fetch in the background and will have a chance with a new timeout // in the next rounds, giving it overall more time but a proportionally smaller share. // We expect a normal source to produce ~10 candidates per second. discmixTimeout = 100 * time.Millisecond // discoveryPrefetchBuffer is the number of peers to pre-fetch from a discovery // source. It is useful to avoid the negative effects of potential longer timeouts // in the discovery, keeping dial progress while waiting for the next batch of // candidates. discoveryPrefetchBuffer = 32 // maxParallelENRRequests is the maximum number of parallel ENR requests that can be // performed by a disc/v4 source. maxParallelENRRequests = 16 ) // Config contains the configuration options of the ETH protocol. // Deprecated: use ethconfig.Config instead. type Config = ethconfig.Config // Ethereum implements the Ethereum full node service. type Ethereum struct { // core protocol objects config *ethconfig.Config txPool *txpool.TxPool blobTxPool *blobpool.BlobPool localTxTracker *locals.TxTracker blockchain *core.BlockChain handler *handler discmix *enode.FairMix dropper *dropper // DB interfaces chainDb ethdb.Database // Block chain database eventMux *event.TypeMux engine consensus.Engine accountManager *accounts.Manager filterMaps *filtermaps.FilterMaps closeFilterMaps chan chan struct{} APIBackend *EthAPIBackend miner *miner.Miner gasPrice *big.Int networkID uint64 netRPCService *ethapi.NetAPI p2pServer *p2p.Server lock sync.RWMutex // Protects the variadic fields (e.g. gas price and etherbase) shutdownTracker *shutdowncheck.ShutdownTracker // Tracks if and when the node has shutdown ungracefully } // New creates a new Ethereum object (including the initialisation of the common Ethereum object), // whose lifecycle will be managed by the provided node. func New(stack *node.Node, config *ethconfig.Config) (*Ethereum, error) { // Ensure configuration values are compatible and sane if !config.SyncMode.IsValid() { return nil, fmt.Errorf("invalid sync mode %d", config.SyncMode) } if !config.HistoryMode.IsValid() { return nil, fmt.Errorf("invalid history mode %d", config.HistoryMode) } if config.Miner.GasPrice == nil || config.Miner.GasPrice.Sign() <= 0 { log.Warn("Sanitizing invalid miner gas price", "provided", config.Miner.GasPrice, "updated", ethconfig.Defaults.Miner.GasPrice) config.Miner.GasPrice = new(big.Int).Set(ethconfig.Defaults.Miner.GasPrice) } if config.NoPruning && config.TrieDirtyCache > 0 && config.StateScheme == rawdb.HashScheme { if config.SnapshotCache > 0 { config.TrieCleanCache += config.TrieDirtyCache * 3 / 5 config.SnapshotCache += config.TrieDirtyCache * 2 / 5 } else { config.TrieCleanCache += config.TrieDirtyCache } config.TrieDirtyCache = 0 } log.Info("Allocated trie memory caches", "clean", common.StorageSize(config.TrieCleanCache)*1024*1024, "dirty", common.StorageSize(config.TrieDirtyCache)*1024*1024) dbOptions := node.DatabaseOptions{ Cache: config.DatabaseCache, Handles: config.DatabaseHandles, AncientsDirectory: config.DatabaseFreezer, EraDirectory: config.DatabaseEra, MetricsNamespace: "eth/db/chaindata/", } chainDb, err := stack.OpenDatabaseWithOptions("chaindata", dbOptions) if err != nil { return nil, err } scheme, err := rawdb.ParseStateScheme(config.StateScheme, chainDb) if err != nil { return nil, err } // Try to recover offline state pruning only in hash-based. if scheme == rawdb.HashScheme { if err := pruner.RecoverPruning(stack.ResolvePath(""), chainDb); err != nil { log.Error("Failed to recover state", "error", err) } } // Here we determine genesis hash and active ChainConfig. // We need these to figure out the consensus parameters and to set up history pruning. chainConfig, _, err := core.LoadChainConfig(chainDb, config.Genesis) if err != nil { return nil, err } engine, err := ethconfig.CreateConsensusEngine(chainConfig, chainDb) if err != nil { return nil, err } // Set networkID to chainID by default. networkID := config.NetworkId if networkID == 0 { networkID = chainConfig.ChainID.Uint64() } // Assemble the Ethereum object. eth := &Ethereum{ config: config, chainDb: chainDb, eventMux: stack.EventMux(), accountManager: stack.AccountManager(), engine: engine, networkID: networkID, gasPrice: config.Miner.GasPrice, p2pServer: stack.Server(), discmix: enode.NewFairMix(discmixTimeout), shutdownTracker: shutdowncheck.NewShutdownTracker(chainDb), } bcVersion := rawdb.ReadDatabaseVersion(chainDb) var dbVer = "" if bcVersion != nil { dbVer = fmt.Sprintf("%d", *bcVersion) } log.Info("Initialising Ethereum protocol", "network", networkID, "dbversion", dbVer) // Create BlockChain object. if !config.SkipBcVersionCheck { if bcVersion != nil && *bcVersion > core.BlockChainVersion { return nil, fmt.Errorf("database version is v%d, Geth %s only supports v%d", *bcVersion, version.WithMeta, core.BlockChainVersion) } else if bcVersion == nil || *bcVersion < core.BlockChainVersion { if bcVersion != nil { // only print warning on upgrade, not on init log.Warn("Upgrade blockchain database version", "from", dbVer, "to", core.BlockChainVersion) } rawdb.WriteDatabaseVersion(chainDb, core.BlockChainVersion) } } var ( options = &core.BlockChainConfig{ TrieCleanLimit: config.TrieCleanCache, NoPrefetch: config.NoPrefetch, TrieDirtyLimit: config.TrieDirtyCache, ArchiveMode: config.NoPruning, TrieTimeLimit: config.TrieTimeout, SnapshotLimit: config.SnapshotCache, Preimages: config.Preimages, StateHistory: config.StateHistory, TrienodeHistory: config.TrienodeHistory, NodeFullValueCheckpoint: config.NodeFullValueCheckpoint, StateScheme: scheme, ChainHistoryMode: config.HistoryMode, TxLookupLimit: int64(min(config.TransactionHistory, math.MaxInt64)), VmConfig: vm.Config{ EnablePreimageRecording: config.EnablePreimageRecording, EnableWitnessStats: config.EnableWitnessStats, StatelessSelfValidation: config.StatelessSelfValidation, }, // Enables file journaling for the trie database. The journal files will be stored // within the data directory. The corresponding paths will be either: // - DATADIR/triedb/merkle.journal // - DATADIR/triedb/verkle.journal TrieJournalDirectory: stack.ResolvePath("triedb"), StateSizeTracking: config.EnableStateSizeTracking, SlowBlockThreshold: config.SlowBlockThreshold, } ) if config.VMTrace != "" { traceConfig := json.RawMessage("{}") if config.VMTraceJsonConfig != "" { traceConfig = json.RawMessage(config.VMTraceJsonConfig) } t, err := tracers.LiveDirectory.New(config.VMTrace, traceConfig) if err != nil { return nil, fmt.Errorf("failed to create tracer %s: %v", config.VMTrace, err) } options.VmConfig.Tracer = t } // Override the chain config with provided settings. var overrides core.ChainOverrides if config.OverrideOsaka != nil { overrides.OverrideOsaka = config.OverrideOsaka } if config.OverrideBPO1 != nil { overrides.OverrideBPO1 = config.OverrideBPO1 } if config.OverrideBPO2 != nil { overrides.OverrideBPO2 = config.OverrideBPO2 } if config.OverrideVerkle != nil { overrides.OverrideVerkle = config.OverrideVerkle } options.Overrides = &overrides eth.blockchain, err = core.NewBlockChain(chainDb, config.Genesis, eth.engine, options) if err != nil { return nil, err } // Initialize filtermaps log index. fmConfig := filtermaps.Config{ History: config.LogHistory, Disabled: config.LogNoHistory, ExportFileName: config.LogExportCheckpoints, HashScheme: scheme == rawdb.HashScheme, } chainView := eth.newChainView(eth.blockchain.CurrentBlock()) historyCutoff, _ := eth.blockchain.HistoryPruningCutoff() var finalBlock uint64 if fb := eth.blockchain.CurrentFinalBlock(); fb != nil { finalBlock = fb.Number.Uint64() } filterMaps, err := filtermaps.NewFilterMaps(chainDb, chainView, historyCutoff, finalBlock, filtermaps.DefaultParams, fmConfig) if err != nil { return nil, err } eth.filterMaps = filterMaps eth.closeFilterMaps = make(chan chan struct{}) // TxPool if config.TxPool.Journal != "" { config.TxPool.Journal = stack.ResolvePath(config.TxPool.Journal) } legacyPool := legacypool.New(config.TxPool, eth.blockchain) if config.BlobPool.Datadir != "" { config.BlobPool.Datadir = stack.ResolvePath(config.BlobPool.Datadir) } eth.blobTxPool = blobpool.New(config.BlobPool, eth.blockchain, legacyPool.HasPendingAuth) eth.txPool, err = txpool.New(config.TxPool.PriceLimit, eth.blockchain, []txpool.SubPool{legacyPool, eth.blobTxPool}) if err != nil { return nil, err } if !config.TxPool.NoLocals { rejournal := config.TxPool.Rejournal if rejournal < time.Second { log.Warn("Sanitizing invalid txpool journal time", "provided", rejournal, "updated", time.Second) rejournal = time.Second } eth.localTxTracker = locals.New(config.TxPool.Journal, rejournal, eth.blockchain.Config(), eth.txPool) stack.RegisterLifecycle(eth.localTxTracker) } // Permit the downloader to use the trie cache allowance during fast sync cacheLimit := options.TrieCleanLimit + options.TrieDirtyLimit + options.SnapshotLimit if eth.handler, err = newHandler(&handlerConfig{ NodeID: eth.p2pServer.Self().ID(), Database: chainDb, Chain: eth.blockchain, TxPool: eth.txPool, Network: networkID, Sync: config.SyncMode, BloomCache: uint64(cacheLimit), EventMux: eth.eventMux, RequiredBlocks: config.RequiredBlocks, }); err != nil { return nil, err } eth.dropper = newDropper(eth.p2pServer.MaxDialedConns(), eth.p2pServer.MaxInboundConns()) eth.miner = miner.New(eth, config.Miner, eth.engine) eth.miner.SetExtra(makeExtraData(config.Miner.ExtraData)) eth.miner.SetPrioAddresses(config.TxPool.Locals) eth.APIBackend = &EthAPIBackend{stack.Config().ExtRPCEnabled(), stack.Config().AllowUnprotectedTxs, eth, nil} if eth.APIBackend.allowUnprotectedTxs { log.Info("Unprotected transactions allowed") } eth.APIBackend.gpo = gasprice.NewOracle(eth.APIBackend, config.GPO, config.Miner.GasPrice) // Start the RPC service eth.netRPCService = ethapi.NewNetAPI(eth.p2pServer, networkID) // Register the backend on the node stack.RegisterAPIs(eth.APIs()) stack.RegisterProtocols(eth.Protocols()) stack.RegisterLifecycle(eth) // Successful startup; push a marker and check previous unclean shutdowns. eth.shutdownTracker.MarkStartup() return eth, nil } func makeExtraData(extra []byte) []byte { if len(extra) == 0 { // create default extradata extra, _ = rlp.EncodeToBytes([]interface{}{ uint(gethversion.Major<<16 | gethversion.Minor<<8 | gethversion.Patch), "geth", runtime.Version(), runtime.GOOS, }) } if uint64(len(extra)) > params.MaximumExtraDataSize { log.Warn("Miner extra data exceed limit", "extra", hexutil.Bytes(extra), "limit", params.MaximumExtraDataSize) extra = nil } return extra } // APIs return the collection of RPC services the ethereum package offers. // NOTE, some of these services probably need to be moved to somewhere else. func (s *Ethereum) APIs() []rpc.API { apis := ethapi.GetAPIs(s.APIBackend) // Append all the local APIs and return return append(apis, []rpc.API{ { Namespace: "miner", Service: NewMinerAPI(s), }, { Namespace: "eth", Service: downloader.NewDownloaderAPI(s.handler.downloader, s.blockchain, s.eventMux), }, { Namespace: "admin", Service: NewAdminAPI(s), }, { Namespace: "debug", Service: NewDebugAPI(s), }, { Namespace: "net", Service: s.netRPCService, }, }...) } func (s *Ethereum) ResetWithGenesisBlock(gb *types.Block) { s.blockchain.ResetWithGenesisBlock(gb) } func (s *Ethereum) Miner() *miner.Miner { return s.miner } func (s *Ethereum) AccountManager() *accounts.Manager { return s.accountManager } func (s *Ethereum) BlockChain() *core.BlockChain { return s.blockchain } func (s *Ethereum) TxPool() *txpool.TxPool { return s.txPool } func (s *Ethereum) BlobTxPool() *blobpool.BlobPool { return s.blobTxPool } func (s *Ethereum) Engine() consensus.Engine { return s.engine } func (s *Ethereum) ChainDb() ethdb.Database { return s.chainDb } func (s *Ethereum) IsListening() bool { return true } // Always listening func (s *Ethereum) Downloader() *downloader.Downloader { return s.handler.downloader } func (s *Ethereum) Synced() bool { return s.handler.synced.Load() } func (s *Ethereum) SetSynced() { s.handler.enableSyncedFeatures() } func (s *Ethereum) ArchiveMode() bool { return s.config.NoPruning } // Protocols returns all the currently configured // network protocols to start. func (s *Ethereum) Protocols() []p2p.Protocol { protos := eth.MakeProtocols((*ethHandler)(s.handler), s.networkID, s.discmix) if s.config.SnapshotCache > 0 { protos = append(protos, snap.MakeProtocols((*snapHandler)(s.handler))...) } return protos } // Start implements node.Lifecycle, starting all internal goroutines needed by the // Ethereum protocol implementation. func (s *Ethereum) Start() error { if err := s.setupDiscovery(); err != nil { return err } // Regularly update shutdown marker s.shutdownTracker.Start() // Start the networking layer s.handler.Start(s.p2pServer.MaxPeers) // Start the connection manager s.dropper.Start(s.p2pServer, func() bool { return !s.Synced() }) // start log indexer s.filterMaps.Start() go s.updateFilterMapsHeads() return nil } func (s *Ethereum) newChainView(head *types.Header) *filtermaps.ChainView { if head == nil { return nil } return filtermaps.NewChainView(s.blockchain, head.Number.Uint64(), head.Hash()) } func (s *Ethereum) updateFilterMapsHeads() { headEventCh := make(chan core.ChainEvent, 10) blockProcCh := make(chan bool, 10) sub := s.blockchain.SubscribeChainEvent(headEventCh) sub2 := s.blockchain.SubscribeBlockProcessingEvent(blockProcCh) defer func() { sub.Unsubscribe() sub2.Unsubscribe() for { select { case <-headEventCh: case <-blockProcCh: default: return } } }() var head *types.Header setHead := func(newHead *types.Header) { if newHead == nil { return } if head == nil || newHead.Hash() != head.Hash() { head = newHead chainView := s.newChainView(head) if chainView == nil { return } historyCutoff, _ := s.blockchain.HistoryPruningCutoff() var finalBlock uint64 if fb := s.blockchain.CurrentFinalBlock(); fb != nil { finalBlock = fb.Number.Uint64() } s.filterMaps.SetTarget(chainView, historyCutoff, finalBlock) } } setHead(s.blockchain.CurrentBlock()) for { select { case ev := <-headEventCh: setHead(ev.Header) case blockProc := <-blockProcCh: s.filterMaps.SetBlockProcessing(blockProc) case <-time.After(time.Second * 10): setHead(s.blockchain.CurrentBlock()) case ch := <-s.closeFilterMaps: close(ch) return } } } func (s *Ethereum) setupDiscovery() error { eth.StartENRUpdater(s.blockchain, s.p2pServer.LocalNode()) // Add eth nodes from DNS. dnsclient := dnsdisc.NewClient(dnsdisc.Config{}) if len(s.config.EthDiscoveryURLs) > 0 { iter, err := dnsclient.NewIterator(s.config.EthDiscoveryURLs...) if err != nil { return err } s.discmix.AddSource(iter) } // Add snap nodes from DNS. if len(s.config.SnapDiscoveryURLs) > 0 { iter, err := dnsclient.NewIterator(s.config.SnapDiscoveryURLs...) if err != nil { return err } s.discmix.AddSource(iter) } // Add DHT nodes from discv4. if s.p2pServer.DiscoveryV4() != nil { iter := s.p2pServer.DiscoveryV4().RandomNodes() resolverFunc := func(ctx context.Context, enr *enode.Node) *enode.Node { // RequestENR does not yet support context. It will simply time out. // If the ENR can't be resolved, RequestENR will return nil. We don't // care about the specific error here, so we ignore it. nn, _ := s.p2pServer.DiscoveryV4().RequestENR(enr) return nn } iter = enode.AsyncFilter(iter, resolverFunc, maxParallelENRRequests) iter = enode.Filter(iter, eth.NewNodeFilter(s.blockchain)) iter = enode.NewBufferIter(iter, discoveryPrefetchBuffer) s.discmix.AddSource(iter) } // Add DHT nodes from discv5. if s.p2pServer.DiscoveryV5() != nil { filter := eth.NewNodeFilter(s.blockchain) iter := enode.Filter(s.p2pServer.DiscoveryV5().RandomNodes(), filter) iter = enode.NewBufferIter(iter, discoveryPrefetchBuffer) s.discmix.AddSource(iter) } return nil } // Stop implements node.Lifecycle, terminating all internal goroutines used by the // Ethereum protocol. func (s *Ethereum) Stop() error { // Stop all the peer-related stuff first. s.discmix.Close() s.dropper.Stop() s.handler.Stop() // Then stop everything else. ch := make(chan struct{}) s.closeFilterMaps <- ch <-ch s.filterMaps.Stop() s.txPool.Close() s.blockchain.Stop() s.engine.Close() // Clean shutdown marker as the last thing before closing db s.shutdownTracker.Stop() s.chainDb.Close() s.eventMux.Stop() return nil }