* clean up the pool old round

* add unit test to cover the vote key format

* add gapNumber to the vote pool key

* fix race condition in pool

* remove verify gap number in vote handler
This commit is contained in:
Jerome 2022-04-01 14:59:16 +11:00 committed by GitHub
parent b98005a8dd
commit cb67e8e26a
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
10 changed files with 300 additions and 26 deletions

View file

@ -59,16 +59,16 @@ type XDPoS_v2 struct {
}
func New(config *params.XDPoSConfig, db ethdb.Database, waitPeriodCh chan int) *XDPoS_v2 {
// Setup Timer
// Setup timeoutTimer
duration := time.Duration(config.V2.TimeoutPeriod) * time.Second
timer := countdown.NewCountDown(duration)
timeoutPool := utils.NewPool(config.V2.CertThreshold)
timeoutTimer := countdown.NewCountDown(duration)
snapshots, _ := lru.NewARC(utils.InmemorySnapshots)
signatures, _ := lru.NewARC(utils.InmemorySnapshots)
epochSwitches, _ := lru.NewARC(int(utils.InmemoryEpochs))
verifiedHeaders, _ := lru.NewARC(utils.InmemorySnapshots)
timeoutPool := utils.NewPool(config.V2.CertThreshold)
votePool := utils.NewPool(config.V2.CertThreshold)
engine := &XDPoS_v2{
config: config,
@ -80,7 +80,7 @@ func New(config *params.XDPoSConfig, db ethdb.Database, waitPeriodCh chan int) *
verifiedHeaders: verifiedHeaders,
snapshots: snapshots,
epochSwitches: epochSwitches,
timeoutWorker: timer,
timeoutWorker: timeoutTimer,
BroadcastCh: make(chan interface{}),
waitPeriodCh: waitPeriodCh,
@ -104,8 +104,9 @@ func New(config *params.XDPoSConfig, db ethdb.Database, waitPeriodCh chan int) *
highestCommitBlock: nil,
}
// Add callback to the timer
timer.OnTimeoutFn = engine.OnCountdownTimeout
timeoutTimer.OnTimeoutFn = engine.OnCountdownTimeout
engine.periodicJob()
return engine
}
@ -1042,3 +1043,14 @@ func (x *XDPoS_v2) allowedToSend(chain consensus.ChainReader, blockHeader *types
}
return nil
}
// Periodlly execution(Attached to engine initialisation during "new"). Used for pool cleaning etc
func (x *XDPoS_v2) periodicJob() {
go func() {
for {
<-time.After(utils.PeriodicJobPeriod * time.Second)
x.hygieneVotePool()
x.hygieneTimeoutPool()
}
}()
}

View file

@ -57,3 +57,19 @@ func (x *XDPoS_v2) SetPropertiesFaker(highestQC *utils.QuorumCert, highestTC *ut
x.highestQuorumCert = highestQC
x.highestTimeoutCert = highestTC
}
func (x *XDPoS_v2) HygieneVotePoolFaker() {
x.hygieneVotePool()
}
func (x *XDPoS_v2) GetVotePoolKeyListFaker() []string {
return x.votePool.PoolObjKeysList()
}
func (x *XDPoS_v2) HygieneTimeoutPoolFaker() {
x.hygieneTimeoutPool()
}
func (x *XDPoS_v2) GetTimeoutPoolKeyListFaker() []string {
return x.timeoutPool.PoolObjKeysList()
}

View file

