From abd494055b1de0ad93ee4a7237634ae6189d209d Mon Sep 17 00:00:00 2001 From: parmarrushabh Date: Wed, 14 Nov 2018 11:29:48 +0530 Subject: [PATCH] fix err download block on masternode --- cmd/XDC/main.go | 1 + cmd/utils/flags.go | 18 ++++--- eth/backend.go | 2 +- eth/downloader/queue.go | 2 - miner/miner.go | 10 ++-- miner/worker.go | 106 +++++++++++++++++++++++++--------------- node/config.go | 2 + 7 files changed, 85 insertions(+), 56 deletions(-) diff --git a/cmd/XDC/main.go b/cmd/XDC/main.go index d15efa91f8..9b4b08c13c 100644 --- a/cmd/XDC/main.go +++ b/cmd/XDC/main.go @@ -120,6 +120,7 @@ var ( //utils.GpoPercentileFlag, //utils.ExtraDataFlag, configFileFlag, + utils.CommitTxWhenNotMiningFlag, } rpcFlags = []cli.Flag{ diff --git a/cmd/utils/flags.go b/cmd/utils/flags.go index 144b46e959..04125eabd5 100644 --- a/cmd/utils/flags.go +++ b/cmd/utils/flags.go @@ -74,10 +74,8 @@ SUBCOMMANDS: func init() { cli.AppHelpTemplate = `{{.Name}} {{if .Flags}}[global options] {{end}}command{{if .Flags}} [command options]{{end}} [arguments...] - VERSION: {{.Version}} - COMMANDS: {{range .Commands}}{{.Name}}{{with .ShortName}}, {{.}}{{end}}{{ "\t" }}{{.Usage}} {{end}}{{if .Flags}} @@ -113,6 +111,11 @@ func NewApp(gitCommit, usage string) *cli.App { var ( // General settings + CommitTxWhenNotMiningFlag = DirectoryFlag{ + Name: "committxwhennotmining", + Usage: "Always commit transactions", + Value: DirectoryString{node.DefaultDataDir()}, + } DataDirFlag = DirectoryFlag{ Name: "datadir", Usage: "Data directory for the databases and keystore", @@ -128,7 +131,7 @@ var ( } NetworkIdFlag = cli.Uint64Flag{ Name: "networkid", - Usage: "Network identifier (integer, 1=Frontier, 2=Morden (disused), 3=Ropsten, 4=Rinkeby)", + Usage: "Network identifier (integer, 89=XDCchain)", Value: eth.DefaultConfig.NetworkId, } TestnetFlag = cli.BoolFlag{ @@ -311,11 +314,11 @@ var ( Value: int(state.MaxTrieCacheGen), } // Miner settings - StakingEnabledFlag = cli.BoolFlag{ + StakingEnabledFlag = cli.BoolFlag{ Name: "mine", Usage: "Enable staking", } - StakerThreadsFlag = cli.IntFlag{ + StakerThreadsFlag = cli.IntFlag{ Name: "minerthreads", Usage: "Number of CPU threads to use for staking", Value: runtime.NumCPU(), @@ -897,6 +900,9 @@ func SetNodeConfig(ctx *cli.Context, cfg *node.Config) { if ctx.GlobalIsSet(NoUSBFlag.Name) { cfg.NoUSB = ctx.GlobalBool(NoUSBFlag.Name) } + if ctx.GlobalIsSet(CommitTxWhenNotMiningFlag.Name) { + cfg.CommitTxWhenNotMining = ctx.GlobalBool(CommitTxWhenNotMiningFlag.Name) + } } func setGPO(ctx *cli.Context, cfg *gasprice.Config) { @@ -1295,4 +1301,4 @@ func MigrateFlags(action func(ctx *cli.Context) error) func(*cli.Context) error } return action(ctx) } -} +} \ No newline at end of file diff --git a/eth/backend.go b/eth/backend.go index 6dd25b5384..e8a5e30739 100644 --- a/eth/backend.go +++ b/eth/backend.go @@ -173,7 +173,7 @@ func New(ctx *node.ServiceContext, config *Config) (*Ethereum, error) { if eth.protocolManager, err = NewProtocolManager(eth.chainConfig, config.SyncMode, config.NetworkId, eth.eventMux, eth.txPool, eth.engine, eth.blockchain, chainDb); err != nil { return nil, err } - eth.miner = miner.New(eth, eth.chainConfig, eth.EventMux(), eth.engine) + eth.miner = miner.New(eth, eth.chainConfig, eth.EventMux(), eth.engine, ctx.GetConfig().CommitTxWhenNotMining) eth.miner.SetExtra(makeExtraData(config.ExtraData)) eth.ApiBackend = &EthApiBackend{eth, nil} diff --git a/eth/downloader/queue.go b/eth/downloader/queue.go index 359cce54b5..8e6c91166e 100644 --- a/eth/downloader/queue.go +++ b/eth/downloader/queue.go @@ -146,9 +146,7 @@ func (q *queue) Reset() { // Close marks the end of the sync, unblocking WaitResults. // It may be called even if the queue is already closed. func (q *queue) Close() { - q.lock.Lock() q.closed = true - q.lock.Unlock() q.active.Broadcast() } diff --git a/miner/miner.go b/miner/miner.go index d9256e9787..160880cb51 100644 --- a/miner/miner.go +++ b/miner/miner.go @@ -57,12 +57,12 @@ type Miner struct { shouldStart int32 // should start indicates whether we should start after sync } -func New(eth Backend, config *params.ChainConfig, mux *event.TypeMux, engine consensus.Engine) *Miner { +func New(eth Backend, config *params.ChainConfig, mux *event.TypeMux, engine consensus.Engine, commitTxWhenNotMining bool) *Miner { miner := &Miner{ eth: eth, mux: mux, engine: engine, - worker: newWorker(config, engine, common.Address{}, eth, mux), + worker: newWorker(config, engine, common.Address{}, eth, mux, commitTxWhenNotMining), canStart: 1, } miner.Register(NewCpuAgent(eth.BlockChain(), engine)) @@ -77,7 +77,6 @@ func New(eth Backend, config *params.ChainConfig, mux *event.TypeMux, engine con // and halt your mining operation for as long as the DOS continues. func (self *Miner) update() { events := self.mux.Subscribe(downloader.StartEvent{}, downloader.DoneEvent{}, downloader.FailedEvent{}) -out: for ev := range events.Chan() { switch ev.Data.(type) { case downloader.StartEvent: @@ -95,10 +94,7 @@ out: if shouldStart { self.Start(self.coinbase) } - // unsubscribe. we're only interested in this event once - events.Unsubscribe() - // stop immediately and ignore all further pending events - break out + } } } diff --git a/miner/worker.go b/miner/worker.go index b42ec17e1e..a7d924305a 100644 --- a/miner/worker.go +++ b/miner/worker.go @@ -55,7 +55,7 @@ const ( // timeout waiting for M1 waitPeriod = 10 // timeout for checkpoint. - waitPeriodCheckpoint = 30 + waitPeriodCheckpoint = 60 ) // Agent can register themself with the worker @@ -130,30 +130,35 @@ type worker struct { unconfirmed *unconfirmedBlocks // set of locally mined blocks pending canonicalness confirmations // atomic status counters - mining int32 - atWork int32 + mining int32 + atWork int32 + commitTxWhenNotMining bool + lastParentBlockCommit string } -func newWorker(config *params.ChainConfig, engine consensus.Engine, coinbase common.Address, eth Backend, mux *event.TypeMux) *worker { +func newWorker(config *params.ChainConfig, engine consensus.Engine, coinbase common.Address, eth Backend, mux *event.TypeMux, commitTxWhenNotMining bool) *worker { worker := &worker{ - config: config, - engine: engine, - eth: eth, - mux: mux, - txCh: make(chan core.TxPreEvent, txChanSize), - chainHeadCh: make(chan core.ChainHeadEvent, chainHeadChanSize), - chainSideCh: make(chan core.ChainSideEvent, chainSideChanSize), - chainDb: eth.ChainDb(), - recv: make(chan *Result, resultQueueSize), - chain: eth.BlockChain(), - proc: eth.BlockChain().Validator(), - possibleUncles: make(map[common.Hash]*types.Block), - coinbase: coinbase, - agents: make(map[Agent]struct{}), - unconfirmed: newUnconfirmedBlocks(eth.BlockChain(), miningLogAtDepth), + config: config, + engine: engine, + eth: eth, + mux: mux, + txCh: make(chan core.TxPreEvent, txChanSize), + chainHeadCh: make(chan core.ChainHeadEvent, chainHeadChanSize), + chainSideCh: make(chan core.ChainSideEvent, chainSideChanSize), + chainDb: eth.ChainDb(), + recv: make(chan *Result, resultQueueSize), + chain: eth.BlockChain(), + proc: eth.BlockChain().Validator(), + possibleUncles: make(map[common.Hash]*types.Block), + coinbase: coinbase, + agents: make(map[Agent]struct{}), + unconfirmed: newUnconfirmedBlocks(eth.BlockChain(), miningLogAtDepth), + commitTxWhenNotMining: commitTxWhenNotMining, + } + if worker.commitTxWhenNotMining { + // Subscribe TxPreEvent for tx pool + worker.txSub = eth.TxPool().SubscribeTxPreEvent(worker.txCh) } - // Subscribe TxPreEvent for tx pool - worker.txSub = eth.TxPool().SubscribeTxPreEvent(worker.txCh) // Subscribe events for blockchain worker.chainHeadSub = eth.BlockChain().SubscribeChainHeadEvent(worker.chainHeadCh) worker.chainSideSub = eth.BlockChain().SubscribeChainSideEvent(worker.chainSideCh) @@ -248,16 +253,39 @@ func (self *worker) unregister(agent Agent) { } func (self *worker) update() { - defer self.txSub.Unsubscribe() + if self.commitTxWhenNotMining { + defer self.txSub.Unsubscribe() + } defer self.chainHeadSub.Unsubscribe() defer self.chainSideSub.Unsubscribe() - + timeout := time.NewTimer(waitPeriod * time.Second) + c := make(chan struct{}) + finish := make(chan struct{}) + defer close(finish) + defer timeout.Stop() + go func() { + for { + // A real event arrived, process interesting content + select { + case <-timeout.C: + c <- struct{}{} + case <-finish: + return + } + } + }() for { // A real event arrived, process interesting content select { - // Handle ChainHeadEvent + case <-c: + if atomic.LoadInt32(&self.mining) == 1 { + self.commitNewWork() + } + timeout.Reset(waitPeriod * time.Second) + // Handle ChainHeadEvent case <-self.chainHeadCh: self.commitNewWork() + timeout.Reset(waitPeriod * time.Second) // Handle ChainSideEvent case ev := <-self.chainSideCh: @@ -283,8 +311,6 @@ func (self *worker) update() { } } // System stopped - case <-self.txSub.Err(): - return case <-self.chainHeadSub.Err(): return case <-self.chainSideSub.Err(): @@ -466,6 +492,13 @@ func (self *worker) commitNewWork() { tstart := time.Now() parent := self.chain.CurrentBlock() var signers map[common.Address]struct{} + if parent.Hash().Hex() == self.lastParentBlockCommit { + return + } + if !self.commitTxWhenNotMining && atomic.LoadInt32(&self.mining) == 0 { + return + } + // Only try to commit new work if we are mining if atomic.LoadInt32(&self.mining) == 1 { // check if we are right after parent's coinbase in the list @@ -499,24 +532,16 @@ func (self *worker) commitNewWork() { h := hop(len(masternodes), preIndex, curIndex) gap := waitPeriod * int64(h) // Check nearest checkpoint block in hop range. - nearest := self.config.XDPoA.Epoch - (parent.Header().Number.Uint64() % self.config.XDPoS.Epoch) + nearest := self.config.XDPoS.Epoch - (parent.Header().Number.Uint64() % self.config.XDPoS.Epoch) if uint64(h) >= nearest { - gap = waitPeriodCheckpoint * int64(h) + gap += waitPeriodCheckpoint } log.Info("Distance from the parent block", "seconds", gap, "hops", h) - L: - select { - case newBlock := <-self.chainHeadCh: - self.chainHeadCh <- newBlock - if newBlock.Block.NumberU64() > parent.NumberU64() { - log.Info("New block has came already. Skip this turn", "new block", newBlock.Block.NumberU64(), "current block", parent.NumberU64()) - return - } - case <-time.After(time.Duration(gap) * time.Second): - // wait enough. It's my turn - log.Info("Wait enough. It's my turn", "waited seconds", gap) - break L + waitedTime := time.Now().Unix() - parent.Header().Time.Int64() + if gap > waitedTime { + return } + log.Info("Wait enough. It's my turn", "waited seconds", waitedTime) } } } @@ -611,6 +636,7 @@ func (self *worker) commitNewWork() { if atomic.LoadInt32(&self.mining) == 1 { log.Info("Commit new mining work", "number", work.Block.Number(), "txs", work.tcount, "special txs", len(specialTxs), "uncles", len(uncles), "elapsed", common.PrettyDuration(time.Since(tstart))) self.unconfirmed.Shift(work.Block.NumberU64() - 1) + self.lastParentBlockCommit = parent.Hash().Hex() } self.push(work) } diff --git a/node/config.go b/node/config.go index d1b2432e18..08ef10b69c 100644 --- a/node/config.go +++ b/node/config.go @@ -147,6 +147,8 @@ type Config struct { // Logger is a custom logger to use with the p2p.Server. Logger log.Logger `toml:",omitempty"` + + CommitTxWhenNotMining bool `toml:",omitempty"` } // IPCEndpoint resolves an IPC endpoint based on a configured value, taking into