Merge from master mining time patch (#767)

* merge from master

* close channel

* close channel

---------

Co-authored-by: liam.lai <liam.lai@us>
This commit is contained in:
benjamin202410 2024-12-19 01:17:29 -08:00 committed by GitHub
parent 0f8abf531c
commit 21b05243b6
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
3 changed files with 64 additions and 9 deletions

View file

@ -41,6 +41,7 @@ import (
const (
ExtraFieldCheck = true
SkipExtraFieldCheck = false
newRoundChanSize = 1
)
func (x *XDPoS) SigHash(header *types.Header) (hash common.Hash) {
@ -64,6 +65,8 @@ type XDPoS struct {
// Share Channel
MinePeriodCh chan int // Miner wait Period Channel
NewRoundCh chan types.Round // Miner use this channel to trigger worker to commitNewWork
// Trading and lending service
GetXDCXService func() utils.TradingService
GetLendingService func() utils.LendingService
@ -104,6 +107,7 @@ func New(chainConfig *params.ChainConfig, db ethdb.Database) *XDPoS {
log.Info("xdc config loading", "v2 config", config.V2)
minePeriodCh := make(chan int)
newRoundCh := make(chan types.Round, newRoundChanSize)
// Allocate the snapshot caches and create the engine
signingTxsCache, _ := lru.New(utils.BlockSignersCacheLimit)
@ -113,10 +117,11 @@ func New(chainConfig *params.ChainConfig, db ethdb.Database) *XDPoS {
db: db,
MinePeriodCh: minePeriodCh,
NewRoundCh: newRoundCh,
signingTxsCache: signingTxsCache,
EngineV1: engine_v1.New(chainConfig, db),
EngineV2: engine_v2.New(chainConfig, db, minePeriodCh),
EngineV2: engine_v2.New(chainConfig, db, minePeriodCh, newRoundCh),
}
}
@ -131,6 +136,7 @@ func NewFaker(db ethdb.Database, chainConfig *params.ChainConfig) *XDPoS {
}
minePeriodCh := make(chan int)
newRoundCh := make(chan types.Round, newRoundChanSize)
// Allocate the snapshot caches and create the engine
signingTxsCache, _ := lru.New(utils.BlockSignersCacheLimit)
@ -140,13 +146,14 @@ func NewFaker(db ethdb.Database, chainConfig *params.ChainConfig) *XDPoS {
db: db,
MinePeriodCh: minePeriodCh,
NewRoundCh: newRoundCh,
GetXDCXService: func() utils.TradingService { return nil },
GetLendingService: func() utils.LendingService { return nil },
signingTxsCache: signingTxsCache,
EngineV1: engine_v1.NewFaker(db, chainConfig),
EngineV2: engine_v2.New(chainConfig, db, minePeriodCh),
EngineV2: engine_v2.New(chainConfig, db, minePeriodCh, newRoundCh),
}
return fakeEngine
}

View file

@ -48,6 +48,7 @@ type XDPoS_v2 struct {
BroadcastCh chan interface{}
minePeriodCh chan int
newRoundCh chan types.Round
timeoutWorker *countdown.CountdownTimer // Timer to generate broadcast timeout msg if threashold reached
timeoutCount int // number of timeout being sent
@ -71,7 +72,7 @@ type XDPoS_v2 struct {
votePoolCollectionTime time.Time
}
func New(chainConfig *params.ChainConfig, db ethdb.Database, minePeriodCh chan int) *XDPoS_v2 {
func New(chainConfig *params.ChainConfig, db ethdb.Database, minePeriodCh chan int, newRoundCh chan types.Round) *XDPoS_v2 {
config := chainConfig.XDPoS
// Setup timeoutTimer
duration := time.Duration(config.V2.CurrentConfig.TimeoutPeriod) * time.Second
@ -100,6 +101,7 @@ func New(chainConfig *params.ChainConfig, db ethdb.Database, minePeriodCh chan i
timeoutWorker: timeoutTimer,
BroadcastCh: make(chan interface{}),
minePeriodCh: minePeriodCh,
newRoundCh: newRoundCh,
round2epochBlockInfo: round2epochBlockInfo,
@ -902,6 +904,7 @@ func (x *XDPoS_v2) processQC(blockChainReader consensus.ChainReader, incomingQuo
1. Set currentRound = QC round + 1 (or TC round +1)
2. Reset timer
3. Reset vote and timeout Pools
4. Send signal to miner
*/
func (x *XDPoS_v2) setNewRound(blockChainReader consensus.ChainReader, round types.Round) {
log.Info("[setNewRound] new round and reset pools and workers", "round", round)
@ -911,6 +914,12 @@ func (x *XDPoS_v2) setNewRound(blockChainReader consensus.ChainReader, round typ
x.timeoutPool.Clear()
// don't need to clean vote pool, we have other process to clean and it's not good to clean here, some edge case may break
// for example round gets bump during collecting vote, so we have to keep vote.
// send signal to newRoundCh, but if full don't send
select {
case x.newRoundCh <- round:
default:
}
}
func (x *XDPoS_v2) broadcastToBftChannel(msg interface{}) {

View file

@ -20,6 +20,7 @@ import (
"bytes"
"encoding/binary"
"errors"
"fmt"
"math/big"
"sync"
"sync/atomic"
@ -116,7 +117,9 @@ type worker struct {
chainHeadSub event.Subscription
chainSideCh chan core.ChainSideEvent
chainSideSub event.Subscription
wg sync.WaitGroup
resetCh chan time.Duration // Channel to request timer resets
wg sync.WaitGroup
agents map[Agent]struct{}
recv chan *Result
@ -158,6 +161,7 @@ func newWorker(config *params.ChainConfig, engine consensus.Engine, coinbase com
txsCh: make(chan core.NewTxsEvent, txChanSize),
chainHeadCh: make(chan core.ChainHeadEvent, chainHeadChanSize),
chainSideCh: make(chan core.ChainSideEvent, chainSideChanSize),
resetCh: make(chan time.Duration, 1),
chainDb: eth.ChainDb(),
recv: make(chan *Result, resultQueueSize),
chain: eth.BlockChain(),
@ -273,16 +277,30 @@ func (w *worker) update() {
minePeriod := 2
MinePeriodCh := w.engine.(*XDPoS.XDPoS).MinePeriodCh
defer close(MinePeriodCh)
NewRoundCh := w.engine.(*XDPoS.XDPoS).NewRoundCh
defer close(NewRoundCh)
timeout := time.NewTimer(time.Duration(minePeriod) * time.Second)
c := make(chan struct{})
defer timeout.Stop()
c := make(chan struct{}, 1)
defer close(c)
finish := make(chan struct{})
defer close(finish)
defer timeout.Stop()
go func() {
for {
// A real event arrived, process interesting content
select {
case d := <-w.resetCh:
// Reset the timer to the new duration.
if !timeout.Stop() {
// Drain the timer channel if it had already expired.
select {
case <-timeout.C:
default:
}
}
timeout.Reset(d)
case <-timeout.C:
c <- struct{}{}
case <-finish:
@ -296,18 +314,26 @@ func (w *worker) update() {
case v := <-MinePeriodCh:
log.Info("[worker] update wait period", "period", v)
minePeriod = v
timeout.Reset(time.Duration(minePeriod) * time.Second)
w.resetCh <- time.Duration(minePeriod) * time.Second
case <-c:
if atomic.LoadInt32(&w.mining) == 1 {
w.commitNewWork()
}
timeout.Reset(time.Duration(minePeriod) * time.Second)
resetTime := getResetTime(w.chain, minePeriod)
w.resetCh <- resetTime
// Handle ChainHeadEvent
case <-w.chainHeadCh:
w.commitNewWork()
timeout.Reset(time.Duration(minePeriod) * time.Second)
resetTime := getResetTime(w.chain, minePeriod)
w.resetCh <- resetTime
// Handle new round
case <-NewRoundCh:
w.commitNewWork()
resetTime := getResetTime(w.chain, minePeriod)
w.resetCh <- resetTime
// Handle ChainSideEvent
case <-w.chainSideCh:
@ -354,6 +380,19 @@ func (w *worker) update() {
}
}
func getResetTime(chain *core.BlockChain, minePeriod int) time.Duration {
minePeriodDuration := time.Duration(minePeriod) * time.Second
currentBlockTime := chain.CurrentBlock().Time().Int64()
nowTime := time.Now().UnixMilli()
resetTime := time.Duration(currentBlockTime)*time.Second + minePeriodDuration - time.Duration(nowTime)*time.Millisecond
// in case the current block time is not very accurate
if resetTime > minePeriodDuration || resetTime <= 0 {
resetTime = minePeriodDuration
}
log.Debug("[update] Miner worker timer reset", "resetMilliseconds", resetTime.Milliseconds(), "minePeriodSec", minePeriod, "currentBlockTimeSec", fmt.Sprintf("%d", currentBlockTime), "currentSystemTimeSec", fmt.Sprintf("%d.%03d", nowTime/1000, nowTime%1000))
return resetTime
}
func (w *worker) wait() {
for {
mustCommitNewWork := true