go-ethereum/consensus/XDPoS/engines/engine_v2/vote.go
Daniel Liu 84ac794e22
consensus: fix missing block header error in vote verification, fix #1850 (#1891)
Previously, vote verification would log errors and fail when the referenced block header was not yet available locally, especially when nodes in the same round processed votes before receiving the block header. This commit changes the logic to defer verification and log at debug level if the header is missing, preventing unnecessary error logs and aligning with upstream geth behavior for out-of-order message arrival during consensus voting.
2026-01-05 15:40:49 +05:30

341 lines
14 KiB
Go

package engine_v2
import (
"errors"
"fmt"
"math/big"
"strconv"
"strings"
"sync"
"time"
"github.com/XinFinOrg/XDPoSChain/common"
"github.com/XinFinOrg/XDPoSChain/consensus"
"github.com/XinFinOrg/XDPoSChain/consensus/XDPoS/utils"
"github.com/XinFinOrg/XDPoSChain/core/types"
"github.com/XinFinOrg/XDPoSChain/log"
)
func (x *XDPoS_v2) VerifyVoteMessage(chain consensus.ChainReader, vote *types.Vote) (bool, error) {
if vote.ProposedBlockInfo.Round < x.currentRound {
log.Debug("[VerifyVoteMessage] Disqualified vote message as the proposed round does not match currentRound", "voteHash", vote.Hash(), "voteProposedBlockInfoRound", vote.ProposedBlockInfo.Round, "currentRound", x.currentRound)
return false, nil
}
// If we don't yet have the referenced header locally, defer verification.
// Votes may arrive before the block header itself; avoid logging an error
// for that normal timing condition and let the vote be retried later.
if chain.GetHeaderByHash(vote.ProposedBlockInfo.Hash) == nil {
log.Debug("[VerifyVoteMessage] referenced header not present yet, defer verification", "blockNum", vote.ProposedBlockInfo.Number, "blockHash", vote.ProposedBlockInfo.Hash)
return false, nil
}
epochInfo, err := x.getEpochSwitchInfo(chain, nil, vote.ProposedBlockInfo.Hash)
if err != nil {
log.Error("[VerifyVoteMessage] Fail to get epochInfo when verifying vote message!", "blockNum", vote.ProposedBlockInfo.Number, "blockHash", vote.ProposedBlockInfo.Hash, "voteHash", vote.Hash(), "voteGapNumber", vote.GapNumber, "err", err)
return false, err
}
verified, signer, err := x.verifyMsgSignature(types.VoteSigHash(&types.VoteForSign{
ProposedBlockInfo: vote.ProposedBlockInfo,
GapNumber: vote.GapNumber,
}), vote.Signature, epochInfo.Masternodes)
if err != nil {
for i, mn := range epochInfo.Masternodes {
log.Warn("[VerifyVoteMessage] Master node list item", "index", i, "Master node", mn.Hex())
}
log.Warn("[VerifyVoteMessage] Error while verifying vote message", "votedBlockNum", vote.ProposedBlockInfo.Number.Uint64(), "votedBlockHash", vote.ProposedBlockInfo.Hash.Hex(), "voteHash", vote.Hash(), "error", err.Error())
return false, err
}
vote.SetSigner(signer)
return verified, nil
}
// Consensus entry point for processing vote message to produce QC
func (x *XDPoS_v2) VoteHandler(chain consensus.ChainReader, voteMsg *types.Vote) error {
x.lock.Lock()
defer x.lock.Unlock()
return x.voteHandler(chain, voteMsg)
}
// Once Hot stuff voting rule has verified, this node can then send vote
func (x *XDPoS_v2) sendVote(chainReader consensus.ChainReader, blockInfo *types.BlockInfo) error {
// First step: Update the highest Voted round
// Second step: Generate the signature by using node's private key(The signature is the blockInfo signature)
// Third step: Construct the vote struct with the above signature & blockinfo struct
// Forth step: Send the vote to broadcast channel
epochSwitchInfo, err := x.getEpochSwitchInfo(chainReader, nil, blockInfo.Hash)
if err != nil {
log.Error("getEpochSwitchInfo when sending out Vote", "BlockInfoHash", blockInfo.Hash, "Error", err)
return err
}
epochSwitchNumber := epochSwitchInfo.EpochSwitchBlockInfo.Number.Uint64()
gapNumber := epochSwitchNumber - epochSwitchNumber%x.config.Epoch
if gapNumber > x.config.Gap {
gapNumber -= x.config.Gap
} else {
gapNumber = 0
}
signedHash, err := x.signSignature(types.VoteSigHash(&types.VoteForSign{
ProposedBlockInfo: blockInfo,
GapNumber: gapNumber,
}))
if err != nil {
log.Error("signSignature when sending out Vote", "BlockInfoHash", blockInfo.Hash, "Error", err)
return err
}
x.highestVotedRound = x.currentRound
voteMsg := &types.Vote{
ProposedBlockInfo: blockInfo,
Signature: signedHash,
GapNumber: gapNumber,
}
err = x.voteHandler(chainReader, voteMsg)
if err != nil {
log.Error("sendVote error", "BlockInfoHash", blockInfo.Hash, "Error", err)
return err
}
x.broadcastToBftChannel(voteMsg)
return nil
}
func (x *XDPoS_v2) voteHandler(chain consensus.ChainReader, voteMsg *types.Vote) error {
// checkRoundNumber
if (voteMsg.ProposedBlockInfo.Round != x.currentRound) && (voteMsg.ProposedBlockInfo.Round != x.currentRound+1) {
return &utils.ErrIncomingMessageRoundTooFarFromCurrentRound{
Type: "vote",
IncomingRound: voteMsg.ProposedBlockInfo.Round,
CurrentRound: x.currentRound,
}
}
if x.votePoolCollectionTime.IsZero() {
log.Info("[voteHandler] set vote pool time", "round", x.currentRound)
x.votePoolCollectionTime = time.Now()
}
// Collect vote
numberOfVotesInPool, pooledVotes := x.votePool.Add(voteMsg)
log.Trace("[voteHandler] New vote", "signer", voteMsg.GetSigner().Hex(), "proposedBlockInfoRound", voteMsg.ProposedBlockInfo.Round, "proposedBlockInfoNumber", voteMsg.ProposedBlockInfo.Number.Uint64(), "proposedBlockInfoHash", voteMsg.ProposedBlockInfo.Hash.Hex())
log.Debug("[voteHandler] collect votes", "number", numberOfVotesInPool)
go x.ForensicsProcessor.DetectEquivocationInVotePool(voteMsg, x.votePool)
go x.ForensicsProcessor.ProcessVoteEquivocation(chain, x, voteMsg)
epochInfo, err := x.getEpochSwitchInfo(chain, nil, voteMsg.ProposedBlockInfo.Hash)
if err != nil {
return &utils.ErrIncomingMessageBlockNotFound{
Type: "vote",
IncomingBlockHash: voteMsg.ProposedBlockInfo.Hash,
IncomingBlockNumber: voteMsg.ProposedBlockInfo.Number,
Err: err,
}
}
certThreshold := x.config.V2.Config(uint64(voteMsg.ProposedBlockInfo.Round)).CertThreshold
thresholdReached := float64(numberOfVotesInPool) >= float64(epochInfo.MasternodesLen)*certThreshold
if thresholdReached {
log.Info(fmt.Sprintf("[voteHandler] Vote pool threashold reached: %v, number of items in the pool: %v", thresholdReached, numberOfVotesInPool))
// Check if the block already exist, otherwise we try luck with the next vote
proposedBlockHeader := chain.GetHeaderByHash(voteMsg.ProposedBlockInfo.Hash)
if proposedBlockHeader == nil {
log.Info("[voteHandler] The proposed block from vote message does not exist yet, wait for the next vote to try again", "blockNum", voteMsg.ProposedBlockInfo.Number, "Hash", voteMsg.ProposedBlockInfo.Hash, "Round", voteMsg.ProposedBlockInfo.Round)
return nil
}
err := x.VerifyBlockInfo(chain, voteMsg.ProposedBlockInfo, nil)
if err != nil {
return err
}
x.verifyVotes(chain, pooledVotes, proposedBlockHeader)
err = x.onVotePoolThresholdReached(chain, pooledVotes, voteMsg, proposedBlockHeader)
if err != nil {
return err
}
elapsed := time.Since(x.votePoolCollectionTime)
log.Info("[voteHandler] time cost from receive first vote under QC create", "elapsed", elapsed)
x.votePoolCollectionTime = time.Time{}
}
return nil
}
func (x *XDPoS_v2) verifyVotes(chain consensus.ChainReader, votes map[common.Hash]utils.PoolObj, header *types.Header) {
masternodes := x.GetMasternodes(chain, header)
start := time.Now()
emptySigner := common.Address{}
// Filter out non-Master nodes signatures
var wg sync.WaitGroup
for _, vote := range votes {
wg.Go(func() {
v := vote.(*types.Vote)
signerAddress := v.GetSigner()
if signerAddress != emptySigner {
// verify that signer belongs to the final masternodes, we have not do so in previous steps
if len(masternodes) == 0 {
log.Error("[verifyVotes] empty masternode list detected when verifying message signatures")
}
for _, mn := range masternodes {
if mn == signerAddress {
return
}
}
// if signer does not belong to final masternodes, we remove the signer
v.SetSigner(emptySigner)
log.Debug("[verifyVotes] find a vote does not belong to final masternodes", "signer", signerAddress)
return
}
signedVote := types.VoteSigHash(&types.VoteForSign{
ProposedBlockInfo: v.ProposedBlockInfo,
GapNumber: v.GapNumber,
})
verified, masterNode, err := x.verifyMsgSignature(signedVote, v.Signature, masternodes)
if err != nil {
log.Warn("[verifyVotes] error while verifying vote signature", "error", err.Error())
return
}
if !verified {
log.Warn("[verifyVotes] non-verified vote signature", "verified", verified)
return
}
v.SetSigner(masterNode)
})
}
wg.Wait()
elapsed := time.Since(start)
log.Debug("[verifyVotes] verify message signatures of vote pool took", "elapsed", elapsed)
}
/*
Function that will be called by votePool when it reached threshold.
In the engine v2, we will need to generate and process QC
*/
func (x *XDPoS_v2) onVotePoolThresholdReached(chain consensus.ChainReader, pooledVotes map[common.Hash]utils.PoolObj, currentVoteMsg utils.PoolObj, proposedBlockHeader *types.Header) error {
// The signature list may contain empty entey. we only care the ones with values
var validSignatures []types.Signature
emptySigner := common.Address{}
for _, vote := range pooledVotes {
if vote.GetSigner() != emptySigner {
validSignatures = append(validSignatures, vote.(*types.Vote).Signature)
}
}
epochInfo, err := x.getEpochSwitchInfo(chain, nil, currentVoteMsg.(*types.Vote).ProposedBlockInfo.Hash)
if err != nil {
log.Error("[voteHandler] Error when getting epoch switch Info", "error", err)
return errors.New("fail on voteHandler due to failure in getting epoch switch info")
}
// Skip and wait for the next vote to process again if valid votes is less than what we required
certThreshold := x.config.V2.Config(uint64(currentVoteMsg.(*types.Vote).ProposedBlockInfo.Round)).CertThreshold
if float64(len(validSignatures)) < float64(epochInfo.MasternodesLen)*certThreshold {
log.Warn("[onVotePoolThresholdReached] Not enough valid signatures to generate QC", "VotesSignaturesAfterFilter", validSignatures, "NumberOfValidVotes", len(validSignatures), "NumberOfVotes", len(pooledVotes))
return nil
}
// Genrate QC
quorumCert := &types.QuorumCert{
ProposedBlockInfo: currentVoteMsg.(*types.Vote).ProposedBlockInfo,
Signatures: validSignatures,
GapNumber: currentVoteMsg.(*types.Vote).GapNumber,
}
err = x.processQC(chain, quorumCert)
if err != nil {
log.Error("Error while processing QC in the Vote handler after reaching pool threshold, ", err)
return err
}
log.Info("Successfully processed the vote and produced QC!", "QcRound", quorumCert.ProposedBlockInfo.Round, "QcNumOfSig", len(quorumCert.Signatures), "QcHash", quorumCert.ProposedBlockInfo.Hash, "QcNumber", quorumCert.ProposedBlockInfo.Number.Uint64())
return nil
}
// Hot stuff rule to decide whether this node is eligible to vote for the received block
func (x *XDPoS_v2) verifyVotingRule(blockChainReader consensus.ChainReader, blockInfo *types.BlockInfo, quorumCert *types.QuorumCert) (bool, error) {
// Make sure this node has not voted for this round.
if x.currentRound <= x.highestVotedRound {
log.Info("Failed to pass the voting rule verification, currentRound is not large then highestVoteRound", "x.currentRound", x.currentRound, "x.highestVotedRound", x.highestVotedRound)
return false, nil
}
/*
HotStuff Voting rule:
header's round == local current round, AND (one of the following two:)
header's block extends lockQuorumCert's ProposedBlockInfo (we need a isExtending(block_a, block_b) function), OR
header's QC's ProposedBlockInfo.Round > lockQuorumCert's ProposedBlockInfo.Round
*/
if blockInfo.Round != x.currentRound {
log.Info("Failed to pass the voting rule verification, blockRound is not equal currentRound", "x.currentRound", x.currentRound, "blockInfo.Round", blockInfo.Round)
return false, nil
}
// XDPoS v1.0 switch to v2.0, the proposed block can always pass voting rule
if x.lockQuorumCert == nil {
return true, nil
}
if quorumCert.ProposedBlockInfo.Round > x.lockQuorumCert.ProposedBlockInfo.Round {
return true, nil
}
isExtended, err := x.isExtendingFromAncestor(blockChainReader, blockInfo, x.lockQuorumCert.ProposedBlockInfo)
if err != nil {
log.Error("Failed to pass the voting rule verification, error on isExtendingFromAncestor", "err", err, "blockInfo", blockInfo, "x.lockQuorumCert.ProposedBlockInfo", x.lockQuorumCert.ProposedBlockInfo)
return false, err
}
if !isExtended {
log.Warn("Failed to pass the voting rule verification, block is not extended from ancestor", "blockInfo", blockInfo, "x.lockQuorumCert.ProposedBlockInfo", x.lockQuorumCert.ProposedBlockInfo)
return false, nil
}
return true, nil
}
func (x *XDPoS_v2) isExtendingFromAncestor(blockChainReader consensus.ChainReader, currentBlock *types.BlockInfo, ancestorBlock *types.BlockInfo) (bool, error) {
blockNumDiff := int(big.NewInt(0).Sub(currentBlock.Number, ancestorBlock.Number).Int64())
nextBlockHash := currentBlock.Hash
for i := 0; i < blockNumDiff; i++ {
parentBlock := blockChainReader.GetHeaderByHash(nextBlockHash)
if parentBlock == nil {
return false, fmt.Errorf("could not find its parent block when checking whether currentBlock %v with hash %v is extending from the ancestorBlock %v", currentBlock.Number, currentBlock.Hash, ancestorBlock.Number)
} else {
nextBlockHash = parentBlock.ParentHash
}
log.Debug("[isExtendingFromAncestor] Found parent block", "CurrentBlockHash", currentBlock.Hash, "ParentHash", nextBlockHash)
}
if nextBlockHash == ancestorBlock.Hash {
return true, nil
}
return false, nil
}
func (x *XDPoS_v2) hygieneVotePool() {
x.lock.RLock()
round := x.currentRound
x.lock.RUnlock()
votePoolKeys := x.votePool.PoolObjKeysList()
// Extract round number
for _, k := range votePoolKeys {
keyedRound, err := strconv.ParseInt(strings.Split(k, ":")[0], 10, 64)
if err != nil {
log.Error("[hygieneVotePool] Error while trying to get keyedRound inside pool", "Error", err)
continue
}
// Clean up any votes round that is 10 rounds older
if keyedRound < int64(round)-utils.PoolHygieneRound {
log.Debug("[hygieneVotePool] Cleaned vote pool at round", "Round", keyedRound, "currentRound", round, "Key", k)
x.votePool.ClearByPoolKey(k)
}
}
}
func (x *XDPoS_v2) ReceivedVotes() map[string]map[common.Hash]utils.PoolObj {
return x.votePool.Get()
}