add syncinfo pool (#1236)

* add syncinfo message into pool for process later

* add missing file back

---------

Co-authored-by: liam.lai <liam.lai@us>
This commit is contained in:
benjamin202410 2025-07-28 01:43:41 -07:00 committed by GitHub
parent 85f08c7732
commit db9c3de1dc
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
12 changed files with 429 additions and 222 deletions

View file

@ -119,7 +119,19 @@ const (
statusObservernode AccountRewardStatus = "ObserverNode"
)
type MessageStatus map[string]map[string]SignerTypes
type MessageStatus map[string]map[string]interface{}
type SyncInfoTypes struct {
Hash common.Hash `json:"hash"`
QCSigners int `json:"qcSigners"`
TCSigners int `json:"tcSigners"`
}
type PoolStatus struct {
Vote map[string]SignerTypes `json:"vote"`
Timeout map[string]SignerTypes `json:"timeout"`
SyncInfo map[string]SyncInfoTypes `json:"syncInfo"`
}
// GetSnapshot retrieves the state snapshot at a given block.
func (api *API) GetSnapshot(number *rpc.BlockNumber) (*utils.PublicApiSnapshot, error) {
@ -210,18 +222,40 @@ func (api *API) GetMasternodesByNumber(number *rpc.BlockNumber) MasternodesStatu
}
// Get current vote pool and timeout pool content and missing messages
func (api *API) GetLatestPoolStatus() MessageStatus {
func (api *API) GetLatestPoolStatus() PoolStatus {
header := api.chain.CurrentHeader()
masternodes := api.XDPoS.EngineV2.GetMasternodes(api.chain, header)
receivedVotes := api.XDPoS.EngineV2.ReceivedVotes()
receivedTimeouts := api.XDPoS.EngineV2.ReceivedTimeouts()
info := make(MessageStatus)
info["vote"] = make(map[string]SignerTypes)
info["timeout"] = make(map[string]SignerTypes)
receivedSyncInfo := api.XDPoS.EngineV2.ReceivedSyncInfo()
calculateSigners(info["vote"], receivedVotes, masternodes)
calculateSigners(info["timeout"], receivedTimeouts, masternodes)
info := PoolStatus{}
info.Vote = make(map[string]SignerTypes)
info.Timeout = make(map[string]SignerTypes)
info.SyncInfo = make(map[string]SyncInfoTypes)
calculateSigners(info.Vote, receivedVotes, masternodes)
calculateSigners(info.Timeout, receivedTimeouts, masternodes)
for name, objs := range receivedSyncInfo {
for _, obj := range objs {
syncInfo := obj.(*types.SyncInfo)
hash := syncInfo.Hash()
key := name + ":" + hash.Hex()
qcSigners := len(syncInfo.HighestQuorumCert.Signatures)
tcSigners := 0
if syncInfo.HighestTimeoutCert != nil {
tcSigners = len(syncInfo.HighestTimeoutCert.Signatures)
}
info.SyncInfo[key] = SyncInfoTypes{
Hash: hash,
QCSigners: qcSigners,
TCSigners: tcSigners,
}
}
}
return info
}

View file

@ -56,6 +56,7 @@ type XDPoS_v2 struct {
timeoutPool *utils.Pool
votePool *utils.Pool
syncInfoPool *utils.Pool
currentRound types.Round
highestSelfMinedRound types.Round
highestVotedRound types.Round
@ -84,6 +85,7 @@ func New(chainConfig *params.ChainConfig, db ethdb.Database, minePeriodCh chan i
timeoutPool := utils.NewPool()
votePool := utils.NewPool()
syncInfoPool := utils.NewPool()
engine := &XDPoS_v2{
chainConfig: chainConfig,
@ -103,8 +105,9 @@ func New(chainConfig *params.ChainConfig, db ethdb.Database, minePeriodCh chan i
round2epochBlockInfo: lru.NewCache[types.Round, *types.BlockInfo](utils.InmemoryRound2Epochs),
timeoutPool: timeoutPool,
votePool: votePool,
timeoutPool: timeoutPool,
votePool: votePool,
syncInfoPool: syncInfoPool,
highestSelfMinedRound: types.Round(0),
@ -566,147 +569,6 @@ func (x *XDPoS_v2) VerifyHeaders(chain consensus.ChainReader, headers []*types.H
}()
}
/*
SyncInfo workflow
*/
// Verify syncInfo and trigger process QC or TC if successful
func (x *XDPoS_v2) VerifySyncInfoMessage(chain consensus.ChainReader, syncInfo *types.SyncInfo) (bool, error) {
/*
1. Check QC and TC against highest QC TC. Skip if none of them need to be updated
2. Verify items including:
- verifyQC
- verifyTC
3. Broadcast(Not part of consensus)
*/
if (x.highestQuorumCert.ProposedBlockInfo.Round >= syncInfo.HighestQuorumCert.ProposedBlockInfo.Round) && (x.highestTimeoutCert.Round >= syncInfo.HighestTimeoutCert.Round) {
log.Debug("[VerifySyncInfoMessage] Round from incoming syncInfo message is no longer qualified", "Highest QC Round", x.highestQuorumCert.ProposedBlockInfo.Round, "Incoming SyncInfo QC Round", syncInfo.HighestQuorumCert.ProposedBlockInfo.Round, "highestTimeoutCert Round", x.highestTimeoutCert.Round, "Incoming syncInfo TC Round", syncInfo.HighestTimeoutCert.Round)
return false, nil
}
err := x.verifyQC(chain, syncInfo.HighestQuorumCert, nil)
if err != nil {
log.Warn("[VerifySyncInfoMessage] SyncInfo message verification failed due to QC", "blockNum", syncInfo.HighestQuorumCert.ProposedBlockInfo.Number, "round", syncInfo.HighestQuorumCert.ProposedBlockInfo.Round, "error", err)
return false, err
}
err = x.verifyTC(chain, syncInfo.HighestTimeoutCert)
if err != nil {
log.Warn("[VerifySyncInfoMessage] SyncInfo message verification failed due to TC", "gapNum", syncInfo.HighestTimeoutCert.GapNumber, "round", syncInfo.HighestTimeoutCert.Round, "error", err)
return false, err
}
return true, nil
}
func (x *XDPoS_v2) SyncInfoHandler(chain consensus.ChainReader, syncInfo *types.SyncInfo) error {
x.lock.Lock()
defer x.lock.Unlock()
/*
1. processQC
2. processTC
*/
err := x.processQC(chain, syncInfo.HighestQuorumCert)
if err != nil {
return err
}
return x.processTC(chain, syncInfo.HighestTimeoutCert)
}
/*
Vote workflow
*/
func (x *XDPoS_v2) VerifyVoteMessage(chain consensus.ChainReader, vote *types.Vote) (bool, error) {
/*
1. Check vote round with current round for fast fail(disqualifed)
2. Get masterNode list from snapshot by using vote.GapNumber
3. Check signature:
- Use ecRecover to get the public key
- Use the above public key to find out the xdc address
- Use the above xdc address to check against the master node list from step 1(For the running epoch)
4. Broadcast(Not part of consensus)
*/
if vote.ProposedBlockInfo.Round < x.currentRound {
log.Debug("[VerifyVoteMessage] Disqualified vote message as the proposed round does not match currentRound", "voteHash", vote.Hash(), "voteProposedBlockInfoRound", vote.ProposedBlockInfo.Round, "currentRound", x.currentRound)
return false, nil
}
snapshot, err := x.getSnapshot(chain, vote.GapNumber, true)
if err != nil {
log.Error("[VerifyVoteMessage] fail to get snapshot for a vote message", "blockNum", vote.ProposedBlockInfo.Number, "blockHash", vote.ProposedBlockInfo.Hash, "voteHash", vote.Hash(), "error", err.Error())
return false, err
}
verified, signer, err := x.verifyMsgSignature(types.VoteSigHash(&types.VoteForSign{
ProposedBlockInfo: vote.ProposedBlockInfo,
GapNumber: vote.GapNumber,
}), vote.Signature, snapshot.NextEpochCandidates)
if err != nil {
for i, mn := range snapshot.NextEpochCandidates {
log.Warn("[VerifyVoteMessage] Master node list item", "index", i, "Master node", mn.Hex())
}
log.Warn("[VerifyVoteMessage] Error while verifying vote message", "votedBlockNum", vote.ProposedBlockInfo.Number.Uint64(), "votedBlockHash", vote.ProposedBlockInfo.Hash.Hex(), "voteHash", vote.Hash(), "error", err.Error())
return false, err
}
vote.SetSigner(signer)
return verified, nil
}
// Consensus entry point for processing vote message to produce QC
func (x *XDPoS_v2) VoteHandler(chain consensus.ChainReader, voteMsg *types.Vote) error {
x.lock.Lock()
defer x.lock.Unlock()
return x.voteHandler(chain, voteMsg)
}
/*
Timeout workflow
*/
// Verify timeout message type from peers in bft.go
/*
1. Get master node list by timeout msg round
2. Check signature:
- Use ecRecover to get the public key
- Use the above public key to find out the xdc address
- Use the above xdc address to check against the master node list from step 1(For the running epoch)
3. Broadcast(Not part of consensus)
*/
func (x *XDPoS_v2) VerifyTimeoutMessage(chain consensus.ChainReader, timeoutMsg *types.Timeout) (bool, error) {
if timeoutMsg.Round < x.currentRound {
log.Debug("[VerifyTimeoutMessage] Disqualified timeout message as the proposed round does not match currentRound", "timeoutHash", timeoutMsg.Hash(), "timeoutRound", timeoutMsg.Round, "currentRound", x.currentRound)
return false, nil
}
snap, err := x.getSnapshot(chain, timeoutMsg.GapNumber, true)
if err != nil || snap == nil {
log.Error("[VerifyTimeoutMessage] Fail to get snapshot when verifying timeout message!", "messageGapNumber", timeoutMsg.GapNumber, "err", err)
return false, err
}
if len(snap.NextEpochCandidates) == 0 {
log.Error("[VerifyTimeoutMessage] cannot find NextEpochCandidates from snapshot", "messageGapNumber", timeoutMsg.GapNumber)
return false, errors.New("empty master node lists from snapshot")
}
verified, signer, err := x.verifyMsgSignature(types.TimeoutSigHash(&types.TimeoutForSign{
Round: timeoutMsg.Round,
GapNumber: timeoutMsg.GapNumber,
}), timeoutMsg.Signature, snap.NextEpochCandidates)
if err != nil {
log.Warn("[VerifyTimeoutMessage] cannot verify timeout signature", "err", err)
return false, err
}
timeoutMsg.SetSigner(signer)
return verified, nil
}
/*
Entry point for handling timeout message to process below:
*/
func (x *XDPoS_v2) TimeoutHandler(blockChainReader consensus.ChainReader, timeout *types.Timeout) error {
x.lock.Lock()
defer x.lock.Unlock()
return x.timeoutHandler(blockChainReader, timeout)
}
/*
Proposed Block workflow
*/
@ -873,7 +735,7 @@ func (x *XDPoS_v2) verifyQC(blockChainReader consensus.ChainReader, quorumCert *
// Update local QC variables including highestQC & lockQuorumCert, as well as commit the blocks that satisfy the algorithm requirements
func (x *XDPoS_v2) processQC(blockChainReader consensus.ChainReader, incomingQuorumCert *types.QuorumCert) error {
log.Trace("[processQC][Before]", "HighQC", x.highestQuorumCert)
log.Debug("[processQC][Before]", "HighQC", x.highestQuorumCert.ProposedBlockInfo.Round)
// 1. Update HighestQC
if incomingQuorumCert.ProposedBlockInfo.Round > x.highestQuorumCert.ProposedBlockInfo.Round {
log.Debug("[processQC] update x.highestQuorumCert", "blockNum", incomingQuorumCert.ProposedBlockInfo.Number, "round", incomingQuorumCert.ProposedBlockInfo.Round, "hash", incomingQuorumCert.ProposedBlockInfo.Hash)
@ -907,7 +769,7 @@ func (x *XDPoS_v2) processQC(blockChainReader consensus.ChainReader, incomingQuo
if incomingQuorumCert.ProposedBlockInfo.Round >= x.currentRound {
x.setNewRound(blockChainReader, incomingQuorumCert.ProposedBlockInfo.Round+1)
}
log.Trace("[processQC][After]", "HighQC", x.highestQuorumCert)
log.Debug("[processQC][After]", "HighQC", x.highestQuorumCert.ProposedBlockInfo.Round)
return nil
}
@ -922,8 +784,7 @@ func (x *XDPoS_v2) setNewRound(blockChainReader consensus.ChainReader, round typ
x.currentRound = round
x.timeoutCount = 0
x.timeoutWorker.Reset(blockChainReader, x.currentRound, x.highestQuorumCert.ProposedBlockInfo.Round)
x.timeoutPool.Clear()
// don't need to clean vote pool, we have other process to clean and it's not good to clean here, some edge case may break
// don't need to clean pool, we have other process to clean and it's not good to clean here, some edge case may break
// for example round gets bump during collecting vote, so we have to keep vote.
// send signal to newRoundCh, but if full don't send
@ -1148,6 +1009,7 @@ func (x *XDPoS_v2) periodicJob() {
<-ticker.C
x.hygieneVotePool()
x.hygieneTimeoutPool()
x.hygieneSyncInfoPool()
}
}()
}

View file

@ -0,0 +1,193 @@
package engine_v2
import (
"errors"
"fmt"
"strconv"
"strings"
"sync"
"github.com/XinFinOrg/XDPoSChain/common"
"github.com/XinFinOrg/XDPoSChain/consensus"
"github.com/XinFinOrg/XDPoSChain/consensus/XDPoS/utils"
"github.com/XinFinOrg/XDPoSChain/core/types"
"github.com/XinFinOrg/XDPoSChain/log"
)
// Verify syncInfo and trigger process QC or TC if successful
func (x *XDPoS_v2) VerifySyncInfoMessage(chain consensus.ChainReader, syncInfo *types.SyncInfo) (bool, error) {
qc := syncInfo.HighestQuorumCert
if qc == nil {
log.Warn("[VerifySyncInfoMessage] SyncInfo message is missing QC", "highestQC", qc)
return false, nil
}
if x.highestQuorumCert.ProposedBlockInfo.Round >= qc.ProposedBlockInfo.Round {
log.Debug("[VerifySyncInfoMessage] Round from incoming syncInfo message is equal or smaller then local round", "highestQCRound", x.highestQuorumCert.ProposedBlockInfo.Round, "incomingSyncInfoQCRound", qc.ProposedBlockInfo.Round)
return false, nil
}
snapshot, err := x.getSnapshot(chain, qc.GapNumber, true)
if err != nil {
log.Error("[VerifySyncInfoMessage] fail to get snapshot for a syncInfo message", "blockNum", qc.ProposedBlockInfo.Number, "blockHash", qc.ProposedBlockInfo.Hash, "error", err)
return false, err
}
voteSigHash := types.VoteSigHash(&types.VoteForSign{
ProposedBlockInfo: qc.ProposedBlockInfo,
GapNumber: qc.GapNumber,
})
if err := x.verifySignatures(voteSigHash, qc.Signatures, snapshot.NextEpochCandidates); err != nil {
log.Warn("[VerifySyncInfoMessage] SyncInfo message verification failed due to QC", "blockNum", qc.ProposedBlockInfo.Number, "gapNum", qc.GapNumber, "round", qc.ProposedBlockInfo.Round, "error", err)
return false, err
}
tc := syncInfo.HighestTimeoutCert
if tc != nil { // tc is optional, when the node is starting up there is no TC at the memory
snapshot, err = x.getSnapshot(chain, tc.GapNumber, true)
if err != nil {
log.Error("[VerifySyncInfoMessage] Fail to get snapshot when verifying TC!", "tcGapNumber", tc.GapNumber)
return false, fmt.Errorf("[VerifySyncInfoMessage] Unable to get snapshot, %s", err)
}
signedTimeoutObj := types.TimeoutSigHash(&types.TimeoutForSign{
Round: tc.Round,
GapNumber: tc.GapNumber,
})
if err := x.verifySignatures(signedTimeoutObj, tc.Signatures, snapshot.NextEpochCandidates); err != nil {
log.Warn("[VerifySyncInfoMessage] SyncInfo message verification failed due to TC", "gapNum", tc.GapNumber, "round", tc.Round, "error", err)
return false, err
}
}
return true, nil
}
func (x *XDPoS_v2) SyncInfoHandler(chain consensus.ChainReader, syncInfo *types.SyncInfo) error {
x.lock.Lock()
defer x.lock.Unlock()
x.syncInfoPool.Add(syncInfo) // Add syncInfo to the pool, in case this is valid syncInfo but chain is not sync to latest height
return x.syncInfoHandler(chain, syncInfo)
}
func (x *XDPoS_v2) syncInfoHandler(chain consensus.ChainReader, syncInfo *types.SyncInfo) error {
if x.highestQuorumCert.ProposedBlockInfo.Round >= syncInfo.HighestQuorumCert.ProposedBlockInfo.Round {
log.Debug("[syncInfoHandler] Round from incoming syncInfo message is equal or smaller then local round, skip process message", "highestQCRound", x.highestQuorumCert.ProposedBlockInfo.Round, "incomingSyncInfoQCRound", syncInfo.HighestQuorumCert.ProposedBlockInfo.Round)
return nil
}
if err := x.verifyQC(chain, syncInfo.HighestQuorumCert, nil); err != nil {
return fmt.Errorf("[syncInfoHandler] Failed to verify QC, err %s", err)
}
if err := x.processQC(chain, syncInfo.HighestQuorumCert); err != nil {
return fmt.Errorf("[syncInfoHandler] Failed to process QC, err %s", err)
}
if syncInfo.HighestTimeoutCert != nil {
if x.highestTimeoutCert.Round >= syncInfo.HighestTimeoutCert.Round {
log.Debug("[syncInfoHandler] Round from incoming syncInfo message is equal or smaller then local TC round, skip process message", "highestTCRound", x.highestTimeoutCert.Round, "incomingSyncInfoTCRound", syncInfo.HighestTimeoutCert.Round)
return nil
}
if err := x.verifyTC(chain, syncInfo.HighestTimeoutCert); err != nil {
return fmt.Errorf("[syncInfoHandler] Failed to verify TC, err %s", err)
}
if err := x.processTC(chain, syncInfo.HighestTimeoutCert); err != nil {
return fmt.Errorf("[syncInfoHandler] Failed to process TC, err %s", err)
}
}
return nil
}
func (x *XDPoS_v2) processSyncInfoPool(chain consensus.ChainReader) {
syncInfo := x.syncInfoPool.PoolObjKeysList()
for _, key := range syncInfo {
// Key format: qcRound:qcGapNum:qcBlockNum:timeoutRound:timeoutGapNum:qcBlockHash
// Get QC Round and needs to lower or equal to x.currentRound
qcRound, qcErr := strconv.ParseInt(strings.Split(key, ":")[0], 10, 64)
if qcErr != nil {
log.Warn("[processSyncInfoPool] Failed to parse QC round", "key", key, "error", qcErr)
continue
}
if int64(x.currentRound) < qcRound {
log.Info("[processSyncInfoPool] Sync QC round is higher than current round, need to sync from other nodes", "qcRound", qcRound, "currentRound", x.currentRound)
continue
}
// Optimize TODO: Check TC Round
log.Debug("[processSyncInfoPool] Processing syncInfo message from pool", "key", key)
for _, obj := range x.syncInfoPool.Get()[key] {
if syncInfoObj, ok := obj.(*types.SyncInfo); ok {
if err := x.syncInfoHandler(chain, syncInfoObj); err != nil {
log.Error("[processSyncInfoPool] Failed to handle sync info", "error", err, "currenBlock", chain.CurrentHeader().Number.Uint64(), "x.currentRound", x.currentRound, "key", key)
// must be something wrong with this message, so continue process next object in the pool for same round
continue
}
} else {
log.Error("[processSyncInfoPool] Object in sync info pool is not of type SyncInfo", "objectType", fmt.Sprintf("%T", obj), "key", key)
continue
}
break // We only need to process the first object in the pool ideally
}
}
}
func (x *XDPoS_v2) verifySignatures(messageHash common.Hash, signatures []types.Signature, candidates []common.Address) error {
var wg sync.WaitGroup
wg.Add(len(signatures))
var haveError error
for _, signature := range signatures {
go func(sig types.Signature) {
defer wg.Done()
verified, _, err := x.verifyMsgSignature(messageHash, sig, candidates)
if err != nil {
log.Error("[verifySignatures] Error while verfying QC message signatures", "error", err)
haveError = errors.New("error while verfying QC message signatures")
return
}
if !verified {
log.Error("[verifySignatures] Signature not verified doing signature verification")
haveError = errors.New("fail to verify QC due to signature mismatch")
return
}
}(signature)
}
wg.Wait()
if haveError != nil {
return haveError
}
return nil
}
func (x *XDPoS_v2) hygieneSyncInfoPool() {
x.lock.RLock()
round := x.currentRound
x.lock.RUnlock()
syncInfoPoolKeys := x.syncInfoPool.PoolObjKeysList()
// Extract round number
for _, k := range syncInfoPoolKeys {
// Key format: qcRound:qcGapNum:qcBlockNum:timeoutRound:timeoutGapNum:qcBlockHash
qcRound, qcErr := strconv.ParseInt(strings.Split(k, ":")[0], 10, 64)
tcRound, tcErr := strconv.ParseInt(strings.Split(k, ":")[3], 10, 64)
if qcErr != nil || tcErr != nil {
log.Error("[hygieneSyncInfoPool] Error while trying to get keyedRound inside pool", "Error", qcErr, "tcError", tcErr, "Key", k)
continue
}
lowerBoundRound := int64(round) - utils.PoolHygieneRound
// Clean up any sync info round that is 10 rounds older
if qcRound < lowerBoundRound && (tcRound == 0 || tcRound < lowerBoundRound) {
log.Debug("[hygieneSyncInfoPool] Cleaned sync info pool at round", "Round", qcRound, "currentRound", round, "Key", k)
x.syncInfoPool.ClearByPoolKey(k)
}
}
}
func (x *XDPoS_v2) ReceivedSyncInfo() map[string]map[common.Hash]utils.PoolObj {
return x.syncInfoPool.Get()
}

View file

@ -15,6 +15,44 @@ import (
"github.com/XinFinOrg/XDPoSChain/log"
)
func (x *XDPoS_v2) VerifyTimeoutMessage(chain consensus.ChainReader, timeoutMsg *types.Timeout) (bool, error) {
if timeoutMsg.Round < x.currentRound {
log.Debug("[VerifyTimeoutMessage] Disqualified timeout message as the proposed round does not match currentRound", "timeoutHash", timeoutMsg.Hash(), "timeoutRound", timeoutMsg.Round, "currentRound", x.currentRound)
return false, nil
}
snap, err := x.getSnapshot(chain, timeoutMsg.GapNumber, true)
if err != nil || snap == nil {
log.Error("[VerifyTimeoutMessage] Fail to get snapshot when verifying timeout message!", "messageGapNumber", timeoutMsg.GapNumber, "err", err)
return false, err
}
if len(snap.NextEpochCandidates) == 0 {
log.Error("[VerifyTimeoutMessage] cannot find NextEpochCandidates from snapshot", "messageGapNumber", timeoutMsg.GapNumber)
return false, errors.New("empty master node lists from snapshot")
}
verified, signer, err := x.verifyMsgSignature(types.TimeoutSigHash(&types.TimeoutForSign{
Round: timeoutMsg.Round,
GapNumber: timeoutMsg.GapNumber,
}), timeoutMsg.Signature, snap.NextEpochCandidates)
if err != nil {
log.Warn("[VerifyTimeoutMessage] cannot verify timeout signature", "err", err)
return false, err
}
timeoutMsg.SetSigner(signer)
return verified, nil
}
/*
Entry point for handling timeout message to process below:
*/
func (x *XDPoS_v2) TimeoutHandler(blockChainReader consensus.ChainReader, timeout *types.Timeout) error {
x.lock.Lock()
defer x.lock.Unlock()
return x.timeoutHandler(blockChainReader, timeout)
}
func (x *XDPoS_v2) timeoutHandler(blockChainReader consensus.ChainReader, timeout *types.Timeout) error {
// checkRoundNumber
if timeout.Round != x.currentRound {
@ -68,14 +106,11 @@ func (x *XDPoS_v2) onTimeoutPoolThresholdReached(blockChainReader consensus.Chai
// Process TC
err := x.processTC(blockChainReader, timeoutCert)
if err != nil {
log.Error("Error while processing TC in the Timeout handler after reaching pool threshold", "TcRound", timeoutCert.Round, "NumberOfTcSig", len(timeoutCert.Signatures), "GapNumber", gapNumber, "Error", err)
log.Error("[onTimeoutPoolThresholdReached] Error while processing TC in the Timeout handler after reaching pool threshold", "TcRound", timeoutCert.Round, "NumberOfTcSig", len(timeoutCert.Signatures), "GapNumber", gapNumber, "Error", err)
return err
}
// Generate and broadcast syncInfo
syncInfo := x.getSyncInfo()
x.broadcastToBftChannel(syncInfo)
log.Info("Successfully processed the timeout message and produced TC & SyncInfo!", "QcRound", syncInfo.HighestQuorumCert.ProposedBlockInfo.Round, "QcBlockNum", syncInfo.HighestQuorumCert.ProposedBlockInfo.Number, "TcRound", timeoutCert.Round, "NumberOfTcSig", len(timeoutCert.Signatures))
log.Info("[onTimeoutPoolThresholdReached] Successfully processed the timeout message and produced TC!", "TcRound", timeoutCert.Round, "NumberOfTcSig", len(timeoutCert.Signatures))
return nil
}
@ -117,14 +152,6 @@ func (x *XDPoS_v2) getTCEpochInfo(chain consensus.ChainReader, timeoutCert *type
}
func (x *XDPoS_v2) verifyTC(chain consensus.ChainReader, timeoutCert *types.TimeoutCert) error {
/*
1. Get epoch master node list by gapNumber
2. Check number of signatures > threshold, as well as it's format. (Same as verifyQC)
2. Verify signer signature: (List of signatures)
- Use ecRecover to get the public key
- Use the above public key to find out the xdc address
- Use the above xdc address to check against the master node list from step 1(For the received TC epoch)
*/
if timeoutCert == nil || timeoutCert.Signatures == nil {
log.Warn("[verifyTC] TC or TC signatures is Nil")
return utils.ErrInvalidTC
@ -143,7 +170,7 @@ func (x *XDPoS_v2) verifyTC(chain consensus.ChainReader, timeoutCert *types.Time
signatures, duplicates := UniqueSignatures(timeoutCert.Signatures)
if len(duplicates) != 0 {
for _, d := range duplicates {
log.Warn("[verifyQC] duplicated signature in QC", "duplicate", common.Bytes2Hex(d))
log.Warn("[verifyTC] duplicated signature in QC", "duplicate", common.Bytes2Hex(d))
}
}
@ -201,12 +228,11 @@ func (x *XDPoS_v2) verifyTC(chain consensus.ChainReader, timeoutCert *types.Time
2. Check TC round >= node's currentRound. If yes, call setNewRound
*/
func (x *XDPoS_v2) processTC(blockChainReader consensus.ChainReader, timeoutCert *types.TimeoutCert) error {
if timeoutCert.Round > x.highestTimeoutCert.Round {
if x.highestTimeoutCert.Round < timeoutCert.Round {
x.highestTimeoutCert = timeoutCert
}
if timeoutCert.Round >= x.currentRound {
x.setNewRound(blockChainReader, timeoutCert.Round+1)
}
return nil
}
@ -288,6 +314,7 @@ func (x *XDPoS_v2) OnCountdownTimeout(time time.Time, chain interface{}) error {
if !allow {
return nil
}
x.processSyncInfoPool(chain.(consensus.ChainReader))
err := x.sendTimeout(chain.(consensus.ChainReader))
if err != nil {

View file

@ -16,6 +16,40 @@ import (
"github.com/XinFinOrg/XDPoSChain/log"
)
func (x *XDPoS_v2) VerifyVoteMessage(chain consensus.ChainReader, vote *types.Vote) (bool, error) {
if vote.ProposedBlockInfo.Round < x.currentRound {
log.Debug("[VerifyVoteMessage] Disqualified vote message as the proposed round does not match currentRound", "voteHash", vote.Hash(), "voteProposedBlockInfoRound", vote.ProposedBlockInfo.Round, "currentRound", x.currentRound)
return false, nil
}
snapshot, err := x.getSnapshot(chain, vote.GapNumber, true)
if err != nil {
log.Error("[VerifyVoteMessage] fail to get snapshot for a vote message", "blockNum", vote.ProposedBlockInfo.Number, "blockHash", vote.ProposedBlockInfo.Hash, "voteHash", vote.Hash(), "error", err.Error())
return false, err
}
verified, signer, err := x.verifyMsgSignature(types.VoteSigHash(&types.VoteForSign{
ProposedBlockInfo: vote.ProposedBlockInfo,
GapNumber: vote.GapNumber,
}), vote.Signature, snapshot.NextEpochCandidates)
if err != nil {
for i, mn := range snapshot.NextEpochCandidates {
log.Warn("[VerifyVoteMessage] Master node list item", "index", i, "Master node", mn.Hex())
}
log.Warn("[VerifyVoteMessage] Error while verifying vote message", "votedBlockNum", vote.ProposedBlockInfo.Number.Uint64(), "votedBlockHash", vote.ProposedBlockInfo.Hash.Hex(), "voteHash", vote.Hash(), "error", err.Error())
return false, err
}
vote.SetSigner(signer)
return verified, nil
}
// Consensus entry point for processing vote message to produce QC
func (x *XDPoS_v2) VoteHandler(chain consensus.ChainReader, voteMsg *types.Vote) error {
x.lock.Lock()
defer x.lock.Unlock()
return x.voteHandler(chain, voteMsg)
}
// Once Hot stuff voting rule has verified, this node can then send vote
func (x *XDPoS_v2) sendVote(chainReader consensus.ChainReader, blockInfo *types.BlockInfo) error {
// First step: Update the highest Voted round

View file

@ -25,7 +25,6 @@ func (p *Pool) Get() map[string]map[common.Hash]PoolObj {
return p.objList
}
// return true if it has reached threshold
func (p *Pool) Add(obj PoolObj) (int, map[common.Hash]PoolObj) {
p.lock.Lock()
defer p.lock.Unlock()

View file

@ -24,11 +24,26 @@ func TestSyncInfoShouldSuccessfullyUpdateByQC(t *testing.T) {
t.Fatal("Fail to decode extra data", err)
}
timeoutForSign := &types.TimeoutForSign{
Round: types.Round(2),
GapNumber: 450,
}
// Sign from acc 1, 2, 3 and voter
acc1SignedHash := SignHashByPK(acc1Key, types.TimeoutSigHash(timeoutForSign).Bytes())
acc2SignedHash := SignHashByPK(acc2Key, types.TimeoutSigHash(timeoutForSign).Bytes())
acc3SignedHash := SignHashByPK(acc3Key, types.TimeoutSigHash(timeoutForSign).Bytes())
voterSignedHash := SignHashByPK(voterKey, types.TimeoutSigHash(timeoutForSign).Bytes())
var signatures []types.Signature
signatures = append(signatures, acc1SignedHash, acc2SignedHash, acc3SignedHash, voterSignedHash)
syncInfoMsg := &types.SyncInfo{
HighestQuorumCert: extraField.QuorumCert,
HighestTimeoutCert: &types.TimeoutCert{
Round: types.Round(2),
Signatures: []types.Signature{},
Signatures: signatures,
GapNumber: 450,
},
}
@ -55,9 +70,24 @@ func TestSyncInfoShouldSuccessfullyUpdateByTC(t *testing.T) {
t.Fatal("Fail to decode extra data", err)
}
timeoutForSign := &types.TimeoutForSign{
Round: types.Round(6),
GapNumber: 450,
}
// Sign from acc 1, 2, 3 and voter
acc1SignedHash := SignHashByPK(acc1Key, types.TimeoutSigHash(timeoutForSign).Bytes())
acc2SignedHash := SignHashByPK(acc2Key, types.TimeoutSigHash(timeoutForSign).Bytes())
acc3SignedHash := SignHashByPK(acc3Key, types.TimeoutSigHash(timeoutForSign).Bytes())
voterSignedHash := SignHashByPK(voterKey, types.TimeoutSigHash(timeoutForSign).Bytes())
var signatures []types.Signature
signatures = append(signatures, acc1SignedHash, acc2SignedHash, acc3SignedHash, voterSignedHash)
highestTC := &types.TimeoutCert{
Round: types.Round(6),
Signatures: []types.Signature{},
Signatures: signatures,
GapNumber: 450,
}
syncInfoMsg := &types.SyncInfo{
@ -115,11 +145,6 @@ func TestVerifySyncInfoIfTCRoundIsAtNextEpoch(t *testing.T) {
t.Fatal("Fail to decode extra data", err)
}
highestTC := &types.TimeoutCert{
Round: types.Round(899),
Signatures: []types.Signature{},
}
timeoutForSign := &types.TimeoutForSign{
Round: types.Round(900),
GapNumber: 450,
@ -145,8 +170,6 @@ func TestVerifySyncInfoIfTCRoundIsAtNextEpoch(t *testing.T) {
HighestTimeoutCert: syncInfoTC,
}
engineV2.SetPropertiesFaker(syncInfoMsg.HighestQuorumCert, highestTC)
verified, err := engineV2.VerifySyncInfoMessage(blockchain, syncInfoMsg)
assert.True(t, verified)
assert.Nil(t, err)
@ -266,12 +289,7 @@ func TestVerifySyncInfoIfTcUseDifferentEpoch(t *testing.T) {
HighestTimeoutCert: newTC,
}
x.SetPropertiesFaker(syncInfoMsg.HighestQuorumCert, &types.TimeoutCert{
Round: types.Round(898),
Signatures: []types.Signature{},
})
verified, err := x.VerifySyncInfoMessage(blockchain, syncInfoMsg)
assert.True(t, verified)
assert.Nil(t, err)
assert.True(t, verified)
}

View file

@ -139,7 +139,7 @@ func TestTimeoutPeriodAndThreadholdConfigChange(t *testing.T) {
}
// Timeout handler
func TestTimeoutMessageHandlerSuccessfullyGenerateTCandSyncInfo(t *testing.T) {
func TestTimeoutMessageHandlerSuccessfullyGenerateTCandSyncInfoAfterReachingThreshold(t *testing.T) {
blockchain, _, _, _, _, _ := PrepareXDCTestBlockChainForV2Engine(t, 905, params.TestXDPoSMockChainConfig, nil)
engineV2 := blockchain.Engine().(*XDPoS.XDPoS).EngineV2
@ -194,17 +194,30 @@ func TestTimeoutMessageHandlerSuccessfullyGenerateTCandSyncInfo(t *testing.T) {
err = engineV2.TimeoutHandler(blockchain, timeoutMsg)
assert.Nil(t, err)
syncInfoMsg := <-engineV2.BroadcastCh
var syncInfoMsg *types.SyncInfo
for {
msg := <-engineV2.BroadcastCh
// Try to type assert
if s, ok := msg.(*types.SyncInfo); ok {
syncInfoMsg = s
break
}
// Optionally: log or handle other types
t.Logf("Received unexpected message type: %T", msg)
}
currentRound, _, _, _, _, _ = engineV2.GetPropertiesFaker()
assert.NotNil(t, syncInfoMsg)
// Shouldn't have QC, however, we did not inilise it, hence will show default empty value
qc := syncInfoMsg.(*types.SyncInfo).HighestQuorumCert
qc := syncInfoMsg.HighestQuorumCert
assert.Equal(t, types.Round(0), qc.ProposedBlockInfo.Round)
tc := syncInfoMsg.(*types.SyncInfo).HighestTimeoutCert
tc := syncInfoMsg.HighestTimeoutCert
assert.NotNil(t, tc)
assert.Equal(t, tc.Round, types.Round(5))
assert.Equal(t, uint64(450), tc.GapNumber)

View file

@ -347,23 +347,6 @@ func TestProcessVoteMsgThenTimeoutMsg(t *testing.T) {
err = engineV2.TimeoutHandler(blockchain, timeoutMsg)
assert.Nil(t, err)
syncInfoMsg := <-engineV2.BroadcastCh
assert.NotNil(t, syncInfoMsg)
// Should have HighestQuorumCert from previous round votes
qc := syncInfoMsg.(*types.SyncInfo).HighestQuorumCert
assert.NotNil(t, qc)
assert.Equal(t, types.Round(5), qc.ProposedBlockInfo.Round)
tc := syncInfoMsg.(*types.SyncInfo).HighestTimeoutCert
assert.NotNil(t, tc)
assert.Equal(t, types.Round(6), tc.Round)
sigatures := []types.Signature{[]byte{1}, []byte{2}, []byte{3}, []byte{4}}
assert.ElementsMatch(t, tc.Signatures, sigatures)
// Round shall be +1 now
currentRound, _, _, _, _, _ = engineV2.GetPropertiesFaker()
assert.Equal(t, types.Round(7), currentRound)
}
func TestVoteMessageShallNotThrowErrorIfBlockNotYetExist(t *testing.T) {

View file

@ -79,6 +79,27 @@ func (s *SyncInfo) Hash() common.Hash {
return rlpHash(s)
}
// Key format: qcRound:qcGapNum:qcBlockNum:timeoutRound:timeoutGapNum:qcBlockHash
func (s *SyncInfo) PoolKey() string {
qcRound := s.HighestQuorumCert.ProposedBlockInfo.Round
qcGapNum := s.HighestQuorumCert.GapNumber
qcBlockNum := s.HighestQuorumCert.ProposedBlockInfo.Number
qcBlockHash := s.HighestQuorumCert.ProposedBlockInfo.Hash
timeoutRound := Round(0)
timeoutGapNum := uint64(0)
if s.HighestTimeoutCert != nil {
timeoutRound = s.HighestTimeoutCert.Round
timeoutGapNum = s.HighestTimeoutCert.GapNumber
}
return fmt.Sprint(qcRound, ":", qcGapNum, ":", qcBlockNum, ":", timeoutRound, ":", timeoutGapNum, ":", qcBlockHash.Hex())
}
func (s *SyncInfo) GetSigner() common.Address {
// SyncInfo does not have a signer, so we return an empty address
return common.Address{}
}
// Quorum Certificate struct in XDPoS 2.0
type QuorumCert struct {
ProposedBlockInfo *BlockInfo `json:"proposedBlockInfo"`

View file

@ -78,18 +78,18 @@ func (b *Bfter) SetConsensusFuns(engine consensus.Engine) {
}
func (b *Bfter) Vote(peer string, vote *types.Vote) error {
log.Trace("Receive Vote", "hash", vote.Hash().Hex(), "voted block hash", vote.ProposedBlockInfo.Hash.Hex(), "number", vote.ProposedBlockInfo.Number, "round", vote.ProposedBlockInfo.Round)
log.Trace("[Vote] Receive Vote", "hash", vote.Hash().Hex(), "voted block hash", vote.ProposedBlockInfo.Hash.Hex(), "number", vote.ProposedBlockInfo.Number, "round", vote.ProposedBlockInfo.Round)
voteBlockNum := vote.ProposedBlockInfo.Number.Int64()
if dist := voteBlockNum - int64(b.chainHeight()); dist < -maxBlockDist || dist > maxBlockDist {
log.Debug("Discarded propagated vote, too far away", "peer", peer, "number", voteBlockNum, "hash", vote.ProposedBlockInfo.Hash, "distance", dist)
log.Debug("[Vote] Discarded propagated vote, too far away", "peer", peer, "number", voteBlockNum, "hash", vote.ProposedBlockInfo.Hash, "distance", dist)
return nil
}
verified, err := b.consensus.verifyVote(b.blockChainReader, vote)
if err != nil {
log.Error("Verify BFT Vote", "error", err)
log.Error("[Vote] Verify BFT Vote", "error", err)
return err
}
@ -98,14 +98,14 @@ func (b *Bfter) Vote(peer string, vote *types.Vote) error {
err = b.consensus.voteHandler(b.blockChainReader, vote)
if err != nil {
if _, ok := err.(*utils.ErrIncomingMessageRoundTooFarFromCurrentRound); ok {
log.Debug("vote round not equal", "error", err, "vote", vote.Hash())
log.Debug("[Vote] vote round not equal", "error", err, "vote", vote.Hash())
return err
}
if _, ok := err.(*utils.ErrIncomingMessageBlockNotFound); ok {
log.Debug("vote proposed block not found", "error", err, "vote", vote.Hash())
log.Debug("[Vote] vote proposed block not found", "error", err, "vote", vote.Hash())
return err
}
log.Error("handle BFT Vote", "error", err)
log.Error("[Vote] handle BFT Vote", "error", err)
return err
}
}
@ -117,26 +117,26 @@ func (b *Bfter) Timeout(peer string, timeout *types.Timeout) error {
// dist times 3, ex: timeout message's gap number is based on block and find out it's epoch switch number, then mod 900 then minus 450
if dist := int64(gapNum) - int64(b.chainHeight()); dist < -int64(b.epoch)*3 || dist > int64(b.epoch)*3 {
log.Debug("Discarded propagated timeout, too far away", "peer", peer, "gapNumber", gapNum, "hash", timeout.Hash, "distance", dist)
log.Debug("[Timeout] Discarded propagated timeout, too far away", "peer", peer, "gapNumber", gapNum, "hash", timeout.Hash, "distance", dist)
return nil
}
verified, err := b.consensus.verifyTimeout(b.blockChainReader, timeout)
if err != nil {
log.Error("Verify BFT Timeout", "timeoutRound", timeout.Round, "timeoutGapNum", gapNum, "error", err)
log.Error("[Timeout] Verify BFT Timeout", "timeoutRound", timeout.Round, "timeoutGapNum", gapNum, "error", err)
return err
}
log.Debug("Receive Timeout", "gap", gapNum, "hash", timeout.Hash().Hex(), "round", timeout.Round, "signer", timeout.GetSigner().Hex()) //get signer after verifyTimeout
log.Debug("[Timeout] Receive Timeout", "gap", gapNum, "hash", timeout.Hash().Hex(), "round", timeout.Round, "signer", timeout.GetSigner().Hex()) //get signer after verifyTimeout
if verified {
b.broadcastCh <- timeout
err = b.consensus.timeoutHandler(b.blockChainReader, timeout)
if err != nil {
if _, ok := err.(*utils.ErrIncomingMessageRoundNotEqualCurrentRound); ok {
log.Debug("timeout round not equal", "error", err)
log.Debug("[Timeout] timeout round not equal", "error", err)
return err
}
log.Error("handle BFT Timeout", "error", err)
log.Error("[Timeout] handle BFT Timeout", "error", err)
return err
}
}
@ -144,17 +144,22 @@ func (b *Bfter) Timeout(peer string, timeout *types.Timeout) error {
return nil
}
func (b *Bfter) SyncInfo(peer string, syncInfo *types.SyncInfo) error {
log.Debug("Receive SyncInfo", "syncInfo", syncInfo)
log.Debug("[SyncInfo] Receive SyncInfo")
if syncInfo == nil || syncInfo.HighestQuorumCert == nil {
log.Warn("[SyncInfo] Received nil SyncInfo or missing QC", "syncInfo", syncInfo)
return nil
}
qcBlockNum := syncInfo.HighestQuorumCert.ProposedBlockInfo.Number.Int64()
if dist := qcBlockNum - int64(b.chainHeight()); dist < -maxBlockDist || dist > maxBlockDist {
log.Debug("Discarded propagated syncInfo, too far away", "peer", peer, "blockNum", qcBlockNum, "hash", syncInfo.Hash, "distance", dist)
log.Debug("[SyncInfo] Discarded propagated syncInfo, too far away", "peer", peer, "blockNum", syncInfo.HighestQuorumCert.ProposedBlockInfo.Number, "hash", syncInfo.Hash, "qcRound", syncInfo.HighestQuorumCert.ProposedBlockInfo.Round, "distance", dist)
return nil
}
verified, err := b.consensus.verifySyncInfo(b.blockChainReader, syncInfo)
if err != nil {
log.Error("Verify BFT SyncInfo", "error", err)
log.Error("[SyncInfo] Verify BFT SyncInfo", "error", err)
return err
}
@ -163,7 +168,7 @@ func (b *Bfter) SyncInfo(peer string, syncInfo *types.SyncInfo) error {
b.broadcastCh <- syncInfo
err = b.consensus.syncInfoHandler(b.blockChainReader, syncInfo)
if err != nil {
log.Error("handle BFT SyncInfo", "error", err)
log.Error("[SyncInfo] handle BFT SyncInfo", "error", err)
return err
}
}

View file

@ -85,6 +85,15 @@ var (
MinePeriod: 2,
ExpTimeoutConfig: ExpTimeoutConfig{Base: 1.0, MaxExponent: 0},
},
3200000: {
MaxMasternodes: 108,
SwitchRound: 3200000,
CertThreshold: 0.667,
TimeoutSyncThreshold: 3,
TimeoutPeriod: 10,
MinePeriod: 2,
ExpTimeoutConfig: ExpTimeoutConfig{Base: 1.0, MaxExponent: 0},
},
}
TestnetV2Configs = map[uint64]*V2Config{
@ -106,6 +115,15 @@ var (
MinePeriod: 2,
ExpTimeoutConfig: ExpTimeoutConfig{Base: 1.0, MaxExponent: 0},
},
15000000: {
MaxMasternodes: 108,
SwitchRound: 15000000,
CertThreshold: 0.667,
TimeoutSyncThreshold: 3,
TimeoutPeriod: 10,
MinePeriod: 2,
ExpTimeoutConfig: ExpTimeoutConfig{Base: 1.0, MaxExponent: 0},
},
}
DevnetV2Configs = map[uint64]*V2Config{