add countdown timer

This commit is contained in:
Jianrong 2021-10-24 15:18:47 +11:00
parent 8891cd167b
commit bcb1fea280
3 changed files with 210 additions and 7 deletions

View file

@ -0,0 +1,83 @@
// A countdown timer that will mostly be used by XDPoS v2 consensus engine
package countdown
import (
"sync"
"time"
"github.com/XinFinOrg/XDPoSChain/log"
)
type CountdownTimer struct {
lock sync.RWMutex // Protects the Initilised field
resetc chan int
quitc chan chan struct{}
Initilised bool
timeoutDuration time.Duration
// Triggered when the countdown timer timeout for the `timeoutDuration` period, it will pass current timestamp to the callback function
OnTimeoutFn func(time time.Time) error
}
func NewCountDown(duration time.Duration) *CountdownTimer {
return &CountdownTimer{
resetc: make(chan int),
quitc: make(chan chan struct{}),
Initilised: false,
timeoutDuration: duration,
}
}
// Completely stop the countdown timer from running.
func (t *CountdownTimer) StopTimer() {
q := make(chan struct{})
t.quitc <- q
<-q
}
// Reset will start the countdown timer if it's already stopped, or simply reset the countdown time back to the defual `duration`
func (t *CountdownTimer) Reset() {
if !t.getInitilisedValue() {
t.setInitilised(true)
go t.startTimer()
} else {
t.resetc <- 0
}
}
// A long running process that
func (t *CountdownTimer) startTimer() {
// Make sure we mark Initilised to false when we quit the countdown
defer t.setInitilised(false)
timer := time.NewTimer(t.timeoutDuration)
// We start with a inf loop
for {
select {
case q := <-t.quitc:
log.Debug("Quit countdown timer")
close(q)
return
case <-timer.C:
log.Debug("Countdown time reached!")
err := t.OnTimeoutFn(time.Now())
if err != nil {
log.Error("OnTimeoutFn error", err)
}
case <-t.resetc:
log.Debug("Reset countdown timer")
timer.Reset(t.timeoutDuration)
}
}
}
// Set the desired value to Initilised with lock to avoid race condition
func (t *CountdownTimer) setInitilised(value bool) {
t.lock.Lock()
defer t.lock.Unlock()
t.Initilised = value
}
func (t *CountdownTimer) getInitilisedValue() bool {
t.lock.Lock()
defer t.lock.Unlock()
return t.Initilised
}

View file

@ -0,0 +1,93 @@
package countdown
import (
"fmt"
"testing"
"time"
"github.com/stretchr/testify/assert"
)
func TestCountdownWillCallback(t *testing.T) {
called := make(chan int)
OnTimeoutFn := func(time time.Time) error {
called <- 1
return nil
}
countdown := NewCountDown(1000 * time.Millisecond)
countdown.OnTimeoutFn = OnTimeoutFn
countdown.Reset()
<-called
fmt.Println("Times up, successfully called OnTimeoutFn")
}
func TestCountdownShouldReset(t *testing.T) {
called := make(chan int)
OnTimeoutFn := func(time time.Time) error {
called <- 1
return nil
}
countdown := NewCountDown(5000 * time.Millisecond)
countdown.OnTimeoutFn = OnTimeoutFn
// Check countdown did not start
assert.False(t, countdown.Initilised)
countdown.Reset()
// Now the countdown should already started
assert.True(t, countdown.Initilised)
expectedCalledTime := time.Now().Add(9000 * time.Millisecond)
resetTimer := time.NewTimer(4000 * time.Millisecond)
firstReset:
for {
select {
case <-called:
if time.Now().After(expectedCalledTime) {
// Make sure the countdown runs forever
assert.True(t, countdown.Initilised)
fmt.Println("Correctly reset the countdown once")
} else {
t.Fatalf("Countdown did not reset correctly first time")
}
break firstReset
case <-resetTimer.C:
countdown.Reset()
}
}
// Now the countdown is paused after calling the callback function, let's reset it again
assert.True(t, countdown.Initilised)
expectedTimeAfterReset := time.Now().Add(5000 * time.Millisecond)
countdown.Reset()
<-called
// Always initilised
assert.True(t, countdown.Initilised)
if time.Now().After(expectedTimeAfterReset) {
fmt.Println("Correctly reset the countdown second time")
} else {
t.Fatalf("Countdown did not reset correctly second time")
}
}
func TestCountdownShouldBeAbleToStop(t *testing.T) {
called := make(chan int)
OnTimeoutFn := func(time time.Time) error {
called <- 1
return nil
}
countdown := NewCountDown(5000 * time.Millisecond)
countdown.OnTimeoutFn = OnTimeoutFn
// Check countdown did not start
assert.False(t, countdown.Initilised)
countdown.Reset()
// Now the countdown should already started
assert.True(t, countdown.Initilised)
// Try manually stop the timer before it triggers the callback
stopTimer := time.NewTimer(4000 * time.Millisecond)
<-stopTimer.C
countdown.StopTimer()
assert.False(t, countdown.Initilised)
}

View file

@ -1,25 +1,40 @@
package engine_v2
import (
"time"
"github.com/XinFinOrg/XDPoSChain/common"
"github.com/XinFinOrg/XDPoSChain/common/countdown"
"github.com/XinFinOrg/XDPoSChain/consensus"
"github.com/XinFinOrg/XDPoSChain/core/types"
"github.com/XinFinOrg/XDPoSChain/ethdb"
"github.com/XinFinOrg/XDPoSChain/log"
"github.com/XinFinOrg/XDPoSChain/params"
)
type XDPoS_v2 struct {
config *params.XDPoSConfig // Consensus engine configuration parameters
db ethdb.Database // Database to store and retrieve snapshot checkpoints
BroadcastCh chan interface{}
BFTQueue chan interface{}
config *params.XDPoSConfig // Consensus engine configuration parameters
db ethdb.Database // Database to store and retrieve snapshot checkpoints
BroadcastCh chan interface{}
BFTQueue chan interface{}
timeoutWorker *countdown.CountdownTimer // Timer to generate broadcast timeout msg if threashold reached
}
func New(config *params.XDPoSConfig, db ethdb.Database) *XDPoS_v2 {
return &XDPoS_v2{
config: config,
db: db,
// Setup Timer
// TODO: (hashlab) Introduce consensus v2 engine specific config under the main config struct
duration := 50000 * time.Millisecond // Hardcoded value until we move it to a config (XIN-72)
timer := countdown.NewCountDown(duration)
engine := &XDPoS_v2{
config: config,
db: db,
timeoutWorker: timer,
}
// Add callback to the timer
timer.OnTimeoutFn = engine.onCountdownTimeout
return engine
}
func NewFaker(db ethdb.Database, config *params.XDPoSConfig) *XDPoS_v2 {
@ -239,3 +254,15 @@ func (consensus *XDPoS_v2) sendTimeout() error {
func (consensus *XDPoS_v2) sendSyncInfo() error {
return nil
}
/*
Function that will be called by timer when countdown reaches its threshold.
In the engine v2, we would need to broadcast timeout messages to other peers
*/
func (consensus *XDPoS_v2) onCountdownTimeout(time time.Time) error {
err := consensus.sendTimeout()
if err != nil {
log.Error("Error while sending out timeout message at time: ", time)
}
return nil
}