From d47d9a2a9978bb33431a3159ffdd15b456e1688a Mon Sep 17 00:00:00 2001 From: wgr523 Date: Wed, 10 Nov 2021 16:19:30 +0800 Subject: [PATCH] Consensus V2 variable, timeout pool (#19) * fill in XDPoS_v2 variables and processQC/TC * add timeout pool, refine engine variables * refactor type functions * solve a small pointer bug * create general pool and its test, refine engine * refine pool, add xdpos v2 config cert threshold * refine config --- consensus/XDPoS/engines/engine_v2/engine.go | 95 +++++++++++++------- consensus/XDPoS/utils/pool.go | 55 ++++++++++++ consensus/XDPoS/utils/pool_test.go | 96 +++++++++++++++++++++ consensus/XDPoS/utils/types.go | 39 +++++++++ consensus/XDPoS/utils/types_test.go | 68 +++++++++++++++ consensus/XDPoS/utils/utils.go | 32 +------ consensus/XDPoS/utils/utils_test.go | 62 +------------ params/config.go | 3 + 8 files changed, 329 insertions(+), 121 deletions(-) create mode 100644 consensus/XDPoS/utils/pool.go create mode 100644 consensus/XDPoS/utils/pool_test.go create mode 100644 consensus/XDPoS/utils/types_test.go diff --git a/consensus/XDPoS/engines/engine_v2/engine.go b/consensus/XDPoS/engines/engine_v2/engine.go index 49e9e5de65..381edf5afd 100644 --- a/consensus/XDPoS/engines/engine_v2/engine.go +++ b/consensus/XDPoS/engines/engine_v2/engine.go @@ -30,20 +30,28 @@ type XDPoS_v2 struct { BFTQueue chan interface{} timeoutWorker *countdown.CountdownTimer // Timer to generate broadcast timeout msg if threashold reached - currentRound utils.Round + timeoutPool *utils.Pool + currentRound utils.Round + highestVotedRound utils.Round + highestQuorumCert *utils.QuorumCert + // LockQC in XDPoS Consensus 2.0, used in voting rule + lockQuorumCert *utils.QuorumCert + highestTimeoutCert *utils.TimeoutCert + highestCommitBlock *utils.BlockInfo } func New(config *params.XDPoSConfig, db ethdb.Database) *XDPoS_v2 { // Setup Timer duration := time.Duration(config.V2.TimeoutWorkerDuration) * time.Millisecond timer := countdown.NewCountDown(duration) - + timeoutPool := utils.NewPool(config.V2.CertThreshold) engine := &XDPoS_v2{ config: config, db: db, timeoutWorker: timer, BroadcastCh: make(chan interface{}), BFTQueue: make(chan interface{}), + timeoutPool: timeoutPool, } // Add callback to the timer timer.OnTimeoutFn = engine.onCountdownTimeout @@ -62,6 +70,7 @@ func NewFaker(db ethdb.Database, config *params.XDPoSConfig) *XDPoS_v2 { // Setup Timer duration := time.Duration(config.V2.TimeoutWorkerDuration) * time.Millisecond timer := countdown.NewCountDown(duration) + timeoutPool := utils.NewPool(2) // Allocate the snapshot caches and create the engine fakeEngine = &XDPoS_v2{ @@ -70,6 +79,7 @@ func NewFaker(db ethdb.Database, config *params.XDPoSConfig) *XDPoS_v2 { timeoutWorker: timer, BroadcastCh: make(chan interface{}), BFTQueue: make(chan interface{}), + timeoutPool: timeoutPool, } // Add callback to the timer timer.OnTimeoutFn = fakeEngine.onCountdownTimeout @@ -171,7 +181,7 @@ func (x *XDPoS_v2) VoteHandler() { */ func (x *XDPoS_v2) VerifyTimeoutMessage(timeoutMsg utils.Timeout) (bool, error) { // Recover the public key and the Ethereum address - pubkey, err := crypto.Ecrecover(utils.TimeoutSigHash(timeoutMsg.Round).Bytes(), timeoutMsg.Signature) + pubkey, err := crypto.Ecrecover(utils.TimeoutSigHash(&timeoutMsg.Round).Bytes(), timeoutMsg.Signature) if err != nil { return false, fmt.Errorf("Error while verifying time out message: %v", err) } @@ -187,14 +197,20 @@ func (x *XDPoS_v2) VerifyTimeoutMessage(timeoutMsg utils.Timeout) (bool, error) return false, fmt.Errorf("Masternodes does not contain signer address. Master node list %v, Signer address: %v", masternodes, signerAddress) } -func (x *XDPoS_v2) TimeoutHandler() { - /* - 1. checkRoundNumber() - 2. Collect timeout (TODO) - 3. Genrate TC (TODO) - 4. processTC() - 5. generateSyncInfo() - */ +/* + 1. checkRoundNumber() + 2. Collect timeout (TODO) + 3. Genrate TC (TODO) + 4. processTC() + 5. generateSyncInfo() +*/ +func (x *XDPoS_v2) TimeoutHandler(timeout *utils.Timeout) { + // Collect timeout, generate TC + timeoutCert := x.timeoutPool.Add(timeout) + // If TC is generated + if timeoutCert != nil { + //TODO: processTC(),generateSyncInfo() + } } /* @@ -245,29 +261,50 @@ func (x *XDPoS_v2) verifyTC(header *types.Header) error { } // Update local QC variables including highestQC & lockQC, as well as update commit blockInfo before call -func (x *XDPoS_v2) processQC(header *types.Header) error { - /* - 1. Update HighestQC and LockQC - 2. Update commit block info (TODO) - 3. Check QC round >= node's currentRound. If yes, call setNewRound - */ +/* + 1. Update HighestQC and LockQC + 2. Update commit block info (TODO) + 3. Check QC round >= node's currentRound. If yes, call setNewRound +*/ +func (x *XDPoS_v2) processQC(quorumCert *utils.QuorumCert) error { + if x.highestQuorumCert == nil || quorumCert.ProposedBlockInfo.Round > x.highestQuorumCert.ProposedBlockInfo.Round { + x.highestQuorumCert = quorumCert + //TODO: do I need a clone? + } + //TODO: x.blockchain.getBlock(quorumCert.ProposedBlockInfo.Hash) then get the QC inside that block header + //TODO: update lockQC + //TODO: find parent and grandparent and grandgrandparent block, check round number, if so, commit grandgrandparent + if quorumCert.ProposedBlockInfo.Round >= x.currentRound { + x.setNewRound(quorumCert.ProposedBlockInfo.Round + 1) + } return nil } -func (x *XDPoS_v2) processTC(header *types.Header) error { - /* - 1. Update highestTC - 2. Check TC round >= node's currentRound. If yes, call setNewRound - */ +/* + 1. Update highestTC + 2. Check TC round >= node's currentRound. If yes, call setNewRound +*/ +func (x *XDPoS_v2) processTC(timeoutCert *utils.TimeoutCert) error { + if x.highestTimeoutCert == nil || timeoutCert.Round > x.highestTimeoutCert.Round { + x.highestTimeoutCert = timeoutCert + } + if timeoutCert.Round >= x.currentRound { + x.setNewRound(timeoutCert.Round + 1) + } return nil } -func (x *XDPoS_v2) setNewRound() error { - /* - 1. Set currentRound = QC round + 1 (or TC round +1) - 2. Reset timer - 3. Reset vote and timeout Pools - */ +/* + 1. Set currentRound = QC round + 1 (or TC round +1) + 2. Reset timer + 3. Reset vote and timeout Pools +*/ +func (x *XDPoS_v2) setNewRound(round utils.Round) error { + x.currentRound = round + //TODO: tell miner now it's a new round and start mine if it's leader + //TODO: reset timer + //TODO: vote pools + x.timeoutPool.Clear() return nil } @@ -308,7 +345,7 @@ func (x *XDPoS_v2) sendTimeout() error { signer, signFn := x.signer, x.signFn x.lock.RUnlock() - signedHash, err := signFn(accounts.Account{Address: signer}, utils.TimeoutSigHash(x.currentRound).Bytes()) + signedHash, err := signFn(accounts.Account{Address: signer}, utils.TimeoutSigHash(&x.currentRound).Bytes()) if err != nil { return fmt.Errorf("Error while signing for timeout message") } diff --git a/consensus/XDPoS/utils/pool.go b/consensus/XDPoS/utils/pool.go new file mode 100644 index 0000000000..d9484c428c --- /dev/null +++ b/consensus/XDPoS/utils/pool.go @@ -0,0 +1,55 @@ +package utils + +import ( + "fmt" + + "github.com/XinFinOrg/XDPoSChain/common" +) + +type PoolObj interface { + Hash() common.Hash + PoolKey() string +} +type Pool struct { + objList map[string]map[common.Hash]PoolObj + threshold int + onThresholdFn func(map[common.Hash]PoolObj) error +} + +func NewPool(threshold int) *Pool { + return &Pool{ + objList: make(map[string]map[common.Hash]PoolObj), + threshold: threshold, + } +} + +func (p *Pool) Add(obj PoolObj) error { + poolKey := obj.PoolKey() + objListKeyed, ok := p.objList[poolKey] + if !ok { + p.objList[poolKey] = make(map[common.Hash]PoolObj) + objListKeyed = p.objList[poolKey] + } + objListKeyed[obj.Hash()] = obj + if len(objListKeyed) >= p.threshold { + delete(p.objList, poolKey) + if p.onThresholdFn != nil { + return p.onThresholdFn(objListKeyed) + } else { + return fmt.Errorf("no call back function for pool") + } + } + return nil +} + +func (p *Pool) Clear() { + p.objList = make(map[string]map[common.Hash]PoolObj) +} + +func (p *Pool) SetThreshold(t int) { + p.threshold = t +} + +func (p *Pool) SetOnThresholdFn(f func(map[common.Hash]PoolObj) error) { + p.onThresholdFn = f +} diff --git a/consensus/XDPoS/utils/pool_test.go b/consensus/XDPoS/utils/pool_test.go new file mode 100644 index 0000000000..ec867ee659 --- /dev/null +++ b/consensus/XDPoS/utils/pool_test.go @@ -0,0 +1,96 @@ +package utils + +import ( + "math/big" + "testing" + + "github.com/XinFinOrg/XDPoSChain/common" + "github.com/stretchr/testify/assert" +) + +func TestPoolWithTimeout(t *testing.T) { + assert := assert.New(t) + var ret int + onThresholdFn := func(po map[common.Hash]PoolObj) error { + for _, m := range po { + if _, ok := m.(*Timeout); ok { + ret += 1 + } else { + t.Fatalf("wrong type passed into pool: %v", m) + } + } + return nil + } + + pool := NewPool(2) // 2 is the cert threshold + ret = 0 + pool.SetOnThresholdFn(onThresholdFn) + timeout1 := Timeout{Round: 1, Signature: []byte{1}} + timeout2 := Timeout{Round: 1, Signature: []byte{2}} + timeout3 := Timeout{Round: 1, Signature: []byte{3}} + assert.Nil(pool.Add(&timeout1)) + assert.Nil(pool.Add(&timeout1)) + assert.Equal(ret, 0) + assert.Nil(pool.Add(&timeout2)) + assert.Equal(ret, 2) + assert.Nil(pool.Add(&timeout3)) + assert.Equal(ret, 2) + pool = NewPool(3) // 3 is the cert size + ret = 0 + pool.SetOnThresholdFn(onThresholdFn) + assert.Nil(pool.Add(&timeout1)) + assert.Nil(pool.Add(&timeout2)) + assert.Equal(ret, 0) + pool.Clear() + assert.Nil(pool.Add(&timeout3)) + assert.Equal(ret, 0) +} + +func TestPoolWithVote(t *testing.T) { + assert := assert.New(t) + var ret int + onThresholdFn := func(po map[common.Hash]PoolObj) error { + for _, m := range po { + if _, ok := m.(*Vote); ok { + ret += 1 + } else { + t.Fatalf("wrong type passed into pool: %v", m) + } + } + return nil + } + + pool := NewPool(2) // 2 is the cert threshold + ret = 0 + pool.SetOnThresholdFn(onThresholdFn) + blockInfo1 := BlockInfo{Hash: common.BigToHash(big.NewInt(2047)), Round: 1, Number: big.NewInt(1)} + blockInfo2 := BlockInfo{Hash: common.BigToHash(big.NewInt(4095)), Round: 1, Number: big.NewInt(1)} + vote1 := Vote{ProposedBlockInfo: blockInfo1, Signature: []byte{1}} + vote2 := Vote{ProposedBlockInfo: blockInfo2, Signature: []byte{2}} + vote3 := Vote{ProposedBlockInfo: blockInfo1, Signature: []byte{3}} + assert.Nil(pool.Add(&vote1)) + assert.Nil(pool.Add(&vote1)) + assert.Equal(ret, 0) + assert.Nil(pool.Add(&vote2)) + assert.Equal(ret, 0) + assert.Nil(pool.Add(&vote3)) + assert.Equal(ret, 2) + pool = NewPool(3) // 3 is the cert size + ret = 0 + pool.SetOnThresholdFn(onThresholdFn) + assert.Nil(pool.Add(&vote1)) + assert.Nil(pool.Add(&vote2)) + assert.Nil(pool.Add(&vote3)) + assert.Equal(ret, 0) + pool.Clear() + assert.Empty(pool.objList) + pool = NewPool(2) // 2 is the cert size + ret = 0 + pool.SetOnThresholdFn(onThresholdFn) + assert.Nil(pool.Add(&vote1)) + assert.Nil(pool.Add(&vote2)) + assert.Nil(pool.Add(&vote3)) + assert.Equal(len(pool.objList), 1) //vote for one hash is cleared, but another remains + pool.Clear() + assert.Empty(pool.objList) +} diff --git a/consensus/XDPoS/utils/types.go b/consensus/XDPoS/utils/types.go index 2e0802e733..509f2ae7a1 100644 --- a/consensus/XDPoS/utils/types.go +++ b/consensus/XDPoS/utils/types.go @@ -11,6 +11,8 @@ import ( "github.com/XinFinOrg/XDPoSChain/consensus/clique" "github.com/XinFinOrg/XDPoSChain/core/state" "github.com/XinFinOrg/XDPoSChain/core/types" + "github.com/XinFinOrg/XDPoSChain/crypto/sha3" + "github.com/XinFinOrg/XDPoSChain/rlp" "gopkg.in/karalabe/cookiejar.v2/collections/prque" ) @@ -104,3 +106,40 @@ type ExtraFields_v2 struct { Round Round QuorumCert QuorumCert } + +func rlpHash(x interface{}) (h common.Hash) { + hw := sha3.NewKeccak256() + rlp.Encode(hw, x) + hw.Sum(h[:0]) + return h +} + +func (m *Vote) Hash() common.Hash { + return rlpHash(m) +} + +func (m *Timeout) Hash() common.Hash { + return rlpHash(m) +} + +func (m *SyncInfo) Hash() common.Hash { + return rlpHash(m) +} + +func VoteSigHash(m *BlockInfo) common.Hash { + return rlpHash(m) +} + +func TimeoutSigHash(m *Round) common.Hash { + return rlpHash(m) +} + +func (m *Vote) PoolKey() string { + // return the voted block hash + return m.ProposedBlockInfo.Hash.Hex() +} + +func (m *Timeout) PoolKey() string { + // return a default pool key string + return "0" +} diff --git a/consensus/XDPoS/utils/types_test.go b/consensus/XDPoS/utils/types_test.go new file mode 100644 index 0000000000..08d12a3cd9 --- /dev/null +++ b/consensus/XDPoS/utils/types_test.go @@ -0,0 +1,68 @@ +package utils + +import ( + "math/big" + "reflect" + "testing" + + "github.com/XinFinOrg/XDPoSChain/common" +) + +func toyExtraFields() *ExtraFields_v2 { + round := Round(307) + blockInfo := BlockInfo{Hash: common.BigToHash(big.NewInt(2047)), Round: round - 1, Number: big.NewInt(1)} + signature := []byte{1, 2, 3, 4, 5, 6, 7, 8} + signatures := [][]byte{signature} + quorumCert := QuorumCert{ProposedBlockInfo: blockInfo, Signatures: signatures} + e := &ExtraFields_v2{Round: round, QuorumCert: quorumCert} + return e +} +func TestExtraFieldsEncodeDecode(t *testing.T) { + extraFields := toyExtraFields() + encoded, err := extraFields.EncodeToBytes() + if err != nil { + t.Errorf("Error when encoding extra fields") + } + var decoded ExtraFields_v2 + err = DecodeBytesExtraFields(encoded, &decoded) + if err != nil { + t.Errorf("Error when decoding extra fields") + } + if !reflect.DeepEqual(*extraFields, decoded) { + t.Fatalf("Decoded not equal to original extra field, original: %v; decoded: %v", extraFields, decoded) + } +} + +func TestHashAndSigHash(t *testing.T) { + round := Round(307) + blockInfo1 := BlockInfo{Hash: common.BigToHash(big.NewInt(2047)), Round: round - 1, Number: big.NewInt(1)} + blockInfo2 := BlockInfo{Hash: common.BigToHash(big.NewInt(4095)), Round: round - 1, Number: big.NewInt(1)} + signature1 := []byte{1, 2, 3, 4, 5, 6, 7, 8} + signature2 := []byte{1, 2, 3, 4, 5, 6, 7, 7} + signatures1 := [][]byte{signature1} + quorumCert1 := QuorumCert{ProposedBlockInfo: blockInfo1, Signatures: signatures1} + signatures2 := [][]byte{signature2} + quorumCert2 := QuorumCert{ProposedBlockInfo: blockInfo1, Signatures: signatures2} + vote1 := Vote{ProposedBlockInfo: blockInfo1, Signature: signature1} + vote2 := Vote{ProposedBlockInfo: blockInfo1, Signature: signature2} + if vote1.Hash() == vote2.Hash() { + t.Fatalf("Hash of two votes shouldn't equal") + } + timeout1 := Timeout{Round: 10, Signature: signature1} + timeout2 := Timeout{Round: 10, Signature: signature2} + if timeout1.Hash() == timeout2.Hash() { + t.Fatalf("Hash of two timeouts shouldn't equal") + } + syncInfo1 := SyncInfo{HighestQuorumCert: quorumCert1} + syncInfo2 := SyncInfo{HighestQuorumCert: quorumCert2} + if syncInfo1.Hash() == syncInfo2.Hash() { + t.Fatalf("Hash of two sync info shouldn't equal") + } + if VoteSigHash(&blockInfo1) == VoteSigHash(&blockInfo2) { + t.Fatalf("SigHash of two block info shouldn't equal") + } + round2 := Round(999) + if TimeoutSigHash(&round) == TimeoutSigHash(&round2) { + t.Fatalf("SigHash of two round shouldn't equal") + } +} diff --git a/consensus/XDPoS/utils/utils.go b/consensus/XDPoS/utils/utils.go index 4a073d4f50..4dfae64093 100644 --- a/consensus/XDPoS/utils/utils.go +++ b/consensus/XDPoS/utils/utils.go @@ -174,34 +174,4 @@ func DecodeBytesExtraFields(b []byte, val interface{}) error { default: return fmt.Errorf("consensus version %d is not defined", b[0]) } -} - -func rlpHash(x interface{}) (h common.Hash) { - hw := sha3.NewKeccak256() - err := rlp.Encode(hw, x) - if err != nil { - log.Error("rlpHash failed", err) - } - hw.Sum(h[:0]) - return h -} - -func (m *Vote) Hash() common.Hash { - return rlpHash(m) -} - -func (m *Timeout) Hash() common.Hash { - return rlpHash(m) -} - -func (m *SyncInfo) Hash() common.Hash { - return rlpHash(m) -} - -func VoteSigHash(m BlockInfo) common.Hash { - return rlpHash(m) -} - -func TimeoutSigHash(m Round) common.Hash { - return rlpHash(m) -} +} \ No newline at end of file diff --git a/consensus/XDPoS/utils/utils_test.go b/consensus/XDPoS/utils/utils_test.go index 242aaf4501..6624f410af 100644 --- a/consensus/XDPoS/utils/utils_test.go +++ b/consensus/XDPoS/utils/utils_test.go @@ -3,7 +3,6 @@ package utils import ( "fmt" "math/big" - "reflect" "testing" "github.com/XinFinOrg/XDPoSChain/common" @@ -82,63 +81,4 @@ func TestCompareSignersLists(t *testing.T) { if CompareSignersLists([]common.Address{common.StringToAddress("aaaaaaaaaaaaaaaa")}, []common.Address{common.StringToAddress("cccccccccccccccccccccccccccccccccccccccc")}) { t.Error("Failed with list has only one signer") } -} - -func toyExtraFields() *ExtraFields_v2 { - round := Round(307) - blockInfo := BlockInfo{Hash: common.BigToHash(big.NewInt(2047)), Round: round - 1, Number: big.NewInt(1)} - signature := []byte{1, 2, 3, 4, 5, 6, 7, 8} - signatures := [][]byte{signature} - quorumCert := QuorumCert{ProposedBlockInfo: blockInfo, Signatures: signatures} - e := &ExtraFields_v2{Round: round, QuorumCert: quorumCert} - return e -} -func TestExtraFieldsEncodeDecode(t *testing.T) { - extraFields := toyExtraFields() - encoded, err := extraFields.EncodeToBytes() - if err != nil { - t.Errorf("Error when encoding extra fields") - } - var decoded ExtraFields_v2 - err = DecodeBytesExtraFields(encoded, &decoded) - if err != nil { - t.Errorf("Error when decoding extra fields") - } - if !reflect.DeepEqual(*extraFields, decoded) { - t.Fatalf("Decoded not equal to original extra field, original: %v; decoded: %v", extraFields, decoded) - } -} - -func TestHashAndSigHash(t *testing.T) { - round := Round(307) - blockInfo1 := BlockInfo{Hash: common.BigToHash(big.NewInt(2047)), Round: round - 1, Number: big.NewInt(1)} - blockInfo2 := BlockInfo{Hash: common.BigToHash(big.NewInt(4095)), Round: round - 1, Number: big.NewInt(1)} - signature1 := []byte{1, 2, 3, 4, 5, 6, 7, 8} - signature2 := []byte{1, 2, 3, 4, 5, 6, 7, 7} - signatures1 := [][]byte{signature1} - quorumCert1 := QuorumCert{ProposedBlockInfo: blockInfo1, Signatures: signatures1} - signatures2 := [][]byte{signature2} - quorumCert2 := QuorumCert{ProposedBlockInfo: blockInfo1, Signatures: signatures2} - vote1 := Vote{ProposedBlockInfo: blockInfo1, Signature: signature1} - vote2 := Vote{ProposedBlockInfo: blockInfo1, Signature: signature2} - if vote1.Hash() == vote2.Hash() { - t.Fatalf("Hash of two votes shouldn't equal") - } - timeout1 := Timeout{Round: 10, Signature: signature1} - timeout2 := Timeout{Round: 10, Signature: signature2} - if timeout1.Hash() == timeout2.Hash() { - t.Fatalf("Hash of two timeouts shouldn't equal") - } - syncInfo1 := SyncInfo{HighestQuorumCert: quorumCert1} - syncInfo2 := SyncInfo{HighestQuorumCert: quorumCert2} - if syncInfo1.Hash() == syncInfo2.Hash() { - t.Fatalf("Hash of two sync info shouldn't equal") - } - if VoteSigHash(blockInfo1) == VoteSigHash(blockInfo2) { - t.Fatalf("SigHash of two block info shouldn't equal") - } - round2 := Round(999) - if TimeoutSigHash(round) == TimeoutSigHash(round2) { - t.Fatalf("SigHash of two round shouldn't equal") - } -} +} \ No newline at end of file diff --git a/params/config.go b/params/config.go index 1bf0c24e62..19af952115 100644 --- a/params/config.go +++ b/params/config.go @@ -37,9 +37,11 @@ var ( var ( XDPoSV2Config = &V2{ TimeoutWorkerDuration: 50000, + CertThreshold: common.MaxMasternodesV2*2/3 + 1, } TestXDPoSV2Config = &V2{ TimeoutWorkerDuration: 5000, + CertThreshold: 2, } // XDPoSChain mainnet config @@ -199,6 +201,7 @@ type XDPoSConfig struct { type V2 struct { TimeoutWorkerDuration int64 `json:"TimeoutWorkerDuration"` // Duration in ms + CertThreshold int `json:"certificateThreshold"` // Necessary number of messages from master nodes to form a certificate } // String implements the stringer interface, returning the consensus engine details.