update network layer and add handler functions (#23)

* update network layer and add handler functions

* fix test syntax error
This commit is contained in:
Liam 2021-11-24 00:39:32 +11:00 committed by Jianrong
parent 17f6e67f58
commit b9068974f5
8 changed files with 121 additions and 93 deletions

View file

@ -355,18 +355,18 @@ func (x *XDPoS) GetCachedSigningTxs(hash common.Hash) (interface{}, bool) {
}
//V2
func (x *XDPoS) VerifyVote(utils.Vote) error {
func (x *XDPoS) VerifyVote(*utils.Vote) error {
return nil
}
func (x *XDPoS) VerifyTimeout(utils.Timeout) error {
func (x *XDPoS) VerifyTimeout(*utils.Timeout) error {
return nil
}
func (x *XDPoS) VerifySyncInfo(utils.SyncInfo) error {
func (x *XDPoS) VerifySyncInfo(*utils.SyncInfo) error {
return nil
}
func (x *XDPoS) VerifyBlockInfo(utils.BlockInfo) error {
func (x *XDPoS) VerifyBlockInfo(*utils.BlockInfo) error {
return nil
}

View file

@ -103,7 +103,7 @@ func (x *XDPoS_v2) VerifyHeader(chain consensus.ChainReader, header *types.Heade
SyncInfo workflow
*/
// Verify syncInfo and trigger process QC or TC if successful
func (x *XDPoS_v2) VerifySyncInfoMessage(syncInfo utils.SyncInfo) error {
func (x *XDPoS_v2) VerifySyncInfoMessage(syncInfo *utils.SyncInfo) error {
/*
1. Verify items including:
- verifyQC
@ -123,7 +123,7 @@ func (x *XDPoS_v2) VerifySyncInfoMessage(syncInfo utils.SyncInfo) error {
return nil
}
func (x *XDPoS_v2) SyncInfoHandler(header *types.Header) error {
func (x *XDPoS_v2) SyncInfoHandler(syncInfo *utils.SyncInfo) error {
/*
1. processQC
2. processTC
@ -134,7 +134,7 @@ func (x *XDPoS_v2) SyncInfoHandler(header *types.Header) error {
/*
Vote workflow
*/
func (x *XDPoS_v2) VerifyVoteMessage(vote utils.Vote) (bool, error) {
func (x *XDPoS_v2) VerifyVoteMessage(vote *utils.Vote) (bool, error) {
/*
1. Check signature:
- Use ecRecover to get the public key
@ -147,7 +147,7 @@ func (x *XDPoS_v2) VerifyVoteMessage(vote utils.Vote) (bool, error) {
}
// Consensus entry point for processing vote message to produce QC
func (x *XDPoS_v2) VoteHandler(voteMsg utils.Vote) error {
func (x *XDPoS_v2) VoteHandler(voteMsg *utils.Vote) error {
x.lock.Lock()
defer x.lock.Unlock()
@ -157,7 +157,7 @@ func (x *XDPoS_v2) VoteHandler(voteMsg utils.Vote) error {
}
// Collect vote
thresholdReached, numberOfVotesInPool, hookError := x.votePool.Add(&voteMsg)
thresholdReached, numberOfVotesInPool, hookError := x.votePool.Add(voteMsg)
if hookError != nil {
log.Error("Error while adding vote message to the pool, ", hookError)
return hookError
@ -201,7 +201,7 @@ func (x *XDPoS_v2) onVotePoolThresholdReached(pooledVotes map[common.Hash]utils.
- Use the above xdc address to check against the master node(For the running epoch)
2. Broadcast(Not part of consensus)
*/
func (x *XDPoS_v2) VerifyTimeoutMessage(timeoutMsg utils.Timeout) (bool, error) {
func (x *XDPoS_v2) VerifyTimeoutMessage(timeoutMsg *utils.Timeout) (bool, error) {
return x.verifyMsgSignature(utils.TimeoutSigHash(&timeoutMsg.Round), timeoutMsg.Signature)
}
@ -282,7 +282,7 @@ func (x *XDPoS_v2) generateBlockInfo() error {
}
// To be used by different message verification. Verify local DB block info against the received block information(i.e hash, blockNum, round)
func (x *XDPoS_v2) VerifyBlockInfo(blockInfo utils.BlockInfo) error {
func (x *XDPoS_v2) VerifyBlockInfo(blockInfo *utils.BlockInfo) error {
return nil
}

View file

@ -1 +0,0 @@
mode: atomic

View file

@ -12,15 +12,10 @@ const (
messageLimit = 1024
)
//Define Verify Group functions
type VerifySyncInfoFn func(utils.SyncInfo) error
type VerifyVoteFn func(utils.Vote) error
type VerifyTimeoutFn func(utils.Timeout) error
//Define Boradcast Group functions
type broadcastVoteFn func(utils.Vote)
type broadcastTimeoutFn func(utils.Timeout)
type broadcastSyncInfoFn func(utils.SyncInfo)
type broadcastVoteFn func(*utils.Vote)
type broadcastTimeoutFn func(*utils.Timeout)
type broadcastSyncInfoFn func(*utils.SyncInfo)
type Bfter struct {
broadcastCh chan interface{}
@ -35,9 +30,14 @@ type Bfter struct {
}
type ConsensusFns struct {
verifySyncInfo VerifySyncInfoFn
verifyVote VerifyVoteFn
verifyTimeout VerifyTimeoutFn
verifyVote func(*utils.Vote) error
voteHandler func(*utils.Vote) error
verifyTimeout func(*utils.Timeout) error
timeoutHandler func(*utils.Timeout) error
verifySyncInfo func(*utils.SyncInfo) error
syncInfoHandler func(*utils.SyncInfo) error
}
type BroadcastFns struct {
@ -67,13 +67,16 @@ func (b *Bfter) SetConsensusFuns(engine consensus.Engine) {
verifySyncInfo: e.VerifySyncInfo,
verifyVote: e.VerifyVote,
verifyTimeout: e.VerifyTimeout,
voteHandler: e.EngineV2.VoteHandler,
timeoutHandler: e.EngineV2.TimeoutHandler,
syncInfoHandler: e.EngineV2.SyncInfoHandler,
}
}
// TODO: rename
func (b *Bfter) Vote(vote utils.Vote) {
func (b *Bfter) Vote(vote *utils.Vote) {
log.Trace("Receive Vote", "vote", vote)
if b.knownVotes.Contains(vote.Hash()) {
log.Trace("Discarded vote, known vote", "Signature", vote.Signature, "hash", vote.Hash())
return
@ -84,43 +87,49 @@ func (b *Bfter) Vote(vote utils.Vote) {
log.Error("Verify BFT Vote", "error", err)
return
}
err = b.consensus.voteHandler(vote)
if err != nil {
log.Error("handle BFT Vote", "error", err)
return
}
b.knownVotes.Add(vote.Hash(), true)
b.broadcastCh <- vote
}
func (b *Bfter) Timeout(timeout utils.Timeout) {
func (b *Bfter) Timeout(timeout *utils.Timeout) {
log.Trace("Receive Timeout", "timeout", timeout)
if b.knownVotes.Contains(timeout.Hash()) {
log.Trace("Discarded Timeout, known Timeout", "Signature", timeout.Signature, "hash", timeout.Hash(), "round", timeout.Round)
return
}
err := b.consensus.verifyTimeout(timeout)
if err != nil {
log.Error("Verify BFT Timeout", "error", err)
return
}
err = b.consensus.timeoutHandler(timeout)
if err != nil {
log.Error("handle BFT Timeout", "error", err)
return
}
b.knownTimeouts.Add(timeout.Hash(), true)
b.broadcastCh <- timeout
}
func (b *Bfter) SyncInfo(syncInfo utils.SyncInfo) {
func (b *Bfter) SyncInfo(syncInfo *utils.SyncInfo) {
log.Trace("Receive SyncInfo", "syncInfo", syncInfo)
if b.knownVotes.Contains(syncInfo.Hash()) {
log.Trace("Discarded SyncInfo, known SyncInfo", "hash", syncInfo.Hash())
return
}
err := b.consensus.verifySyncInfo(syncInfo)
if err != nil {
log.Error("Verify BFT SyncInfo", "error", err)
return
}
err = b.consensus.syncInfoHandler(syncInfo)
if err != nil {
log.Error("handle BFT SyncInfo", "error", err)
return
}
b.knownSyncInfos.Add(syncInfo.Hash(), true)
b.broadcastCh <- syncInfo
}
@ -129,27 +138,24 @@ func (b *Bfter) SyncInfo(syncInfo utils.SyncInfo) {
func (b *Bfter) Start() {
go b.loop()
}
func (b *Bfter) Stop() {
close(b.quit)
}
func (b *Bfter) loop() {
for {
select {
case <-b.quit:
return
case obj := <-b.broadcastCh:
switch v := obj.(type) {
case utils.Vote:
case *utils.Vote:
go b.broadcast.Vote(v)
case utils.Timeout:
case *utils.Timeout:
go b.broadcast.Timeout(v)
case utils.SyncInfo:
case *utils.SyncInfo:
go b.broadcast.SyncInfo(v)
default:
log.Error("Unknown message type received, value: %v", v)
log.Error("Unknown message type received", "value", v)
}
}
}

View file

@ -43,25 +43,33 @@ func newTester() *bfterTester {
func TestSequentialVotes(t *testing.T) {
tester := newTester()
verifyCounter := uint32(0)
handlerCounter := uint32(0)
broadcastCounter := uint32(0)
targetVotes := 10
tester.bfter.consensus.verifyVote = func(vote utils.Vote) error {
tester.bfter.consensus.verifyVote = func(vote *utils.Vote) error {
atomic.AddUint32(&verifyCounter, 1)
return nil
}
tester.bfter.broadcast.Vote = func(utils.Vote) {
tester.bfter.consensus.voteHandler = func(vote *utils.Vote) error {
atomic.AddUint32(&handlerCounter, 1)
return nil
}
tester.bfter.broadcast.Vote = func(*utils.Vote) {
atomic.AddUint32(&broadcastCounter, 1)
}
votes := makeVotes(targetVotes)
for _, vote := range votes {
tester.bfter.Vote(vote)
tester.bfter.Vote(&vote)
}
time.Sleep(50 * time.Millisecond)
if int(verifyCounter) != targetVotes || int(broadcastCounter) != targetVotes {
t.Fatalf("count mismatch: have %v on verify and have %v on broadcast, want %v", verifyCounter, broadcastCounter, targetVotes)
time.Sleep(100 * time.Millisecond)
if int(verifyCounter) != targetVotes || int(handlerCounter) != targetVotes || int(broadcastCounter) != targetVotes {
t.Fatalf("count mismatch: have %v on verify, %v on handler, %v on broadcast, want %v", verifyCounter, handlerCounter, broadcastCounter, targetVotes)
}
}
@ -69,48 +77,61 @@ func TestSequentialVotes(t *testing.T) {
func TestDuplicateVotes(t *testing.T) {
tester := newTester()
verifyCounter := uint32(0)
handlerCounter := uint32(0)
broadcastCounter := uint32(0)
targetVotes := 1
tester.bfter.consensus.verifyVote = func(vote utils.Vote) error {
tester.bfter.consensus.verifyVote = func(vote *utils.Vote) error {
atomic.AddUint32(&verifyCounter, 1)
return nil
}
tester.bfter.broadcast.Vote = func(utils.Vote) {
tester.bfter.consensus.voteHandler = func(vote *utils.Vote) error {
atomic.AddUint32(&handlerCounter, 1)
return nil
}
tester.bfter.broadcast.Vote = func(*utils.Vote) {
atomic.AddUint32(&broadcastCounter, 1)
}
vote := utils.Vote{}
// send twice
tester.bfter.Vote(vote)
tester.bfter.Vote(vote)
tester.bfter.Vote(&vote)
tester.bfter.Vote(&vote)
time.Sleep(50 * time.Millisecond)
if int(verifyCounter) != targetVotes || int(broadcastCounter) != targetVotes {
t.Fatalf("count mismatch: have %v on verify and have %v on broadcast, want %v", verifyCounter, broadcastCounter, targetVotes)
if int(verifyCounter) != targetVotes || int(handlerCounter) != targetVotes || int(broadcastCounter) != targetVotes {
t.Fatalf("count mismatch: have %v on verify, %v on handler, %v on broadcast, want %v", verifyCounter, handlerCounter, broadcastCounter, targetVotes)
}
}
// Test that avoid boardcast if there is bad vote
func TestNotBoardcastInvalidVote(t *testing.T) {
tester := newTester()
handlerCounter := uint32(0)
broadcastCounter := uint32(0)
targetVotes := 0
tester.bfter.consensus.verifyVote = func(vote utils.Vote) error {
tester.bfter.consensus.verifyVote = func(vote *utils.Vote) error {
return fmt.Errorf("This is invalid vote")
}
tester.bfter.broadcast.Vote = func(utils.Vote) {
tester.bfter.consensus.voteHandler = func(vote *utils.Vote) error {
atomic.AddUint32(&handlerCounter, 1)
return nil
}
tester.bfter.broadcast.Vote = func(*utils.Vote) {
atomic.AddUint32(&broadcastCounter, 1)
}
vote := utils.Vote{}
tester.bfter.Vote(vote)
tester.bfter.Vote(&vote)
time.Sleep(50 * time.Millisecond)
if int(broadcastCounter) != targetVotes {
t.Fatalf("count mismatch: have %v on broadcast, want %v", broadcastCounter, targetVotes)
if int(handlerCounter) != targetVotes || int(broadcastCounter) != targetVotes {
t.Fatalf("count mismatch: have %v on handler, %v on broadcast, want %v", handlerCounter, broadcastCounter, targetVotes)
}
}

View file

@ -826,8 +826,8 @@ func (pm *ProtocolManager) handleMsg(p *peer) error {
return errResp(ErrDecode, "msg %v: %v", msg, err)
}
// Mark the peer as owning the vote and process it
p.MarkVote(vote)
pm.bfter.Vote(vote)
p.MarkVote(vote.Hash())
pm.bfter.Vote(&vote)
case msg.Code == TimeoutMsg:
var timeout utils.Timeout
if err := msg.Decode(&timeout); err != nil {
@ -835,16 +835,16 @@ func (pm *ProtocolManager) handleMsg(p *peer) error {
}
// Mark the peer as owning the timeout and process it
p.MarkTimeout(timeout)
pm.bfter.Timeout(timeout)
p.MarkTimeout(timeout.Hash())
pm.bfter.Timeout(&timeout)
case msg.Code == SyncInfoMsg:
var syncInfo utils.SyncInfo
if err := msg.Decode(&syncInfo); err != nil {
return errResp(ErrDecode, "msg %v: %v", msg, err)
}
// Mark the peer as owning the syncInfo and process it
p.MarkSyncInfo(syncInfo)
pm.bfter.SyncInfo(syncInfo)
p.MarkSyncInfo(syncInfo.Hash())
pm.bfter.SyncInfo(&syncInfo)
default:
return errResp(ErrInvalidMsgCode, "%v", msg.Code)
@ -898,9 +898,8 @@ func (pm *ProtocolManager) BroadcastTx(hash common.Hash, tx *types.Transaction)
// BroadcastVote will propagate a Vote to all peers which are not known to
// already have the given vote.
func (pm *ProtocolManager) BroadcastVote(vote utils.Vote) {
//hash := Vote.Hash()
hash := common.Hash{}
func (pm *ProtocolManager) BroadcastVote(vote *utils.Vote) {
hash := vote.Hash()
peers := pm.peers.PeersWithoutVote(hash)
for _, peer := range peers {
peer.SendVote(vote)
@ -910,9 +909,8 @@ func (pm *ProtocolManager) BroadcastVote(vote utils.Vote) {
// BroadcastTimeout will propagate a Timeout to all peers which are not known to
// already have the given timeout.
func (pm *ProtocolManager) BroadcastTimeout(timeout utils.Timeout) {
//hash := timeout.Hash()
hash := common.Hash{}
func (pm *ProtocolManager) BroadcastTimeout(timeout *utils.Timeout) {
hash := timeout.Hash()
peers := pm.peers.PeersWithoutTimeout(hash)
for _, peer := range peers {
peer.SendTimeout(timeout)
@ -922,9 +920,8 @@ func (pm *ProtocolManager) BroadcastTimeout(timeout utils.Timeout) {
// BroadcastSyncInfo will propagate a SyncInfo to all peers which are not known to
// already have the given SyncInfo.
func (pm *ProtocolManager) BroadcastSyncInfo(syncInfo utils.SyncInfo) {
//hash := syncInfo.Hash()
hash := common.Hash{}
func (pm *ProtocolManager) BroadcastSyncInfo(syncInfo *utils.SyncInfo) {
hash := syncInfo.Hash()
peers := pm.peers.PeersWithoutSyncInfo(hash)
for _, peer := range peers {
peer.SendSyncInfo(syncInfo)

View file

@ -24,6 +24,7 @@ import (
"time"
"github.com/XinFinOrg/XDPoSChain/common"
"github.com/XinFinOrg/XDPoSChain/consensus/XDPoS/utils"
"github.com/XinFinOrg/XDPoSChain/core/types"
"github.com/XinFinOrg/XDPoSChain/p2p"
"github.com/XinFinOrg/XDPoSChain/rlp"
@ -92,6 +93,10 @@ func newPeer(version int, p *p2p.Peer, rw p2p.MsgReadWriter) *peer {
knownBlocks: mapset.NewSet(),
knownOrderTxs: mapset.NewSet(),
knownLendingTxs: mapset.NewSet(),
knownVote: mapset.NewSet(),
knownTimeout: mapset.NewSet(),
knownSyncInfo: mapset.NewSet(),
}
}
@ -167,7 +172,7 @@ func (p *peer) MarkLendingTransaction(hash common.Hash) {
// MarkVote marks a vote as known for the peer, ensuring that it
// will never be propagated to this particular peer.
func (p *peer) MarkVote(hash interface{}) {
func (p *peer) MarkVote(hash common.Hash) {
// If we reached the memory allowance, drop a previously known transaction hash
for p.knownVote.Cardinality() >= maxKnownVote {
p.knownVote.Pop()
@ -177,7 +182,7 @@ func (p *peer) MarkVote(hash interface{}) {
// MarkTimeout marks a timeout as known for the peer, ensuring that it
// will never be propagated to this particular peer.
func (p *peer) MarkTimeout(hash interface{}) {
func (p *peer) MarkTimeout(hash common.Hash) {
// If we reached the memory allowance, drop a previously known transaction hash
for p.knownTimeout.Cardinality() >= maxKnownTimeout {
p.knownTimeout.Pop()
@ -187,7 +192,7 @@ func (p *peer) MarkTimeout(hash interface{}) {
// MarkSyncInfo marks a syncInfo as known for the peer, ensuring that it
// will never be propagated to this particular peer.
func (p *peer) MarkSyncInfo(hash interface{}) {
func (p *peer) MarkSyncInfo(hash common.Hash) {
// If we reached the memory allowance, drop a previously known transaction hash
for p.knownSyncInfo.Cardinality() >= maxKnownSyncInfo {
p.knownSyncInfo.Pop()
@ -294,8 +299,8 @@ func (p *peer) SendReceiptsRLP(receipts []rlp.RawValue) error {
}
}
func (p *peer) SendVote(vote interface{}) error {
p.knownVote.Add(vote)
func (p *peer) SendVote(vote *utils.Vote) error {
p.knownVote.Add(vote.Hash())
if p.pairRw != nil {
return p2p.Send(p.pairRw, VoteMsg, vote)
} else {
@ -308,8 +313,8 @@ func (p *peer) AsyncSendVote() {
}
*/
func (p *peer) SendTimeout(timeout interface{}) error {
p.knownTimeout.Add(timeout)
func (p *peer) SendTimeout(timeout *utils.Timeout) error {
p.knownTimeout.Add(timeout.Hash())
if p.pairRw != nil {
return p2p.Send(p.pairRw, TimeoutMsg, timeout)
} else {
@ -322,8 +327,8 @@ func (p *peer) AsyncSendTimeout() {
}
*/
func (p *peer) SendSyncInfo(syncInfo interface{}) error {
p.knownSyncInfo.Add(syncInfo)
func (p *peer) SendSyncInfo(syncInfo *utils.SyncInfo) error {
p.knownSyncInfo.Add(syncInfo.Hash())
if p.pairRw != nil {
return p2p.Send(p.pairRw, SyncInfoMsg, syncInfo)
} else {

View file

@ -20,7 +20,7 @@ func TestCountdownTimeoutToSendTimeoutMessage(t *testing.T) {
timeoutMsg := <-engineV2.BroadcastCh
assert.NotNil(t, timeoutMsg)
valid, err := engineV2.VerifyTimeoutMessage(*timeoutMsg.(*utils.Timeout))
valid, err := engineV2.VerifyTimeoutMessage(timeoutMsg.(*utils.Timeout))
// We can only test valid = false for now as the implementation for getCurrentRoundMasterNodes is not complete
assert.False(t, valid)
// This shows we are able to decode the timeout message, which is what this test is all about
@ -117,14 +117,14 @@ func TestVoteMessageHandlerSuccessfullyGeneratedQC(t *testing.T) {
Signature: []byte{1},
}
err := engineV2.VoteHandler(*voteMsg)
err := engineV2.VoteHandler(voteMsg)
assert.Nil(t, err)
assert.Equal(t, utils.Round(1), engineV2.GetCurrentRound())
voteMsg = &utils.Vote{
ProposedBlockInfo: *blockInfo,
Signature: []byte{2},
}
err = engineV2.VoteHandler(*voteMsg)
err = engineV2.VoteHandler(voteMsg)
assert.Nil(t, err)
assert.Equal(t, utils.Round(1), engineV2.GetCurrentRound())
@ -134,7 +134,7 @@ func TestVoteMessageHandlerSuccessfullyGeneratedQC(t *testing.T) {
Signature: []byte{3},
}
err = engineV2.VoteHandler(*voteMsg)
err = engineV2.VoteHandler(voteMsg)
assert.Nil(t, err)
// Check round has now changed from 1 to 2
assert.Equal(t, utils.Round(2), engineV2.GetCurrentRound())
@ -158,13 +158,13 @@ func TestThrowErrorIfVoteMsgRoundNotEqualToCurrentRound(t *testing.T) {
}
// voteRound > currentRound
err := engineV2.VoteHandler(*voteMsg)
err := engineV2.VoteHandler(voteMsg)
assert.NotNil(t, err)
assert.Equal(t, "Vote message round number: 2 does not match currentRound: 3", err.Error())
// Set round to 1
engineV2.SetNewRoundFaker(utils.Round(1), false)
err = engineV2.VoteHandler(*voteMsg)
err = engineV2.VoteHandler(voteMsg)
assert.NotNil(t, err)
// voteRound < currentRound
assert.Equal(t, "Vote message round number: 2 does not match currentRound: 1", err.Error())
@ -189,14 +189,14 @@ func TestProcessVoteMsgThenTimeoutMsg(t *testing.T) {
Signature: []byte{1},
}
err := engineV2.VoteHandler(*voteMsg)
err := engineV2.VoteHandler(voteMsg)
assert.Nil(t, err)
assert.Equal(t, utils.Round(1), engineV2.GetCurrentRound())
voteMsg = &utils.Vote{
ProposedBlockInfo: *blockInfo,
Signature: []byte{2},
}
err = engineV2.VoteHandler(*voteMsg)
err = engineV2.VoteHandler(voteMsg)
assert.Nil(t, err)
assert.Equal(t, utils.Round(1), engineV2.GetCurrentRound())
@ -206,7 +206,7 @@ func TestProcessVoteMsgThenTimeoutMsg(t *testing.T) {
Signature: []byte{3},
}
err = engineV2.VoteHandler(*voteMsg)
err = engineV2.VoteHandler(voteMsg)
assert.Nil(t, err)
// Check round has now changed from 1 to 2
assert.Equal(t, utils.Round(2), engineV2.GetCurrentRound())