xin-106 add generated message into its pool (#32)

* add debug log and change to contain or add for cache

* add generated message into its pool
This commit is contained in:
Liam 2021-12-15 19:25:40 +03:00 committed by Jianrong
parent e0d66d4ea8
commit 35eebabae0
6 changed files with 50 additions and 19 deletions

View file

@ -120,7 +120,7 @@ func (x *XDPoS_v2) Prepare(chain consensus.ChainReader, header *types.Header) er
currentRound := x.currentRound
highestQC := x.highestQuorumCert
x.lock.Unlock()
//parentRound := highestQC.ProposedBlockInfo.Round
if (highestQC == nil) || (header.ParentHash != highestQC.ProposedBlockInfo.Hash) {
return consensus.ErrNotReadyToPropose
}
@ -447,6 +447,16 @@ func (x *XDPoS_v2) VerifyHeader(chain consensus.ChainReader, header *types.Heade
return nil
}
// Utils for test to get current Pool size
func (x *XDPoS_v2) GetVotePoolSize(vote *utils.Vote) int {
return x.votePool.Size(vote)
}
// Utils for test to get Timeout Pool Size
func (x *XDPoS_v2) GetTimeoutPoolSize(timeout *utils.Timeout) int {
return x.timeoutPool.Size(timeout)
}
/*
SyncInfo workflow
*/
@ -504,6 +514,10 @@ func (x *XDPoS_v2) VerifyVoteMessage(vote *utils.Vote) (bool, error) {
func (x *XDPoS_v2) VoteHandler(chain consensus.ChainReader, voteMsg *utils.Vote) error {
x.lock.Lock()
defer x.lock.Unlock()
return x.voteHandler(chain, voteMsg)
}
func (x *XDPoS_v2) voteHandler(chain consensus.ChainReader, voteMsg *utils.Vote) error {
// 1. checkRoundNumber
if voteMsg.ProposedBlockInfo.Round != x.currentRound {
@ -516,7 +530,7 @@ func (x *XDPoS_v2) VoteHandler(chain consensus.ChainReader, voteMsg *utils.Vote)
log.Debug("Vote pool threashold reached: %v, number of items in the pool: %v", thresholdReached, numberOfVotesInPool)
err := x.onVotePoolThresholdReached(chain, pooledVotes, voteMsg)
if err != nil {
return nil
return err
}
}
@ -570,7 +584,10 @@ func (x *XDPoS_v2) VerifyTimeoutMessage(timeoutMsg *utils.Timeout) (bool, error)
func (x *XDPoS_v2) TimeoutHandler(timeout *utils.Timeout) error {
x.lock.Lock()
defer x.lock.Unlock()
return x.timeoutHandler(timeout)
}
func (x *XDPoS_v2) timeoutHandler(timeout *utils.Timeout) error {
// 1. checkRoundNumber
if timeout.Round != x.currentRound {
return &utils.ErrIncomingMessageRoundNotEqualCurrentRound{
@ -666,7 +683,7 @@ func (x *XDPoS_v2) ProposedBlockHandler(blockChainReader consensus.ChainReader,
return err
}
if verified {
return x.sendVote(blockInfo)
return x.sendVote(blockChainReader, blockInfo)
} else {
log.Info("Failed to pass the voting rule verification", "ProposeBlockHash", blockInfo.Hash)
}
@ -806,7 +823,7 @@ func (x *XDPoS_v2) verifyVotingRule(blockChainReader consensus.ChainReader, bloc
}
// Once Hot stuff voting rule has verified, this node can then send vote
func (x *XDPoS_v2) sendVote(blockInfo *utils.BlockInfo) error {
func (x *XDPoS_v2) sendVote(chainReader consensus.ChainReader, blockInfo *utils.BlockInfo) error {
// First step: Update the highest Voted round
// Second step: Generate the signature by using node's private key(The signature is the blockInfo signature)
// Third step: Construct the vote struct with the above signature & blockinfo struct
@ -822,6 +839,8 @@ func (x *XDPoS_v2) sendVote(blockInfo *utils.BlockInfo) error {
ProposedBlockInfo: blockInfo,
Signature: signedHash,
}
x.voteHandler(chainReader, voteMsg)
x.broadcastToBftChannel(voteMsg)
return nil
}
@ -841,6 +860,8 @@ func (x *XDPoS_v2) sendTimeout() error {
Round: x.currentRound,
Signature: signedHash,
}
x.timeoutHandler(timeoutMsg)
x.broadcastToBftChannel(timeoutMsg)
return nil
}

View file

@ -36,6 +36,14 @@ func (p *Pool) Add(obj PoolObj) (bool, int, map[common.Hash]PoolObj) {
}
return false, numOfItems, objListKeyed
}
func (p *Pool) Size(obj PoolObj) int {
poolKey := obj.PoolKey()
objListKeyed, ok := p.objList[poolKey]
if !ok {
return 0
}
return len(objListKeyed)
}
func (p *Pool) Clear() {
p.objList = make(map[string]map[common.Hash]PoolObj)

View file

@ -16,6 +16,8 @@ func TestCountdownTimeoutToSendTimeoutMessage(t *testing.T) {
engineV2.SetNewRoundFaker(utils.Round(1), true)
timeoutMsg := <-engineV2.BroadcastCh
poolSize := engineV2.GetTimeoutPoolSize(timeoutMsg.(*utils.Timeout))
assert.Equal(t, poolSize, 1)
assert.NotNil(t, timeoutMsg)
valid, err := engineV2.VerifyTimeoutMessage(timeoutMsg.(*utils.Timeout))

View file

@ -27,6 +27,9 @@ func TestProcessFirstV2BlockAndSendVoteMsg(t *testing.T) {
}
voteMsg := <-engineV2.BroadcastCh
poolSize := engineV2.GetVotePoolSize(voteMsg.(*utils.Vote))
assert.Equal(t, poolSize, 1)
assert.NotNil(t, voteMsg)
assert.Equal(t, currentBlock.Hash(), voteMsg.(*utils.Vote).ProposedBlockInfo.Hash)

View file

@ -26,9 +26,9 @@ type Bfter struct {
broadcast BroadcastFns
// Message Cache
knownVotes *lru.ARCCache
knownSyncInfos *lru.ARCCache
knownTimeouts *lru.ARCCache
knownVotes *lru.Cache
knownSyncInfos *lru.Cache
knownTimeouts *lru.Cache
}
type ConsensusFns struct {
@ -49,9 +49,9 @@ type BroadcastFns struct {
}
func New(broadcasts BroadcastFns, blockCahinReader *core.BlockChain) *Bfter {
knownVotes, _ := lru.NewARC(messageLimit)
knownSyncInfos, _ := lru.NewARC(messageLimit)
knownTimeouts, _ := lru.NewARC(messageLimit)
knownVotes, _ := lru.New(messageLimit)
knownSyncInfos, _ := lru.New(messageLimit)
knownTimeouts, _ := lru.New(messageLimit)
return &Bfter{
quit: make(chan struct{}),
broadcastCh: make(chan interface{}),
@ -79,9 +79,9 @@ func (b *Bfter) SetConsensusFuns(engine consensus.Engine) {
// TODO: rename
func (b *Bfter) Vote(vote *utils.Vote) error {
log.Info("Receive Vote", "voted block hash", vote.ProposedBlockInfo.Hash.Hex(), "number", vote.ProposedBlockInfo.Number, "round", vote.ProposedBlockInfo.Round)
if b.knownVotes.Contains(vote.Hash()) {
log.Info("Discarded vote, known vote", "voted block hash", vote.ProposedBlockInfo.Hash.Hex(), "number", vote.ProposedBlockInfo.Number, "round", vote.ProposedBlockInfo.Round)
log.Trace("Receive Vote", "vote hash", vote.Hash(), "voted block hash", vote.ProposedBlockInfo.Hash.Hex(), "number", vote.ProposedBlockInfo.Number, "round", vote.ProposedBlockInfo.Round, "signature", vote.Signature)
if exist, _ := b.knownVotes.ContainsOrAdd(vote.Hash(), true); exist {
log.Info("Discarded vote, known vote", "vote hash", vote.Hash(), "voted block hash", vote.ProposedBlockInfo.Hash.Hex(), "number", vote.ProposedBlockInfo.Number, "round", vote.ProposedBlockInfo.Round)
return nil
}
@ -90,7 +90,6 @@ func (b *Bfter) Vote(vote *utils.Vote) error {
log.Error("Verify BFT Vote", "error", err)
return err
}
b.knownVotes.Add(vote.Hash(), true)
b.broadcastCh <- vote
err = b.consensus.voteHandler(b.blockCahinReader, vote)
@ -102,7 +101,7 @@ func (b *Bfter) Vote(vote *utils.Vote) error {
}
func (b *Bfter) Timeout(timeout *utils.Timeout) error {
log.Trace("Receive Timeout", "timeout", timeout)
if b.knownVotes.Contains(timeout.Hash()) {
if exist, _ := b.knownTimeouts.ContainsOrAdd(timeout.Hash(), true); exist {
log.Trace("Discarded Timeout, known Timeout", "Signature", timeout.Signature, "hash", timeout.Hash(), "round", timeout.Round)
return nil
}
@ -111,7 +110,6 @@ func (b *Bfter) Timeout(timeout *utils.Timeout) error {
log.Error("Verify BFT Timeout", "error", err)
return err
}
b.knownTimeouts.Add(timeout.Hash(), true)
b.broadcastCh <- timeout
err = b.consensus.timeoutHandler(timeout)
@ -127,7 +125,7 @@ func (b *Bfter) Timeout(timeout *utils.Timeout) error {
}
func (b *Bfter) SyncInfo(syncInfo *utils.SyncInfo) error {
log.Trace("Receive SyncInfo", "syncInfo", syncInfo)
if b.knownVotes.Contains(syncInfo.Hash()) {
if exist, _ := b.knownSyncInfos.ContainsOrAdd(syncInfo.Hash(), true); exist {
log.Trace("Discarded SyncInfo, known SyncInfo", "hash", syncInfo.Hash())
return nil
}
@ -137,7 +135,6 @@ func (b *Bfter) SyncInfo(syncInfo *utils.SyncInfo) error {
return err
}
b.knownSyncInfos.Add(syncInfo.Hash(), true)
b.broadcastCh <- syncInfo
err = b.consensus.syncInfoHandler(b.blockCahinReader, syncInfo)

View file

@ -909,7 +909,7 @@ func (pm *ProtocolManager) BroadcastVote(vote *utils.Vote) {
for _, peer := range peers {
peer.SendVote(vote)
}
log.Info("Propagated Vote", "voted block hash", vote.ProposedBlockInfo.Hash.Hex(), "number", vote.ProposedBlockInfo.Number, "round", vote.ProposedBlockInfo.Round, "recipients", len(peers))
log.Info("Propagated Vote", "vote hash", vote.Hash(), "voted block hash", vote.ProposedBlockInfo.Hash.Hex(), "number", vote.ProposedBlockInfo.Number, "round", vote.ProposedBlockInfo.Round, "recipients", len(peers))
}
// BroadcastTimeout will propagate a Timeout to all peers which are not known to