mirror of
https://github.com/ethereum/go-ethereum.git
synced 2026-06-19 21:31:37 +00:00
* move masternode in v2 config * update number to meet 7 vote for current setup * add test * update all failed test * fix test * remove comment * remove comment * fix test
277 lines
11 KiB
Go
277 lines
11 KiB
Go
package engine_v2
|
|
|
|
import (
|
|
"fmt"
|
|
"strconv"
|
|
"strings"
|
|
"sync"
|
|
"time"
|
|
|
|
"github.com/XinFinOrg/XDPoSChain/common"
|
|
"github.com/XinFinOrg/XDPoSChain/consensus"
|
|
"github.com/XinFinOrg/XDPoSChain/consensus/XDPoS/utils"
|
|
"github.com/XinFinOrg/XDPoSChain/core/types"
|
|
"github.com/XinFinOrg/XDPoSChain/log"
|
|
)
|
|
|
|
func (x *XDPoS_v2) timeoutHandler(blockChainReader consensus.ChainReader, timeout *types.Timeout) error {
|
|
// checkRoundNumber
|
|
if timeout.Round != x.currentRound {
|
|
return &utils.ErrIncomingMessageRoundNotEqualCurrentRound{
|
|
Type: "timeout",
|
|
IncomingRound: timeout.Round,
|
|
CurrentRound: x.currentRound,
|
|
}
|
|
}
|
|
// Collect timeout, generate TC
|
|
numberOfTimeoutsInPool, pooledTimeouts := x.timeoutPool.Add(timeout)
|
|
log.Debug("[timeoutHandler] collect timeout", "number", numberOfTimeoutsInPool)
|
|
|
|
epochInfo, err := x.getEpochSwitchInfo(blockChainReader, blockChainReader.CurrentHeader(), blockChainReader.CurrentHeader().Hash())
|
|
if err != nil {
|
|
log.Error("[timeoutHandler] Error when getting epoch switch Info", "error", err)
|
|
return fmt.Errorf("fail on timeoutHandler due to failure in getting epoch switch info, %s", err)
|
|
}
|
|
|
|
// Threshold reached
|
|
certThreshold := x.config.V2.Config(uint64(timeout.Round)).CertThreshold
|
|
isThresholdReached := float64(numberOfTimeoutsInPool) >= float64(epochInfo.MasternodesLen)*certThreshold
|
|
if isThresholdReached {
|
|
log.Info(fmt.Sprintf("Timeout pool threashold reached: %v, number of items in the pool: %v", isThresholdReached, numberOfTimeoutsInPool))
|
|
err := x.onTimeoutPoolThresholdReached(blockChainReader, pooledTimeouts, timeout, timeout.GapNumber)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
}
|
|
return nil
|
|
}
|
|
|
|
/*
|
|
Function that will be called by timeoutPool when it reached threshold.
|
|
In the engine v2, we will need to:
|
|
1. Genrate TC
|
|
2. processTC()
|
|
3. generateSyncInfo()
|
|
*/
|
|
func (x *XDPoS_v2) onTimeoutPoolThresholdReached(blockChainReader consensus.ChainReader, pooledTimeouts map[common.Hash]utils.PoolObj, currentTimeoutMsg utils.PoolObj, gapNumber uint64) error {
|
|
signatures := []types.Signature{}
|
|
for _, v := range pooledTimeouts {
|
|
signatures = append(signatures, v.(*types.Timeout).Signature)
|
|
}
|
|
// Genrate TC
|
|
timeoutCert := &types.TimeoutCert{
|
|
Round: currentTimeoutMsg.(*types.Timeout).Round,
|
|
Signatures: signatures,
|
|
GapNumber: gapNumber,
|
|
}
|
|
// Process TC
|
|
err := x.processTC(blockChainReader, timeoutCert)
|
|
if err != nil {
|
|
log.Error("Error while processing TC in the Timeout handler after reaching pool threshold", "TcRound", timeoutCert.Round, "NumberOfTcSig", len(timeoutCert.Signatures), "GapNumber", gapNumber, "Error", err)
|
|
return err
|
|
}
|
|
// Generate and broadcast syncInfo
|
|
syncInfo := x.getSyncInfo()
|
|
x.broadcastToBftChannel(syncInfo)
|
|
|
|
log.Info("Successfully processed the timeout message and produced TC & SyncInfo!", "QcRound", syncInfo.HighestQuorumCert.ProposedBlockInfo.Round, "QcBlockNum", syncInfo.HighestQuorumCert.ProposedBlockInfo.Number, "TcRound", timeoutCert.Round, "NumberOfTcSig", len(timeoutCert.Signatures))
|
|
return nil
|
|
}
|
|
|
|
func (x *XDPoS_v2) verifyTC(chain consensus.ChainReader, timeoutCert *types.TimeoutCert) error {
|
|
/*
|
|
1. Get epoch master node list by gapNumber
|
|
2. Check number of signatures > threshold, as well as it's format. (Same as verifyQC)
|
|
2. Verify signer signature: (List of signatures)
|
|
- Use ecRecover to get the public key
|
|
- Use the above public key to find out the xdc address
|
|
- Use the above xdc address to check against the master node list from step 1(For the received TC epoch)
|
|
*/
|
|
if timeoutCert == nil || timeoutCert.Signatures == nil {
|
|
log.Warn("[verifyTC] TC or TC signatures is Nil")
|
|
return utils.ErrInvalidTC
|
|
}
|
|
|
|
snap, err := x.getSnapshot(chain, timeoutCert.GapNumber, true)
|
|
if err != nil {
|
|
log.Error("[verifyTC] Fail to get snapshot when verifying TC!", "TCGapNumber", timeoutCert.GapNumber)
|
|
return fmt.Errorf("[verifyTC] Unable to get snapshot, %s", err)
|
|
}
|
|
if snap == nil || len(snap.NextEpochMasterNodes) == 0 {
|
|
log.Error("[verifyTC] Something wrong with the snapshot from gapNumber", "messageGapNumber", timeoutCert.GapNumber, "snapshot", snap)
|
|
return fmt.Errorf("empty master node lists from snapshot")
|
|
}
|
|
|
|
signatures, duplicates := UniqueSignatures(timeoutCert.Signatures)
|
|
if len(duplicates) != 0 {
|
|
for _, d := range duplicates {
|
|
log.Warn("[verifyQC] duplicated signature in QC", "duplicate", common.Bytes2Hex(d))
|
|
}
|
|
}
|
|
|
|
epochInfo, err := x.getEpochSwitchInfo(chain, chain.CurrentHeader(), chain.CurrentHeader().Hash())
|
|
if err != nil {
|
|
log.Error("[verifyTC] Error when getting epoch switch Info", "error", err)
|
|
return fmt.Errorf("fail on verifyTC due to failure in getting epoch switch info, %s", err)
|
|
}
|
|
|
|
certThreshold := x.config.V2.Config(uint64(timeoutCert.Round)).CertThreshold
|
|
if float64(len(signatures)) < float64(epochInfo.MasternodesLen)*certThreshold {
|
|
log.Warn("[verifyTC] Invalid TC Signature is nil or empty", "timeoutCert.Round", timeoutCert.Round, "timeoutCert.GapNumber", timeoutCert.GapNumber, "Signatures len", len(timeoutCert.Signatures), "CertThreshold", float64(epochInfo.MasternodesLen)*certThreshold)
|
|
return utils.ErrInvalidTCSignatures
|
|
}
|
|
|
|
var wg sync.WaitGroup
|
|
wg.Add(len(signatures))
|
|
var haveError error
|
|
|
|
signedTimeoutObj := types.TimeoutSigHash(&types.TimeoutForSign{
|
|
Round: timeoutCert.Round,
|
|
GapNumber: timeoutCert.GapNumber,
|
|
})
|
|
|
|
for _, signature := range signatures {
|
|
go func(sig types.Signature) {
|
|
defer wg.Done()
|
|
verified, _, err := x.verifyMsgSignature(signedTimeoutObj, sig, snap.NextEpochMasterNodes)
|
|
if err != nil {
|
|
log.Error("[verifyTC] Error while verfying TC message signatures", "timeoutCert.Round", timeoutCert.Round, "timeoutCert.GapNumber", timeoutCert.GapNumber, "Signatures len", len(signatures), "Error", err)
|
|
haveError = fmt.Errorf("error while verfying TC message signatures, %s", err)
|
|
return
|
|
}
|
|
if !verified {
|
|
log.Warn("[verifyTC] Signature not verified doing TC verification", "timeoutCert.Round", timeoutCert.Round, "timeoutCert.GapNumber", timeoutCert.GapNumber, "Signatures len", len(signatures))
|
|
haveError = fmt.Errorf("fail to verify TC due to signature mis-match")
|
|
return
|
|
}
|
|
}(signature)
|
|
}
|
|
wg.Wait()
|
|
if haveError != nil {
|
|
return haveError
|
|
}
|
|
return nil
|
|
}
|
|
|
|
/*
|
|
1. Update highestTC
|
|
2. Check TC round >= node's currentRound. If yes, call setNewRound
|
|
*/
|
|
func (x *XDPoS_v2) processTC(blockChainReader consensus.ChainReader, timeoutCert *types.TimeoutCert) error {
|
|
if timeoutCert.Round > x.highestTimeoutCert.Round {
|
|
x.highestTimeoutCert = timeoutCert
|
|
}
|
|
if timeoutCert.Round >= x.currentRound {
|
|
x.setNewRound(blockChainReader, timeoutCert.Round+1)
|
|
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// Generate and send timeout into BFT channel.
|
|
/*
|
|
1. timeout.round = currentRound
|
|
2. Sign the signature
|
|
3. send to broadcast channel
|
|
*/
|
|
func (x *XDPoS_v2) sendTimeout(chain consensus.ChainReader) error {
|
|
// Construct the gapNumber
|
|
var gapNumber uint64
|
|
currentBlockHeader := chain.CurrentHeader()
|
|
isEpochSwitch, epochNum, err := x.isEpochSwitchAtRound(x.currentRound, currentBlockHeader)
|
|
if err != nil {
|
|
log.Error("[sendTimeout] Error while checking if the currentBlock is epoch switch", "currentRound", x.currentRound, "currentBlockNum", currentBlockHeader.Number, "currentBlockHash", currentBlockHeader.Hash(), "epochNum", epochNum)
|
|
return err
|
|
}
|
|
|
|
if isEpochSwitch {
|
|
// Notice this +1 is because we expect a block whos is the child of currentHeader
|
|
currentNumber := currentBlockHeader.Number.Uint64() + 1
|
|
gapNumber = currentNumber - currentNumber%x.config.Epoch - x.config.Gap
|
|
log.Debug("[sendTimeout] is epoch switch when sending out timeout message", "currentNumber", currentNumber, "gapNumber", gapNumber)
|
|
} else {
|
|
epochSwitchInfo, err := x.getEpochSwitchInfo(chain, currentBlockHeader, currentBlockHeader.Hash())
|
|
if err != nil {
|
|
log.Error("[sendTimeout] Error when trying to get current epoch switch info for a non-epoch block", "currentRound", x.currentRound, "currentBlockNum", currentBlockHeader.Number, "currentBlockHash", currentBlockHeader.Hash(), "epochNum", epochNum)
|
|
}
|
|
gapNumber = epochSwitchInfo.EpochSwitchBlockInfo.Number.Uint64() - epochSwitchInfo.EpochSwitchBlockInfo.Number.Uint64()%x.config.Epoch - x.config.Gap
|
|
log.Debug("[sendTimeout] non-epoch-switch block found its epoch block and calculated the gapNumber", "epochSwitchInfo.EpochSwitchBlockInfo.Number", epochSwitchInfo.EpochSwitchBlockInfo.Number.Uint64(), "gapNumber", gapNumber)
|
|
}
|
|
|
|
signedHash, err := x.signSignature(types.TimeoutSigHash(&types.TimeoutForSign{
|
|
Round: x.currentRound,
|
|
GapNumber: gapNumber,
|
|
}))
|
|
if err != nil {
|
|
log.Error("[sendTimeout] signSignature when sending out TC", "Error", err, "round", x.currentRound, "gap", gapNumber)
|
|
return err
|
|
}
|
|
timeoutMsg := &types.Timeout{
|
|
Round: x.currentRound,
|
|
Signature: signedHash,
|
|
GapNumber: gapNumber,
|
|
}
|
|
log.Warn("[sendTimeout] Timeout message generated, ready to send!", "timeoutMsgRound", timeoutMsg.Round, "timeoutMsgGapNumber", timeoutMsg.GapNumber, "whosTurn", x.whosTurn)
|
|
err = x.timeoutHandler(chain, timeoutMsg)
|
|
if err != nil {
|
|
log.Error("TimeoutHandler error", "TimeoutRound", timeoutMsg.Round, "Error", err)
|
|
return err
|
|
}
|
|
x.broadcastToBftChannel(timeoutMsg)
|
|
return nil
|
|
}
|
|
|
|
/*
|
|
Function that will be called by timer when countdown reaches its threshold.
|
|
In the engine v2, we would need to broadcast timeout messages to other peers
|
|
*/
|
|
func (x *XDPoS_v2) OnCountdownTimeout(time time.Time, chain interface{}) error {
|
|
x.lock.Lock()
|
|
defer x.lock.Unlock()
|
|
|
|
// Check if we are within the master node list
|
|
allow := x.allowedToSend(chain.(consensus.ChainReader), chain.(consensus.ChainReader).CurrentHeader(), "timeout")
|
|
if !allow {
|
|
return nil
|
|
}
|
|
|
|
err := x.sendTimeout(chain.(consensus.ChainReader))
|
|
if err != nil {
|
|
log.Error("Error while sending out timeout message at time: ", "time", time, "err", err)
|
|
return err
|
|
}
|
|
|
|
x.timeoutCount++
|
|
if x.timeoutCount%x.config.V2.CurrentConfig.TimeoutSyncThreshold == 0 {
|
|
log.Warn("[OnCountdownTimeout] timeout sync threadhold reached, send syncInfo message")
|
|
syncInfo := x.getSyncInfo()
|
|
x.broadcastToBftChannel(syncInfo)
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func (x *XDPoS_v2) hygieneTimeoutPool() {
|
|
x.lock.RLock()
|
|
currentRound := x.currentRound
|
|
x.lock.RUnlock()
|
|
timeoutPoolKeys := x.timeoutPool.PoolObjKeysList()
|
|
|
|
// Extract round number
|
|
for _, k := range timeoutPoolKeys {
|
|
keyedRound, err := strconv.ParseInt(strings.Split(k, ":")[0], 10, 64)
|
|
if err != nil {
|
|
log.Error("[hygieneTimeoutPool] Error while trying to get keyedRound inside pool", "Error", err)
|
|
continue
|
|
}
|
|
// Clean up any timeouts round that is 10 rounds older
|
|
if keyedRound < int64(currentRound)-utils.PoolHygieneRound {
|
|
log.Debug("[hygieneTimeoutPool] Cleaned timeout pool at round", "Round", keyedRound, "CurrentRound", currentRound, "Key", k)
|
|
x.timeoutPool.ClearByPoolKey(k)
|
|
}
|
|
}
|
|
}
|
|
|
|
func (x *XDPoS_v2) ReceivedTimeouts() map[string]map[common.Hash]utils.PoolObj {
|
|
return x.timeoutPool.Get()
|
|
}
|