diff --git a/common/countdown/countdown.go b/common/countdown/countdown.go new file mode 100644 index 0000000000..6f941adaa8 --- /dev/null +++ b/common/countdown/countdown.go @@ -0,0 +1,83 @@ +// A countdown timer that will mostly be used by XDPoS v2 consensus engine +package countdown + +import ( + "sync" + "time" + + "github.com/XinFinOrg/XDPoSChain/log" +) + +type CountdownTimer struct { + lock sync.RWMutex // Protects the Initilised field + resetc chan int + quitc chan chan struct{} + Initilised bool + timeoutDuration time.Duration + // Triggered when the countdown timer timeout for the `timeoutDuration` period, it will pass current timestamp to the callback function + OnTimeoutFn func(time time.Time) error +} + +func NewCountDown(duration time.Duration) *CountdownTimer { + return &CountdownTimer{ + resetc: make(chan int), + quitc: make(chan chan struct{}), + Initilised: false, + timeoutDuration: duration, + } +} + +// Completely stop the countdown timer from running. +func (t *CountdownTimer) StopTimer() { + q := make(chan struct{}) + t.quitc <- q + <-q +} + +// Reset will start the countdown timer if it's already stopped, or simply reset the countdown time back to the defual `duration` +func (t *CountdownTimer) Reset() { + if !t.getInitilisedValue() { + t.setInitilised(true) + go t.startTimer() + } else { + t.resetc <- 0 + } +} + +// A long running process that +func (t *CountdownTimer) startTimer() { + // Make sure we mark Initilised to false when we quit the countdown + defer t.setInitilised(false) + timer := time.NewTimer(t.timeoutDuration) + // We start with a inf loop + for { + select { + case q := <-t.quitc: + log.Debug("Quit countdown timer") + close(q) + return + case <-timer.C: + log.Debug("Countdown time reached!") + err := t.OnTimeoutFn(time.Now()) + if err != nil { + log.Error("OnTimeoutFn error", err) + } + case <-t.resetc: + log.Debug("Reset countdown timer") + timer.Reset(t.timeoutDuration) + } + } +} + +// Set the desired value to Initilised with lock to avoid race condition +func (t *CountdownTimer) setInitilised(value bool) { + t.lock.Lock() + defer t.lock.Unlock() + t.Initilised = value +} + +func (t *CountdownTimer) getInitilisedValue() bool { + t.lock.Lock() + defer t.lock.Unlock() + return t.Initilised +} diff --git a/common/countdown/countdown_test.go b/common/countdown/countdown_test.go new file mode 100644 index 0000000000..2ca60192e4 --- /dev/null +++ b/common/countdown/countdown_test.go @@ -0,0 +1,93 @@ +package countdown + +import ( + "fmt" + "testing" + "time" + + "github.com/stretchr/testify/assert" +) + +func TestCountdownWillCallback(t *testing.T) { + + called := make(chan int) + OnTimeoutFn := func(time time.Time) error { + called <- 1 + return nil + } + + countdown := NewCountDown(1000 * time.Millisecond) + countdown.OnTimeoutFn = OnTimeoutFn + countdown.Reset() + <-called + fmt.Println("Times up, successfully called OnTimeoutFn") +} + +func TestCountdownShouldReset(t *testing.T) { + called := make(chan int) + OnTimeoutFn := func(time time.Time) error { + called <- 1 + return nil + } + + countdown := NewCountDown(5000 * time.Millisecond) + countdown.OnTimeoutFn = OnTimeoutFn + // Check countdown did not start + assert.False(t, countdown.Initilised) + countdown.Reset() + // Now the countdown should already started + assert.True(t, countdown.Initilised) + expectedCalledTime := time.Now().Add(9000 * time.Millisecond) + resetTimer := time.NewTimer(4000 * time.Millisecond) + +firstReset: + for { + select { + case <-called: + if time.Now().After(expectedCalledTime) { + // Make sure the countdown runs forever + assert.True(t, countdown.Initilised) + fmt.Println("Correctly reset the countdown once") + } else { + t.Fatalf("Countdown did not reset correctly first time") + } + break firstReset + case <-resetTimer.C: + countdown.Reset() + } + } + + // Now the countdown is paused after calling the callback function, let's reset it again + assert.True(t, countdown.Initilised) + expectedTimeAfterReset := time.Now().Add(5000 * time.Millisecond) + countdown.Reset() + <-called + // Always initilised + assert.True(t, countdown.Initilised) + if time.Now().After(expectedTimeAfterReset) { + fmt.Println("Correctly reset the countdown second time") + } else { + t.Fatalf("Countdown did not reset correctly second time") + } +} + +func TestCountdownShouldBeAbleToStop(t *testing.T) { + called := make(chan int) + OnTimeoutFn := func(time time.Time) error { + called <- 1 + return nil + } + + countdown := NewCountDown(5000 * time.Millisecond) + countdown.OnTimeoutFn = OnTimeoutFn + // Check countdown did not start + assert.False(t, countdown.Initilised) + countdown.Reset() + // Now the countdown should already started + assert.True(t, countdown.Initilised) + // Try manually stop the timer before it triggers the callback + stopTimer := time.NewTimer(4000 * time.Millisecond) + <-stopTimer.C + countdown.StopTimer() + assert.False(t, countdown.Initilised) +} diff --git a/consensus/XDPoS/engines/engine_v2/engine.go b/consensus/XDPoS/engines/engine_v2/engine.go index 9e3bf61790..4ac8ca1050 100644 --- a/consensus/XDPoS/engines/engine_v2/engine.go +++ b/consensus/XDPoS/engines/engine_v2/engine.go @@ -1,25 +1,40 @@ package engine_v2 import ( + "time" + "github.com/XinFinOrg/XDPoSChain/common" + "github.com/XinFinOrg/XDPoSChain/common/countdown" "github.com/XinFinOrg/XDPoSChain/consensus" "github.com/XinFinOrg/XDPoSChain/core/types" "github.com/XinFinOrg/XDPoSChain/ethdb" + "github.com/XinFinOrg/XDPoSChain/log" "github.com/XinFinOrg/XDPoSChain/params" ) type XDPoS_v2 struct { - config *params.XDPoSConfig // Consensus engine configuration parameters - db ethdb.Database // Database to store and retrieve snapshot checkpoints - BroadcastCh chan interface{} - BFTQueue chan interface{} + config *params.XDPoSConfig // Consensus engine configuration parameters + db ethdb.Database // Database to store and retrieve snapshot checkpoints + BroadcastCh chan interface{} + BFTQueue chan interface{} + timeoutWorker *countdown.CountdownTimer // Timer to generate broadcast timeout msg if threashold reached } func New(config *params.XDPoSConfig, db ethdb.Database) *XDPoS_v2 { - return &XDPoS_v2{ - config: config, - db: db, + // Setup Timer + // TODO: (hashlab) Introduce consensus v2 engine specific config under the main config struct + duration := 50000 * time.Millisecond // Hardcoded value until we move it to a config (XIN-72) + timer := countdown.NewCountDown(duration) + + engine := &XDPoS_v2{ + config: config, + db: db, + timeoutWorker: timer, } + // Add callback to the timer + timer.OnTimeoutFn = engine.onCountdownTimeout + + return engine } func NewFaker(db ethdb.Database, config *params.XDPoSConfig) *XDPoS_v2 { @@ -239,3 +254,15 @@ func (consensus *XDPoS_v2) sendTimeout() error { func (consensus *XDPoS_v2) sendSyncInfo() error { return nil } + +/* + Function that will be called by timer when countdown reaches its threshold. + In the engine v2, we would need to broadcast timeout messages to other peers +*/ +func (consensus *XDPoS_v2) onCountdownTimeout(time time.Time) error { + err := consensus.sendTimeout() + if err != nil { + log.Error("Error while sending out timeout message at time: ", time) + } + return nil +}