From 35eebabae060dd181f7134d37d7420fc12ee9342 Mon Sep 17 00:00:00 2001 From: Liam Date: Wed, 15 Dec 2021 19:25:40 +0300 Subject: [PATCH] xin-106 add generated message into its pool (#32) * add debug log and change to contain or add for cache * add generated message into its pool --- consensus/XDPoS/engines/engine_v2/engine.go | 29 ++++++++++++++++++--- consensus/XDPoS/utils/pool.go | 8 ++++++ consensus/tests/countdown_test.go | 2 ++ consensus/tests/proposed_block_test.go | 3 +++ eth/bft/bft_handler.go | 25 ++++++++---------- eth/handler.go | 2 +- 6 files changed, 50 insertions(+), 19 deletions(-) diff --git a/consensus/XDPoS/engines/engine_v2/engine.go b/consensus/XDPoS/engines/engine_v2/engine.go index 983f8a5007..05abab71fa 100644 --- a/consensus/XDPoS/engines/engine_v2/engine.go +++ b/consensus/XDPoS/engines/engine_v2/engine.go @@ -120,7 +120,7 @@ func (x *XDPoS_v2) Prepare(chain consensus.ChainReader, header *types.Header) er currentRound := x.currentRound highestQC := x.highestQuorumCert x.lock.Unlock() - //parentRound := highestQC.ProposedBlockInfo.Round + if (highestQC == nil) || (header.ParentHash != highestQC.ProposedBlockInfo.Hash) { return consensus.ErrNotReadyToPropose } @@ -447,6 +447,16 @@ func (x *XDPoS_v2) VerifyHeader(chain consensus.ChainReader, header *types.Heade return nil } +// Utils for test to get current Pool size +func (x *XDPoS_v2) GetVotePoolSize(vote *utils.Vote) int { + return x.votePool.Size(vote) +} + +// Utils for test to get Timeout Pool Size +func (x *XDPoS_v2) GetTimeoutPoolSize(timeout *utils.Timeout) int { + return x.timeoutPool.Size(timeout) +} + /* SyncInfo workflow */ @@ -504,6 +514,10 @@ func (x *XDPoS_v2) VerifyVoteMessage(vote *utils.Vote) (bool, error) { func (x *XDPoS_v2) VoteHandler(chain consensus.ChainReader, voteMsg *utils.Vote) error { x.lock.Lock() defer x.lock.Unlock() + return x.voteHandler(chain, voteMsg) +} + +func (x *XDPoS_v2) voteHandler(chain consensus.ChainReader, voteMsg *utils.Vote) error { // 1. checkRoundNumber if voteMsg.ProposedBlockInfo.Round != x.currentRound { @@ -516,7 +530,7 @@ func (x *XDPoS_v2) VoteHandler(chain consensus.ChainReader, voteMsg *utils.Vote) log.Debug("Vote pool threashold reached: %v, number of items in the pool: %v", thresholdReached, numberOfVotesInPool) err := x.onVotePoolThresholdReached(chain, pooledVotes, voteMsg) if err != nil { - return nil + return err } } @@ -570,7 +584,10 @@ func (x *XDPoS_v2) VerifyTimeoutMessage(timeoutMsg *utils.Timeout) (bool, error) func (x *XDPoS_v2) TimeoutHandler(timeout *utils.Timeout) error { x.lock.Lock() defer x.lock.Unlock() + return x.timeoutHandler(timeout) +} +func (x *XDPoS_v2) timeoutHandler(timeout *utils.Timeout) error { // 1. checkRoundNumber if timeout.Round != x.currentRound { return &utils.ErrIncomingMessageRoundNotEqualCurrentRound{ @@ -666,7 +683,7 @@ func (x *XDPoS_v2) ProposedBlockHandler(blockChainReader consensus.ChainReader, return err } if verified { - return x.sendVote(blockInfo) + return x.sendVote(blockChainReader, blockInfo) } else { log.Info("Failed to pass the voting rule verification", "ProposeBlockHash", blockInfo.Hash) } @@ -806,7 +823,7 @@ func (x *XDPoS_v2) verifyVotingRule(blockChainReader consensus.ChainReader, bloc } // Once Hot stuff voting rule has verified, this node can then send vote -func (x *XDPoS_v2) sendVote(blockInfo *utils.BlockInfo) error { +func (x *XDPoS_v2) sendVote(chainReader consensus.ChainReader, blockInfo *utils.BlockInfo) error { // First step: Update the highest Voted round // Second step: Generate the signature by using node's private key(The signature is the blockInfo signature) // Third step: Construct the vote struct with the above signature & blockinfo struct @@ -822,6 +839,8 @@ func (x *XDPoS_v2) sendVote(blockInfo *utils.BlockInfo) error { ProposedBlockInfo: blockInfo, Signature: signedHash, } + + x.voteHandler(chainReader, voteMsg) x.broadcastToBftChannel(voteMsg) return nil } @@ -841,6 +860,8 @@ func (x *XDPoS_v2) sendTimeout() error { Round: x.currentRound, Signature: signedHash, } + + x.timeoutHandler(timeoutMsg) x.broadcastToBftChannel(timeoutMsg) return nil } diff --git a/consensus/XDPoS/utils/pool.go b/consensus/XDPoS/utils/pool.go index a5ca1af3fb..4bfd91b6fb 100644 --- a/consensus/XDPoS/utils/pool.go +++ b/consensus/XDPoS/utils/pool.go @@ -36,6 +36,14 @@ func (p *Pool) Add(obj PoolObj) (bool, int, map[common.Hash]PoolObj) { } return false, numOfItems, objListKeyed } +func (p *Pool) Size(obj PoolObj) int { + poolKey := obj.PoolKey() + objListKeyed, ok := p.objList[poolKey] + if !ok { + return 0 + } + return len(objListKeyed) +} func (p *Pool) Clear() { p.objList = make(map[string]map[common.Hash]PoolObj) diff --git a/consensus/tests/countdown_test.go b/consensus/tests/countdown_test.go index 3f191a86cb..bd342f7808 100644 --- a/consensus/tests/countdown_test.go +++ b/consensus/tests/countdown_test.go @@ -16,6 +16,8 @@ func TestCountdownTimeoutToSendTimeoutMessage(t *testing.T) { engineV2.SetNewRoundFaker(utils.Round(1), true) timeoutMsg := <-engineV2.BroadcastCh + poolSize := engineV2.GetTimeoutPoolSize(timeoutMsg.(*utils.Timeout)) + assert.Equal(t, poolSize, 1) assert.NotNil(t, timeoutMsg) valid, err := engineV2.VerifyTimeoutMessage(timeoutMsg.(*utils.Timeout)) diff --git a/consensus/tests/proposed_block_test.go b/consensus/tests/proposed_block_test.go index e166e121a7..8e728a8e60 100644 --- a/consensus/tests/proposed_block_test.go +++ b/consensus/tests/proposed_block_test.go @@ -27,6 +27,9 @@ func TestProcessFirstV2BlockAndSendVoteMsg(t *testing.T) { } voteMsg := <-engineV2.BroadcastCh + poolSize := engineV2.GetVotePoolSize(voteMsg.(*utils.Vote)) + + assert.Equal(t, poolSize, 1) assert.NotNil(t, voteMsg) assert.Equal(t, currentBlock.Hash(), voteMsg.(*utils.Vote).ProposedBlockInfo.Hash) diff --git a/eth/bft/bft_handler.go b/eth/bft/bft_handler.go index 4d57bf3d23..632679037f 100644 --- a/eth/bft/bft_handler.go +++ b/eth/bft/bft_handler.go @@ -26,9 +26,9 @@ type Bfter struct { broadcast BroadcastFns // Message Cache - knownVotes *lru.ARCCache - knownSyncInfos *lru.ARCCache - knownTimeouts *lru.ARCCache + knownVotes *lru.Cache + knownSyncInfos *lru.Cache + knownTimeouts *lru.Cache } type ConsensusFns struct { @@ -49,9 +49,9 @@ type BroadcastFns struct { } func New(broadcasts BroadcastFns, blockCahinReader *core.BlockChain) *Bfter { - knownVotes, _ := lru.NewARC(messageLimit) - knownSyncInfos, _ := lru.NewARC(messageLimit) - knownTimeouts, _ := lru.NewARC(messageLimit) + knownVotes, _ := lru.New(messageLimit) + knownSyncInfos, _ := lru.New(messageLimit) + knownTimeouts, _ := lru.New(messageLimit) return &Bfter{ quit: make(chan struct{}), broadcastCh: make(chan interface{}), @@ -79,9 +79,9 @@ func (b *Bfter) SetConsensusFuns(engine consensus.Engine) { // TODO: rename func (b *Bfter) Vote(vote *utils.Vote) error { - log.Info("Receive Vote", "voted block hash", vote.ProposedBlockInfo.Hash.Hex(), "number", vote.ProposedBlockInfo.Number, "round", vote.ProposedBlockInfo.Round) - if b.knownVotes.Contains(vote.Hash()) { - log.Info("Discarded vote, known vote", "voted block hash", vote.ProposedBlockInfo.Hash.Hex(), "number", vote.ProposedBlockInfo.Number, "round", vote.ProposedBlockInfo.Round) + log.Trace("Receive Vote", "vote hash", vote.Hash(), "voted block hash", vote.ProposedBlockInfo.Hash.Hex(), "number", vote.ProposedBlockInfo.Number, "round", vote.ProposedBlockInfo.Round, "signature", vote.Signature) + if exist, _ := b.knownVotes.ContainsOrAdd(vote.Hash(), true); exist { + log.Info("Discarded vote, known vote", "vote hash", vote.Hash(), "voted block hash", vote.ProposedBlockInfo.Hash.Hex(), "number", vote.ProposedBlockInfo.Number, "round", vote.ProposedBlockInfo.Round) return nil } @@ -90,7 +90,6 @@ func (b *Bfter) Vote(vote *utils.Vote) error { log.Error("Verify BFT Vote", "error", err) return err } - b.knownVotes.Add(vote.Hash(), true) b.broadcastCh <- vote err = b.consensus.voteHandler(b.blockCahinReader, vote) @@ -102,7 +101,7 @@ func (b *Bfter) Vote(vote *utils.Vote) error { } func (b *Bfter) Timeout(timeout *utils.Timeout) error { log.Trace("Receive Timeout", "timeout", timeout) - if b.knownVotes.Contains(timeout.Hash()) { + if exist, _ := b.knownTimeouts.ContainsOrAdd(timeout.Hash(), true); exist { log.Trace("Discarded Timeout, known Timeout", "Signature", timeout.Signature, "hash", timeout.Hash(), "round", timeout.Round) return nil } @@ -111,7 +110,6 @@ func (b *Bfter) Timeout(timeout *utils.Timeout) error { log.Error("Verify BFT Timeout", "error", err) return err } - b.knownTimeouts.Add(timeout.Hash(), true) b.broadcastCh <- timeout err = b.consensus.timeoutHandler(timeout) @@ -127,7 +125,7 @@ func (b *Bfter) Timeout(timeout *utils.Timeout) error { } func (b *Bfter) SyncInfo(syncInfo *utils.SyncInfo) error { log.Trace("Receive SyncInfo", "syncInfo", syncInfo) - if b.knownVotes.Contains(syncInfo.Hash()) { + if exist, _ := b.knownSyncInfos.ContainsOrAdd(syncInfo.Hash(), true); exist { log.Trace("Discarded SyncInfo, known SyncInfo", "hash", syncInfo.Hash()) return nil } @@ -137,7 +135,6 @@ func (b *Bfter) SyncInfo(syncInfo *utils.SyncInfo) error { return err } - b.knownSyncInfos.Add(syncInfo.Hash(), true) b.broadcastCh <- syncInfo err = b.consensus.syncInfoHandler(b.blockCahinReader, syncInfo) diff --git a/eth/handler.go b/eth/handler.go index 88e4d1e44f..38450e5c8f 100644 --- a/eth/handler.go +++ b/eth/handler.go @@ -909,7 +909,7 @@ func (pm *ProtocolManager) BroadcastVote(vote *utils.Vote) { for _, peer := range peers { peer.SendVote(vote) } - log.Info("Propagated Vote", "voted block hash", vote.ProposedBlockInfo.Hash.Hex(), "number", vote.ProposedBlockInfo.Number, "round", vote.ProposedBlockInfo.Round, "recipients", len(peers)) + log.Info("Propagated Vote", "vote hash", vote.Hash(), "voted block hash", vote.ProposedBlockInfo.Hash.Hex(), "number", vote.ProposedBlockInfo.Number, "round", vote.ProposedBlockInfo.Round, "recipients", len(peers)) } // BroadcastTimeout will propagate a Timeout to all peers which are not known to