diff --git a/consensus/XDPoS/api.go b/consensus/XDPoS/api.go index 95e645fbe1..9f3dedf108 100644 --- a/consensus/XDPoS/api.go +++ b/consensus/XDPoS/api.go @@ -119,7 +119,19 @@ const ( statusObservernode AccountRewardStatus = "ObserverNode" ) -type MessageStatus map[string]map[string]SignerTypes +type MessageStatus map[string]map[string]interface{} + +type SyncInfoTypes struct { + Hash common.Hash `json:"hash"` + QCSigners int `json:"qcSigners"` + TCSigners int `json:"tcSigners"` +} + +type PoolStatus struct { + Vote map[string]SignerTypes `json:"vote"` + Timeout map[string]SignerTypes `json:"timeout"` + SyncInfo map[string]SyncInfoTypes `json:"syncInfo"` +} // GetSnapshot retrieves the state snapshot at a given block. func (api *API) GetSnapshot(number *rpc.BlockNumber) (*utils.PublicApiSnapshot, error) { @@ -210,18 +222,40 @@ func (api *API) GetMasternodesByNumber(number *rpc.BlockNumber) MasternodesStatu } // Get current vote pool and timeout pool content and missing messages -func (api *API) GetLatestPoolStatus() MessageStatus { +func (api *API) GetLatestPoolStatus() PoolStatus { header := api.chain.CurrentHeader() masternodes := api.XDPoS.EngineV2.GetMasternodes(api.chain, header) receivedVotes := api.XDPoS.EngineV2.ReceivedVotes() receivedTimeouts := api.XDPoS.EngineV2.ReceivedTimeouts() - info := make(MessageStatus) - info["vote"] = make(map[string]SignerTypes) - info["timeout"] = make(map[string]SignerTypes) + receivedSyncInfo := api.XDPoS.EngineV2.ReceivedSyncInfo() - calculateSigners(info["vote"], receivedVotes, masternodes) - calculateSigners(info["timeout"], receivedTimeouts, masternodes) + info := PoolStatus{} + info.Vote = make(map[string]SignerTypes) + info.Timeout = make(map[string]SignerTypes) + info.SyncInfo = make(map[string]SyncInfoTypes) + + calculateSigners(info.Vote, receivedVotes, masternodes) + calculateSigners(info.Timeout, receivedTimeouts, masternodes) + + for name, objs := range receivedSyncInfo { + for _, obj := range objs { + syncInfo := obj.(*types.SyncInfo) + hash := syncInfo.Hash() + key := name + ":" + hash.Hex() + + qcSigners := len(syncInfo.HighestQuorumCert.Signatures) + tcSigners := 0 + if syncInfo.HighestTimeoutCert != nil { + tcSigners = len(syncInfo.HighestTimeoutCert.Signatures) + } + info.SyncInfo[key] = SyncInfoTypes{ + Hash: hash, + QCSigners: qcSigners, + TCSigners: tcSigners, + } + } + } return info } diff --git a/consensus/XDPoS/engines/engine_v2/engine.go b/consensus/XDPoS/engines/engine_v2/engine.go index 7c59ed1ae4..98c29d92f4 100644 --- a/consensus/XDPoS/engines/engine_v2/engine.go +++ b/consensus/XDPoS/engines/engine_v2/engine.go @@ -56,6 +56,7 @@ type XDPoS_v2 struct { timeoutPool *utils.Pool votePool *utils.Pool + syncInfoPool *utils.Pool currentRound types.Round highestSelfMinedRound types.Round highestVotedRound types.Round @@ -84,6 +85,7 @@ func New(chainConfig *params.ChainConfig, db ethdb.Database, minePeriodCh chan i timeoutPool := utils.NewPool() votePool := utils.NewPool() + syncInfoPool := utils.NewPool() engine := &XDPoS_v2{ chainConfig: chainConfig, @@ -103,8 +105,9 @@ func New(chainConfig *params.ChainConfig, db ethdb.Database, minePeriodCh chan i round2epochBlockInfo: lru.NewCache[types.Round, *types.BlockInfo](utils.InmemoryRound2Epochs), - timeoutPool: timeoutPool, - votePool: votePool, + timeoutPool: timeoutPool, + votePool: votePool, + syncInfoPool: syncInfoPool, highestSelfMinedRound: types.Round(0), @@ -566,147 +569,6 @@ func (x *XDPoS_v2) VerifyHeaders(chain consensus.ChainReader, headers []*types.H }() } -/* - SyncInfo workflow -*/ -// Verify syncInfo and trigger process QC or TC if successful -func (x *XDPoS_v2) VerifySyncInfoMessage(chain consensus.ChainReader, syncInfo *types.SyncInfo) (bool, error) { - /* - 1. Check QC and TC against highest QC TC. Skip if none of them need to be updated - 2. Verify items including: - - verifyQC - - verifyTC - 3. Broadcast(Not part of consensus) - */ - - if (x.highestQuorumCert.ProposedBlockInfo.Round >= syncInfo.HighestQuorumCert.ProposedBlockInfo.Round) && (x.highestTimeoutCert.Round >= syncInfo.HighestTimeoutCert.Round) { - log.Debug("[VerifySyncInfoMessage] Round from incoming syncInfo message is no longer qualified", "Highest QC Round", x.highestQuorumCert.ProposedBlockInfo.Round, "Incoming SyncInfo QC Round", syncInfo.HighestQuorumCert.ProposedBlockInfo.Round, "highestTimeoutCert Round", x.highestTimeoutCert.Round, "Incoming syncInfo TC Round", syncInfo.HighestTimeoutCert.Round) - return false, nil - } - - err := x.verifyQC(chain, syncInfo.HighestQuorumCert, nil) - if err != nil { - log.Warn("[VerifySyncInfoMessage] SyncInfo message verification failed due to QC", "blockNum", syncInfo.HighestQuorumCert.ProposedBlockInfo.Number, "round", syncInfo.HighestQuorumCert.ProposedBlockInfo.Round, "error", err) - return false, err - } - err = x.verifyTC(chain, syncInfo.HighestTimeoutCert) - if err != nil { - log.Warn("[VerifySyncInfoMessage] SyncInfo message verification failed due to TC", "gapNum", syncInfo.HighestTimeoutCert.GapNumber, "round", syncInfo.HighestTimeoutCert.Round, "error", err) - return false, err - } - return true, nil -} - -func (x *XDPoS_v2) SyncInfoHandler(chain consensus.ChainReader, syncInfo *types.SyncInfo) error { - x.lock.Lock() - defer x.lock.Unlock() - /* - 1. processQC - 2. processTC - */ - err := x.processQC(chain, syncInfo.HighestQuorumCert) - if err != nil { - return err - } - return x.processTC(chain, syncInfo.HighestTimeoutCert) -} - -/* -Vote workflow -*/ -func (x *XDPoS_v2) VerifyVoteMessage(chain consensus.ChainReader, vote *types.Vote) (bool, error) { - /* - 1. Check vote round with current round for fast fail(disqualifed) - 2. Get masterNode list from snapshot by using vote.GapNumber - 3. Check signature: - - Use ecRecover to get the public key - - Use the above public key to find out the xdc address - - Use the above xdc address to check against the master node list from step 1(For the running epoch) - 4. Broadcast(Not part of consensus) - */ - if vote.ProposedBlockInfo.Round < x.currentRound { - log.Debug("[VerifyVoteMessage] Disqualified vote message as the proposed round does not match currentRound", "voteHash", vote.Hash(), "voteProposedBlockInfoRound", vote.ProposedBlockInfo.Round, "currentRound", x.currentRound) - return false, nil - } - - snapshot, err := x.getSnapshot(chain, vote.GapNumber, true) - if err != nil { - log.Error("[VerifyVoteMessage] fail to get snapshot for a vote message", "blockNum", vote.ProposedBlockInfo.Number, "blockHash", vote.ProposedBlockInfo.Hash, "voteHash", vote.Hash(), "error", err.Error()) - return false, err - } - verified, signer, err := x.verifyMsgSignature(types.VoteSigHash(&types.VoteForSign{ - ProposedBlockInfo: vote.ProposedBlockInfo, - GapNumber: vote.GapNumber, - }), vote.Signature, snapshot.NextEpochCandidates) - if err != nil { - for i, mn := range snapshot.NextEpochCandidates { - log.Warn("[VerifyVoteMessage] Master node list item", "index", i, "Master node", mn.Hex()) - } - log.Warn("[VerifyVoteMessage] Error while verifying vote message", "votedBlockNum", vote.ProposedBlockInfo.Number.Uint64(), "votedBlockHash", vote.ProposedBlockInfo.Hash.Hex(), "voteHash", vote.Hash(), "error", err.Error()) - return false, err - } - vote.SetSigner(signer) - - return verified, nil -} - -// Consensus entry point for processing vote message to produce QC -func (x *XDPoS_v2) VoteHandler(chain consensus.ChainReader, voteMsg *types.Vote) error { - x.lock.Lock() - defer x.lock.Unlock() - return x.voteHandler(chain, voteMsg) -} - -/* - Timeout workflow -*/ -// Verify timeout message type from peers in bft.go -/* - 1. Get master node list by timeout msg round - 2. Check signature: - - Use ecRecover to get the public key - - Use the above public key to find out the xdc address - - Use the above xdc address to check against the master node list from step 1(For the running epoch) - 3. Broadcast(Not part of consensus) -*/ -func (x *XDPoS_v2) VerifyTimeoutMessage(chain consensus.ChainReader, timeoutMsg *types.Timeout) (bool, error) { - if timeoutMsg.Round < x.currentRound { - log.Debug("[VerifyTimeoutMessage] Disqualified timeout message as the proposed round does not match currentRound", "timeoutHash", timeoutMsg.Hash(), "timeoutRound", timeoutMsg.Round, "currentRound", x.currentRound) - return false, nil - } - snap, err := x.getSnapshot(chain, timeoutMsg.GapNumber, true) - if err != nil || snap == nil { - log.Error("[VerifyTimeoutMessage] Fail to get snapshot when verifying timeout message!", "messageGapNumber", timeoutMsg.GapNumber, "err", err) - return false, err - } - if len(snap.NextEpochCandidates) == 0 { - log.Error("[VerifyTimeoutMessage] cannot find NextEpochCandidates from snapshot", "messageGapNumber", timeoutMsg.GapNumber) - return false, errors.New("empty master node lists from snapshot") - } - - verified, signer, err := x.verifyMsgSignature(types.TimeoutSigHash(&types.TimeoutForSign{ - Round: timeoutMsg.Round, - GapNumber: timeoutMsg.GapNumber, - }), timeoutMsg.Signature, snap.NextEpochCandidates) - - if err != nil { - log.Warn("[VerifyTimeoutMessage] cannot verify timeout signature", "err", err) - return false, err - } - - timeoutMsg.SetSigner(signer) - return verified, nil -} - -/* -Entry point for handling timeout message to process below: -*/ -func (x *XDPoS_v2) TimeoutHandler(blockChainReader consensus.ChainReader, timeout *types.Timeout) error { - x.lock.Lock() - defer x.lock.Unlock() - return x.timeoutHandler(blockChainReader, timeout) -} - /* Proposed Block workflow */ @@ -873,7 +735,7 @@ func (x *XDPoS_v2) verifyQC(blockChainReader consensus.ChainReader, quorumCert * // Update local QC variables including highestQC & lockQuorumCert, as well as commit the blocks that satisfy the algorithm requirements func (x *XDPoS_v2) processQC(blockChainReader consensus.ChainReader, incomingQuorumCert *types.QuorumCert) error { - log.Trace("[processQC][Before]", "HighQC", x.highestQuorumCert) + log.Debug("[processQC][Before]", "HighQC", x.highestQuorumCert.ProposedBlockInfo.Round) // 1. Update HighestQC if incomingQuorumCert.ProposedBlockInfo.Round > x.highestQuorumCert.ProposedBlockInfo.Round { log.Debug("[processQC] update x.highestQuorumCert", "blockNum", incomingQuorumCert.ProposedBlockInfo.Number, "round", incomingQuorumCert.ProposedBlockInfo.Round, "hash", incomingQuorumCert.ProposedBlockInfo.Hash) @@ -907,7 +769,7 @@ func (x *XDPoS_v2) processQC(blockChainReader consensus.ChainReader, incomingQuo if incomingQuorumCert.ProposedBlockInfo.Round >= x.currentRound { x.setNewRound(blockChainReader, incomingQuorumCert.ProposedBlockInfo.Round+1) } - log.Trace("[processQC][After]", "HighQC", x.highestQuorumCert) + log.Debug("[processQC][After]", "HighQC", x.highestQuorumCert.ProposedBlockInfo.Round) return nil } @@ -922,8 +784,7 @@ func (x *XDPoS_v2) setNewRound(blockChainReader consensus.ChainReader, round typ x.currentRound = round x.timeoutCount = 0 x.timeoutWorker.Reset(blockChainReader, x.currentRound, x.highestQuorumCert.ProposedBlockInfo.Round) - x.timeoutPool.Clear() - // don't need to clean vote pool, we have other process to clean and it's not good to clean here, some edge case may break + // don't need to clean pool, we have other process to clean and it's not good to clean here, some edge case may break // for example round gets bump during collecting vote, so we have to keep vote. // send signal to newRoundCh, but if full don't send @@ -1148,6 +1009,7 @@ func (x *XDPoS_v2) periodicJob() { <-ticker.C x.hygieneVotePool() x.hygieneTimeoutPool() + x.hygieneSyncInfoPool() } }() } diff --git a/consensus/XDPoS/engines/engine_v2/syncInfo.go b/consensus/XDPoS/engines/engine_v2/syncInfo.go new file mode 100644 index 0000000000..02bf3cd93c --- /dev/null +++ b/consensus/XDPoS/engines/engine_v2/syncInfo.go @@ -0,0 +1,193 @@ +package engine_v2 + +import ( + "errors" + "fmt" + "strconv" + "strings" + "sync" + + "github.com/XinFinOrg/XDPoSChain/common" + "github.com/XinFinOrg/XDPoSChain/consensus" + "github.com/XinFinOrg/XDPoSChain/consensus/XDPoS/utils" + "github.com/XinFinOrg/XDPoSChain/core/types" + "github.com/XinFinOrg/XDPoSChain/log" +) + +// Verify syncInfo and trigger process QC or TC if successful +func (x *XDPoS_v2) VerifySyncInfoMessage(chain consensus.ChainReader, syncInfo *types.SyncInfo) (bool, error) { + qc := syncInfo.HighestQuorumCert + if qc == nil { + log.Warn("[VerifySyncInfoMessage] SyncInfo message is missing QC", "highestQC", qc) + return false, nil + } + + if x.highestQuorumCert.ProposedBlockInfo.Round >= qc.ProposedBlockInfo.Round { + log.Debug("[VerifySyncInfoMessage] Round from incoming syncInfo message is equal or smaller then local round", "highestQCRound", x.highestQuorumCert.ProposedBlockInfo.Round, "incomingSyncInfoQCRound", qc.ProposedBlockInfo.Round) + return false, nil + } + + snapshot, err := x.getSnapshot(chain, qc.GapNumber, true) + if err != nil { + log.Error("[VerifySyncInfoMessage] fail to get snapshot for a syncInfo message", "blockNum", qc.ProposedBlockInfo.Number, "blockHash", qc.ProposedBlockInfo.Hash, "error", err) + return false, err + } + + voteSigHash := types.VoteSigHash(&types.VoteForSign{ + ProposedBlockInfo: qc.ProposedBlockInfo, + GapNumber: qc.GapNumber, + }) + + if err := x.verifySignatures(voteSigHash, qc.Signatures, snapshot.NextEpochCandidates); err != nil { + log.Warn("[VerifySyncInfoMessage] SyncInfo message verification failed due to QC", "blockNum", qc.ProposedBlockInfo.Number, "gapNum", qc.GapNumber, "round", qc.ProposedBlockInfo.Round, "error", err) + return false, err + } + + tc := syncInfo.HighestTimeoutCert + if tc != nil { // tc is optional, when the node is starting up there is no TC at the memory + snapshot, err = x.getSnapshot(chain, tc.GapNumber, true) + if err != nil { + log.Error("[VerifySyncInfoMessage] Fail to get snapshot when verifying TC!", "tcGapNumber", tc.GapNumber) + return false, fmt.Errorf("[VerifySyncInfoMessage] Unable to get snapshot, %s", err) + } + + signedTimeoutObj := types.TimeoutSigHash(&types.TimeoutForSign{ + Round: tc.Round, + GapNumber: tc.GapNumber, + }) + + if err := x.verifySignatures(signedTimeoutObj, tc.Signatures, snapshot.NextEpochCandidates); err != nil { + log.Warn("[VerifySyncInfoMessage] SyncInfo message verification failed due to TC", "gapNum", tc.GapNumber, "round", tc.Round, "error", err) + return false, err + } + } + + return true, nil +} + +func (x *XDPoS_v2) SyncInfoHandler(chain consensus.ChainReader, syncInfo *types.SyncInfo) error { + x.lock.Lock() + defer x.lock.Unlock() + x.syncInfoPool.Add(syncInfo) // Add syncInfo to the pool, in case this is valid syncInfo but chain is not sync to latest height + return x.syncInfoHandler(chain, syncInfo) +} + +func (x *XDPoS_v2) syncInfoHandler(chain consensus.ChainReader, syncInfo *types.SyncInfo) error { + if x.highestQuorumCert.ProposedBlockInfo.Round >= syncInfo.HighestQuorumCert.ProposedBlockInfo.Round { + log.Debug("[syncInfoHandler] Round from incoming syncInfo message is equal or smaller then local round, skip process message", "highestQCRound", x.highestQuorumCert.ProposedBlockInfo.Round, "incomingSyncInfoQCRound", syncInfo.HighestQuorumCert.ProposedBlockInfo.Round) + return nil + } + + if err := x.verifyQC(chain, syncInfo.HighestQuorumCert, nil); err != nil { + return fmt.Errorf("[syncInfoHandler] Failed to verify QC, err %s", err) + } + if err := x.processQC(chain, syncInfo.HighestQuorumCert); err != nil { + return fmt.Errorf("[syncInfoHandler] Failed to process QC, err %s", err) + } + + if syncInfo.HighestTimeoutCert != nil { + if x.highestTimeoutCert.Round >= syncInfo.HighestTimeoutCert.Round { + log.Debug("[syncInfoHandler] Round from incoming syncInfo message is equal or smaller then local TC round, skip process message", "highestTCRound", x.highestTimeoutCert.Round, "incomingSyncInfoTCRound", syncInfo.HighestTimeoutCert.Round) + return nil + } + if err := x.verifyTC(chain, syncInfo.HighestTimeoutCert); err != nil { + return fmt.Errorf("[syncInfoHandler] Failed to verify TC, err %s", err) + } + + if err := x.processTC(chain, syncInfo.HighestTimeoutCert); err != nil { + return fmt.Errorf("[syncInfoHandler] Failed to process TC, err %s", err) + } + } + + return nil +} + +func (x *XDPoS_v2) processSyncInfoPool(chain consensus.ChainReader) { + syncInfo := x.syncInfoPool.PoolObjKeysList() + for _, key := range syncInfo { + // Key format: qcRound:qcGapNum:qcBlockNum:timeoutRound:timeoutGapNum:qcBlockHash + // Get QC Round and needs to lower or equal to x.currentRound + qcRound, qcErr := strconv.ParseInt(strings.Split(key, ":")[0], 10, 64) + if qcErr != nil { + log.Warn("[processSyncInfoPool] Failed to parse QC round", "key", key, "error", qcErr) + continue + } + if int64(x.currentRound) < qcRound { + log.Info("[processSyncInfoPool] Sync QC round is higher than current round, need to sync from other nodes", "qcRound", qcRound, "currentRound", x.currentRound) + continue + } + + // Optimize TODO: Check TC Round + log.Debug("[processSyncInfoPool] Processing syncInfo message from pool", "key", key) + for _, obj := range x.syncInfoPool.Get()[key] { + if syncInfoObj, ok := obj.(*types.SyncInfo); ok { + if err := x.syncInfoHandler(chain, syncInfoObj); err != nil { + log.Error("[processSyncInfoPool] Failed to handle sync info", "error", err, "currenBlock", chain.CurrentHeader().Number.Uint64(), "x.currentRound", x.currentRound, "key", key) + // must be something wrong with this message, so continue process next object in the pool for same round + continue + } + } else { + log.Error("[processSyncInfoPool] Object in sync info pool is not of type SyncInfo", "objectType", fmt.Sprintf("%T", obj), "key", key) + continue + } + break // We only need to process the first object in the pool ideally + } + } +} + +func (x *XDPoS_v2) verifySignatures(messageHash common.Hash, signatures []types.Signature, candidates []common.Address) error { + + var wg sync.WaitGroup + wg.Add(len(signatures)) + var haveError error + + for _, signature := range signatures { + go func(sig types.Signature) { + defer wg.Done() + verified, _, err := x.verifyMsgSignature(messageHash, sig, candidates) + if err != nil { + log.Error("[verifySignatures] Error while verfying QC message signatures", "error", err) + haveError = errors.New("error while verfying QC message signatures") + return + } + if !verified { + log.Error("[verifySignatures] Signature not verified doing signature verification") + haveError = errors.New("fail to verify QC due to signature mismatch") + return + } + }(signature) + } + wg.Wait() + if haveError != nil { + return haveError + } + return nil +} + +func (x *XDPoS_v2) hygieneSyncInfoPool() { + x.lock.RLock() + round := x.currentRound + x.lock.RUnlock() + syncInfoPoolKeys := x.syncInfoPool.PoolObjKeysList() + + // Extract round number + for _, k := range syncInfoPoolKeys { + // Key format: qcRound:qcGapNum:qcBlockNum:timeoutRound:timeoutGapNum:qcBlockHash + qcRound, qcErr := strconv.ParseInt(strings.Split(k, ":")[0], 10, 64) + tcRound, tcErr := strconv.ParseInt(strings.Split(k, ":")[3], 10, 64) + if qcErr != nil || tcErr != nil { + log.Error("[hygieneSyncInfoPool] Error while trying to get keyedRound inside pool", "Error", qcErr, "tcError", tcErr, "Key", k) + continue + } + lowerBoundRound := int64(round) - utils.PoolHygieneRound + // Clean up any sync info round that is 10 rounds older + if qcRound < lowerBoundRound && (tcRound == 0 || tcRound < lowerBoundRound) { + log.Debug("[hygieneSyncInfoPool] Cleaned sync info pool at round", "Round", qcRound, "currentRound", round, "Key", k) + x.syncInfoPool.ClearByPoolKey(k) + } + } +} + +func (x *XDPoS_v2) ReceivedSyncInfo() map[string]map[common.Hash]utils.PoolObj { + return x.syncInfoPool.Get() +} diff --git a/consensus/XDPoS/engines/engine_v2/timeout.go b/consensus/XDPoS/engines/engine_v2/timeout.go index ff689f45ef..51bc211777 100644 --- a/consensus/XDPoS/engines/engine_v2/timeout.go +++ b/consensus/XDPoS/engines/engine_v2/timeout.go @@ -15,6 +15,44 @@ import ( "github.com/XinFinOrg/XDPoSChain/log" ) +func (x *XDPoS_v2) VerifyTimeoutMessage(chain consensus.ChainReader, timeoutMsg *types.Timeout) (bool, error) { + if timeoutMsg.Round < x.currentRound { + log.Debug("[VerifyTimeoutMessage] Disqualified timeout message as the proposed round does not match currentRound", "timeoutHash", timeoutMsg.Hash(), "timeoutRound", timeoutMsg.Round, "currentRound", x.currentRound) + return false, nil + } + snap, err := x.getSnapshot(chain, timeoutMsg.GapNumber, true) + if err != nil || snap == nil { + log.Error("[VerifyTimeoutMessage] Fail to get snapshot when verifying timeout message!", "messageGapNumber", timeoutMsg.GapNumber, "err", err) + return false, err + } + if len(snap.NextEpochCandidates) == 0 { + log.Error("[VerifyTimeoutMessage] cannot find NextEpochCandidates from snapshot", "messageGapNumber", timeoutMsg.GapNumber) + return false, errors.New("empty master node lists from snapshot") + } + + verified, signer, err := x.verifyMsgSignature(types.TimeoutSigHash(&types.TimeoutForSign{ + Round: timeoutMsg.Round, + GapNumber: timeoutMsg.GapNumber, + }), timeoutMsg.Signature, snap.NextEpochCandidates) + + if err != nil { + log.Warn("[VerifyTimeoutMessage] cannot verify timeout signature", "err", err) + return false, err + } + + timeoutMsg.SetSigner(signer) + return verified, nil +} + +/* +Entry point for handling timeout message to process below: +*/ +func (x *XDPoS_v2) TimeoutHandler(blockChainReader consensus.ChainReader, timeout *types.Timeout) error { + x.lock.Lock() + defer x.lock.Unlock() + return x.timeoutHandler(blockChainReader, timeout) +} + func (x *XDPoS_v2) timeoutHandler(blockChainReader consensus.ChainReader, timeout *types.Timeout) error { // checkRoundNumber if timeout.Round != x.currentRound { @@ -68,14 +106,11 @@ func (x *XDPoS_v2) onTimeoutPoolThresholdReached(blockChainReader consensus.Chai // Process TC err := x.processTC(blockChainReader, timeoutCert) if err != nil { - log.Error("Error while processing TC in the Timeout handler after reaching pool threshold", "TcRound", timeoutCert.Round, "NumberOfTcSig", len(timeoutCert.Signatures), "GapNumber", gapNumber, "Error", err) + log.Error("[onTimeoutPoolThresholdReached] Error while processing TC in the Timeout handler after reaching pool threshold", "TcRound", timeoutCert.Round, "NumberOfTcSig", len(timeoutCert.Signatures), "GapNumber", gapNumber, "Error", err) return err } - // Generate and broadcast syncInfo - syncInfo := x.getSyncInfo() - x.broadcastToBftChannel(syncInfo) - log.Info("Successfully processed the timeout message and produced TC & SyncInfo!", "QcRound", syncInfo.HighestQuorumCert.ProposedBlockInfo.Round, "QcBlockNum", syncInfo.HighestQuorumCert.ProposedBlockInfo.Number, "TcRound", timeoutCert.Round, "NumberOfTcSig", len(timeoutCert.Signatures)) + log.Info("[onTimeoutPoolThresholdReached] Successfully processed the timeout message and produced TC!", "TcRound", timeoutCert.Round, "NumberOfTcSig", len(timeoutCert.Signatures)) return nil } @@ -117,14 +152,6 @@ func (x *XDPoS_v2) getTCEpochInfo(chain consensus.ChainReader, timeoutCert *type } func (x *XDPoS_v2) verifyTC(chain consensus.ChainReader, timeoutCert *types.TimeoutCert) error { - /* - 1. Get epoch master node list by gapNumber - 2. Check number of signatures > threshold, as well as it's format. (Same as verifyQC) - 2. Verify signer signature: (List of signatures) - - Use ecRecover to get the public key - - Use the above public key to find out the xdc address - - Use the above xdc address to check against the master node list from step 1(For the received TC epoch) - */ if timeoutCert == nil || timeoutCert.Signatures == nil { log.Warn("[verifyTC] TC or TC signatures is Nil") return utils.ErrInvalidTC @@ -143,7 +170,7 @@ func (x *XDPoS_v2) verifyTC(chain consensus.ChainReader, timeoutCert *types.Time signatures, duplicates := UniqueSignatures(timeoutCert.Signatures) if len(duplicates) != 0 { for _, d := range duplicates { - log.Warn("[verifyQC] duplicated signature in QC", "duplicate", common.Bytes2Hex(d)) + log.Warn("[verifyTC] duplicated signature in QC", "duplicate", common.Bytes2Hex(d)) } } @@ -201,12 +228,11 @@ func (x *XDPoS_v2) verifyTC(chain consensus.ChainReader, timeoutCert *types.Time 2. Check TC round >= node's currentRound. If yes, call setNewRound */ func (x *XDPoS_v2) processTC(blockChainReader consensus.ChainReader, timeoutCert *types.TimeoutCert) error { - if timeoutCert.Round > x.highestTimeoutCert.Round { + if x.highestTimeoutCert.Round < timeoutCert.Round { x.highestTimeoutCert = timeoutCert } if timeoutCert.Round >= x.currentRound { x.setNewRound(blockChainReader, timeoutCert.Round+1) - } return nil } @@ -288,6 +314,7 @@ func (x *XDPoS_v2) OnCountdownTimeout(time time.Time, chain interface{}) error { if !allow { return nil } + x.processSyncInfoPool(chain.(consensus.ChainReader)) err := x.sendTimeout(chain.(consensus.ChainReader)) if err != nil { diff --git a/consensus/XDPoS/engines/engine_v2/vote.go b/consensus/XDPoS/engines/engine_v2/vote.go index 675b0ca2c9..2715d861c5 100644 --- a/consensus/XDPoS/engines/engine_v2/vote.go +++ b/consensus/XDPoS/engines/engine_v2/vote.go @@ -16,6 +16,40 @@ import ( "github.com/XinFinOrg/XDPoSChain/log" ) +func (x *XDPoS_v2) VerifyVoteMessage(chain consensus.ChainReader, vote *types.Vote) (bool, error) { + if vote.ProposedBlockInfo.Round < x.currentRound { + log.Debug("[VerifyVoteMessage] Disqualified vote message as the proposed round does not match currentRound", "voteHash", vote.Hash(), "voteProposedBlockInfoRound", vote.ProposedBlockInfo.Round, "currentRound", x.currentRound) + return false, nil + } + + snapshot, err := x.getSnapshot(chain, vote.GapNumber, true) + if err != nil { + log.Error("[VerifyVoteMessage] fail to get snapshot for a vote message", "blockNum", vote.ProposedBlockInfo.Number, "blockHash", vote.ProposedBlockInfo.Hash, "voteHash", vote.Hash(), "error", err.Error()) + return false, err + } + verified, signer, err := x.verifyMsgSignature(types.VoteSigHash(&types.VoteForSign{ + ProposedBlockInfo: vote.ProposedBlockInfo, + GapNumber: vote.GapNumber, + }), vote.Signature, snapshot.NextEpochCandidates) + if err != nil { + for i, mn := range snapshot.NextEpochCandidates { + log.Warn("[VerifyVoteMessage] Master node list item", "index", i, "Master node", mn.Hex()) + } + log.Warn("[VerifyVoteMessage] Error while verifying vote message", "votedBlockNum", vote.ProposedBlockInfo.Number.Uint64(), "votedBlockHash", vote.ProposedBlockInfo.Hash.Hex(), "voteHash", vote.Hash(), "error", err.Error()) + return false, err + } + vote.SetSigner(signer) + + return verified, nil +} + +// Consensus entry point for processing vote message to produce QC +func (x *XDPoS_v2) VoteHandler(chain consensus.ChainReader, voteMsg *types.Vote) error { + x.lock.Lock() + defer x.lock.Unlock() + return x.voteHandler(chain, voteMsg) +} + // Once Hot stuff voting rule has verified, this node can then send vote func (x *XDPoS_v2) sendVote(chainReader consensus.ChainReader, blockInfo *types.BlockInfo) error { // First step: Update the highest Voted round diff --git a/consensus/XDPoS/utils/pool.go b/consensus/XDPoS/utils/pool.go index 3c35d37d8c..9c09a8ba53 100644 --- a/consensus/XDPoS/utils/pool.go +++ b/consensus/XDPoS/utils/pool.go @@ -25,7 +25,6 @@ func (p *Pool) Get() map[string]map[common.Hash]PoolObj { return p.objList } -// return true if it has reached threshold func (p *Pool) Add(obj PoolObj) (int, map[common.Hash]PoolObj) { p.lock.Lock() defer p.lock.Unlock() diff --git a/consensus/tests/engine_v2_tests/sync_info_test.go b/consensus/tests/engine_v2_tests/sync_info_test.go index ebe2e4db68..a04d1d8b42 100644 --- a/consensus/tests/engine_v2_tests/sync_info_test.go +++ b/consensus/tests/engine_v2_tests/sync_info_test.go @@ -24,11 +24,26 @@ func TestSyncInfoShouldSuccessfullyUpdateByQC(t *testing.T) { t.Fatal("Fail to decode extra data", err) } + timeoutForSign := &types.TimeoutForSign{ + Round: types.Round(2), + GapNumber: 450, + } + + // Sign from acc 1, 2, 3 and voter + acc1SignedHash := SignHashByPK(acc1Key, types.TimeoutSigHash(timeoutForSign).Bytes()) + acc2SignedHash := SignHashByPK(acc2Key, types.TimeoutSigHash(timeoutForSign).Bytes()) + acc3SignedHash := SignHashByPK(acc3Key, types.TimeoutSigHash(timeoutForSign).Bytes()) + voterSignedHash := SignHashByPK(voterKey, types.TimeoutSigHash(timeoutForSign).Bytes()) + + var signatures []types.Signature + signatures = append(signatures, acc1SignedHash, acc2SignedHash, acc3SignedHash, voterSignedHash) + syncInfoMsg := &types.SyncInfo{ HighestQuorumCert: extraField.QuorumCert, HighestTimeoutCert: &types.TimeoutCert{ Round: types.Round(2), - Signatures: []types.Signature{}, + Signatures: signatures, + GapNumber: 450, }, } @@ -55,9 +70,24 @@ func TestSyncInfoShouldSuccessfullyUpdateByTC(t *testing.T) { t.Fatal("Fail to decode extra data", err) } + timeoutForSign := &types.TimeoutForSign{ + Round: types.Round(6), + GapNumber: 450, + } + + // Sign from acc 1, 2, 3 and voter + acc1SignedHash := SignHashByPK(acc1Key, types.TimeoutSigHash(timeoutForSign).Bytes()) + acc2SignedHash := SignHashByPK(acc2Key, types.TimeoutSigHash(timeoutForSign).Bytes()) + acc3SignedHash := SignHashByPK(acc3Key, types.TimeoutSigHash(timeoutForSign).Bytes()) + voterSignedHash := SignHashByPK(voterKey, types.TimeoutSigHash(timeoutForSign).Bytes()) + + var signatures []types.Signature + signatures = append(signatures, acc1SignedHash, acc2SignedHash, acc3SignedHash, voterSignedHash) + highestTC := &types.TimeoutCert{ Round: types.Round(6), - Signatures: []types.Signature{}, + Signatures: signatures, + GapNumber: 450, } syncInfoMsg := &types.SyncInfo{ @@ -115,11 +145,6 @@ func TestVerifySyncInfoIfTCRoundIsAtNextEpoch(t *testing.T) { t.Fatal("Fail to decode extra data", err) } - highestTC := &types.TimeoutCert{ - Round: types.Round(899), - Signatures: []types.Signature{}, - } - timeoutForSign := &types.TimeoutForSign{ Round: types.Round(900), GapNumber: 450, @@ -145,8 +170,6 @@ func TestVerifySyncInfoIfTCRoundIsAtNextEpoch(t *testing.T) { HighestTimeoutCert: syncInfoTC, } - engineV2.SetPropertiesFaker(syncInfoMsg.HighestQuorumCert, highestTC) - verified, err := engineV2.VerifySyncInfoMessage(blockchain, syncInfoMsg) assert.True(t, verified) assert.Nil(t, err) @@ -266,12 +289,7 @@ func TestVerifySyncInfoIfTcUseDifferentEpoch(t *testing.T) { HighestTimeoutCert: newTC, } - x.SetPropertiesFaker(syncInfoMsg.HighestQuorumCert, &types.TimeoutCert{ - Round: types.Round(898), - Signatures: []types.Signature{}, - }) - verified, err := x.VerifySyncInfoMessage(blockchain, syncInfoMsg) - assert.True(t, verified) assert.Nil(t, err) + assert.True(t, verified) } diff --git a/consensus/tests/engine_v2_tests/timeout_test.go b/consensus/tests/engine_v2_tests/timeout_test.go index 0b01f54342..8ce3edbc91 100644 --- a/consensus/tests/engine_v2_tests/timeout_test.go +++ b/consensus/tests/engine_v2_tests/timeout_test.go @@ -139,7 +139,7 @@ func TestTimeoutPeriodAndThreadholdConfigChange(t *testing.T) { } // Timeout handler -func TestTimeoutMessageHandlerSuccessfullyGenerateTCandSyncInfo(t *testing.T) { +func TestTimeoutMessageHandlerSuccessfullyGenerateTCandSyncInfoAfterReachingThreshold(t *testing.T) { blockchain, _, _, _, _, _ := PrepareXDCTestBlockChainForV2Engine(t, 905, params.TestXDPoSMockChainConfig, nil) engineV2 := blockchain.Engine().(*XDPoS.XDPoS).EngineV2 @@ -194,17 +194,30 @@ func TestTimeoutMessageHandlerSuccessfullyGenerateTCandSyncInfo(t *testing.T) { err = engineV2.TimeoutHandler(blockchain, timeoutMsg) assert.Nil(t, err) - syncInfoMsg := <-engineV2.BroadcastCh + var syncInfoMsg *types.SyncInfo + + for { + msg := <-engineV2.BroadcastCh + + // Try to type assert + if s, ok := msg.(*types.SyncInfo); ok { + syncInfoMsg = s + break + } + + // Optionally: log or handle other types + t.Logf("Received unexpected message type: %T", msg) + } currentRound, _, _, _, _, _ = engineV2.GetPropertiesFaker() assert.NotNil(t, syncInfoMsg) // Shouldn't have QC, however, we did not inilise it, hence will show default empty value - qc := syncInfoMsg.(*types.SyncInfo).HighestQuorumCert + qc := syncInfoMsg.HighestQuorumCert assert.Equal(t, types.Round(0), qc.ProposedBlockInfo.Round) - tc := syncInfoMsg.(*types.SyncInfo).HighestTimeoutCert + tc := syncInfoMsg.HighestTimeoutCert assert.NotNil(t, tc) assert.Equal(t, tc.Round, types.Round(5)) assert.Equal(t, uint64(450), tc.GapNumber) diff --git a/consensus/tests/engine_v2_tests/vote_test.go b/consensus/tests/engine_v2_tests/vote_test.go index afaf5e818d..7febbd8cc7 100644 --- a/consensus/tests/engine_v2_tests/vote_test.go +++ b/consensus/tests/engine_v2_tests/vote_test.go @@ -347,23 +347,6 @@ func TestProcessVoteMsgThenTimeoutMsg(t *testing.T) { err = engineV2.TimeoutHandler(blockchain, timeoutMsg) assert.Nil(t, err) - - syncInfoMsg := <-engineV2.BroadcastCh - assert.NotNil(t, syncInfoMsg) - - // Should have HighestQuorumCert from previous round votes - qc := syncInfoMsg.(*types.SyncInfo).HighestQuorumCert - assert.NotNil(t, qc) - assert.Equal(t, types.Round(5), qc.ProposedBlockInfo.Round) - - tc := syncInfoMsg.(*types.SyncInfo).HighestTimeoutCert - assert.NotNil(t, tc) - assert.Equal(t, types.Round(6), tc.Round) - sigatures := []types.Signature{[]byte{1}, []byte{2}, []byte{3}, []byte{4}} - assert.ElementsMatch(t, tc.Signatures, sigatures) - // Round shall be +1 now - currentRound, _, _, _, _, _ = engineV2.GetPropertiesFaker() - assert.Equal(t, types.Round(7), currentRound) } func TestVoteMessageShallNotThrowErrorIfBlockNotYetExist(t *testing.T) { diff --git a/core/types/consensus_v2.go b/core/types/consensus_v2.go index f69892532d..2f573ca354 100644 --- a/core/types/consensus_v2.go +++ b/core/types/consensus_v2.go @@ -79,6 +79,27 @@ func (s *SyncInfo) Hash() common.Hash { return rlpHash(s) } +// Key format: qcRound:qcGapNum:qcBlockNum:timeoutRound:timeoutGapNum:qcBlockHash +func (s *SyncInfo) PoolKey() string { + qcRound := s.HighestQuorumCert.ProposedBlockInfo.Round + qcGapNum := s.HighestQuorumCert.GapNumber + qcBlockNum := s.HighestQuorumCert.ProposedBlockInfo.Number + qcBlockHash := s.HighestQuorumCert.ProposedBlockInfo.Hash + timeoutRound := Round(0) + timeoutGapNum := uint64(0) + if s.HighestTimeoutCert != nil { + timeoutRound = s.HighestTimeoutCert.Round + timeoutGapNum = s.HighestTimeoutCert.GapNumber + } + + return fmt.Sprint(qcRound, ":", qcGapNum, ":", qcBlockNum, ":", timeoutRound, ":", timeoutGapNum, ":", qcBlockHash.Hex()) +} + +func (s *SyncInfo) GetSigner() common.Address { + // SyncInfo does not have a signer, so we return an empty address + return common.Address{} +} + // Quorum Certificate struct in XDPoS 2.0 type QuorumCert struct { ProposedBlockInfo *BlockInfo `json:"proposedBlockInfo"` diff --git a/eth/bft/bft_handler.go b/eth/bft/bft_handler.go index 75663004c9..ae3d190cac 100644 --- a/eth/bft/bft_handler.go +++ b/eth/bft/bft_handler.go @@ -78,18 +78,18 @@ func (b *Bfter) SetConsensusFuns(engine consensus.Engine) { } func (b *Bfter) Vote(peer string, vote *types.Vote) error { - log.Trace("Receive Vote", "hash", vote.Hash().Hex(), "voted block hash", vote.ProposedBlockInfo.Hash.Hex(), "number", vote.ProposedBlockInfo.Number, "round", vote.ProposedBlockInfo.Round) + log.Trace("[Vote] Receive Vote", "hash", vote.Hash().Hex(), "voted block hash", vote.ProposedBlockInfo.Hash.Hex(), "number", vote.ProposedBlockInfo.Number, "round", vote.ProposedBlockInfo.Round) voteBlockNum := vote.ProposedBlockInfo.Number.Int64() if dist := voteBlockNum - int64(b.chainHeight()); dist < -maxBlockDist || dist > maxBlockDist { - log.Debug("Discarded propagated vote, too far away", "peer", peer, "number", voteBlockNum, "hash", vote.ProposedBlockInfo.Hash, "distance", dist) + log.Debug("[Vote] Discarded propagated vote, too far away", "peer", peer, "number", voteBlockNum, "hash", vote.ProposedBlockInfo.Hash, "distance", dist) return nil } verified, err := b.consensus.verifyVote(b.blockChainReader, vote) if err != nil { - log.Error("Verify BFT Vote", "error", err) + log.Error("[Vote] Verify BFT Vote", "error", err) return err } @@ -98,14 +98,14 @@ func (b *Bfter) Vote(peer string, vote *types.Vote) error { err = b.consensus.voteHandler(b.blockChainReader, vote) if err != nil { if _, ok := err.(*utils.ErrIncomingMessageRoundTooFarFromCurrentRound); ok { - log.Debug("vote round not equal", "error", err, "vote", vote.Hash()) + log.Debug("[Vote] vote round not equal", "error", err, "vote", vote.Hash()) return err } if _, ok := err.(*utils.ErrIncomingMessageBlockNotFound); ok { - log.Debug("vote proposed block not found", "error", err, "vote", vote.Hash()) + log.Debug("[Vote] vote proposed block not found", "error", err, "vote", vote.Hash()) return err } - log.Error("handle BFT Vote", "error", err) + log.Error("[Vote] handle BFT Vote", "error", err) return err } } @@ -117,26 +117,26 @@ func (b *Bfter) Timeout(peer string, timeout *types.Timeout) error { // dist times 3, ex: timeout message's gap number is based on block and find out it's epoch switch number, then mod 900 then minus 450 if dist := int64(gapNum) - int64(b.chainHeight()); dist < -int64(b.epoch)*3 || dist > int64(b.epoch)*3 { - log.Debug("Discarded propagated timeout, too far away", "peer", peer, "gapNumber", gapNum, "hash", timeout.Hash, "distance", dist) + log.Debug("[Timeout] Discarded propagated timeout, too far away", "peer", peer, "gapNumber", gapNum, "hash", timeout.Hash, "distance", dist) return nil } verified, err := b.consensus.verifyTimeout(b.blockChainReader, timeout) if err != nil { - log.Error("Verify BFT Timeout", "timeoutRound", timeout.Round, "timeoutGapNum", gapNum, "error", err) + log.Error("[Timeout] Verify BFT Timeout", "timeoutRound", timeout.Round, "timeoutGapNum", gapNum, "error", err) return err } - log.Debug("Receive Timeout", "gap", gapNum, "hash", timeout.Hash().Hex(), "round", timeout.Round, "signer", timeout.GetSigner().Hex()) //get signer after verifyTimeout + log.Debug("[Timeout] Receive Timeout", "gap", gapNum, "hash", timeout.Hash().Hex(), "round", timeout.Round, "signer", timeout.GetSigner().Hex()) //get signer after verifyTimeout if verified { b.broadcastCh <- timeout err = b.consensus.timeoutHandler(b.blockChainReader, timeout) if err != nil { if _, ok := err.(*utils.ErrIncomingMessageRoundNotEqualCurrentRound); ok { - log.Debug("timeout round not equal", "error", err) + log.Debug("[Timeout] timeout round not equal", "error", err) return err } - log.Error("handle BFT Timeout", "error", err) + log.Error("[Timeout] handle BFT Timeout", "error", err) return err } } @@ -144,17 +144,22 @@ func (b *Bfter) Timeout(peer string, timeout *types.Timeout) error { return nil } func (b *Bfter) SyncInfo(peer string, syncInfo *types.SyncInfo) error { - log.Debug("Receive SyncInfo", "syncInfo", syncInfo) + log.Debug("[SyncInfo] Receive SyncInfo") + + if syncInfo == nil || syncInfo.HighestQuorumCert == nil { + log.Warn("[SyncInfo] Received nil SyncInfo or missing QC", "syncInfo", syncInfo) + return nil + } qcBlockNum := syncInfo.HighestQuorumCert.ProposedBlockInfo.Number.Int64() if dist := qcBlockNum - int64(b.chainHeight()); dist < -maxBlockDist || dist > maxBlockDist { - log.Debug("Discarded propagated syncInfo, too far away", "peer", peer, "blockNum", qcBlockNum, "hash", syncInfo.Hash, "distance", dist) + log.Debug("[SyncInfo] Discarded propagated syncInfo, too far away", "peer", peer, "blockNum", syncInfo.HighestQuorumCert.ProposedBlockInfo.Number, "hash", syncInfo.Hash, "qcRound", syncInfo.HighestQuorumCert.ProposedBlockInfo.Round, "distance", dist) return nil } verified, err := b.consensus.verifySyncInfo(b.blockChainReader, syncInfo) if err != nil { - log.Error("Verify BFT SyncInfo", "error", err) + log.Error("[SyncInfo] Verify BFT SyncInfo", "error", err) return err } @@ -163,7 +168,7 @@ func (b *Bfter) SyncInfo(peer string, syncInfo *types.SyncInfo) error { b.broadcastCh <- syncInfo err = b.consensus.syncInfoHandler(b.blockChainReader, syncInfo) if err != nil { - log.Error("handle BFT SyncInfo", "error", err) + log.Error("[SyncInfo] handle BFT SyncInfo", "error", err) return err } } diff --git a/params/config.go b/params/config.go index 468c1ba36a..a43db22748 100644 --- a/params/config.go +++ b/params/config.go @@ -85,6 +85,15 @@ var ( MinePeriod: 2, ExpTimeoutConfig: ExpTimeoutConfig{Base: 1.0, MaxExponent: 0}, }, + 3200000: { + MaxMasternodes: 108, + SwitchRound: 3200000, + CertThreshold: 0.667, + TimeoutSyncThreshold: 3, + TimeoutPeriod: 10, + MinePeriod: 2, + ExpTimeoutConfig: ExpTimeoutConfig{Base: 1.0, MaxExponent: 0}, + }, } TestnetV2Configs = map[uint64]*V2Config{ @@ -106,6 +115,15 @@ var ( MinePeriod: 2, ExpTimeoutConfig: ExpTimeoutConfig{Base: 1.0, MaxExponent: 0}, }, + 15000000: { + MaxMasternodes: 108, + SwitchRound: 15000000, + CertThreshold: 0.667, + TimeoutSyncThreshold: 3, + TimeoutPeriod: 10, + MinePeriod: 2, + ExpTimeoutConfig: ExpTimeoutConfig{Base: 1.0, MaxExponent: 0}, + }, } DevnetV2Configs = map[uint64]*V2Config{