fix err download block on masternode

This commit is contained in:
parmarrushabh 2018-11-14 11:29:48 +05:30
parent 52ae30023a
commit abd494055b
7 changed files with 85 additions and 56 deletions

View file

@ -120,6 +120,7 @@ var (
//utils.GpoPercentileFlag,
//utils.ExtraDataFlag,
configFileFlag,
utils.CommitTxWhenNotMiningFlag,
}
rpcFlags = []cli.Flag{

View file

@ -74,10 +74,8 @@ SUBCOMMANDS:
func init() {
cli.AppHelpTemplate = `{{.Name}} {{if .Flags}}[global options] {{end}}command{{if .Flags}} [command options]{{end}} [arguments...]
VERSION:
{{.Version}}
COMMANDS:
{{range .Commands}}{{.Name}}{{with .ShortName}}, {{.}}{{end}}{{ "\t" }}{{.Usage}}
{{end}}{{if .Flags}}
@ -113,6 +111,11 @@ func NewApp(gitCommit, usage string) *cli.App {
var (
// General settings
CommitTxWhenNotMiningFlag = DirectoryFlag{
Name: "committxwhennotmining",
Usage: "Always commit transactions",
Value: DirectoryString{node.DefaultDataDir()},
}
DataDirFlag = DirectoryFlag{
Name: "datadir",
Usage: "Data directory for the databases and keystore",
@ -128,7 +131,7 @@ var (
}
NetworkIdFlag = cli.Uint64Flag{
Name: "networkid",
Usage: "Network identifier (integer, 1=Frontier, 2=Morden (disused), 3=Ropsten, 4=Rinkeby)",
Usage: "Network identifier (integer, 89=XDCchain)",
Value: eth.DefaultConfig.NetworkId,
}
TestnetFlag = cli.BoolFlag{
@ -311,11 +314,11 @@ var (
Value: int(state.MaxTrieCacheGen),
}
// Miner settings
StakingEnabledFlag = cli.BoolFlag{
StakingEnabledFlag = cli.BoolFlag{
Name: "mine",
Usage: "Enable staking",
}
StakerThreadsFlag = cli.IntFlag{
StakerThreadsFlag = cli.IntFlag{
Name: "minerthreads",
Usage: "Number of CPU threads to use for staking",
Value: runtime.NumCPU(),
@ -897,6 +900,9 @@ func SetNodeConfig(ctx *cli.Context, cfg *node.Config) {
if ctx.GlobalIsSet(NoUSBFlag.Name) {
cfg.NoUSB = ctx.GlobalBool(NoUSBFlag.Name)
}
if ctx.GlobalIsSet(CommitTxWhenNotMiningFlag.Name) {
cfg.CommitTxWhenNotMining = ctx.GlobalBool(CommitTxWhenNotMiningFlag.Name)
}
}
func setGPO(ctx *cli.Context, cfg *gasprice.Config) {
@ -1295,4 +1301,4 @@ func MigrateFlags(action func(ctx *cli.Context) error) func(*cli.Context) error
}
return action(ctx)
}
}
}

View file

@ -173,7 +173,7 @@ func New(ctx *node.ServiceContext, config *Config) (*Ethereum, error) {
if eth.protocolManager, err = NewProtocolManager(eth.chainConfig, config.SyncMode, config.NetworkId, eth.eventMux, eth.txPool, eth.engine, eth.blockchain, chainDb); err != nil {
return nil, err
}
eth.miner = miner.New(eth, eth.chainConfig, eth.EventMux(), eth.engine)
eth.miner = miner.New(eth, eth.chainConfig, eth.EventMux(), eth.engine, ctx.GetConfig().CommitTxWhenNotMining)
eth.miner.SetExtra(makeExtraData(config.ExtraData))
eth.ApiBackend = &EthApiBackend{eth, nil}

View file

@ -146,9 +146,7 @@ func (q *queue) Reset() {
// Close marks the end of the sync, unblocking WaitResults.
// It may be called even if the queue is already closed.
func (q *queue) Close() {
q.lock.Lock()
q.closed = true
q.lock.Unlock()
q.active.Broadcast()
}

View file

@ -57,12 +57,12 @@ type Miner struct {
shouldStart int32 // should start indicates whether we should start after sync
}
func New(eth Backend, config *params.ChainConfig, mux *event.TypeMux, engine consensus.Engine) *Miner {
func New(eth Backend, config *params.ChainConfig, mux *event.TypeMux, engine consensus.Engine, commitTxWhenNotMining bool) *Miner {
miner := &Miner{
eth: eth,
mux: mux,
engine: engine,
worker: newWorker(config, engine, common.Address{}, eth, mux),
worker: newWorker(config, engine, common.Address{}, eth, mux, commitTxWhenNotMining),
canStart: 1,
}
miner.Register(NewCpuAgent(eth.BlockChain(), engine))
@ -77,7 +77,6 @@ func New(eth Backend, config *params.ChainConfig, mux *event.TypeMux, engine con
// and halt your mining operation for as long as the DOS continues.
func (self *Miner) update() {
events := self.mux.Subscribe(downloader.StartEvent{}, downloader.DoneEvent{}, downloader.FailedEvent{})
out:
for ev := range events.Chan() {
switch ev.Data.(type) {
case downloader.StartEvent:
@ -95,10 +94,7 @@ out:
if shouldStart {
self.Start(self.coinbase)
}
// unsubscribe. we're only interested in this event once
events.Unsubscribe()
// stop immediately and ignore all further pending events
break out
}
}
}

View file

@ -55,7 +55,7 @@ const (
// timeout waiting for M1
waitPeriod = 10
// timeout for checkpoint.
waitPeriodCheckpoint = 30
waitPeriodCheckpoint = 60
)
// Agent can register themself with the worker
@ -130,30 +130,35 @@ type worker struct {
unconfirmed *unconfirmedBlocks // set of locally mined blocks pending canonicalness confirmations
// atomic status counters
mining int32
atWork int32
mining int32
atWork int32
commitTxWhenNotMining bool
lastParentBlockCommit string
}
func newWorker(config *params.ChainConfig, engine consensus.Engine, coinbase common.Address, eth Backend, mux *event.TypeMux) *worker {
func newWorker(config *params.ChainConfig, engine consensus.Engine, coinbase common.Address, eth Backend, mux *event.TypeMux, commitTxWhenNotMining bool) *worker {
worker := &worker{
config: config,
engine: engine,
eth: eth,
mux: mux,
txCh: make(chan core.TxPreEvent, txChanSize),
chainHeadCh: make(chan core.ChainHeadEvent, chainHeadChanSize),
chainSideCh: make(chan core.ChainSideEvent, chainSideChanSize),
chainDb: eth.ChainDb(),
recv: make(chan *Result, resultQueueSize),
chain: eth.BlockChain(),
proc: eth.BlockChain().Validator(),
possibleUncles: make(map[common.Hash]*types.Block),
coinbase: coinbase,
agents: make(map[Agent]struct{}),
unconfirmed: newUnconfirmedBlocks(eth.BlockChain(), miningLogAtDepth),
config: config,
engine: engine,
eth: eth,
mux: mux,
txCh: make(chan core.TxPreEvent, txChanSize),
chainHeadCh: make(chan core.ChainHeadEvent, chainHeadChanSize),
chainSideCh: make(chan core.ChainSideEvent, chainSideChanSize),
chainDb: eth.ChainDb(),
recv: make(chan *Result, resultQueueSize),
chain: eth.BlockChain(),
proc: eth.BlockChain().Validator(),
possibleUncles: make(map[common.Hash]*types.Block),
coinbase: coinbase,
agents: make(map[Agent]struct{}),
unconfirmed: newUnconfirmedBlocks(eth.BlockChain(), miningLogAtDepth),
commitTxWhenNotMining: commitTxWhenNotMining,
}
if worker.commitTxWhenNotMining {
// Subscribe TxPreEvent for tx pool
worker.txSub = eth.TxPool().SubscribeTxPreEvent(worker.txCh)
}
// Subscribe TxPreEvent for tx pool
worker.txSub = eth.TxPool().SubscribeTxPreEvent(worker.txCh)
// Subscribe events for blockchain
worker.chainHeadSub = eth.BlockChain().SubscribeChainHeadEvent(worker.chainHeadCh)
worker.chainSideSub = eth.BlockChain().SubscribeChainSideEvent(worker.chainSideCh)
@ -248,16 +253,39 @@ func (self *worker) unregister(agent Agent) {
}
func (self *worker) update() {
defer self.txSub.Unsubscribe()
if self.commitTxWhenNotMining {
defer self.txSub.Unsubscribe()
}
defer self.chainHeadSub.Unsubscribe()
defer self.chainSideSub.Unsubscribe()
timeout := time.NewTimer(waitPeriod * time.Second)
c := make(chan struct{})
finish := make(chan struct{})
defer close(finish)
defer timeout.Stop()
go func() {
for {
// A real event arrived, process interesting content
select {
case <-timeout.C:
c <- struct{}{}
case <-finish:
return
}
}
}()
for {
// A real event arrived, process interesting content
select {
// Handle ChainHeadEvent
case <-c:
if atomic.LoadInt32(&self.mining) == 1 {
self.commitNewWork()
}
timeout.Reset(waitPeriod * time.Second)
// Handle ChainHeadEvent
case <-self.chainHeadCh:
self.commitNewWork()
timeout.Reset(waitPeriod * time.Second)
// Handle ChainSideEvent
case ev := <-self.chainSideCh:
@ -283,8 +311,6 @@ func (self *worker) update() {
}
}
// System stopped
case <-self.txSub.Err():
return
case <-self.chainHeadSub.Err():
return
case <-self.chainSideSub.Err():
@ -466,6 +492,13 @@ func (self *worker) commitNewWork() {
tstart := time.Now()
parent := self.chain.CurrentBlock()
var signers map[common.Address]struct{}
if parent.Hash().Hex() == self.lastParentBlockCommit {
return
}
if !self.commitTxWhenNotMining && atomic.LoadInt32(&self.mining) == 0 {
return
}
// Only try to commit new work if we are mining
if atomic.LoadInt32(&self.mining) == 1 {
// check if we are right after parent's coinbase in the list
@ -499,24 +532,16 @@ func (self *worker) commitNewWork() {
h := hop(len(masternodes), preIndex, curIndex)
gap := waitPeriod * int64(h)
// Check nearest checkpoint block in hop range.
nearest := self.config.XDPoA.Epoch - (parent.Header().Number.Uint64() % self.config.XDPoS.Epoch)
nearest := self.config.XDPoS.Epoch - (parent.Header().Number.Uint64() % self.config.XDPoS.Epoch)
if uint64(h) >= nearest {
gap = waitPeriodCheckpoint * int64(h)
gap += waitPeriodCheckpoint
}
log.Info("Distance from the parent block", "seconds", gap, "hops", h)
L:
select {
case newBlock := <-self.chainHeadCh:
self.chainHeadCh <- newBlock
if newBlock.Block.NumberU64() > parent.NumberU64() {
log.Info("New block has came already. Skip this turn", "new block", newBlock.Block.NumberU64(), "current block", parent.NumberU64())
return
}
case <-time.After(time.Duration(gap) * time.Second):
// wait enough. It's my turn
log.Info("Wait enough. It's my turn", "waited seconds", gap)
break L
waitedTime := time.Now().Unix() - parent.Header().Time.Int64()
if gap > waitedTime {
return
}
log.Info("Wait enough. It's my turn", "waited seconds", waitedTime)
}
}
}
@ -611,6 +636,7 @@ func (self *worker) commitNewWork() {
if atomic.LoadInt32(&self.mining) == 1 {
log.Info("Commit new mining work", "number", work.Block.Number(), "txs", work.tcount, "special txs", len(specialTxs), "uncles", len(uncles), "elapsed", common.PrettyDuration(time.Since(tstart)))
self.unconfirmed.Shift(work.Block.NumberU64() - 1)
self.lastParentBlockCommit = parent.Hash().Hex()
}
self.push(work)
}

View file

@ -147,6 +147,8 @@ type Config struct {
// Logger is a custom logger to use with the p2p.Server.
Logger log.Logger `toml:",omitempty"`
CommitTxWhenNotMining bool `toml:",omitempty"`
}
// IPCEndpoint resolves an IPC endpoint based on a configured value, taking into