@ -2,6 +2,8 @@ package engine_v2
import (
"fmt"
"strconv"
"strings"
"sync"
"time"
@ -227,3 +229,24 @@ func (x *XDPoS_v2) OnCountdownTimeout(time time.Time, chain interface{}) error {
return nil
}
func (x *XDPoS_v2) hygieneTimeoutPool() {
x.lock.RLock()
currentRound := x.currentRound
x.lock.RUnlock()
timeoutPoolKeys := x.timeoutPool.PoolObjKeysList()
// Extract round number
for _, k := range timeoutPoolKeys {
keyedRound, err := strconv.ParseInt(strings.Split(k, ":")[0], 10, 64)
if err != nil {
log.Error("[hygieneTimeoutPool] Error while trying to get keyedRound inside pool", "Error", err)
continue
}
// Clean up any timeouts round that is 10 rounds older
if keyedRound < int64(currentRound)-utils.PoolHygieneRound {
log.Debug("[hygieneTimeoutPool] Cleaned timeout pool at round", "Round", keyedRound, "CurrentRound", currentRound, "Key", k)
x.timeoutPool.ClearByPoolKey(k)
}
}
}

View file

@ -3,6 +3,8 @@ package engine_v2
import (
"fmt"
"math/big"
"strconv"
"strings"
"sync"
"github.com/XinFinOrg/XDPoSChain/common"
@ -77,22 +79,8 @@ func (x *XDPoS_v2) voteHandler(chain consensus.ChainReader, voteMsg *utils.Vote)
err := x.VerifyBlockInfo(chain, voteMsg.ProposedBlockInfo)
if err != nil {
x.votePool.ClearPoolKeyByObj(voteMsg)
return err
}
// verify vote.GapNumber
epochSwitchInfo, err := x.getEpochSwitchInfo(chain, nil, voteMsg.ProposedBlockInfo.Hash)
if err != nil {
log.Error("getEpochSwitchInfo when handle Vote", "BlockInfoHash", voteMsg.ProposedBlockInfo.Hash, "Error", err)
return err
}
epochSwitchNumber := epochSwitchInfo.EpochSwitchBlockInfo.Number.Uint64()
gapNumber := epochSwitchNumber - epochSwitchNumber%x.config.Epoch - x.config.Gap
if gapNumber != voteMsg.GapNumber {
log.Error("[voteHandler] gap number mismatch", "BlockInfoHash", voteMsg.ProposedBlockInfo.Hash, "Gap", voteMsg.GapNumber, "GapShouldBe", gapNumber)
return fmt.Errorf("gap number mismatch %v", voteMsg)
}
err = x.onVotePoolThresholdReached(chain, pooledVotes, voteMsg, proposedBlockHeader)
if err != nil {
return err
@ -157,8 +145,6 @@ func (x *XDPoS_v2) onVotePoolThresholdReached(chain consensus.ChainReader, poole
return err
}
log.Info("Successfully processed the vote and produced QC!", "QcRound", quorumCert.ProposedBlockInfo.Round, "QcNumOfSig", len(quorumCert.Signatures), "QcHash", quorumCert.ProposedBlockInfo.Hash, "QcNumber", quorumCert.ProposedBlockInfo.Number.Uint64())
// clean up vote at the same poolKey. and pookKey is proposed block hash
x.votePool.ClearPoolKeyByObj(currentVoteMsg)
return nil
}
@ -216,3 +202,24 @@ func (x *XDPoS_v2) isExtendingFromAncestor(blockChainReader consensus.ChainReade
}
return false, nil
}
func (x *XDPoS_v2) hygieneVotePool() {
x.lock.RLock()
round := x.currentRound
x.lock.RUnlock()
votePoolKeys := x.votePool.PoolObjKeysList()
// Extract round number
for _, k := range votePoolKeys {
keyedRound, err := strconv.ParseInt(strings.Split(k, ":")[0], 10, 64)
if err != nil {
log.Error("[hygieneVotePool] Error while trying to get keyedRound inside pool", "Error", err)
continue
}
// Clean up any votes round that is 10 rounds older
if keyedRound < int64(round)-utils.PoolHygieneRound {
log.Debug("[hygieneVotePool] Cleaned vote poll at round", "Round", keyedRound, "currentRound", round, "Key", k)
x.votePool.ClearByPoolKey(k)
}
}
}

View file

@ -24,3 +24,8 @@ const (
BlockSignersCacheLimit = 9000
M2ByteLength = 4
)
const (
PeriodicJobPeriod = 60
PoolHygieneRound = 10
)

View file

@ -1,6 +1,8 @@
package utils
import (
"sync"
"github.com/XinFinOrg/XDPoSChain/common"
)
@ -11,6 +13,7 @@ type PoolObj interface {
type Pool struct {
objList map[string]map[common.Hash]PoolObj
threshold int
lock sync.RWMutex // Protects the pool fields
}
func NewPool(threshold int) *Pool {
@ -22,6 +25,8 @@ func NewPool(threshold int) *Pool {
// return true if it has reached threshold
func (p *Pool) Add(obj PoolObj) (bool, int, map[common.Hash]PoolObj) {
p.lock.Lock()
defer p.lock.Unlock()
poolKey := obj.PoolKey()
objListKeyed, ok := p.objList[poolKey]
if !ok {
@ -45,16 +50,44 @@ func (p *Pool) Size(obj PoolObj) int {
return len(objListKeyed)
}
func (p *Pool) PoolObjKeysList() []string {
p.lock.RLock()
defer p.lock.RUnlock()
var keyList []string
for key := range p.objList {
keyList = append(keyList, key)
}
return keyList
}
// Given the pool object, clear all object under the same pool key
func (p *Pool) ClearPoolKeyByObj(obj PoolObj) {
p.lock.Lock()
defer p.lock.Unlock()
poolKey := obj.PoolKey()
delete(p.objList, poolKey)
}
// Given the pool key, clean its content
func (p *Pool) ClearByPoolKey(poolKey string) {
p.lock.Lock()
defer p.lock.Unlock()
delete(p.objList, poolKey)
}
func (p *Pool) Clear() {
p.lock.Lock()
defer p.lock.Unlock()
p.objList = make(map[string]map[common.Hash]PoolObj)
}
func (p *Pool) SetThreshold(t int) {
p.lock.Lock()
defer p.lock.Unlock()
p.threshold = t
}

View file

@ -13,6 +13,7 @@ import (
"github.com/XinFinOrg/XDPoSChain/core/state"
"github.com/XinFinOrg/XDPoSChain/core/types"
"github.com/XinFinOrg/XDPoSChain/crypto/sha3"
"github.com/XinFinOrg/XDPoSChain/log"
"github.com/XinFinOrg/XDPoSChain/rlp"
"gopkg.in/karalabe/cookiejar.v2/collections/prque"
)
@ -131,7 +132,10 @@ func (e *ExtraFields_v2) EncodeToBytes() ([]byte, error) {
func rlpHash(x interface{}) (h common.Hash) {
hw := sha3.NewKeccak256()
rlp.Encode(hw, x)
err := rlp.Encode(hw, x)
if err != nil {
log.Error("[rlpHash] Fail to hash item", "Error", err)
}
hw.Sum(h[:0])
return h
}
@ -168,7 +172,7 @@ func TimeoutSigHash(m *TimeoutForSign) common.Hash {
func (m *Vote) PoolKey() string {
// return the voted block hash
return m.ProposedBlockInfo.Hash.Hex()
return fmt.Sprint(m.ProposedBlockInfo.Round, ":", m.GapNumber, ":", m.ProposedBlockInfo.Number, ":", m.ProposedBlockInfo.Hash.Hex())
}
func (m *Timeout) PoolKey() string {

View file

@ -3,9 +3,11 @@ package utils
import (
"math/big"
"reflect"
"strings"
"testing"
"github.com/XinFinOrg/XDPoSChain/common"
"github.com/stretchr/testify/assert"
)
func toyExtraFields() *ExtraFields_v2 {
@ -75,3 +77,21 @@ func TestHashAndSigHash(t *testing.T) {
t.Fatalf("SigHash of two round shouldn't equal")
}
}
func TestPoolKeyFormat(t *testing.T) {
voteMsg := &Vote{
ProposedBlockInfo: &BlockInfo{
Hash: common.Hash{1},
Round: 5,
Number: big.NewInt(4),
},
Signature: []byte{},
GapNumber: 450,
}
voteKey := strings.Split(voteMsg.PoolKey(), ":")
assert.Equal(t, "5", voteKey[0])
assert.Equal(t, "450", voteKey[1])
assert.Equal(t, "4", voteKey[2])
assert.Equal(t, common.Hash{1}.String(), voteKey[3])
}

View file

@ -1,7 +1,8 @@
package engine_v2_tests
import (
"fmt"
"strconv"
"strings"
"testing"
"time"
@ -23,7 +24,6 @@ func TestCountdownTimeoutToSendTimeoutMessage(t *testing.T) {
assert.Equal(t, poolSize, 1)
assert.NotNil(t, timeoutMsg)
assert.Equal(t, uint64(1350), timeoutMsg.(*utils.Timeout).GapNumber)
fmt.Println(timeoutMsg.(*utils.Timeout).GapNumber)
assert.Equal(t, utils.Round(1), timeoutMsg.(*utils.Timeout).Round)
}
@ -229,3 +229,66 @@ func TestShouldVerifyTimeoutMessage(t *testing.T) {
assert.Nil(t, err)
assert.True(t, verified)
}
func TestTimeoutPoolKeeyGoodHygiene(t *testing.T) {
blockchain, _, _, signer, signFn, _ := PrepareXDCTestBlockChainForV2Engine(t, 905, params.TestXDPoSMockChainConfig, 0)
engineV2 := blockchain.Engine().(*XDPoS.XDPoS).EngineV2
// Set round to 5
engineV2.SetNewRoundFaker(blockchain, utils.Round(5), false)
// Inject the first timeout with round 5
signedHash, _ := signFn(accounts.Account{Address: signer}, utils.TimeoutSigHash(&utils.TimeoutForSign{
Round: utils.Round(5),
GapNumber: 450,
}).Bytes())
timeoutMsg := &utils.Timeout{
Round: utils.Round(5),
GapNumber: 450,
Signature: signedHash,
}
engineV2.TimeoutHandler(blockchain, timeoutMsg)
// Inject a second timeout with round 16
signedHash, _ = signFn(accounts.Account{Address: signer}, utils.TimeoutSigHash(&utils.TimeoutForSign{
Round: utils.Round(16),
GapNumber: 450,
}).Bytes())
timeoutMsg = &utils.Timeout{
Round: utils.Round(16),
GapNumber: 450,
Signature: signedHash,
}
// Set round to 16
engineV2.SetNewRoundFaker(blockchain, utils.Round(16), false)
engineV2.TimeoutHandler(blockchain, timeoutMsg)
// Inject a third timeout with round 17
signedHash, _ = signFn(accounts.Account{Address: signer}, utils.TimeoutSigHash(&utils.TimeoutForSign{
Round: utils.Round(17),
GapNumber: 450,
}).Bytes())
timeoutMsg = &utils.Timeout{
Round: utils.Round(17),
GapNumber: 450,
Signature: signedHash,
}
// Set round to 16
engineV2.SetNewRoundFaker(blockchain, utils.Round(17), false)
engineV2.TimeoutHandler(blockchain, timeoutMsg)
// Let's keep good Hygiene
engineV2.HygieneTimeoutPoolFaker()
// Let's wait for 5 second for the goroutine
<-time.After(5 * time.Second)
keyList := engineV2.GetTimeoutPoolKeyListFaker()
assert.Equal(t, 2, len(keyList))
for _, k := range keyList {
keyedRound, err := strconv.ParseInt(strings.Split(k, ":")[0], 10, 64)
assert.Nil(t, err)
if keyedRound < 25-10 {
assert.Fail(t, "Did not clean up the timeout pool")
}
}
}

View file

@ -3,8 +3,10 @@ package engine_v2_tests
import (
"fmt"
"math/big"
"strconv"
"strings"
"testing"
"time"
"github.com/XinFinOrg/XDPoSChain/accounts"
"github.com/XinFinOrg/XDPoSChain/accounts/abi/bind/backends"
@ -555,5 +557,94 @@ func TestVoteMessageHandlerWrongGapNumber(t *testing.T) {
}
err := engineV2.VoteHandler(blockchain, voteMsg)
assert.True(t, strings.Contains(err.Error(), "gap number mismatch"))
// Shall not even trigger the vote threashold as vote pool key also contains the gapNumber
assert.Nil(t, err)
}
func TestVotePoolKeepGoodHygiene(t *testing.T) {
blockchain, _, currentBlock, signer, signFn, _ := PrepareXDCTestBlockChainForV2Engine(t, 905, params.TestXDPoSMockChainConfig, 0)
engineV2 := blockchain.Engine().(*XDPoS.XDPoS).EngineV2
blockInfo := &utils.BlockInfo{
Hash: currentBlock.Hash(),
Round: utils.Round(5),
Number: big.NewInt(905),
}
voteForSign := &utils.VoteForSign{
ProposedBlockInfo: blockInfo,
GapNumber: 450,
}
voteSigningHash := utils.VoteSigHash(voteForSign)
// Set round to 5
engineV2.SetNewRoundFaker(blockchain, utils.Round(5), false)
// Create two vote messages which will not reach vote pool threshold
signedHash, _ := signFn(accounts.Account{Address: signer}, voteSigningHash.Bytes())
voteMsg := &utils.Vote{
ProposedBlockInfo: blockInfo,
Signature: signedHash,
GapNumber: 450,
}
engineV2.VoteHandler(blockchain, voteMsg)
// Inject a second vote with round 16
blockInfo = &utils.BlockInfo{
Hash: currentBlock.Hash(),
Round: utils.Round(16),
Number: big.NewInt(906),
}
voteForSign = &utils.VoteForSign{
ProposedBlockInfo: blockInfo,
GapNumber: 450,
}
voteSigningHash = utils.VoteSigHash(voteForSign)
// Set round to 16
engineV2.SetNewRoundFaker(blockchain, utils.Round(16), false)
// Create two vote messages which will not reach vote pool threshold
signedHash, _ = signFn(accounts.Account{Address: signer}, voteSigningHash.Bytes())
voteMsg = &utils.Vote{
ProposedBlockInfo: blockInfo,
Signature: signedHash,
GapNumber: 450,
}
engineV2.VoteHandler(blockchain, voteMsg)
// Inject a second vote with round 25, which is less than 10 rounds difference to the last vote round
blockInfo = &utils.BlockInfo{
Hash: currentBlock.Hash(),
Round: utils.Round(25),
Number: big.NewInt(907),
}
voteForSign = &utils.VoteForSign{
ProposedBlockInfo: blockInfo,
GapNumber: 450,
}
voteSigningHash = utils.VoteSigHash(voteForSign)
// Set round to 25
engineV2.SetNewRoundFaker(blockchain, utils.Round(25), false)
// Create two vote messages which will not reach vote pool threshold
signedHash, _ = signFn(accounts.Account{Address: signer}, voteSigningHash.Bytes())
voteMsg = &utils.Vote{
ProposedBlockInfo: blockInfo,
Signature: signedHash,
GapNumber: 450,
}
engineV2.VoteHandler(blockchain, voteMsg)
// Let's keep good Hygiene
engineV2.HygieneVotePoolFaker()
// Let's wait for 5 second for the goroutine
<-time.After(5 * time.Second)
keyList := engineV2.GetVotePoolKeyListFaker()
assert.Equal(t, 2, len(keyList))
for _, k := range keyList {
keyedRound, err := strconv.ParseInt(strings.Split(k, ":")[0], 10, 64)
assert.Nil(t, err)
if keyedRound < 25-10 {
assert.Fail(t, "Did not clean up the vote pool")
}
}
}