mirror of
https://github.com/ethereum/go-ethereum.git
synced 2026-06-19 21:31:37 +00:00
vote and timeout handlers
This commit is contained in:
parent
d47d9a2a99
commit
a39612e540
8 changed files with 462 additions and 125 deletions
|
|
@ -100,7 +100,7 @@ func NewFaker(db ethdb.Database, chainConfig *params.ChainConfig) *XDPoS {
|
|||
|
||||
signingTxsCache: signingTxsCache,
|
||||
EngineV1: engine_v1.NewFaker(db, conf),
|
||||
EngineV2: engine_v2.NewFaker(db, conf),
|
||||
EngineV2: engine_v2.New(conf, db),
|
||||
}
|
||||
return fakeEngine
|
||||
}
|
||||
|
|
|
|||
|
|
@ -22,18 +22,19 @@ type XDPoS_v2 struct {
|
|||
config *params.XDPoSConfig // Consensus engine configuration parameters
|
||||
db ethdb.Database // Database to store and retrieve snapshot checkpoints
|
||||
|
||||
signer common.Address // Ethereum address of the signing key
|
||||
signFn clique.SignerFn // Signer function to authorize hashes with
|
||||
lock sync.RWMutex // Protects the signer fields
|
||||
signer common.Address // Ethereum address of the signing key
|
||||
signFn clique.SignerFn // Signer function to authorize hashes with
|
||||
signLock sync.RWMutex // Protects the signer fields
|
||||
|
||||
BroadcastCh chan interface{}
|
||||
BFTQueue chan interface{}
|
||||
timeoutWorker *countdown.CountdownTimer // Timer to generate broadcast timeout msg if threashold reached
|
||||
|
||||
timeoutPool *utils.Pool
|
||||
currentRound utils.Round
|
||||
highestVotedRound utils.Round
|
||||
highestQuorumCert *utils.QuorumCert
|
||||
lock sync.RWMutex // Protects the currentRound fields etc
|
||||
timeoutPool *utils.Pool
|
||||
votePool *utils.Pool
|
||||
currentRound utils.Round
|
||||
highestVotedRound utils.Round
|
||||
highestQuorumCert *utils.QuorumCert
|
||||
// LockQC in XDPoS Consensus 2.0, used in voting rule
|
||||
lockQuorumCert *utils.QuorumCert
|
||||
highestTimeoutCert *utils.TimeoutCert
|
||||
|
|
@ -45,16 +46,22 @@ func New(config *params.XDPoSConfig, db ethdb.Database) *XDPoS_v2 {
|
|||
duration := time.Duration(config.V2.TimeoutWorkerDuration) * time.Millisecond
|
||||
timer := countdown.NewCountDown(duration)
|
||||
timeoutPool := utils.NewPool(config.V2.CertThreshold)
|
||||
votePool := utils.NewPool(config.V2.CertThreshold)
|
||||
engine := &XDPoS_v2{
|
||||
config: config,
|
||||
db: db,
|
||||
timeoutWorker: timer,
|
||||
BroadcastCh: make(chan interface{}),
|
||||
BFTQueue: make(chan interface{}),
|
||||
timeoutPool: timeoutPool,
|
||||
config: config,
|
||||
db: db,
|
||||
timeoutWorker: timer,
|
||||
BroadcastCh: make(chan interface{}),
|
||||
timeoutPool: timeoutPool,
|
||||
votePool: votePool,
|
||||
highestTimeoutCert: &utils.TimeoutCert{},
|
||||
highestQuorumCert: &utils.QuorumCert{},
|
||||
}
|
||||
// Add callback to the timer
|
||||
timer.OnTimeoutFn = engine.onCountdownTimeout
|
||||
// Attach vote & timeout pool callback function when it reached threshold
|
||||
votePool.SetOnThresholdFn(engine.onVotePoolThresholdReached)
|
||||
timeoutPool.SetOnThresholdFn(engine.onTimeoutPoolThresholdReached)
|
||||
|
||||
return engine
|
||||
}
|
||||
|
|
@ -62,41 +69,23 @@ func New(config *params.XDPoSConfig, db ethdb.Database) *XDPoS_v2 {
|
|||
/*
|
||||
Testing tools
|
||||
*/
|
||||
// Test only. Never to be used for mainnet implementation
|
||||
func NewFaker(db ethdb.Database, config *params.XDPoSConfig) *XDPoS_v2 {
|
||||
var fakeEngine *XDPoS_v2
|
||||
// Set any missing consensus parameters to their defaults
|
||||
conf := config
|
||||
// Setup Timer
|
||||
duration := time.Duration(config.V2.TimeoutWorkerDuration) * time.Millisecond
|
||||
timer := countdown.NewCountDown(duration)
|
||||
timeoutPool := utils.NewPool(2)
|
||||
|
||||
// Allocate the snapshot caches and create the engine
|
||||
fakeEngine = &XDPoS_v2{
|
||||
config: conf,
|
||||
db: db,
|
||||
timeoutWorker: timer,
|
||||
BroadcastCh: make(chan interface{}),
|
||||
BFTQueue: make(chan interface{}),
|
||||
timeoutPool: timeoutPool,
|
||||
func (x *XDPoS_v2) SetNewRoundFaker(newRound utils.Round, resetTimer bool) {
|
||||
// Reset a bunch of things
|
||||
if resetTimer {
|
||||
x.timeoutWorker.Reset()
|
||||
}
|
||||
// Add callback to the timer
|
||||
timer.OnTimeoutFn = fakeEngine.onCountdownTimeout
|
||||
return fakeEngine
|
||||
x.currentRound = newRound
|
||||
}
|
||||
|
||||
// Test only.
|
||||
func (x *XDPoS_v2) SetNewRoundFaker(newRound utils.Round) {
|
||||
// Reset a bunch of things
|
||||
x.timeoutWorker.Reset()
|
||||
x.currentRound = newRound
|
||||
// Utils for test to check currentRound value
|
||||
func (x *XDPoS_v2) GetCurrentRound() utils.Round {
|
||||
return x.currentRound
|
||||
}
|
||||
|
||||
// Authorize injects a private key into the consensus engine to mint new blocks with.
|
||||
func (x *XDPoS_v2) Authorize(signer common.Address, signFn clique.SignerFn) {
|
||||
x.lock.Lock()
|
||||
defer x.lock.Unlock()
|
||||
x.signLock.Lock()
|
||||
defer x.signLock.Unlock()
|
||||
|
||||
x.signer = signer
|
||||
x.signFn = signFn
|
||||
|
|
@ -110,22 +99,10 @@ func (x *XDPoS_v2) VerifyHeader(chain consensus.ChainReader, header *types.Heade
|
|||
return nil
|
||||
}
|
||||
|
||||
// Push mesages(i.e vote, sync info & timeout) into BFTQueue. This funciton shall be called by BFT protocal manager
|
||||
func (x *XDPoS_v2) Enqueue() error {
|
||||
return nil
|
||||
}
|
||||
|
||||
// Main function for the v2 consensus.
|
||||
func (x *XDPoS_v2) Dispatcher() error {
|
||||
// 1. Pull message from the BFTQueue and call the relevant handler by message type, such as vote, timeout or syncInfo
|
||||
// 2. Only 1 message processing at the time
|
||||
return nil
|
||||
}
|
||||
|
||||
/*
|
||||
SyncInfo workflow
|
||||
*/
|
||||
// Verify syncInfo and trigger trigger process QC or TC if successful
|
||||
// Verify syncInfo and trigger process QC or TC if successful
|
||||
func (x *XDPoS_v2) VerifySyncInfoMessage(syncInfo utils.SyncInfo) error {
|
||||
/*
|
||||
1. Verify items including:
|
||||
|
|
@ -133,6 +110,16 @@ func (x *XDPoS_v2) VerifySyncInfoMessage(syncInfo utils.SyncInfo) error {
|
|||
- verifyTC
|
||||
2. Broadcast(Not part of consensus)
|
||||
*/
|
||||
err := x.verifyQC(&syncInfo.HighestQuorumCert)
|
||||
if err != nil {
|
||||
log.Warn("SyncInfo message verification failed due to QC", err)
|
||||
return err
|
||||
}
|
||||
err = x.verifyTC(&syncInfo.HighestTimeoutCert)
|
||||
if err != nil {
|
||||
log.Warn("SyncInfo message verification failed due to TC", err)
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
|
|
@ -147,7 +134,7 @@ func (x *XDPoS_v2) SyncInfoHandler(header *types.Header) error {
|
|||
/*
|
||||
Vote workflow
|
||||
*/
|
||||
func (x *XDPoS_v2) VerifyVoteMessage(vote utils.Vote) error {
|
||||
func (x *XDPoS_v2) VerifyVoteMessage(vote utils.Vote) (bool, error) {
|
||||
/*
|
||||
1. Check signature:
|
||||
- Use ecRecover to get the public key
|
||||
|
|
@ -156,16 +143,51 @@ func (x *XDPoS_v2) VerifyVoteMessage(vote utils.Vote) error {
|
|||
2. Verify blockInfo
|
||||
3. Broadcast(Not part of consensus)
|
||||
*/
|
||||
return x.verifyMsgSignature(utils.VoteSigHash(&vote.ProposedBlockInfo), vote.Signature)
|
||||
}
|
||||
|
||||
// Consensus entry point for processing vote message to produce QC
|
||||
func (x *XDPoS_v2) VoteHandler(voteMsg utils.Vote) error {
|
||||
x.lock.Lock()
|
||||
defer x.lock.Unlock()
|
||||
|
||||
// 1. checkRoundNumber
|
||||
if voteMsg.ProposedBlockInfo.Round != x.currentRound {
|
||||
return fmt.Errorf("Vote message round number: %v does not match currentRound: %v", voteMsg.ProposedBlockInfo.Round, x.currentRound)
|
||||
}
|
||||
|
||||
// Collect vote
|
||||
thresholdReached, numberOfVotesInPool, hookError := x.votePool.Add(&voteMsg)
|
||||
if hookError != nil {
|
||||
log.Error("Error while adding vote message to the pool, ", hookError)
|
||||
return hookError
|
||||
}
|
||||
|
||||
log.Debug("Vote pool threashold reached: %v, number of items in the pool: %v", thresholdReached, numberOfVotesInPool)
|
||||
return nil
|
||||
}
|
||||
|
||||
func (x *XDPoS_v2) VoteHandler() {
|
||||
/*
|
||||
1. checkRoundNumber
|
||||
3. Collect vote (TODO)
|
||||
4. Genrate QC (TODO)
|
||||
5. processQC
|
||||
*/
|
||||
/*
|
||||
Function that will be called by votePool when it reached threshold.
|
||||
In the engine v2, we will need to generate and process QC
|
||||
*/
|
||||
func (x *XDPoS_v2) onVotePoolThresholdReached(pooledVotes map[common.Hash]utils.PoolObj, currentVoteMsg utils.PoolObj) error {
|
||||
signatures := []utils.Signature{}
|
||||
for _, v := range pooledVotes {
|
||||
signatures = append(signatures, v.(*utils.Vote).Signature)
|
||||
}
|
||||
// Genrate QC
|
||||
quorumCert := &utils.QuorumCert{
|
||||
ProposedBlockInfo: currentVoteMsg.(*utils.Vote).ProposedBlockInfo,
|
||||
Signatures: signatures,
|
||||
}
|
||||
err := x.processQC(quorumCert)
|
||||
if err != nil {
|
||||
log.Error("Error while processing QC in the Vote handler after reaching pool threshold, ", err)
|
||||
return err
|
||||
}
|
||||
log.Info("🗳 Successfully processed the vote and produced QC!")
|
||||
return nil
|
||||
}
|
||||
|
||||
/*
|
||||
|
|
@ -180,37 +202,62 @@ func (x *XDPoS_v2) VoteHandler() {
|
|||
2. Broadcast(Not part of consensus)
|
||||
*/
|
||||
func (x *XDPoS_v2) VerifyTimeoutMessage(timeoutMsg utils.Timeout) (bool, error) {
|
||||
// Recover the public key and the Ethereum address
|
||||
pubkey, err := crypto.Ecrecover(utils.TimeoutSigHash(&timeoutMsg.Round).Bytes(), timeoutMsg.Signature)
|
||||
if err != nil {
|
||||
return false, fmt.Errorf("Error while verifying time out message: %v", err)
|
||||
}
|
||||
var signerAddress common.Address
|
||||
copy(signerAddress[:], crypto.Keccak256(pubkey[1:])[12:])
|
||||
masternodes := x.getCurrentRoundMasterNodes()
|
||||
for _, mn := range masternodes {
|
||||
if mn == signerAddress {
|
||||
return true, nil
|
||||
}
|
||||
}
|
||||
|
||||
return false, fmt.Errorf("Masternodes does not contain signer address. Master node list %v, Signer address: %v", masternodes, signerAddress)
|
||||
return x.verifyMsgSignature(utils.TimeoutSigHash(&timeoutMsg.Round), timeoutMsg.Signature)
|
||||
}
|
||||
|
||||
/*
|
||||
Entry point for handling timeout message to process below:
|
||||
1. checkRoundNumber()
|
||||
2. Collect timeout (TODO)
|
||||
3. Genrate TC (TODO)
|
||||
4. processTC()
|
||||
5. generateSyncInfo()
|
||||
2. Collect timeout
|
||||
Once timeout pool reached threshold, it will trigger the call to the hook function "onTimeoutPoolThresholdReached"
|
||||
*/
|
||||
func (x *XDPoS_v2) TimeoutHandler(timeout *utils.Timeout) {
|
||||
// Collect timeout, generate TC
|
||||
timeoutCert := x.timeoutPool.Add(timeout)
|
||||
// If TC is generated
|
||||
if timeoutCert != nil {
|
||||
//TODO: processTC(),generateSyncInfo()
|
||||
func (x *XDPoS_v2) TimeoutHandler(timeout *utils.Timeout) error {
|
||||
x.lock.Lock()
|
||||
defer x.lock.Unlock()
|
||||
|
||||
// 1. checkRoundNumber
|
||||
if timeout.Round != x.currentRound {
|
||||
return fmt.Errorf("Timeout message round number: %v does not match currentRound: %v", timeout.Round, x.currentRound)
|
||||
}
|
||||
// Collect timeout, generate TC
|
||||
isThresholdReached, numberOfTimeoutsInPool, hookError := x.timeoutPool.Add(timeout)
|
||||
if hookError != nil {
|
||||
log.Error("Error adding timeout to the pool, ", hookError.Error())
|
||||
return hookError
|
||||
}
|
||||
log.Debug("Timeout pool threashold reached: %v, number of items in the pool: %v", isThresholdReached, numberOfTimeoutsInPool)
|
||||
return nil
|
||||
}
|
||||
|
||||
/*
|
||||
Function that will be called by timeoutPool when it reached threshold.
|
||||
In the engine v2, we will need to:
|
||||
1. Genrate TC
|
||||
2. processTC()
|
||||
3. generateSyncInfo()
|
||||
*/
|
||||
func (x *XDPoS_v2) onTimeoutPoolThresholdReached(pooledTimeouts map[common.Hash]utils.PoolObj, currentTimeoutMsg utils.PoolObj) error {
|
||||
signatures := []utils.Signature{}
|
||||
for _, v := range pooledTimeouts {
|
||||
signatures = append(signatures, v.(*utils.Timeout).Signature)
|
||||
}
|
||||
// Genrate TC
|
||||
timeoutCert := &utils.TimeoutCert{
|
||||
Round: currentTimeoutMsg.(*utils.Timeout).Round,
|
||||
Signatures: signatures,
|
||||
}
|
||||
// Process TC
|
||||
err := x.processTC(timeoutCert)
|
||||
if err != nil {
|
||||
log.Error("Error while processing TC in the Timeout handler after reaching pool threshold, ", err.Error())
|
||||
return err
|
||||
}
|
||||
// Generate and broadcast syncInfo
|
||||
syncInfo := x.getSyncInfo()
|
||||
x.broadcastToBftChannel(syncInfo)
|
||||
|
||||
log.Info("⏰ Successfully processed the timeout message and produced TC & SyncInfo!")
|
||||
return nil
|
||||
}
|
||||
|
||||
/*
|
||||
|
|
@ -239,7 +286,7 @@ func (x *XDPoS_v2) VerifyBlockInfo(blockInfo utils.BlockInfo) error {
|
|||
return nil
|
||||
}
|
||||
|
||||
func (x *XDPoS_v2) verifyQC(header *types.Header) error {
|
||||
func (x *XDPoS_v2) verifyQC(quorumCert *utils.QuorumCert) error {
|
||||
/*
|
||||
1. Verify signer signatures: (List of signatures)
|
||||
- Use ecRecover to get the public key
|
||||
|
|
@ -250,7 +297,7 @@ func (x *XDPoS_v2) verifyQC(header *types.Header) error {
|
|||
return nil
|
||||
}
|
||||
|
||||
func (x *XDPoS_v2) verifyTC(header *types.Header) error {
|
||||
func (x *XDPoS_v2) verifyTC(timeoutCert *utils.TimeoutCert) error {
|
||||
/*
|
||||
1. Verify signer signature: (List of signatures)
|
||||
- Use ecRecover to get the public key
|
||||
|
|
@ -289,7 +336,10 @@ func (x *XDPoS_v2) processTC(timeoutCert *utils.TimeoutCert) error {
|
|||
x.highestTimeoutCert = timeoutCert
|
||||
}
|
||||
if timeoutCert.Round >= x.currentRound {
|
||||
x.setNewRound(timeoutCert.Round + 1)
|
||||
err := x.setNewRound(timeoutCert.Round + 1)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
|
@ -302,17 +352,12 @@ func (x *XDPoS_v2) processTC(timeoutCert *utils.TimeoutCert) error {
|
|||
func (x *XDPoS_v2) setNewRound(round utils.Round) error {
|
||||
x.currentRound = round
|
||||
//TODO: tell miner now it's a new round and start mine if it's leader
|
||||
//TODO: reset timer
|
||||
x.timeoutWorker.Reset()
|
||||
//TODO: vote pools
|
||||
x.timeoutPool.Clear()
|
||||
return nil
|
||||
}
|
||||
|
||||
// Verify round number against node's local round number(Should be equal)
|
||||
func (x *XDPoS_v2) checkRoundNumber(header *types.Header) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
// Hot stuff rule to decide whether this node is eligible to vote for the received block
|
||||
func (x *XDPoS_v2) verifyVotingRule(header *types.Header) error {
|
||||
/*
|
||||
|
|
@ -326,10 +371,19 @@ func (x *XDPoS_v2) verifyVotingRule(header *types.Header) error {
|
|||
}
|
||||
|
||||
// Once Hot stuff voting rule has verified, this node can then send vote
|
||||
func (x *XDPoS_v2) sendVote(header *types.Header) error {
|
||||
func (x *XDPoS_v2) sendVote(blockInfo utils.BlockInfo) error {
|
||||
// First step: Generate the signature by using node's private key(The signature is the blockInfo signature)
|
||||
// Second step: Construct the vote struct with the above signature & blockinfo struct
|
||||
// Third step: Send the vote to broadcast channel
|
||||
signedHash, err := x.signSignature(utils.VoteSigHash(&blockInfo))
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
voteMsg := &utils.Vote{
|
||||
ProposedBlockInfo: blockInfo,
|
||||
Signature: signedHash,
|
||||
}
|
||||
x.broadcastToBftChannel(voteMsg)
|
||||
return nil
|
||||
}
|
||||
|
||||
|
|
@ -340,14 +394,9 @@ func (x *XDPoS_v2) sendVote(header *types.Header) error {
|
|||
3. send to broadcast channel
|
||||
*/
|
||||
func (x *XDPoS_v2) sendTimeout() error {
|
||||
// Don't hold the signer fields for the entire sealing procedure
|
||||
x.lock.RLock()
|
||||
signer, signFn := x.signer, x.signFn
|
||||
x.lock.RUnlock()
|
||||
|
||||
signedHash, err := signFn(accounts.Account{Address: signer}, utils.TimeoutSigHash(&x.currentRound).Bytes())
|
||||
signedHash, err := x.signSignature(utils.TimeoutSigHash(&x.currentRound))
|
||||
if err != nil {
|
||||
return fmt.Errorf("Error while signing for timeout message")
|
||||
return err
|
||||
}
|
||||
timeoutMsg := &utils.Timeout{
|
||||
Round: x.currentRound,
|
||||
|
|
@ -362,11 +411,45 @@ func (x *XDPoS_v2) sendSyncInfo() error {
|
|||
return nil
|
||||
}
|
||||
|
||||
func (x *XDPoS_v2) signSignature(signingHash common.Hash) (utils.Signature, error) {
|
||||
// Don't hold the signFn for the whole signing operation
|
||||
x.signLock.RLock()
|
||||
signer, signFn := x.signer, x.signFn
|
||||
x.signLock.RUnlock()
|
||||
|
||||
signedHash, err := signFn(accounts.Account{Address: signer}, signingHash.Bytes())
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("Error while signing hash")
|
||||
}
|
||||
return signedHash, nil
|
||||
}
|
||||
|
||||
func (x *XDPoS_v2) verifyMsgSignature(signedHashToBeVerified common.Hash, signature utils.Signature) (bool, error) {
|
||||
// Recover the public key and the Ethereum address
|
||||
pubkey, err := crypto.Ecrecover(signedHashToBeVerified.Bytes(), signature)
|
||||
if err != nil {
|
||||
return false, fmt.Errorf("Error while verifying message: %v", err)
|
||||
}
|
||||
var signerAddress common.Address
|
||||
copy(signerAddress[:], crypto.Keccak256(pubkey[1:])[12:])
|
||||
masternodes := x.getCurrentRoundMasterNodes()
|
||||
for _, mn := range masternodes {
|
||||
if mn == signerAddress {
|
||||
return true, nil
|
||||
}
|
||||
}
|
||||
|
||||
return false, fmt.Errorf("Masternodes does not contain signer address. Master node list %v, Signer address: %v", masternodes, signerAddress)
|
||||
}
|
||||
|
||||
/*
|
||||
Function that will be called by timer when countdown reaches its threshold.
|
||||
In the engine v2, we would need to broadcast timeout messages to other peers
|
||||
*/
|
||||
func (x *XDPoS_v2) onCountdownTimeout(time time.Time) error {
|
||||
x.lock.Lock()
|
||||
defer x.lock.Unlock()
|
||||
|
||||
err := x.sendTimeout()
|
||||
if err != nil {
|
||||
log.Error("Error while sending out timeout message at time: ", time)
|
||||
|
|
@ -376,9 +459,18 @@ func (x *XDPoS_v2) onCountdownTimeout(time time.Time) error {
|
|||
}
|
||||
|
||||
func (x *XDPoS_v2) broadcastToBftChannel(msg interface{}) {
|
||||
x.BroadcastCh <- msg
|
||||
go func() {
|
||||
x.BroadcastCh <- msg
|
||||
}()
|
||||
}
|
||||
|
||||
func (x *XDPoS_v2) getCurrentRoundMasterNodes() []common.Address {
|
||||
return []common.Address{}
|
||||
}
|
||||
|
||||
func (x *XDPoS_v2) getSyncInfo() utils.SyncInfo {
|
||||
return utils.SyncInfo{
|
||||
HighestQuorumCert: *x.highestQuorumCert,
|
||||
HighestTimeoutCert: *x.highestTimeoutCert,
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -13,7 +13,7 @@ type PoolObj interface {
|
|||
type Pool struct {
|
||||
objList map[string]map[common.Hash]PoolObj
|
||||
threshold int
|
||||
onThresholdFn func(map[common.Hash]PoolObj) error
|
||||
onThresholdFn func(objsInPool map[common.Hash]PoolObj, currentObj PoolObj) error
|
||||
}
|
||||
|
||||
func NewPool(threshold int) *Pool {
|
||||
|
|
@ -23,7 +23,8 @@ func NewPool(threshold int) *Pool {
|
|||
}
|
||||
}
|
||||
|
||||
func (p *Pool) Add(obj PoolObj) error {
|
||||
// call the hook function onThresholdFn if reached threshold and return boolean to indicate whether pool has reached threshold
|
||||
func (p *Pool) Add(obj PoolObj) (bool, int, error) {
|
||||
poolKey := obj.PoolKey()
|
||||
objListKeyed, ok := p.objList[poolKey]
|
||||
if !ok {
|
||||
|
|
@ -31,15 +32,16 @@ func (p *Pool) Add(obj PoolObj) error {
|
|||
objListKeyed = p.objList[poolKey]
|
||||
}
|
||||
objListKeyed[obj.Hash()] = obj
|
||||
if len(objListKeyed) >= p.threshold {
|
||||
numOfItems := len(objListKeyed)
|
||||
if numOfItems >= p.threshold {
|
||||
delete(p.objList, poolKey)
|
||||
if p.onThresholdFn != nil {
|
||||
return p.onThresholdFn(objListKeyed)
|
||||
return true, numOfItems, p.onThresholdFn(objListKeyed, obj)
|
||||
} else {
|
||||
return fmt.Errorf("no call back function for pool")
|
||||
return true, numOfItems, fmt.Errorf("no call back function for pool")
|
||||
}
|
||||
}
|
||||
return nil
|
||||
return false, numOfItems, nil
|
||||
}
|
||||
|
||||
func (p *Pool) Clear() {
|
||||
|
|
@ -50,6 +52,6 @@ func (p *Pool) SetThreshold(t int) {
|
|||
p.threshold = t
|
||||
}
|
||||
|
||||
func (p *Pool) SetOnThresholdFn(f func(map[common.Hash]PoolObj) error) {
|
||||
func (p *Pool) SetOnThresholdFn(f func(objsInPool map[common.Hash]PoolObj, currentObj PoolObj) error) {
|
||||
p.onThresholdFn = f
|
||||
}
|
||||
|
|
|
|||
|
|
@ -11,7 +11,7 @@ import (
|
|||
func TestPoolWithTimeout(t *testing.T) {
|
||||
assert := assert.New(t)
|
||||
var ret int
|
||||
onThresholdFn := func(po map[common.Hash]PoolObj) error {
|
||||
onThresholdFn := func(po map[common.Hash]PoolObj, currentPoolObj PoolObj) error {
|
||||
for _, m := range po {
|
||||
if _, ok := m.(*Timeout); ok {
|
||||
ret += 1
|
||||
|
|
@ -49,7 +49,7 @@ func TestPoolWithTimeout(t *testing.T) {
|
|||
func TestPoolWithVote(t *testing.T) {
|
||||
assert := assert.New(t)
|
||||
var ret int
|
||||
onThresholdFn := func(po map[common.Hash]PoolObj) error {
|
||||
onThresholdFn := func(po map[common.Hash]PoolObj, currentPoolObj PoolObj) error {
|
||||
for _, m := range po {
|
||||
if _, ok := m.(*Vote); ok {
|
||||
ret += 1
|
||||
|
|
|
|||
|
|
@ -62,6 +62,7 @@ type PublicApiSnapshot struct {
|
|||
|
||||
// Round number type in XDPoS 2.0
|
||||
type Round uint64
|
||||
type Signature []byte
|
||||
|
||||
// Block Info struct in XDPoS 2.0, used for vote message, etc.
|
||||
type BlockInfo struct {
|
||||
|
|
@ -73,13 +74,13 @@ type BlockInfo struct {
|
|||
// Vote message in XDPoS 2.0
|
||||
type Vote struct {
|
||||
ProposedBlockInfo BlockInfo
|
||||
Signature []byte
|
||||
Signature Signature
|
||||
}
|
||||
|
||||
// Timeout message in XDPoS 2.0
|
||||
type Timeout struct {
|
||||
Round Round
|
||||
Signature []byte
|
||||
Signature Signature
|
||||
}
|
||||
|
||||
// BFT Sync Info message in XDPoS 2.0
|
||||
|
|
@ -91,13 +92,13 @@ type SyncInfo struct {
|
|||
// Quorum Certificate struct in XDPoS 2.0
|
||||
type QuorumCert struct {
|
||||
ProposedBlockInfo BlockInfo
|
||||
Signatures [][]byte
|
||||
Signatures []Signature
|
||||
}
|
||||
|
||||
// Timeout Certificate struct in XDPoS 2.0
|
||||
type TimeoutCert struct {
|
||||
Round Round
|
||||
Signatures [][]byte
|
||||
Signatures []Signature
|
||||
}
|
||||
|
||||
// The parsed extra fields in block header in XDPoS 2.0 (excluding the version byte)
|
||||
|
|
|
|||
|
|
@ -12,7 +12,7 @@ func toyExtraFields() *ExtraFields_v2 {
|
|||
round := Round(307)
|
||||
blockInfo := BlockInfo{Hash: common.BigToHash(big.NewInt(2047)), Round: round - 1, Number: big.NewInt(1)}
|
||||
signature := []byte{1, 2, 3, 4, 5, 6, 7, 8}
|
||||
signatures := [][]byte{signature}
|
||||
signatures := []Signature{signature}
|
||||
quorumCert := QuorumCert{ProposedBlockInfo: blockInfo, Signatures: signatures}
|
||||
e := &ExtraFields_v2{Round: round, QuorumCert: quorumCert}
|
||||
return e
|
||||
|
|
@ -39,9 +39,9 @@ func TestHashAndSigHash(t *testing.T) {
|
|||
blockInfo2 := BlockInfo{Hash: common.BigToHash(big.NewInt(4095)), Round: round - 1, Number: big.NewInt(1)}
|
||||
signature1 := []byte{1, 2, 3, 4, 5, 6, 7, 8}
|
||||
signature2 := []byte{1, 2, 3, 4, 5, 6, 7, 7}
|
||||
signatures1 := [][]byte{signature1}
|
||||
signatures1 := []Signature{signature1}
|
||||
quorumCert1 := QuorumCert{ProposedBlockInfo: blockInfo1, Signatures: signatures1}
|
||||
signatures2 := [][]byte{signature2}
|
||||
signatures2 := []Signature{signature2}
|
||||
quorumCert2 := QuorumCert{ProposedBlockInfo: blockInfo1, Signatures: signatures2}
|
||||
vote1 := Vote{ProposedBlockInfo: blockInfo1, Signature: signature1}
|
||||
vote2 := Vote{ProposedBlockInfo: blockInfo1, Signature: signature2}
|
||||
|
|
|
|||
|
|
@ -41,7 +41,7 @@ var (
|
|||
}
|
||||
TestXDPoSV2Config = &V2{
|
||||
TimeoutWorkerDuration: 5000,
|
||||
CertThreshold: 2,
|
||||
CertThreshold: 3,
|
||||
}
|
||||
|
||||
// XDPoSChain mainnet config
|
||||
|
|
|
|||
|
|
@ -1,8 +1,10 @@
|
|||
package consensus
|
||||
|
||||
import (
|
||||
"math/big"
|
||||
"testing"
|
||||
|
||||
"github.com/XinFinOrg/XDPoSChain/common"
|
||||
"github.com/XinFinOrg/XDPoSChain/consensus/XDPoS"
|
||||
"github.com/XinFinOrg/XDPoSChain/consensus/XDPoS/utils"
|
||||
"github.com/XinFinOrg/XDPoSChain/params"
|
||||
|
|
@ -13,7 +15,7 @@ func TestCountdownTimeoutToSendTimeoutMessage(t *testing.T) {
|
|||
blockchain, _, _, _ := PrepareXDCTestBlockChain(t, 11, params.TestXDPoSMockChainConfigWithV2Engine)
|
||||
engineV2 := blockchain.Engine().(*XDPoS.XDPoS).EngineV2
|
||||
|
||||
engineV2.SetNewRoundFaker(utils.Round(1))
|
||||
engineV2.SetNewRoundFaker(utils.Round(1), true)
|
||||
|
||||
timeoutMsg := <-engineV2.BroadcastCh
|
||||
assert.NotNil(t, timeoutMsg)
|
||||
|
|
@ -24,3 +26,243 @@ func TestCountdownTimeoutToSendTimeoutMessage(t *testing.T) {
|
|||
// This shows we are able to decode the timeout message, which is what this test is all about
|
||||
assert.Regexp(t, "^Masternodes does not contain signer addres.*", err.Error())
|
||||
}
|
||||
|
||||
// Timeout handler
|
||||
func TestTimeoutMessageHandlerSuccessfullyGenerateTCandSyncInfo(t *testing.T) {
|
||||
blockchain, _, _, _ := PrepareXDCTestBlockChain(t, 11, params.TestXDPoSMockChainConfigWithV2Engine)
|
||||
engineV2 := blockchain.Engine().(*XDPoS.XDPoS).EngineV2
|
||||
|
||||
// Set round to 1
|
||||
engineV2.SetNewRoundFaker(utils.Round(1), false)
|
||||
// Create two timeout message which will not reach timeout pool threshold
|
||||
timeoutMsg := &utils.Timeout{
|
||||
Round: utils.Round(1),
|
||||
Signature: []byte{1},
|
||||
}
|
||||
|
||||
err := engineV2.TimeoutHandler(timeoutMsg)
|
||||
assert.Nil(t, err)
|
||||
assert.Equal(t, utils.Round(1), engineV2.GetCurrentRound())
|
||||
timeoutMsg = &utils.Timeout{
|
||||
Round: utils.Round(1),
|
||||
Signature: []byte{2},
|
||||
}
|
||||
err = engineV2.TimeoutHandler(timeoutMsg)
|
||||
assert.Nil(t, err)
|
||||
assert.Equal(t, utils.Round(1), engineV2.GetCurrentRound())
|
||||
// Create a timeout message that should trigger timeout pool hook
|
||||
timeoutMsg = &utils.Timeout{
|
||||
Round: utils.Round(1),
|
||||
Signature: []byte{3},
|
||||
}
|
||||
|
||||
err = engineV2.TimeoutHandler(timeoutMsg)
|
||||
assert.Nil(t, err)
|
||||
|
||||
syncInfoMsg := <-engineV2.BroadcastCh
|
||||
assert.NotNil(t, syncInfoMsg)
|
||||
|
||||
// Should have QC, however, we did not inilise it, hence will show default empty value
|
||||
qc := syncInfoMsg.(utils.SyncInfo).HighestQuorumCert
|
||||
assert.NotNil(t, qc)
|
||||
|
||||
tc := syncInfoMsg.(utils.SyncInfo).HighestTimeoutCert
|
||||
assert.NotNil(t, tc)
|
||||
assert.Equal(t, tc.Round, utils.Round(1))
|
||||
sigatures := []utils.Signature{[]byte{1}, []byte{2}, []byte{3}}
|
||||
assert.ElementsMatch(t, tc.Signatures, sigatures)
|
||||
assert.Equal(t, utils.Round(2), engineV2.GetCurrentRound())
|
||||
}
|
||||
|
||||
func TestThrowErrorIfTimeoutMsgRoundLessThanCurrentRound(t *testing.T) {
|
||||
blockchain, _, _, _ := PrepareXDCTestBlockChain(t, 11, params.TestXDPoSMockChainConfigWithV2Engine)
|
||||
engineV2 := blockchain.Engine().(*XDPoS.XDPoS).EngineV2
|
||||
|
||||
// Set round to 3
|
||||
engineV2.SetNewRoundFaker(utils.Round(3), false)
|
||||
// Create two timeout message which will not reach timeout pool threshold
|
||||
timeoutMsg := &utils.Timeout{
|
||||
Round: utils.Round(2),
|
||||
Signature: []byte{1},
|
||||
}
|
||||
|
||||
err := engineV2.TimeoutHandler(timeoutMsg)
|
||||
assert.NotNil(t, err)
|
||||
// Timeout msg round > currentRound
|
||||
assert.Equal(t, "Timeout message round number: 2 does not match currentRound: 3", err.Error())
|
||||
|
||||
// Set round to 1
|
||||
engineV2.SetNewRoundFaker(utils.Round(1), false)
|
||||
err = engineV2.TimeoutHandler(timeoutMsg)
|
||||
assert.NotNil(t, err)
|
||||
// Timeout msg round < currentRound
|
||||
assert.Equal(t, "Timeout message round number: 2 does not match currentRound: 1", err.Error())
|
||||
}
|
||||
|
||||
// VoteHandler
|
||||
func TestVoteMessageHandlerSuccessfullyGenerateTCandSyncInfo(t *testing.T) {
|
||||
blockchain, _, _, _ := PrepareXDCTestBlockChain(t, 11, params.TestXDPoSMockChainConfigWithV2Engine)
|
||||
engineV2 := blockchain.Engine().(*XDPoS.XDPoS).EngineV2
|
||||
|
||||
blockInfo := &utils.BlockInfo{
|
||||
Hash: common.HexToHash("0x1"),
|
||||
Round: utils.Round(1),
|
||||
Number: big.NewInt(999),
|
||||
}
|
||||
|
||||
// Set round to 1
|
||||
engineV2.SetNewRoundFaker(utils.Round(1), false)
|
||||
// Create two timeout message which will not reach vote pool threshold
|
||||
voteMsg := &utils.Vote{
|
||||
ProposedBlockInfo: *blockInfo,
|
||||
Signature: []byte{1},
|
||||
}
|
||||
|
||||
err := engineV2.VoteHandler(*voteMsg)
|
||||
assert.Nil(t, err)
|
||||
assert.Equal(t, utils.Round(1), engineV2.GetCurrentRound())
|
||||
voteMsg = &utils.Vote{
|
||||
ProposedBlockInfo: *blockInfo,
|
||||
Signature: []byte{2},
|
||||
}
|
||||
err = engineV2.VoteHandler(*voteMsg)
|
||||
assert.Nil(t, err)
|
||||
assert.Equal(t, utils.Round(1), engineV2.GetCurrentRound())
|
||||
|
||||
// Create a vote message that should trigger vite pool hook
|
||||
voteMsg = &utils.Vote{
|
||||
ProposedBlockInfo: *blockInfo,
|
||||
Signature: []byte{3},
|
||||
}
|
||||
|
||||
err = engineV2.VoteHandler(*voteMsg)
|
||||
assert.Nil(t, err)
|
||||
// Check round has now changed from 1 to 2
|
||||
assert.Equal(t, utils.Round(2), engineV2.GetCurrentRound())
|
||||
}
|
||||
|
||||
func TestThrowErrorIfVoteMsgRoundLessThanCurrentRound(t *testing.T) {
|
||||
blockchain, _, _, _ := PrepareXDCTestBlockChain(t, 11, params.TestXDPoSMockChainConfigWithV2Engine)
|
||||
engineV2 := blockchain.Engine().(*XDPoS.XDPoS).EngineV2
|
||||
|
||||
blockInfo := &utils.BlockInfo{
|
||||
Hash: common.HexToHash("0x1"),
|
||||
Round: utils.Round(2),
|
||||
Number: big.NewInt(999),
|
||||
}
|
||||
|
||||
// Set round to 3
|
||||
engineV2.SetNewRoundFaker(utils.Round(3), false)
|
||||
// Create two timeout message which will not reach timeout pool threshold
|
||||
voteMsg := &utils.Vote{
|
||||
ProposedBlockInfo: *blockInfo,
|
||||
Signature: []byte{1},
|
||||
}
|
||||
|
||||
// voteRound > currentRound
|
||||
err := engineV2.VoteHandler(*voteMsg)
|
||||
assert.NotNil(t, err)
|
||||
assert.Equal(t, "Vote message round number: 2 does not match currentRound: 3", err.Error())
|
||||
|
||||
// Set round to 1
|
||||
engineV2.SetNewRoundFaker(utils.Round(1), false)
|
||||
err = engineV2.VoteHandler(*voteMsg)
|
||||
assert.NotNil(t, err)
|
||||
// voteRound < currentRound
|
||||
assert.Equal(t, "Vote message round number: 2 does not match currentRound: 1", err.Error())
|
||||
}
|
||||
|
||||
func TestProcessVoteMsgThenTimeoutMsg(t *testing.T) {
|
||||
blockchain, _, _, _ := PrepareXDCTestBlockChain(t, 11, params.TestXDPoSMockChainConfigWithV2Engine)
|
||||
engineV2 := blockchain.Engine().(*XDPoS.XDPoS).EngineV2
|
||||
|
||||
// Set round to 1
|
||||
engineV2.SetNewRoundFaker(utils.Round(1), false)
|
||||
|
||||
// Start with vote messages
|
||||
blockInfo := &utils.BlockInfo{
|
||||
Hash: common.HexToHash("0x1"),
|
||||
Round: utils.Round(1),
|
||||
Number: big.NewInt(999),
|
||||
}
|
||||
// Create two timeout message which will not reach vote pool threshold
|
||||
voteMsg := &utils.Vote{
|
||||
ProposedBlockInfo: *blockInfo,
|
||||
Signature: []byte{1},
|
||||
}
|
||||
|
||||
err := engineV2.VoteHandler(*voteMsg)
|
||||
assert.Nil(t, err)
|
||||
assert.Equal(t, utils.Round(1), engineV2.GetCurrentRound())
|
||||
voteMsg = &utils.Vote{
|
||||
ProposedBlockInfo: *blockInfo,
|
||||
Signature: []byte{2},
|
||||
}
|
||||
err = engineV2.VoteHandler(*voteMsg)
|
||||
assert.Nil(t, err)
|
||||
assert.Equal(t, utils.Round(1), engineV2.GetCurrentRound())
|
||||
|
||||
// Create a vote message that should trigger vite pool hook
|
||||
voteMsg = &utils.Vote{
|
||||
ProposedBlockInfo: *blockInfo,
|
||||
Signature: []byte{3},
|
||||
}
|
||||
|
||||
err = engineV2.VoteHandler(*voteMsg)
|
||||
assert.Nil(t, err)
|
||||
// Check round has now changed from 1 to 2
|
||||
assert.Equal(t, utils.Round(2), engineV2.GetCurrentRound())
|
||||
|
||||
// We shall have highestQuorumCert in engine now, let's do timeout msg to see if we can broadcast SyncInfo which contains both highestQuorumCert and HighestTimeoutCert
|
||||
|
||||
// First, all incoming old timeout msg shall not be processed
|
||||
timeoutMsg := &utils.Timeout{
|
||||
Round: utils.Round(1),
|
||||
Signature: []byte{1},
|
||||
}
|
||||
|
||||
err = engineV2.TimeoutHandler(timeoutMsg)
|
||||
assert.NotNil(t, err)
|
||||
assert.Equal(t, "Timeout message round number: 1 does not match currentRound: 2", err.Error())
|
||||
|
||||
// Ok, let's do the timeout msg which is on the same round as the current round by creating two timeout message which will not reach timeout pool threshold
|
||||
timeoutMsg = &utils.Timeout{
|
||||
Round: utils.Round(2),
|
||||
Signature: []byte{1},
|
||||
}
|
||||
|
||||
err = engineV2.TimeoutHandler(timeoutMsg)
|
||||
assert.Nil(t, err)
|
||||
assert.Equal(t, utils.Round(2), engineV2.GetCurrentRound())
|
||||
timeoutMsg = &utils.Timeout{
|
||||
Round: utils.Round(2),
|
||||
Signature: []byte{2},
|
||||
}
|
||||
err = engineV2.TimeoutHandler(timeoutMsg)
|
||||
assert.Nil(t, err)
|
||||
assert.Equal(t, utils.Round(2), engineV2.GetCurrentRound())
|
||||
// Create a timeout message that should trigger timeout pool hook
|
||||
timeoutMsg = &utils.Timeout{
|
||||
Round: utils.Round(2),
|
||||
Signature: []byte{3},
|
||||
}
|
||||
|
||||
err = engineV2.TimeoutHandler(timeoutMsg)
|
||||
assert.Nil(t, err)
|
||||
|
||||
syncInfoMsg := <-engineV2.BroadcastCh
|
||||
assert.NotNil(t, syncInfoMsg)
|
||||
|
||||
// Should have HighestQuorumCert from previous round votes
|
||||
qc := syncInfoMsg.(utils.SyncInfo).HighestQuorumCert
|
||||
assert.NotNil(t, qc)
|
||||
assert.Equal(t, utils.Round(1), qc.ProposedBlockInfo.Round)
|
||||
|
||||
tc := syncInfoMsg.(utils.SyncInfo).HighestTimeoutCert
|
||||
assert.NotNil(t, tc)
|
||||
assert.Equal(t, tc.Round, utils.Round(2))
|
||||
sigatures := []utils.Signature{[]byte{1}, []byte{2}, []byte{3}}
|
||||
assert.ElementsMatch(t, tc.Signatures, sigatures)
|
||||
// Round shall be +1 now
|
||||
assert.Equal(t, utils.Round(3), engineV2.GetCurrentRound())
|
||||
}
|
||||
|
|
|
|||
Loading…
Reference in a new issue