fix vote and block insertion race condition (#51)

* fix vote and block insertion race condition

* fix race condition in the vote handler using multiple go routine

* check go routine race condition during ci cd

* remove race check as there are eth code that is failing

* remove unused signature list variable
This commit is contained in:
Jerome 2022-02-03 23:27:50 +11:00 committed by GitHub
parent 7cc2bef2d3
commit 23cbf68307
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
4 changed files with 161 additions and 40 deletions

View file

@ -544,19 +544,22 @@ func (x *XDPoS_v2) SyncInfoHandler(chain consensus.ChainReader, syncInfo *utils.
*/
func (x *XDPoS_v2) VerifyVoteMessage(chain consensus.ChainReader, vote *utils.Vote) (bool, error) {
/*
1. Get masterNode list belong to this epoch by hash
1. Get masterNode list from snapshot
2. Check signature:
- Use ecRecover to get the public key
- Use the above public key to find out the xdc address
- Use the above xdc address to check against the master node list from step 1(For the running epoch)
3. Verify blockInfo
4. Broadcast(Not part of consensus)
*/
epochInfo, err := x.getEpochSwitchInfo(chain, nil, vote.ProposedBlockInfo.Hash)
snapshot, err := x.getSnapshot(chain, vote.ProposedBlockInfo.Number.Uint64())
if err != nil {
log.Error("[VerifyVoteMessage] Error when getting epoch switch Info to verify vote message", "Error", err)
log.Error("[VerifyVoteMessage] fail to get snapshot for a vote message", "BlockNum", vote.ProposedBlockInfo.Number, "Hash", vote.ProposedBlockInfo.Hash, "Error", err.Error())
}
return x.verifyMsgSignature(utils.VoteSigHash(vote.ProposedBlockInfo), vote.Signature, epochInfo.Masternodes)
verified, err := x.verifyMsgSignature(utils.VoteSigHash(vote.ProposedBlockInfo), vote.Signature, snapshot.NextEpochMasterNodes)
if err != nil {
log.Error("[VerifyVoteMessage] Error while verifying vote message", "Error", err.Error())
}
return verified, err
}
// Consensus entry point for processing vote message to produce QC
@ -583,18 +586,16 @@ func (x *XDPoS_v2) voteHandler(chain consensus.ChainReader, voteMsg *utils.Vote)
log.Info(fmt.Sprintf("Vote pool threashold reached: %v, number of items in the pool: %v", thresholdReached, numberOfVotesInPool))
// Check if the block already exist, otherwise we try luck with the next vote
proposedBlock := chain.GetHeaderByHash(voteMsg.ProposedBlockInfo.Hash)
if proposedBlock == nil {
proposedBlockHeader := chain.GetHeaderByHash(voteMsg.ProposedBlockInfo.Hash)
if proposedBlockHeader == nil {
log.Warn("[voteHandler] The proposed block from vote message does not exist yet, wait for the next vote to try again", "Hash", voteMsg.ProposedBlockInfo.Hash, "Round", voteMsg.ProposedBlockInfo.Round)
return nil
}
err := x.onVotePoolThresholdReached(chain, pooledVotes, voteMsg)
err := x.onVotePoolThresholdReached(chain, pooledVotes, voteMsg, proposedBlockHeader)
if err != nil {
return err
}
// clean up vote at the same poolKey. and pookKey is proposed block hash
x.votePool.ClearPoolKeyByObj(voteMsg)
}
return nil
@ -604,15 +605,46 @@ func (x *XDPoS_v2) voteHandler(chain consensus.ChainReader, voteMsg *utils.Vote)
Function that will be called by votePool when it reached threshold.
In the engine v2, we will need to generate and process QC
*/
func (x *XDPoS_v2) onVotePoolThresholdReached(chain consensus.ChainReader, pooledVotes map[common.Hash]utils.PoolObj, currentVoteMsg utils.PoolObj) error {
signatures := []utils.Signature{}
for _, v := range pooledVotes {
signatures = append(signatures, v.(*utils.Vote).Signature)
func (x *XDPoS_v2) onVotePoolThresholdReached(chain consensus.ChainReader, pooledVotes map[common.Hash]utils.PoolObj, currentVoteMsg utils.PoolObj, proposedBlockHeader *types.Header) error {
masternodes := x.GetMasternodes(chain, proposedBlockHeader)
// Filter out non-Master nodes signatures
var wg sync.WaitGroup
wg.Add(len(pooledVotes))
signatureSlice := make([]utils.Signature, len(pooledVotes))
counter := 0
for h, vote := range pooledVotes {
go func(hash common.Hash, v *utils.Vote, i int) {
defer wg.Done()
verified, err := x.verifyMsgSignature(utils.VoteSigHash(v.ProposedBlockInfo), v.Signature, masternodes)
if !verified || err != nil {
log.Warn("[onVotePoolThresholdReached] Skip not verified vote signatures when building QC", "Error", err.Error(), "verified", verified)
} else {
signatureSlice[i] = v.Signature
}
}(h, vote.(*utils.Vote), counter)
counter++
}
wg.Wait()
// The signature list may contain empty entey. we only care the ones with values
var validSignatureSlice []utils.Signature
for _, v := range signatureSlice {
if len(v) != 0 {
validSignatureSlice = append(validSignatureSlice, v)
}
}
// Skip and wait for the next vote to process again if valid votes is less than what we required
if len(validSignatureSlice) < x.config.V2.CertThreshold {
log.Warn("[onVotePoolThresholdReached] Not enough valid signatures to generate QC", "VotesSignaturesAfterFilter", validSignatureSlice, "NumberOfValidVotes", len(validSignatureSlice), "NumberOfVotes", len(pooledVotes))
return nil
}
// Genrate QC
quorumCert := &utils.QuorumCert{
ProposedBlockInfo: currentVoteMsg.(*utils.Vote).ProposedBlockInfo,
Signatures: signatures,
Signatures: validSignatureSlice,
}
err := x.processQC(chain, quorumCert)
if err != nil {
@ -620,6 +652,8 @@ func (x *XDPoS_v2) onVotePoolThresholdReached(chain consensus.ChainReader, poole
return err
}
log.Info("🗳 Successfully processed the vote and produced QC!")
// clean up vote at the same poolKey. and pookKey is proposed block hash
x.votePool.ClearPoolKeyByObj(currentVoteMsg)
return nil
}
@ -1004,6 +1038,9 @@ func (x *XDPoS_v2) signSignature(signingHash common.Hash) (utils.Signature, erro
}
func (x *XDPoS_v2) verifyMsgSignature(signedHashToBeVerified common.Hash, signature utils.Signature, masternodes []common.Address) (bool, error) {
if len(masternodes) == 0 {
return false, fmt.Errorf("Empty masternode list detected when verifying message signatures")
}
// Recover the public key and the Ethereum address
pubkey, err := crypto.Ecrecover(signedHashToBeVerified.Bytes(), signature)
if err != nil {

View file

@ -24,5 +24,5 @@ func TestCountdownTimeoutToSendTimeoutMessage(t *testing.T) {
// We can only test valid = false for now as the implementation for getCurrentRoundMasterNodes is not complete
assert.False(t, valid)
// This shows we are able to decode the timeout message, which is what this test is all about
assert.Regexp(t, "^Masternodes does not contain signer addres.*", err.Error())
assert.Regexp(t, "Empty masternode list detected when verifying message signatures", err.Error())
}

View file

@ -3,9 +3,13 @@ package tests
import (
"bytes"
"context"
"crypto/ecdsa"
"encoding/hex"
"fmt"
"io/ioutil"
"math/big"
"math/rand"
"os"
"strings"
"testing"
"time"
@ -13,6 +17,7 @@ import (
"github.com/XinFinOrg/XDPoSChain/accounts"
"github.com/XinFinOrg/XDPoSChain/accounts/abi/bind"
"github.com/XinFinOrg/XDPoSChain/accounts/abi/bind/backends"
"github.com/XinFinOrg/XDPoSChain/accounts/keystore"
"github.com/XinFinOrg/XDPoSChain/common"
"github.com/XinFinOrg/XDPoSChain/consensus/XDPoS"
"github.com/XinFinOrg/XDPoSChain/consensus/XDPoS/utils"
@ -60,6 +65,50 @@ func debugMessage(backend *backends.SimulatedBackend, signers signersList, t *te
}
}
func SignHashByPK(pk *ecdsa.PrivateKey, itemToSign []byte) []byte {
signer, signFn, err := getSignerAndSignFn(pk)
if err != nil {
panic(err)
}
signedHash, err := signFn(accounts.Account{Address: signer}, itemToSign)
if err != nil {
panic(err)
}
return signedHash
}
const letterBytes = "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ"
func RandStringBytes(n int) string {
b := make([]byte, n)
for i := range b {
b[i] = letterBytes[rand.Intn(len(letterBytes))]
}
return string(b)
}
func getSignerAndSignFn(pk *ecdsa.PrivateKey) (common.Address, func(account accounts.Account, hash []byte) ([]byte, error), error) {
veryLightScryptN := 2
veryLightScryptP := 1
dir, _ := ioutil.TempDir("", fmt.Sprintf("eth-getSignerAndSignFn-test-%v", RandStringBytes(5)))
new := func(kd string) *keystore.KeyStore {
return keystore.NewKeyStore(kd, veryLightScryptN, veryLightScryptP)
}
defer os.RemoveAll(dir)
ks := new(dir)
pass := "" // not used but required by API
a1, err := ks.ImportECDSA(pk, pass)
if err != nil {
return common.Address{}, nil, fmt.Errorf(err.Error())
}
if err := ks.Unlock(a1, ""); err != nil {
return a1.Address, nil, fmt.Errorf(err.Error())
}
return a1.Address, ks.SignHash, nil
}
func getCommonBackend(t *testing.T, chainConfig *params.ChainConfig) *backends.SimulatedBackend {
// initial helper backend
@ -229,12 +278,13 @@ func GetCandidateFromCurrentSmartContract(backend bind.ContractBackend, t *testi
func PrepareXDCTestBlockChain(t *testing.T, numOfBlocks int, chainConfig *params.ChainConfig) (*BlockChain, *backends.SimulatedBackend, *types.Block, common.Address) {
// Preparation
var err error
// Authorise
signer, signFn, err := backends.SimulateWalletAddressAndSignFn()
backend := getCommonBackend(t, chainConfig)
blockchain := backend.GetBlockChain()
blockchain.Client = backend
// Authorise
signer, signFn, err := backends.SimulateWalletAddressAndSignFn()
if err != nil {
panic(fmt.Errorf("Error while creating simulated wallet for generating singer address and signer fn: %v", err))
}
@ -279,15 +329,15 @@ func PrepareXDCTestBlockChain(t *testing.T, numOfBlocks int, chainConfig *params
func PrepareXDCTestBlockChainForV2Engine(t *testing.T, numOfBlocks int, chainConfig *params.ChainConfig, numOfForkedBlocks int) (*BlockChain, *backends.SimulatedBackend, *types.Block, common.Address, func(account accounts.Account, hash []byte) ([]byte, error), *types.Block) {
// Preparation
var err error
signer, signFn, err := backends.SimulateWalletAddressAndSignFn()
if err != nil {
panic(fmt.Errorf("Error while creating simulated wallet for generating singer address and signer fn: %v", err))
}
backend := getCommonBackend(t, chainConfig)
blockchain := backend.GetBlockChain()
blockchain.Client = backend
// Authorise
signer, signFn, err := backends.SimulateWalletAddressAndSignFn()
if err != nil {
panic(fmt.Errorf("Error while creating simulated wallet for generating singer address and signer fn: %v", err))
}
blockchain.Engine().(*XDPoS.XDPoS).Authorize(signer, signFn)
currentBlock := blockchain.Genesis()

View file

@ -5,6 +5,8 @@ import (
"math/big"
"testing"
"github.com/XinFinOrg/XDPoSChain/accounts"
"github.com/XinFinOrg/XDPoSChain/accounts/abi/bind/backends"
"github.com/XinFinOrg/XDPoSChain/common"
"github.com/XinFinOrg/XDPoSChain/consensus/XDPoS"
"github.com/XinFinOrg/XDPoSChain/consensus/XDPoS/utils"
@ -14,7 +16,7 @@ import (
// VoteHandler
func TestVoteMessageHandlerSuccessfullyGeneratedAndProcessQCForFistV2Round(t *testing.T) {
blockchain, _, currentBlock, _, _, _ := PrepareXDCTestBlockChainForV2Engine(t, 11, params.TestXDPoSMockChainConfigWithV2Engine, 0)
blockchain, _, currentBlock, signer, signFn, _ := PrepareXDCTestBlockChainForV2Engine(t, 11, params.TestXDPoSMockChainConfigWithV2Engine, 0)
engineV2 := blockchain.Engine().(*XDPoS.XDPoS).EngineV2
blockInfo := &utils.BlockInfo{
@ -22,25 +24,31 @@ func TestVoteMessageHandlerSuccessfullyGeneratedAndProcessQCForFistV2Round(t *te
Round: utils.Round(1),
Number: big.NewInt(11),
}
voteSigningHash := utils.VoteSigHash(blockInfo)
// Set round to 5
engineV2.SetNewRoundFaker(utils.Round(1), false)
// Create two vote messages which will not reach vote pool threshold
signedHash, err := signFn(accounts.Account{Address: signer}, voteSigningHash.Bytes())
assert.Nil(t, err)
voteMsg := &utils.Vote{
ProposedBlockInfo: blockInfo,
Signature: []byte{1},
Signature: signedHash,
}
err := engineV2.VoteHandler(blockchain, voteMsg)
err = engineV2.VoteHandler(blockchain, voteMsg)
assert.Nil(t, err)
currentRound, lockQuorumCert, highestQuorumCert, _, _ := engineV2.GetProperties()
// initialised with nil and 0 round
assert.Nil(t, lockQuorumCert)
assert.Equal(t, utils.Round(0), highestQuorumCert.ProposedBlockInfo.Round)
assert.Equal(t, utils.Round(1), currentRound)
signedHash = SignHashByPK(acc2Key, voteSigningHash.Bytes())
voteMsg = &utils.Vote{
ProposedBlockInfo: blockInfo,
Signature: []byte{2},
Signature: signedHash,
}
err = engineV2.VoteHandler(blockchain, voteMsg)
assert.Nil(t, err)
@ -52,9 +60,10 @@ func TestVoteMessageHandlerSuccessfullyGeneratedAndProcessQCForFistV2Round(t *te
assert.Equal(t, utils.Round(1), currentRound)
// Create a vote message that should trigger vote pool hook and increment the round to 6
signedHash = SignHashByPK(acc3Key, voteSigningHash.Bytes())
voteMsg = &utils.Vote{
ProposedBlockInfo: blockInfo,
Signature: []byte{3},
Signature: signedHash,
}
err = engineV2.VoteHandler(blockchain, voteMsg)
@ -69,7 +78,7 @@ func TestVoteMessageHandlerSuccessfullyGeneratedAndProcessQCForFistV2Round(t *te
}
func TestVoteMessageHandlerSuccessfullyGeneratedAndProcessQC(t *testing.T) {
blockchain, _, currentBlock, _, _, _ := PrepareXDCTestBlockChainForV2Engine(t, 15, params.TestXDPoSMockChainConfigWithV2Engine, 0)
blockchain, _, currentBlock, signer, signFn, _ := PrepareXDCTestBlockChainForV2Engine(t, 15, params.TestXDPoSMockChainConfigWithV2Engine, 0)
engineV2 := blockchain.Engine().(*XDPoS.XDPoS).EngineV2
blockInfo := &utils.BlockInfo{
@ -77,25 +86,29 @@ func TestVoteMessageHandlerSuccessfullyGeneratedAndProcessQC(t *testing.T) {
Round: utils.Round(5),
Number: big.NewInt(15),
}
voteSigningHash := utils.VoteSigHash(blockInfo)
// Set round to 5
engineV2.SetNewRoundFaker(utils.Round(5), false)
// Create two vote messages which will not reach vote pool threshold
signedHash, err := signFn(accounts.Account{Address: signer}, voteSigningHash.Bytes())
assert.Nil(t, err)
voteMsg := &utils.Vote{
ProposedBlockInfo: blockInfo,
Signature: []byte{1},
Signature: signedHash,
}
err := engineV2.VoteHandler(blockchain, voteMsg)
err = engineV2.VoteHandler(blockchain, voteMsg)
assert.Nil(t, err)
currentRound, lockQuorumCert, highestQuorumCert, _, _ := engineV2.GetProperties()
// initialised with nil and 0 round
assert.Nil(t, lockQuorumCert)
assert.Equal(t, utils.Round(0), highestQuorumCert.ProposedBlockInfo.Round)
assert.Equal(t, utils.Round(5), currentRound)
signedHash = SignHashByPK(acc1Key, voteSigningHash.Bytes())
voteMsg = &utils.Vote{
ProposedBlockInfo: blockInfo,
Signature: []byte{2},
Signature: signedHash,
}
err = engineV2.VoteHandler(blockchain, voteMsg)
assert.Nil(t, err)
@ -106,10 +119,28 @@ func TestVoteMessageHandlerSuccessfullyGeneratedAndProcessQC(t *testing.T) {
assert.Equal(t, utils.Round(5), currentRound)
// Create a vote message that should trigger vote pool hook and increment the round to 6
// Create another vote which is signed by someone not from the master node list
randomSigner, randomSignFn, err := backends.SimulateWalletAddressAndSignFn()
assert.Nil(t, err)
randomlySignedHash, err := randomSignFn(accounts.Account{Address: randomSigner}, voteSigningHash.Bytes())
assert.Nil(t, err)
voteMsg = &utils.Vote{
ProposedBlockInfo: blockInfo,
Signature: []byte{3},
Signature: randomlySignedHash,
}
err = engineV2.VoteHandler(blockchain, voteMsg)
assert.Nil(t, err)
currentRound, lockQuorumCert, highestQuorumCert, _, _ = engineV2.GetProperties()
// Still using the initlised value because we did not yet go to the next round
assert.Nil(t, lockQuorumCert)
assert.Equal(t, utils.Round(0), highestQuorumCert.ProposedBlockInfo.Round)
assert.Equal(t, utils.Round(5), currentRound)
// Create a vote message that should trigger vote pool hook and increment the round to 6
signedHash = SignHashByPK(acc3Key, voteSigningHash.Bytes())
voteMsg = &utils.Vote{
ProposedBlockInfo: blockInfo,
Signature: signedHash,
}
err = engineV2.VoteHandler(blockchain, voteMsg)
@ -173,10 +204,12 @@ func TestProcessVoteMsgThenTimeoutMsg(t *testing.T) {
Round: utils.Round(5),
Number: big.NewInt(11),
}
voteSigningHash := utils.VoteSigHash(blockInfo)
// Create two vote message which will not reach vote pool threshold
signedHash := SignHashByPK(acc1Key, voteSigningHash.Bytes())
voteMsg := &utils.Vote{
ProposedBlockInfo: blockInfo,
Signature: []byte{1},
Signature: signedHash,
}
err := engineV2.VoteHandler(blockchain, voteMsg)
@ -189,7 +222,7 @@ func TestProcessVoteMsgThenTimeoutMsg(t *testing.T) {
assert.Equal(t, utils.Round(5), currentRound)
voteMsg = &utils.Vote{
ProposedBlockInfo: blockInfo,
Signature: []byte{2},
Signature: SignHashByPK(acc2Key, voteSigningHash.Bytes()),
}
err = engineV2.VoteHandler(blockchain, voteMsg)
assert.Nil(t, err)
@ -199,7 +232,7 @@ func TestProcessVoteMsgThenTimeoutMsg(t *testing.T) {
// Create a vote message that should trigger vote pool hook
voteMsg = &utils.Vote{
ProposedBlockInfo: blockInfo,
Signature: []byte{3},
Signature: SignHashByPK(acc3Key, voteSigningHash.Bytes()),
}
err = engineV2.VoteHandler(blockchain, voteMsg)
@ -285,13 +318,14 @@ func TestVoteMessageShallNotThrowErrorIfBlockNotYetExist(t *testing.T) {
Round: utils.Round(6),
Number: big.NewInt(16),
}
voteSigningHash := utils.VoteSigHash(blockInfo)
// Set round to 6
engineV2.SetNewRoundFaker(utils.Round(6), false)
// Create two vote messages which will not reach vote pool threshold
voteMsg := &utils.Vote{
ProposedBlockInfo: blockInfo,
Signature: []byte{1},
Signature: SignHashByPK(acc1Key, voteSigningHash.Bytes()),
}
err := engineV2.VoteHandler(blockchain, voteMsg)
@ -299,7 +333,7 @@ func TestVoteMessageShallNotThrowErrorIfBlockNotYetExist(t *testing.T) {
voteMsg = &utils.Vote{
ProposedBlockInfo: blockInfo,
Signature: []byte{2},
Signature: SignHashByPK(acc2Key, voteSigningHash.Bytes()),
}
err = engineV2.VoteHandler(blockchain, voteMsg)
assert.Nil(t, err)
@ -307,7 +341,7 @@ func TestVoteMessageShallNotThrowErrorIfBlockNotYetExist(t *testing.T) {
// Create a vote message that should trigger vote pool hook, but it shall not produce any QC yet
voteMsg = &utils.Vote{
ProposedBlockInfo: blockInfo,
Signature: []byte{3},
Signature: SignHashByPK(acc3Key, voteSigningHash.Bytes()),
}
err = engineV2.VoteHandler(blockchain, voteMsg)
@ -324,7 +358,7 @@ func TestVoteMessageShallNotThrowErrorIfBlockNotYetExist(t *testing.T) {
voteMsg = &utils.Vote{
ProposedBlockInfo: blockInfo,
Signature: []byte{4},
Signature: SignHashByPK(voterKey, voteSigningHash.Bytes()),
}
err = engineV2.VoteHandler(blockchain, voteMsg)