Consensus V2 variable, timeout pool (#19)

* fill in XDPoS_v2 variables and processQC/TC

* add timeout pool, refine engine variables

* refactor type functions

* solve a small pointer bug

* create general pool and its test, refine engine

* refine pool, add xdpos v2 config cert threshold

* refine config
This commit is contained in:
wgr523 2021-11-10 16:19:30 +08:00 committed by Jianrong
parent 4addb69561
commit d47d9a2a99
8 changed files with 329 additions and 121 deletions

View file

@ -30,20 +30,28 @@ type XDPoS_v2 struct {
BFTQueue chan interface{}
timeoutWorker *countdown.CountdownTimer // Timer to generate broadcast timeout msg if threashold reached
currentRound utils.Round
timeoutPool *utils.Pool
currentRound utils.Round
highestVotedRound utils.Round
highestQuorumCert *utils.QuorumCert
// LockQC in XDPoS Consensus 2.0, used in voting rule
lockQuorumCert *utils.QuorumCert
highestTimeoutCert *utils.TimeoutCert
highestCommitBlock *utils.BlockInfo
}
func New(config *params.XDPoSConfig, db ethdb.Database) *XDPoS_v2 {
// Setup Timer
duration := time.Duration(config.V2.TimeoutWorkerDuration) * time.Millisecond
timer := countdown.NewCountDown(duration)
timeoutPool := utils.NewPool(config.V2.CertThreshold)
engine := &XDPoS_v2{
config: config,
db: db,
timeoutWorker: timer,
BroadcastCh: make(chan interface{}),
BFTQueue: make(chan interface{}),
timeoutPool: timeoutPool,
}
// Add callback to the timer
timer.OnTimeoutFn = engine.onCountdownTimeout
@ -62,6 +70,7 @@ func NewFaker(db ethdb.Database, config *params.XDPoSConfig) *XDPoS_v2 {
// Setup Timer
duration := time.Duration(config.V2.TimeoutWorkerDuration) * time.Millisecond
timer := countdown.NewCountDown(duration)
timeoutPool := utils.NewPool(2)
// Allocate the snapshot caches and create the engine
fakeEngine = &XDPoS_v2{
@ -70,6 +79,7 @@ func NewFaker(db ethdb.Database, config *params.XDPoSConfig) *XDPoS_v2 {
timeoutWorker: timer,
BroadcastCh: make(chan interface{}),
BFTQueue: make(chan interface{}),
timeoutPool: timeoutPool,
}
// Add callback to the timer
timer.OnTimeoutFn = fakeEngine.onCountdownTimeout
@ -171,7 +181,7 @@ func (x *XDPoS_v2) VoteHandler() {
*/
func (x *XDPoS_v2) VerifyTimeoutMessage(timeoutMsg utils.Timeout) (bool, error) {
// Recover the public key and the Ethereum address
pubkey, err := crypto.Ecrecover(utils.TimeoutSigHash(timeoutMsg.Round).Bytes(), timeoutMsg.Signature)
pubkey, err := crypto.Ecrecover(utils.TimeoutSigHash(&timeoutMsg.Round).Bytes(), timeoutMsg.Signature)
if err != nil {
return false, fmt.Errorf("Error while verifying time out message: %v", err)
}
@ -187,14 +197,20 @@ func (x *XDPoS_v2) VerifyTimeoutMessage(timeoutMsg utils.Timeout) (bool, error)
return false, fmt.Errorf("Masternodes does not contain signer address. Master node list %v, Signer address: %v", masternodes, signerAddress)
}
func (x *XDPoS_v2) TimeoutHandler() {
/*
1. checkRoundNumber()
2. Collect timeout (TODO)
3. Genrate TC (TODO)
4. processTC()
5. generateSyncInfo()
*/
/*
1. checkRoundNumber()
2. Collect timeout (TODO)
3. Genrate TC (TODO)
4. processTC()
5. generateSyncInfo()
*/
func (x *XDPoS_v2) TimeoutHandler(timeout *utils.Timeout) {
// Collect timeout, generate TC
timeoutCert := x.timeoutPool.Add(timeout)
// If TC is generated
if timeoutCert != nil {
//TODO: processTC(),generateSyncInfo()
}
}
/*
@ -245,29 +261,50 @@ func (x *XDPoS_v2) verifyTC(header *types.Header) error {
}
// Update local QC variables including highestQC & lockQC, as well as update commit blockInfo before call
func (x *XDPoS_v2) processQC(header *types.Header) error {
/*
1. Update HighestQC and LockQC
2. Update commit block info (TODO)
3. Check QC round >= node's currentRound. If yes, call setNewRound
*/
/*
1. Update HighestQC and LockQC
2. Update commit block info (TODO)
3. Check QC round >= node's currentRound. If yes, call setNewRound
*/
func (x *XDPoS_v2) processQC(quorumCert *utils.QuorumCert) error {
if x.highestQuorumCert == nil || quorumCert.ProposedBlockInfo.Round > x.highestQuorumCert.ProposedBlockInfo.Round {
x.highestQuorumCert = quorumCert
//TODO: do I need a clone?
}
//TODO: x.blockchain.getBlock(quorumCert.ProposedBlockInfo.Hash) then get the QC inside that block header
//TODO: update lockQC
//TODO: find parent and grandparent and grandgrandparent block, check round number, if so, commit grandgrandparent
if quorumCert.ProposedBlockInfo.Round >= x.currentRound {
x.setNewRound(quorumCert.ProposedBlockInfo.Round + 1)
}
return nil
}
func (x *XDPoS_v2) processTC(header *types.Header) error {
/*
1. Update highestTC
2. Check TC round >= node's currentRound. If yes, call setNewRound
*/
/*
1. Update highestTC
2. Check TC round >= node's currentRound. If yes, call setNewRound
*/
func (x *XDPoS_v2) processTC(timeoutCert *utils.TimeoutCert) error {
if x.highestTimeoutCert == nil || timeoutCert.Round > x.highestTimeoutCert.Round {
x.highestTimeoutCert = timeoutCert
}
if timeoutCert.Round >= x.currentRound {
x.setNewRound(timeoutCert.Round + 1)
}
return nil
}
func (x *XDPoS_v2) setNewRound() error {
/*
1. Set currentRound = QC round + 1 (or TC round +1)
2. Reset timer
3. Reset vote and timeout Pools
*/
/*
1. Set currentRound = QC round + 1 (or TC round +1)
2. Reset timer
3. Reset vote and timeout Pools
*/
func (x *XDPoS_v2) setNewRound(round utils.Round) error {
x.currentRound = round
//TODO: tell miner now it's a new round and start mine if it's leader
//TODO: reset timer
//TODO: vote pools
x.timeoutPool.Clear()
return nil
}
@ -308,7 +345,7 @@ func (x *XDPoS_v2) sendTimeout() error {
signer, signFn := x.signer, x.signFn
x.lock.RUnlock()
signedHash, err := signFn(accounts.Account{Address: signer}, utils.TimeoutSigHash(x.currentRound).Bytes())
signedHash, err := signFn(accounts.Account{Address: signer}, utils.TimeoutSigHash(&x.currentRound).Bytes())
if err != nil {
return fmt.Errorf("Error while signing for timeout message")
}

View file

@ -0,0 +1,55 @@
package utils
import (
"fmt"
"github.com/XinFinOrg/XDPoSChain/common"
)
type PoolObj interface {
Hash() common.Hash
PoolKey() string
}
type Pool struct {
objList map[string]map[common.Hash]PoolObj
threshold int
onThresholdFn func(map[common.Hash]PoolObj) error
}
func NewPool(threshold int) *Pool {
return &Pool{
objList: make(map[string]map[common.Hash]PoolObj),
threshold: threshold,
}
}
func (p *Pool) Add(obj PoolObj) error {
poolKey := obj.PoolKey()
objListKeyed, ok := p.objList[poolKey]
if !ok {
p.objList[poolKey] = make(map[common.Hash]PoolObj)
objListKeyed = p.objList[poolKey]
}
objListKeyed[obj.Hash()] = obj
if len(objListKeyed) >= p.threshold {
delete(p.objList, poolKey)
if p.onThresholdFn != nil {
return p.onThresholdFn(objListKeyed)
} else {
return fmt.Errorf("no call back function for pool")
}
}
return nil
}
func (p *Pool) Clear() {
p.objList = make(map[string]map[common.Hash]PoolObj)
}
func (p *Pool) SetThreshold(t int) {
p.threshold = t
}
func (p *Pool) SetOnThresholdFn(f func(map[common.Hash]PoolObj) error) {
p.onThresholdFn = f
}

View file

@ -0,0 +1,96 @@
package utils
import (
"math/big"
"testing"
"github.com/XinFinOrg/XDPoSChain/common"
"github.com/stretchr/testify/assert"
)
func TestPoolWithTimeout(t *testing.T) {
assert := assert.New(t)
var ret int
onThresholdFn := func(po map[common.Hash]PoolObj) error {
for _, m := range po {
if _, ok := m.(*Timeout); ok {
ret += 1
} else {
t.Fatalf("wrong type passed into pool: %v", m)
}
}
return nil
}
pool := NewPool(2) // 2 is the cert threshold
ret = 0
pool.SetOnThresholdFn(onThresholdFn)
timeout1 := Timeout{Round: 1, Signature: []byte{1}}
timeout2 := Timeout{Round: 1, Signature: []byte{2}}
timeout3 := Timeout{Round: 1, Signature: []byte{3}}
assert.Nil(pool.Add(&timeout1))
assert.Nil(pool.Add(&timeout1))
assert.Equal(ret, 0)
assert.Nil(pool.Add(&timeout2))
assert.Equal(ret, 2)
assert.Nil(pool.Add(&timeout3))
assert.Equal(ret, 2)
pool = NewPool(3) // 3 is the cert size
ret = 0
pool.SetOnThresholdFn(onThresholdFn)
assert.Nil(pool.Add(&timeout1))
assert.Nil(pool.Add(&timeout2))
assert.Equal(ret, 0)
pool.Clear()
assert.Nil(pool.Add(&timeout3))
assert.Equal(ret, 0)
}
func TestPoolWithVote(t *testing.T) {
assert := assert.New(t)
var ret int
onThresholdFn := func(po map[common.Hash]PoolObj) error {
for _, m := range po {
if _, ok := m.(*Vote); ok {
ret += 1
} else {
t.Fatalf("wrong type passed into pool: %v", m)
}
}
return nil
}
pool := NewPool(2) // 2 is the cert threshold
ret = 0
pool.SetOnThresholdFn(onThresholdFn)
blockInfo1 := BlockInfo{Hash: common.BigToHash(big.NewInt(2047)), Round: 1, Number: big.NewInt(1)}
blockInfo2 := BlockInfo{Hash: common.BigToHash(big.NewInt(4095)), Round: 1, Number: big.NewInt(1)}
vote1 := Vote{ProposedBlockInfo: blockInfo1, Signature: []byte{1}}
vote2 := Vote{ProposedBlockInfo: blockInfo2, Signature: []byte{2}}
vote3 := Vote{ProposedBlockInfo: blockInfo1, Signature: []byte{3}}
assert.Nil(pool.Add(&vote1))
assert.Nil(pool.Add(&vote1))
assert.Equal(ret, 0)
assert.Nil(pool.Add(&vote2))
assert.Equal(ret, 0)
assert.Nil(pool.Add(&vote3))
assert.Equal(ret, 2)
pool = NewPool(3) // 3 is the cert size
ret = 0
pool.SetOnThresholdFn(onThresholdFn)
assert.Nil(pool.Add(&vote1))
assert.Nil(pool.Add(&vote2))
assert.Nil(pool.Add(&vote3))
assert.Equal(ret, 0)
pool.Clear()
assert.Empty(pool.objList)
pool = NewPool(2) // 2 is the cert size
ret = 0
pool.SetOnThresholdFn(onThresholdFn)
assert.Nil(pool.Add(&vote1))
assert.Nil(pool.Add(&vote2))
assert.Nil(pool.Add(&vote3))
assert.Equal(len(pool.objList), 1) //vote for one hash is cleared, but another remains
pool.Clear()
assert.Empty(pool.objList)
}

View file

@ -11,6 +11,8 @@ import (
"github.com/XinFinOrg/XDPoSChain/consensus/clique"
"github.com/XinFinOrg/XDPoSChain/core/state"
"github.com/XinFinOrg/XDPoSChain/core/types"
"github.com/XinFinOrg/XDPoSChain/crypto/sha3"
"github.com/XinFinOrg/XDPoSChain/rlp"
"gopkg.in/karalabe/cookiejar.v2/collections/prque"
)
@ -104,3 +106,40 @@ type ExtraFields_v2 struct {
Round Round
QuorumCert QuorumCert
}
func rlpHash(x interface{}) (h common.Hash) {
hw := sha3.NewKeccak256()
rlp.Encode(hw, x)
hw.Sum(h[:0])
return h
}
func (m *Vote) Hash() common.Hash {
return rlpHash(m)
}
func (m *Timeout) Hash() common.Hash {
return rlpHash(m)
}
func (m *SyncInfo) Hash() common.Hash {
return rlpHash(m)
}
func VoteSigHash(m *BlockInfo) common.Hash {
return rlpHash(m)
}
func TimeoutSigHash(m *Round) common.Hash {
return rlpHash(m)
}
func (m *Vote) PoolKey() string {
// return the voted block hash
return m.ProposedBlockInfo.Hash.Hex()
}
func (m *Timeout) PoolKey() string {
// return a default pool key string
return "0"
}

View file

@ -0,0 +1,68 @@
package utils
import (
"math/big"
"reflect"
"testing"
"github.com/XinFinOrg/XDPoSChain/common"
)
func toyExtraFields() *ExtraFields_v2 {
round := Round(307)
blockInfo := BlockInfo{Hash: common.BigToHash(big.NewInt(2047)), Round: round - 1, Number: big.NewInt(1)}
signature := []byte{1, 2, 3, 4, 5, 6, 7, 8}
signatures := [][]byte{signature}
quorumCert := QuorumCert{ProposedBlockInfo: blockInfo, Signatures: signatures}
e := &ExtraFields_v2{Round: round, QuorumCert: quorumCert}
return e
}
func TestExtraFieldsEncodeDecode(t *testing.T) {
extraFields := toyExtraFields()
encoded, err := extraFields.EncodeToBytes()
if err != nil {
t.Errorf("Error when encoding extra fields")
}
var decoded ExtraFields_v2
err = DecodeBytesExtraFields(encoded, &decoded)
if err != nil {
t.Errorf("Error when decoding extra fields")
}
if !reflect.DeepEqual(*extraFields, decoded) {
t.Fatalf("Decoded not equal to original extra field, original: %v; decoded: %v", extraFields, decoded)
}
}
func TestHashAndSigHash(t *testing.T) {
round := Round(307)
blockInfo1 := BlockInfo{Hash: common.BigToHash(big.NewInt(2047)), Round: round - 1, Number: big.NewInt(1)}
blockInfo2 := BlockInfo{Hash: common.BigToHash(big.NewInt(4095)), Round: round - 1, Number: big.NewInt(1)}
signature1 := []byte{1, 2, 3, 4, 5, 6, 7, 8}
signature2 := []byte{1, 2, 3, 4, 5, 6, 7, 7}
signatures1 := [][]byte{signature1}
quorumCert1 := QuorumCert{ProposedBlockInfo: blockInfo1, Signatures: signatures1}
signatures2 := [][]byte{signature2}
quorumCert2 := QuorumCert{ProposedBlockInfo: blockInfo1, Signatures: signatures2}
vote1 := Vote{ProposedBlockInfo: blockInfo1, Signature: signature1}
vote2 := Vote{ProposedBlockInfo: blockInfo1, Signature: signature2}
if vote1.Hash() == vote2.Hash() {
t.Fatalf("Hash of two votes shouldn't equal")
}
timeout1 := Timeout{Round: 10, Signature: signature1}
timeout2 := Timeout{Round: 10, Signature: signature2}
if timeout1.Hash() == timeout2.Hash() {
t.Fatalf("Hash of two timeouts shouldn't equal")
}
syncInfo1 := SyncInfo{HighestQuorumCert: quorumCert1}
syncInfo2 := SyncInfo{HighestQuorumCert: quorumCert2}
if syncInfo1.Hash() == syncInfo2.Hash() {
t.Fatalf("Hash of two sync info shouldn't equal")
}
if VoteSigHash(&blockInfo1) == VoteSigHash(&blockInfo2) {
t.Fatalf("SigHash of two block info shouldn't equal")
}
round2 := Round(999)
if TimeoutSigHash(&round) == TimeoutSigHash(&round2) {
t.Fatalf("SigHash of two round shouldn't equal")
}
}

View file

@ -174,34 +174,4 @@ func DecodeBytesExtraFields(b []byte, val interface{}) error {
default:
return fmt.Errorf("consensus version %d is not defined", b[0])
}
}
func rlpHash(x interface{}) (h common.Hash) {
hw := sha3.NewKeccak256()
err := rlp.Encode(hw, x)
if err != nil {
log.Error("rlpHash failed", err)
}
hw.Sum(h[:0])
return h
}
func (m *Vote) Hash() common.Hash {
return rlpHash(m)
}
func (m *Timeout) Hash() common.Hash {
return rlpHash(m)
}
func (m *SyncInfo) Hash() common.Hash {
return rlpHash(m)
}
func VoteSigHash(m BlockInfo) common.Hash {
return rlpHash(m)
}
func TimeoutSigHash(m Round) common.Hash {
return rlpHash(m)
}
}

View file

@ -3,7 +3,6 @@ package utils
import (
"fmt"
"math/big"
"reflect"
"testing"
"github.com/XinFinOrg/XDPoSChain/common"
@ -82,63 +81,4 @@ func TestCompareSignersLists(t *testing.T) {
if CompareSignersLists([]common.Address{common.StringToAddress("aaaaaaaaaaaaaaaa")}, []common.Address{common.StringToAddress("cccccccccccccccccccccccccccccccccccccccc")}) {
t.Error("Failed with list has only one signer")
}
}
func toyExtraFields() *ExtraFields_v2 {
round := Round(307)
blockInfo := BlockInfo{Hash: common.BigToHash(big.NewInt(2047)), Round: round - 1, Number: big.NewInt(1)}
signature := []byte{1, 2, 3, 4, 5, 6, 7, 8}
signatures := [][]byte{signature}
quorumCert := QuorumCert{ProposedBlockInfo: blockInfo, Signatures: signatures}
e := &ExtraFields_v2{Round: round, QuorumCert: quorumCert}
return e
}
func TestExtraFieldsEncodeDecode(t *testing.T) {
extraFields := toyExtraFields()
encoded, err := extraFields.EncodeToBytes()
if err != nil {
t.Errorf("Error when encoding extra fields")
}
var decoded ExtraFields_v2
err = DecodeBytesExtraFields(encoded, &decoded)
if err != nil {
t.Errorf("Error when decoding extra fields")
}
if !reflect.DeepEqual(*extraFields, decoded) {
t.Fatalf("Decoded not equal to original extra field, original: %v; decoded: %v", extraFields, decoded)
}
}
func TestHashAndSigHash(t *testing.T) {
round := Round(307)
blockInfo1 := BlockInfo{Hash: common.BigToHash(big.NewInt(2047)), Round: round - 1, Number: big.NewInt(1)}
blockInfo2 := BlockInfo{Hash: common.BigToHash(big.NewInt(4095)), Round: round - 1, Number: big.NewInt(1)}
signature1 := []byte{1, 2, 3, 4, 5, 6, 7, 8}
signature2 := []byte{1, 2, 3, 4, 5, 6, 7, 7}
signatures1 := [][]byte{signature1}
quorumCert1 := QuorumCert{ProposedBlockInfo: blockInfo1, Signatures: signatures1}
signatures2 := [][]byte{signature2}
quorumCert2 := QuorumCert{ProposedBlockInfo: blockInfo1, Signatures: signatures2}
vote1 := Vote{ProposedBlockInfo: blockInfo1, Signature: signature1}
vote2 := Vote{ProposedBlockInfo: blockInfo1, Signature: signature2}
if vote1.Hash() == vote2.Hash() {
t.Fatalf("Hash of two votes shouldn't equal")
}
timeout1 := Timeout{Round: 10, Signature: signature1}
timeout2 := Timeout{Round: 10, Signature: signature2}
if timeout1.Hash() == timeout2.Hash() {
t.Fatalf("Hash of two timeouts shouldn't equal")
}
syncInfo1 := SyncInfo{HighestQuorumCert: quorumCert1}
syncInfo2 := SyncInfo{HighestQuorumCert: quorumCert2}
if syncInfo1.Hash() == syncInfo2.Hash() {
t.Fatalf("Hash of two sync info shouldn't equal")
}
if VoteSigHash(blockInfo1) == VoteSigHash(blockInfo2) {
t.Fatalf("SigHash of two block info shouldn't equal")
}
round2 := Round(999)
if TimeoutSigHash(round) == TimeoutSigHash(round2) {
t.Fatalf("SigHash of two round shouldn't equal")
}
}
}

View file

@ -37,9 +37,11 @@ var (
var (
XDPoSV2Config = &V2{
TimeoutWorkerDuration: 50000,
CertThreshold: common.MaxMasternodesV2*2/3 + 1,
}
TestXDPoSV2Config = &V2{
TimeoutWorkerDuration: 5000,
CertThreshold: 2,
}
// XDPoSChain mainnet config
@ -199,6 +201,7 @@ type XDPoSConfig struct {
type V2 struct {
TimeoutWorkerDuration int64 `json:"TimeoutWorkerDuration"` // Duration in ms
CertThreshold int `json:"certificateThreshold"` // Necessary number of messages from master nodes to form a certificate
}
// String implements the stringer interface, returning the consensus engine details.