eth,node: replace the deprecated TypeMux with Feed (#32585)
Some checks are pending
/ Linux Build (push) Waiting to run
/ Linux Build (arm) (push) Waiting to run
/ Keeper Build (push) Waiting to run
/ Windows Build (push) Waiting to run
/ Docker Image (push) Waiting to run

replace the not used event.Typemux to event.Feed

---------

Co-authored-by: Felix Lange <fjl@twurst.com>
This commit is contained in:
Delweng 2026-05-08 10:12:46 +08:00 committed by GitHub
parent e71098ba4e
commit 1abbae239d
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
11 changed files with 140 additions and 141 deletions

View file

@ -38,6 +38,7 @@ import (
"github.com/ethereum/go-ethereum/crypto"
"github.com/ethereum/go-ethereum/eth/catalyst"
"github.com/ethereum/go-ethereum/eth/ethconfig"
"github.com/ethereum/go-ethereum/eth/syncer"
"github.com/ethereum/go-ethereum/internal/flags"
"github.com/ethereum/go-ethereum/internal/telemetry/tracesetup"
"github.com/ethereum/go-ethereum/internal/version"
@ -276,16 +277,19 @@ func makeFullNode(ctx *cli.Context) *node.Node {
if cfg.Ethstats.URL != "" {
utils.RegisterEthStatsService(stack, backend, cfg.Ethstats.URL)
}
// Configure synchronization override service
var synctarget common.Hash
syncConfig := syncer.Config{
ExitWhenSynced: ctx.Bool(utils.ExitWhenSyncedFlag.Name),
}
if ctx.IsSet(utils.SyncTargetFlag.Name) {
target := ctx.String(utils.SyncTargetFlag.Name)
if !common.IsHexHash(target) {
utils.Fatalf("sync target hash is not a valid hex hash: %s", target)
}
synctarget = common.HexToHash(target)
syncConfig.TargetBlock = common.HexToHash(target)
}
utils.RegisterSyncOverrideService(stack, eth, synctarget, ctx.Bool(utils.ExitWhenSyncedFlag.Name))
utils.RegisterSyncOverrideService(stack, eth, syncConfig)
if ctx.IsSet(utils.DeveloperFlag.Name) {
// Start dev mode.

View file

@ -22,13 +22,10 @@ import (
"os"
"slices"
"sort"
"time"
"github.com/ethereum/go-ethereum/accounts"
"github.com/ethereum/go-ethereum/cmd/utils"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/console/prompt"
"github.com/ethereum/go-ethereum/eth/downloader"
"github.com/ethereum/go-ethereum/ethclient"
"github.com/ethereum/go-ethereum/internal/debug"
"github.com/ethereum/go-ethereum/internal/flags"
@ -387,28 +384,4 @@ func startNode(ctx *cli.Context, stack *node.Node, isConsole bool) {
}
}
}()
// Spawn a standalone goroutine for status synchronization monitoring,
// close the node when synchronization is complete if user required.
if ctx.Bool(utils.ExitWhenSyncedFlag.Name) {
go func() {
sub := stack.EventMux().Subscribe(downloader.DoneEvent{})
defer sub.Unsubscribe()
for {
event := <-sub.Chan()
if event == nil {
continue
}
done, ok := event.Data.(downloader.DoneEvent)
if !ok {
continue
}
if timestamp := time.Unix(int64(done.Latest.Time), 0); time.Since(timestamp) < 10*time.Minute {
log.Info("Synchronisation completed", "latestnum", done.Latest.Number, "latesthash", done.Latest.Hash(),
"age", common.PrettyAge(timestamp))
stack.Close()
}
}
}()
}
}

View file

@ -2237,13 +2237,13 @@ func RegisterFilterAPI(stack *node.Node, backend ethapi.Backend, ethcfg *ethconf
}
// RegisterSyncOverrideService adds the synchronization override service into node.
func RegisterSyncOverrideService(stack *node.Node, eth *eth.Ethereum, target common.Hash, exitWhenSynced bool) {
if target != (common.Hash{}) {
log.Info("Registered sync override service", "hash", target, "exitWhenSynced", exitWhenSynced)
func RegisterSyncOverrideService(stack *node.Node, eth *eth.Ethereum, config syncer.Config) {
if config.TargetBlock != (common.Hash{}) {
log.Info("Registered sync override service", "hash", config.TargetBlock, "exitWhenSynced", config.ExitWhenSynced)
} else {
log.Info("Registered sync override service")
}
syncer.Register(stack, eth, target, exitWhenSynced)
syncer.Register(stack, eth, config)
}
// SetupMetrics configures the metrics system.

View file

@ -49,7 +49,6 @@ import (
"github.com/ethereum/go-ethereum/eth/protocols/snap"
"github.com/ethereum/go-ethereum/eth/tracers"
"github.com/ethereum/go-ethereum/ethdb"
"github.com/ethereum/go-ethereum/event"
"github.com/ethereum/go-ethereum/internal/ethapi"
"github.com/ethereum/go-ethereum/internal/shutdowncheck"
"github.com/ethereum/go-ethereum/internal/version"
@ -105,7 +104,6 @@ type Ethereum struct {
// DB interfaces
chainDb ethdb.Database // Block chain database
eventMux *event.TypeMux
engine consensus.Engine
accountManager *accounts.Manager
@ -194,7 +192,6 @@ func New(stack *node.Node, config *ethconfig.Config) (*Ethereum, error) {
eth := &Ethereum{
config: config,
chainDb: chainDb,
eventMux: stack.EventMux(),
accountManager: stack.AccountManager(),
engine: engine,
networkID: networkID,
@ -344,7 +341,6 @@ func New(stack *node.Node, config *ethconfig.Config) (*Ethereum, error) {
Network: networkID,
Sync: config.SyncMode,
BloomCache: uint64(cacheLimit),
EventMux: eth.eventMux,
RequiredBlocks: config.RequiredBlocks,
}); err != nil {
return nil, err
@ -405,7 +401,7 @@ func (s *Ethereum) APIs() []rpc.API {
Service: NewMinerAPI(s),
}, {
Namespace: "eth",
Service: downloader.NewDownloaderAPI(s.handler.downloader, s.blockchain, s.eventMux),
Service: downloader.NewDownloaderAPI(s.handler.downloader, s.blockchain),
}, {
Namespace: "admin",
Service: NewAdminAPI(s),
@ -600,7 +596,6 @@ func (s *Ethereum) Stop() error {
s.shutdownTracker.Stop()
s.chainDb.Close()
s.eventMux.Stop()
return nil
}

View file

@ -23,7 +23,6 @@ import (
"github.com/ethereum/go-ethereum"
"github.com/ethereum/go-ethereum/core"
"github.com/ethereum/go-ethereum/event"
"github.com/ethereum/go-ethereum/rpc"
)
@ -33,20 +32,18 @@ import (
type DownloaderAPI struct {
d *Downloader
chain *core.BlockChain
mux *event.TypeMux
installSyncSubscription chan chan interface{}
uninstallSyncSubscription chan *uninstallSyncSubscriptionRequest
}
// NewDownloaderAPI creates a new DownloaderAPI. The API has an internal event loop that
// listens for events from the downloader through the global event mux. In case it receives one of
// listens for events from the downloader through the event feed. In case it receives one of
// these events it broadcasts it to all syncing subscriptions that are installed through the
// installSyncSubscription channel.
func NewDownloaderAPI(d *Downloader, chain *core.BlockChain, m *event.TypeMux) *DownloaderAPI {
func NewDownloaderAPI(d *Downloader, chain *core.BlockChain) *DownloaderAPI {
api := &DownloaderAPI{
d: d,
chain: chain,
mux: m,
installSyncSubscription: make(chan chan interface{}),
uninstallSyncSubscription: make(chan *uninstallSyncSubscriptionRequest),
}
@ -66,7 +63,8 @@ func NewDownloaderAPI(d *Downloader, chain *core.BlockChain, m *event.TypeMux) *
// receive is {false}.
func (api *DownloaderAPI) eventLoop() {
var (
sub = api.mux.Subscribe(StartEvent{})
events = make(chan SyncEvent, 16)
sub = api.d.SubscribeSyncEvents(events)
syncSubscriptions = make(map[chan interface{}]struct{})
checkInterval = time.Second * 60
checkTimer = time.NewTimer(checkInterval)
@ -90,6 +88,7 @@ func (api *DownloaderAPI) eventLoop() {
}
)
defer checkTimer.Stop()
defer sub.Unsubscribe()
for {
select {
@ -101,14 +100,13 @@ func (api *DownloaderAPI) eventLoop() {
case u := <-api.uninstallSyncSubscription:
delete(syncSubscriptions, u.c)
close(u.uninstalled)
case event := <-sub.Chan():
if event == nil {
return
}
switch event.Data.(type) {
case StartEvent:
case ev := <-events:
if ev.Type == SyncStarted {
started = true
}
case <-sub.Err():
// The downloader is terminated or other internal error occurs
return
case <-checkTimer.C:
if !started {
checkTimer.Reset(checkInterval)

View file

@ -97,9 +97,12 @@ type headerTask struct {
}
type Downloader struct {
mode atomic.Uint32 // Synchronisation mode defining the strategy used (per sync cycle), use d.getMode() to get the SyncMode
moder *syncModer // Sync mode management, deliver the appropriate sync mode choice for each cycle
mux *event.TypeMux // Event multiplexer to announce sync operation events
mode atomic.Uint32 // Synchronisation mode defining the strategy used (per sync cycle), use d.getMode() to get the SyncMode
moder *syncModer // Sync mode management, deliver the appropriate sync mode choice for each cycle
// Event feed for downloader events
feed event.FeedOf[SyncEvent]
scope event.SubscriptionScope
queue *queue // Scheduler for selecting the hashes to download
peers *peerSet // Set of active peers from which download can proceed
@ -229,12 +232,11 @@ type BlockChain interface {
}
// New creates a new downloader to fetch hashes and blocks from remote peers.
func New(stateDb ethdb.Database, mode ethconfig.SyncMode, mux *event.TypeMux, chain BlockChain, dropPeer peerDropFn, success func()) *Downloader {
func New(stateDb ethdb.Database, mode ethconfig.SyncMode, chain BlockChain, dropPeer peerDropFn, success func()) *Downloader {
cutoffNumber, cutoffHash := chain.HistoryPruningCutoff()
dl := &Downloader{
stateDB: stateDb,
moder: newSyncModer(mode, chain, stateDb),
mux: mux,
queue: newQueue(blockCacheMaxItems, blockCacheInitialItems),
peers: newPeerSet(),
blockchain: chain,
@ -427,20 +429,25 @@ func (d *Downloader) ConfigSyncMode() SyncMode {
return d.moder.get(false)
}
// SubscribeSyncEvents creates a subscription for downloader sync events
func (d *Downloader) SubscribeSyncEvents(ch chan<- SyncEvent) event.Subscription {
return d.scope.Track(d.feed.Subscribe(ch))
}
// syncToHead starts a block synchronization based on the hash chain from
// the specified head hash.
func (d *Downloader) syncToHead() (err error) {
d.mux.Post(StartEvent{})
mode := d.getMode()
d.feed.Send(SyncEvent{Type: SyncStarted, Mode: mode})
defer func() {
// reset on error
if err != nil {
d.mux.Post(FailedEvent{err})
d.feed.Send(SyncEvent{Type: SyncFailed, Mode: mode, Err: err})
} else {
latest := d.blockchain.CurrentHeader()
d.mux.Post(DoneEvent{latest})
d.feed.Send(SyncEvent{Type: SyncCompleted, Mode: mode, Latest: latest})
}
}()
mode := d.getMode()
log.Debug("Backfilling with the network", "mode", mode)
defer func(start time.Time) {
@ -662,6 +669,9 @@ func (d *Downloader) Cancel() {
// Terminate interrupts the downloader, canceling all pending operations.
// The downloader cannot be reused after calling Terminate.
func (d *Downloader) Terminate() {
// Unsubscribe all subscriptions registered from downloader
d.scope.Close()
// Close the termination channel (make sure double close is allowed)
d.quitLock.Lock()
select {

View file

@ -32,7 +32,6 @@ import (
"github.com/ethereum/go-ethereum/eth/ethconfig"
"github.com/ethereum/go-ethereum/eth/protocols/eth"
"github.com/ethereum/go-ethereum/eth/protocols/snap"
"github.com/ethereum/go-ethereum/event"
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/params"
"github.com/ethereum/go-ethereum/rlp"
@ -75,7 +74,7 @@ func newTesterWithNotification(t *testing.T, mode ethconfig.SyncMode, success fu
chain: chain,
peers: make(map[string]*downloadTesterPeer),
}
tester.downloader = New(db, mode, new(event.TypeMux), tester.chain, tester.dropPeer, success)
tester.downloader = New(db, mode, tester.chain, tester.dropPeer, success)
return tester
}

View file

@ -16,10 +16,24 @@
package downloader
import "github.com/ethereum/go-ethereum/core/types"
import (
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/eth/ethconfig"
)
type DoneEvent struct {
Latest *types.Header
// SyncEventType represents the type of sync event
type SyncEventType int
const (
SyncStarted SyncEventType = iota
SyncFailed
SyncCompleted
)
// SyncEvent represents a downloader synchronization event
type SyncEvent struct {
Type SyncEventType
Mode ethconfig.SyncMode
Err error // Set when Type is SyncFailed
Latest *types.Header // Set when Type is SyncCompleted
}
type StartEvent struct{}
type FailedEvent struct{ Err error }

View file

@ -107,7 +107,6 @@ type handlerConfig struct {
Network uint64 // Network identifier to advertise
Sync ethconfig.SyncMode // Whether to snap or full sync
BloomCache uint64 // Megabytes to alloc for snap sync bloom
EventMux *event.TypeMux // Legacy event mux, deprecate for `feed`
RequiredBlocks map[uint64]common.Hash // Hard coded map of required block hashes for sync challenges
}
@ -126,7 +125,6 @@ type handler struct {
peers *peerSet
txBroadcastKey [16]byte
eventMux *event.TypeMux
txsCh chan core.NewTxsEvent
txsSub event.Subscription
blockRange *blockRangeState
@ -144,14 +142,9 @@ type handler struct {
// newHandler returns a handler for all Ethereum chain management protocol.
func newHandler(config *handlerConfig) (*handler, error) {
// Create the protocol manager with the base fields
if config.EventMux == nil {
config.EventMux = new(event.TypeMux) // Nicety initialization for tests
}
h := &handler{
nodeID: config.NodeID,
networkID: config.Network,
eventMux: config.EventMux,
database: config.Database,
txpool: config.TxPool,
chain: config.Chain,
@ -163,7 +156,7 @@ func newHandler(config *handlerConfig) (*handler, error) {
handlerStartCh: make(chan struct{}),
}
// Construct the downloader (long sync)
h.downloader = downloader.New(config.Database, config.Sync, h.eventMux, h.chain, h.removePeer, h.enableSyncedFeatures)
h.downloader = downloader.New(config.Database, config.Sync, h.chain, h.removePeer, h.enableSyncedFeatures)
// If snap sync is requested but snapshots are disabled, fail loudly
if h.downloader.ConfigSyncMode() == ethconfig.SnapSync && (config.Chain.Snapshots() == nil && config.Chain.TrieDB().Scheme() == rawdb.HashScheme) {
@ -420,7 +413,7 @@ func (h *handler) Start(maxPeers int) {
// broadcast block range
h.wg.Add(1)
h.blockRange = newBlockRangeState(h.chain, h.eventMux)
h.blockRange = newBlockRangeState(h.chain, h.downloader)
go h.blockRangeLoop(h.blockRange)
// start sync handlers
@ -536,16 +529,19 @@ type blockRangeState struct {
next atomic.Pointer[eth.BlockRangeUpdatePacket]
headCh chan core.ChainHeadEvent
headSub event.Subscription
syncSub *event.TypeMuxSubscription
syncCh chan downloader.SyncEvent
syncSub event.Subscription
}
func newBlockRangeState(chain *core.BlockChain, typeMux *event.TypeMux) *blockRangeState {
func newBlockRangeState(chain *core.BlockChain, dl *downloader.Downloader) *blockRangeState {
headCh := make(chan core.ChainHeadEvent, chainHeadChanSize)
headSub := chain.SubscribeChainHeadEvent(headCh)
syncSub := typeMux.Subscribe(downloader.StartEvent{}, downloader.DoneEvent{}, downloader.FailedEvent{})
syncCh := make(chan downloader.SyncEvent, 16)
syncSub := dl.SubscribeSyncEvents(syncCh)
st := &blockRangeState{
headCh: headCh,
headSub: headSub,
syncCh: syncCh,
syncSub: syncSub,
}
st.update(chain, chain.CurrentBlock())
@ -561,11 +557,8 @@ func (h *handler) blockRangeLoop(st *blockRangeState) {
for {
select {
case ev := <-st.syncSub.Chan():
if ev == nil {
continue
}
if _, ok := ev.Data.(downloader.StartEvent); ok && h.downloader.ConfigSyncMode() == ethconfig.SnapSync {
case ev := <-st.syncCh:
if ev.Type == downloader.SyncStarted && ev.Mode == ethconfig.SnapSync {
h.blockRangeWhileSnapSyncing(st)
}
case <-st.headCh:
@ -593,12 +586,8 @@ func (h *handler) blockRangeWhileSnapSyncing(st *blockRangeState) {
h.broadcastBlockRange(st)
}
// back to processing head block updates when sync is done
case ev := <-st.syncSub.Chan():
if ev == nil {
continue
}
switch ev.Data.(type) {
case downloader.FailedEvent, downloader.DoneEvent:
case ev := <-st.syncCh:
if ev.Type == downloader.SyncFailed || ev.Type == downloader.SyncCompleted {
return
}
// ignore head updates, but exit when the subscription ends

View file

@ -26,6 +26,7 @@ import (
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/eth"
"github.com/ethereum/go-ethereum/eth/downloader"
"github.com/ethereum/go-ethereum/eth/ethconfig"
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/node"
@ -37,32 +38,40 @@ type syncReq struct {
errc chan error
}
type Config struct {
TargetBlock common.Hash // if set, sync is triggered at startup
ExitWhenSynced bool // if true, the node shuts down after sync has finished
}
// Syncer is an auxiliary service that allows Geth to perform full sync
// alone without consensus-layer attached. Users must specify a valid block hash
// as the sync target.
//
// Additionally, the syncer can be used to monitor state synchronization.
// It will exit once the specified target has been reached or when the
// most recent chain head is caught up.
//
// This tool can be applied to different networks, no matter it's pre-merge or
// post-merge, but only for full-sync.
type Syncer struct {
stack *node.Node
backend *eth.Ethereum
target common.Hash
request chan *syncReq
closed chan struct{}
wg sync.WaitGroup
exitWhenSynced bool
stack *node.Node
backend *eth.Ethereum
request chan *syncReq
closed chan struct{}
wg sync.WaitGroup
config Config
}
// Register registers the synchronization override service into the node
// stack for launching and stopping the service controlled by node.
func Register(stack *node.Node, backend *eth.Ethereum, target common.Hash, exitWhenSynced bool) (*Syncer, error) {
func Register(stack *node.Node, backend *eth.Ethereum, cfg Config) (*Syncer, error) {
s := &Syncer{
stack: stack,
backend: backend,
target: target,
request: make(chan *syncReq),
closed: make(chan struct{}),
exitWhenSynced: exitWhenSynced,
stack: stack,
backend: backend,
request: make(chan *syncReq),
closed: make(chan struct{}),
config: cfg,
}
stack.RegisterAPIs(s.APIs())
stack.RegisterLifecycle(s)
@ -88,9 +97,11 @@ func (s *Syncer) run() {
var (
target *types.Header
ticker = time.NewTicker(time.Second * 5)
syncCh = make(chan downloader.SyncEvent, 10)
)
defer ticker.Stop()
sub := s.backend.Downloader().SubscribeSyncEvents(syncCh)
defer sub.Unsubscribe()
for {
select {
case req := <-s.request:
@ -137,35 +148,50 @@ func (s *Syncer) run() {
}
}
case <-ticker.C:
if target == nil {
case ev := <-syncCh:
if ev.Type == downloader.SyncStarted {
log.Debug("Synchronization started")
continue
}
if ev.Type == downloader.SyncFailed {
log.Debug("Synchronization failed", "err", ev.Err)
continue
}
head := s.backend.BlockChain().CurrentHeader()
if head != nil {
// Set the finalized and safe markers relative to the current head.
// The finalized marker is set two epochs behind the target,
// and the safe marker is set one epoch behind the target.
if header := s.backend.BlockChain().GetHeaderByNumber(head.Number.Uint64() - params.EpochLength*2); header != nil {
if final := s.backend.BlockChain().CurrentFinalBlock(); final == nil || final.Number.Cmp(header.Number) < 0 {
s.backend.BlockChain().SetFinalized(header)
}
}
if header := s.backend.BlockChain().GetHeaderByNumber(head.Number.Uint64() - params.EpochLength); header != nil {
if safe := s.backend.BlockChain().CurrentSafeBlock(); safe == nil || safe.Number.Cmp(header.Number) < 0 {
s.backend.BlockChain().SetSafe(header)
}
}
}
// Terminate the node if the target has been reached
if s.exitWhenSynced {
if block := s.backend.BlockChain().GetBlockByHash(target.Hash()); block != nil {
log.Info("Sync target reached", "number", block.NumberU64(), "hash", block.Hash())
go s.stack.Close() // async since we need to close ourselves
return
if s.config.ExitWhenSynced {
var synced bool
var block *types.Header
if target != nil {
tb := s.backend.BlockChain().GetBlockByHash(target.Hash())
synced = tb != nil
block = tb.Header()
} else {
timestamp := time.Unix(int64(ev.Latest.Time), 0)
synced = time.Since(timestamp) < 10*time.Minute
block = ev.Latest
}
}
// Set the finalized and safe markers relative to the current head.
// The finalized marker is set two epochs behind the target,
// and the safe marker is set one epoch behind the target.
head := s.backend.BlockChain().CurrentHeader()
if head == nil {
continue
}
if header := s.backend.BlockChain().GetHeaderByNumber(head.Number.Uint64() - params.EpochLength*2); header != nil {
if final := s.backend.BlockChain().CurrentFinalBlock(); final == nil || final.Number.Cmp(header.Number) < 0 {
s.backend.BlockChain().SetFinalized(header)
}
}
if header := s.backend.BlockChain().GetHeaderByNumber(head.Number.Uint64() - params.EpochLength); header != nil {
if safe := s.backend.BlockChain().CurrentSafeBlock(); safe == nil || safe.Number.Cmp(header.Number) < 0 {
s.backend.BlockChain().SetSafe(header)
if synced {
log.Info("Sync target reached", "number", block.Number.Uint64(), "hash", block.Hash())
go s.stack.Close() // async since we need to close ourselves
}
}
@ -179,10 +205,10 @@ func (s *Syncer) run() {
func (s *Syncer) Start() error {
s.wg.Add(1)
go s.run()
if s.target == (common.Hash{}) {
if s.config.TargetBlock == (common.Hash{}) {
return nil
}
return s.Sync(s.target)
return s.Sync(s.config.TargetBlock)
}
// Stop terminates the synchronization service and stop all background activities.

View file

@ -35,7 +35,6 @@ import (
"github.com/ethereum/go-ethereum/core/rawdb"
"github.com/ethereum/go-ethereum/ethdb"
"github.com/ethereum/go-ethereum/ethdb/memorydb"
"github.com/ethereum/go-ethereum/event"
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/p2p"
"github.com/ethereum/go-ethereum/rpc"
@ -44,7 +43,6 @@ import (
// Node is a container on which services can be registered.
type Node struct {
eventmux *event.TypeMux
config *Config
accman *accounts.Manager
log log.Logger
@ -108,7 +106,6 @@ func New(conf *Config) (*Node, error) {
node := &Node{
config: conf,
inprocHandler: server,
eventmux: new(event.TypeMux),
log: conf.Logger,
stop: make(chan struct{}),
server: &p2p.Server{Config: conf.P2P},
@ -692,12 +689,6 @@ func (n *Node) WSAuthEndpoint() string {
return "ws://" + n.wsAuth.listenAddr() + n.wsAuth.wsConfig.prefix
}
// EventMux retrieves the event multiplexer used by all the network services in
// the current protocol stack.
func (n *Node) EventMux() *event.TypeMux {
return n.eventmux
}
// OpenDatabaseWithOptions opens an existing database with the given name (or creates one if no
// previous can be found) from within the node's instance directory. If the node has no
// data directory, an in-memory database is returned.