From 21b05243b61acb92ee53a807fe4f68c34a50a3cf Mon Sep 17 00:00:00 2001 From: benjamin202410 Date: Thu, 19 Dec 2024 01:17:29 -0800 Subject: [PATCH] Merge from master mining time patch (#767) * merge from master * close channel * close channel --------- Co-authored-by: liam.lai --- consensus/XDPoS/XDPoS.go | 11 ++++- consensus/XDPoS/engines/engine_v2/engine.go | 11 ++++- miner/worker.go | 51 ++++++++++++++++++--- 3 files changed, 64 insertions(+), 9 deletions(-) diff --git a/consensus/XDPoS/XDPoS.go b/consensus/XDPoS/XDPoS.go index d6e532fc12..5001abe4e8 100644 --- a/consensus/XDPoS/XDPoS.go +++ b/consensus/XDPoS/XDPoS.go @@ -41,6 +41,7 @@ import ( const ( ExtraFieldCheck = true SkipExtraFieldCheck = false + newRoundChanSize = 1 ) func (x *XDPoS) SigHash(header *types.Header) (hash common.Hash) { @@ -64,6 +65,8 @@ type XDPoS struct { // Share Channel MinePeriodCh chan int // Miner wait Period Channel + NewRoundCh chan types.Round // Miner use this channel to trigger worker to commitNewWork + // Trading and lending service GetXDCXService func() utils.TradingService GetLendingService func() utils.LendingService @@ -104,6 +107,7 @@ func New(chainConfig *params.ChainConfig, db ethdb.Database) *XDPoS { log.Info("xdc config loading", "v2 config", config.V2) minePeriodCh := make(chan int) + newRoundCh := make(chan types.Round, newRoundChanSize) // Allocate the snapshot caches and create the engine signingTxsCache, _ := lru.New(utils.BlockSignersCacheLimit) @@ -113,10 +117,11 @@ func New(chainConfig *params.ChainConfig, db ethdb.Database) *XDPoS { db: db, MinePeriodCh: minePeriodCh, + NewRoundCh: newRoundCh, signingTxsCache: signingTxsCache, EngineV1: engine_v1.New(chainConfig, db), - EngineV2: engine_v2.New(chainConfig, db, minePeriodCh), + EngineV2: engine_v2.New(chainConfig, db, minePeriodCh, newRoundCh), } } @@ -131,6 +136,7 @@ func NewFaker(db ethdb.Database, chainConfig *params.ChainConfig) *XDPoS { } minePeriodCh := make(chan int) + newRoundCh := make(chan types.Round, newRoundChanSize) // Allocate the snapshot caches and create the engine signingTxsCache, _ := lru.New(utils.BlockSignersCacheLimit) @@ -140,13 +146,14 @@ func NewFaker(db ethdb.Database, chainConfig *params.ChainConfig) *XDPoS { db: db, MinePeriodCh: minePeriodCh, + NewRoundCh: newRoundCh, GetXDCXService: func() utils.TradingService { return nil }, GetLendingService: func() utils.LendingService { return nil }, signingTxsCache: signingTxsCache, EngineV1: engine_v1.NewFaker(db, chainConfig), - EngineV2: engine_v2.New(chainConfig, db, minePeriodCh), + EngineV2: engine_v2.New(chainConfig, db, minePeriodCh, newRoundCh), } return fakeEngine } diff --git a/consensus/XDPoS/engines/engine_v2/engine.go b/consensus/XDPoS/engines/engine_v2/engine.go index b808a56236..25515c3f7a 100644 --- a/consensus/XDPoS/engines/engine_v2/engine.go +++ b/consensus/XDPoS/engines/engine_v2/engine.go @@ -48,6 +48,7 @@ type XDPoS_v2 struct { BroadcastCh chan interface{} minePeriodCh chan int + newRoundCh chan types.Round timeoutWorker *countdown.CountdownTimer // Timer to generate broadcast timeout msg if threashold reached timeoutCount int // number of timeout being sent @@ -71,7 +72,7 @@ type XDPoS_v2 struct { votePoolCollectionTime time.Time } -func New(chainConfig *params.ChainConfig, db ethdb.Database, minePeriodCh chan int) *XDPoS_v2 { +func New(chainConfig *params.ChainConfig, db ethdb.Database, minePeriodCh chan int, newRoundCh chan types.Round) *XDPoS_v2 { config := chainConfig.XDPoS // Setup timeoutTimer duration := time.Duration(config.V2.CurrentConfig.TimeoutPeriod) * time.Second @@ -100,6 +101,7 @@ func New(chainConfig *params.ChainConfig, db ethdb.Database, minePeriodCh chan i timeoutWorker: timeoutTimer, BroadcastCh: make(chan interface{}), minePeriodCh: minePeriodCh, + newRoundCh: newRoundCh, round2epochBlockInfo: round2epochBlockInfo, @@ -902,6 +904,7 @@ func (x *XDPoS_v2) processQC(blockChainReader consensus.ChainReader, incomingQuo 1. Set currentRound = QC round + 1 (or TC round +1) 2. Reset timer 3. Reset vote and timeout Pools +4. Send signal to miner */ func (x *XDPoS_v2) setNewRound(blockChainReader consensus.ChainReader, round types.Round) { log.Info("[setNewRound] new round and reset pools and workers", "round", round) @@ -911,6 +914,12 @@ func (x *XDPoS_v2) setNewRound(blockChainReader consensus.ChainReader, round typ x.timeoutPool.Clear() // don't need to clean vote pool, we have other process to clean and it's not good to clean here, some edge case may break // for example round gets bump during collecting vote, so we have to keep vote. + + // send signal to newRoundCh, but if full don't send + select { + case x.newRoundCh <- round: + default: + } } func (x *XDPoS_v2) broadcastToBftChannel(msg interface{}) { diff --git a/miner/worker.go b/miner/worker.go index 9aa9d4589e..6514b48811 100644 --- a/miner/worker.go +++ b/miner/worker.go @@ -20,6 +20,7 @@ import ( "bytes" "encoding/binary" "errors" + "fmt" "math/big" "sync" "sync/atomic" @@ -116,7 +117,9 @@ type worker struct { chainHeadSub event.Subscription chainSideCh chan core.ChainSideEvent chainSideSub event.Subscription - wg sync.WaitGroup + resetCh chan time.Duration // Channel to request timer resets + + wg sync.WaitGroup agents map[Agent]struct{} recv chan *Result @@ -158,6 +161,7 @@ func newWorker(config *params.ChainConfig, engine consensus.Engine, coinbase com txsCh: make(chan core.NewTxsEvent, txChanSize), chainHeadCh: make(chan core.ChainHeadEvent, chainHeadChanSize), chainSideCh: make(chan core.ChainSideEvent, chainSideChanSize), + resetCh: make(chan time.Duration, 1), chainDb: eth.ChainDb(), recv: make(chan *Result, resultQueueSize), chain: eth.BlockChain(), @@ -273,16 +277,30 @@ func (w *worker) update() { minePeriod := 2 MinePeriodCh := w.engine.(*XDPoS.XDPoS).MinePeriodCh defer close(MinePeriodCh) + NewRoundCh := w.engine.(*XDPoS.XDPoS).NewRoundCh + defer close(NewRoundCh) timeout := time.NewTimer(time.Duration(minePeriod) * time.Second) - c := make(chan struct{}) + defer timeout.Stop() + c := make(chan struct{}, 1) + defer close(c) finish := make(chan struct{}) defer close(finish) - defer timeout.Stop() + go func() { for { // A real event arrived, process interesting content select { + case d := <-w.resetCh: + // Reset the timer to the new duration. + if !timeout.Stop() { + // Drain the timer channel if it had already expired. + select { + case <-timeout.C: + default: + } + } + timeout.Reset(d) case <-timeout.C: c <- struct{}{} case <-finish: @@ -296,18 +314,26 @@ func (w *worker) update() { case v := <-MinePeriodCh: log.Info("[worker] update wait period", "period", v) minePeriod = v - timeout.Reset(time.Duration(minePeriod) * time.Second) + w.resetCh <- time.Duration(minePeriod) * time.Second case <-c: if atomic.LoadInt32(&w.mining) == 1 { w.commitNewWork() } - timeout.Reset(time.Duration(minePeriod) * time.Second) + resetTime := getResetTime(w.chain, minePeriod) + w.resetCh <- resetTime // Handle ChainHeadEvent case <-w.chainHeadCh: w.commitNewWork() - timeout.Reset(time.Duration(minePeriod) * time.Second) + resetTime := getResetTime(w.chain, minePeriod) + w.resetCh <- resetTime + + // Handle new round + case <-NewRoundCh: + w.commitNewWork() + resetTime := getResetTime(w.chain, minePeriod) + w.resetCh <- resetTime // Handle ChainSideEvent case <-w.chainSideCh: @@ -354,6 +380,19 @@ func (w *worker) update() { } } +func getResetTime(chain *core.BlockChain, minePeriod int) time.Duration { + minePeriodDuration := time.Duration(minePeriod) * time.Second + currentBlockTime := chain.CurrentBlock().Time().Int64() + nowTime := time.Now().UnixMilli() + resetTime := time.Duration(currentBlockTime)*time.Second + minePeriodDuration - time.Duration(nowTime)*time.Millisecond + // in case the current block time is not very accurate + if resetTime > minePeriodDuration || resetTime <= 0 { + resetTime = minePeriodDuration + } + log.Debug("[update] Miner worker timer reset", "resetMilliseconds", resetTime.Milliseconds(), "minePeriodSec", minePeriod, "currentBlockTimeSec", fmt.Sprintf("%d", currentBlockTime), "currentSystemTimeSec", fmt.Sprintf("%d.%03d", nowTime/1000, nowTime%1000)) + return resetTime +} + func (w *worker) wait() { for { mustCommitNewWork := true