go-ethereum/eth/bft/bft_handler.go
Liam 6c5fe34615 v2 miner function implementation and happy path (#22)
* New struct in consensus/XDPoS/utils/types.go, util functions, and test. (#14)

* define vote, timeout, sync info, qc, tc, extra fields in types.go, add test in types_test.go

* add json tag in types.go, refine encoder decoder of extra fields

* refactor types.go utils.go

* re-write types, comments

* add Hash SigHash for types, and tests

* define Round type

* remove unnecessary logs

* add v2 engine functions placeholder

* typo fix on the consensus v2 function placeholders

* add countdown timer

* make initilised private to countdown

* add v2 specific config struct

* rename some config variables

* Implement BFT Message receiver (#13)

* fix or skip tests due to PR-136 changes

* add bft receiver functions

* add bft receiver functions

* rename tc to TimeoutCert

* implement more functions

* New struct in consensus/XDPoS/utils/types.go, util functions, and test. (#14)

* define vote, timeout, sync info, qc, tc, extra fields in types.go, add test in types_test.go

* add json tag in types.go, refine encoder decoder of extra fields

* refactor types.go utils.go

* re-write types, comments

* add Hash SigHash for types, and tests

* define Round type

* remove unnecessary logs

* add temp functions

* add v2 engine functions placeholder

* typo fix on the consensus v2 function placeholders

* add countdown timer

* make initilised private to countdown

* push verify function

* add test on receiving vote

* revert type change

* add async on broadcast function

* add quit initial

* fix test

Co-authored-by: Jianrong <wjrjerome@gmail.com>
Co-authored-by: wgr523 <wgr523@gmail.com>

* generate and verify timeout message

* Consensus V2 variable, timeout pool (#19)

* fill in XDPoS_v2 variables and processQC/TC

* add timeout pool, refine engine variables

* refactor type functions

* solve a small pointer bug

* create general pool and its test, refine engine

* refine pool, add xdpos v2 config cert threshold

* refine config

* vote and timeout handlers

* fix pool test

* bft miner preparation

* review comment improvement

* update

* relocate tests

* add and remove comment

* fix the syntax error

* update network layer and add handler functions (#23)

* update network layer and add handler functions

* fix test syntax error

* add ProcessQC implementation

* add ProcessQC tests

* add snapshot test

* add wait qc process

* remove testing files

* add route snapshot

* fix merge issue

* add default v2 behaviour (#24)

* add v2 ecrecover functions and refactor test

* fix all the tests

* put minimun lock variable

* debugging prepare and seal v2 blocks

* Trigger proposeBlockHandler after v2 block received and verified in fetcher

* skip snapshot apply related tests

* update test check

* rename bfter to bft handler and ignore normal behviour

* fix bugs during local 4 node run

* fix test

* fix sync info test

* fix bugs during local 4 node run

* rebase and fix bug

* remove hook validators function"

Co-authored-by: wgr523 <wgr523@gmail.com>
Co-authored-by: Jianrong <wjrjerome@gmail.com>
2021-12-30 11:45:18 +11:00

176 lines
4.7 KiB
Go

package bft
import (
"github.com/XinFinOrg/XDPoSChain/consensus"
"github.com/XinFinOrg/XDPoSChain/consensus/XDPoS"
"github.com/XinFinOrg/XDPoSChain/consensus/XDPoS/utils"
"github.com/XinFinOrg/XDPoSChain/core"
"github.com/XinFinOrg/XDPoSChain/log"
lru "github.com/hashicorp/golang-lru"
)
const (
messageLimit = 1024
)
//Define Boradcast Group functions
type broadcastVoteFn func(*utils.Vote)
type broadcastTimeoutFn func(*utils.Timeout)
type broadcastSyncInfoFn func(*utils.SyncInfo)
type Bfter struct {
blockCahinReader consensus.ChainReader
broadcastCh chan interface{}
quit chan struct{}
consensus ConsensusFns
broadcast BroadcastFns
// Message Cache
knownVotes *lru.ARCCache
knownSyncInfos *lru.ARCCache
knownTimeouts *lru.ARCCache
}
type ConsensusFns struct {
verifyVote func(*utils.Vote) error
voteHandler func(consensus.ChainReader, *utils.Vote) error
verifyTimeout func(*utils.Timeout) error
timeoutHandler func(*utils.Timeout) error
verifySyncInfo func(*utils.SyncInfo) error
syncInfoHandler func(consensus.ChainReader, *utils.SyncInfo) error
}
type BroadcastFns struct {
Vote broadcastVoteFn
Timeout broadcastTimeoutFn
SyncInfo broadcastSyncInfoFn
}
func New(broadcasts BroadcastFns, blockCahinReader *core.BlockChain) *Bfter {
knownVotes, _ := lru.NewARC(messageLimit)
knownSyncInfos, _ := lru.NewARC(messageLimit)
knownTimeouts, _ := lru.NewARC(messageLimit)
return &Bfter{
quit: make(chan struct{}),
broadcastCh: make(chan interface{}),
broadcast: broadcasts,
knownVotes: knownVotes,
knownSyncInfos: knownSyncInfos,
knownTimeouts: knownTimeouts,
blockCahinReader: blockCahinReader,
}
}
func (b *Bfter) SetConsensusFuns(engine consensus.Engine) {
e := engine.(*XDPoS.XDPoS)
b.broadcastCh = e.EngineV2.BroadcastCh
b.consensus = ConsensusFns{
verifySyncInfo: e.VerifySyncInfo,
verifyVote: e.VerifyVote,
verifyTimeout: e.VerifyTimeout,
voteHandler: e.EngineV2.VoteHandler,
timeoutHandler: e.EngineV2.TimeoutHandler,
syncInfoHandler: e.EngineV2.SyncInfoHandler,
}
}
// TODO: rename
func (b *Bfter) Vote(vote *utils.Vote) error {
log.Info("Receive Vote", "voted block hash", vote.ProposedBlockInfo.Hash.Hex(), "number", vote.ProposedBlockInfo.Number, "round", vote.ProposedBlockInfo.Round)
if b.knownVotes.Contains(vote.Hash()) {
log.Info("Discarded vote, known vote", "voted block hash", vote.ProposedBlockInfo.Hash.Hex(), "number", vote.ProposedBlockInfo.Number, "round", vote.ProposedBlockInfo.Round)
return nil
}
err := b.consensus.verifyVote(vote)
if err != nil {
log.Error("Verify BFT Vote", "error", err)
return err
}
b.knownVotes.Add(vote.Hash(), true)
b.broadcastCh <- vote
err = b.consensus.voteHandler(b.blockCahinReader, vote)
if err != nil {
log.Error("handle BFT Vote", "error", err)
return err
}
return nil
}
func (b *Bfter) Timeout(timeout *utils.Timeout) error {
log.Trace("Receive Timeout", "timeout", timeout)
if b.knownVotes.Contains(timeout.Hash()) {
log.Trace("Discarded Timeout, known Timeout", "Signature", timeout.Signature, "hash", timeout.Hash(), "round", timeout.Round)
return nil
}
err := b.consensus.verifyTimeout(timeout)
if err != nil {
log.Error("Verify BFT Timeout", "error", err)
return err
}
b.knownTimeouts.Add(timeout.Hash(), true)
b.broadcastCh <- timeout
err = b.consensus.timeoutHandler(timeout)
if err != nil {
if _, ok := err.(*utils.ErrIncomingMessageRoundNotEqualCurrentRound); ok {
log.Debug("timeout message round not equal", "error", err)
return err
}
log.Error("handle BFT Timeout", "error", err)
return err
}
return nil
}
func (b *Bfter) SyncInfo(syncInfo *utils.SyncInfo) error {
log.Trace("Receive SyncInfo", "syncInfo", syncInfo)
if b.knownVotes.Contains(syncInfo.Hash()) {
log.Trace("Discarded SyncInfo, known SyncInfo", "hash", syncInfo.Hash())
return nil
}
err := b.consensus.verifySyncInfo(syncInfo)
if err != nil {
log.Error("Verify BFT SyncInfo", "error", err)
return err
}
b.knownSyncInfos.Add(syncInfo.Hash(), true)
b.broadcastCh <- syncInfo
err = b.consensus.syncInfoHandler(b.blockCahinReader, syncInfo)
if err != nil {
log.Error("handle BFT SyncInfo", "error", err)
return err
}
return nil
}
// Start Bft receiver
func (b *Bfter) Start() {
go b.loop()
}
func (b *Bfter) Stop() {
close(b.quit)
}
func (b *Bfter) loop() {
for {
select {
case <-b.quit:
return
case obj := <-b.broadcastCh:
switch v := obj.(type) {
case *utils.Vote:
go b.broadcast.Vote(v)
case *utils.Timeout:
go b.broadcast.Timeout(v)
case *utils.SyncInfo:
go b.broadcast.SyncInfo(v)
default:
log.Error("Unknown message type received", "value", v)
}
}
}
}