Implement BFT Message receiver (#13)

* fix or skip tests due to PR-136 changes

* add bft receiver functions

* add bft receiver functions

* rename tc to TimeoutCert

* implement more functions

* New struct in consensus/XDPoS/utils/types.go, util functions, and test. (#14)

* define vote, timeout, sync info, qc, tc, extra fields in types.go, add test in types_test.go

* add json tag in types.go, refine encoder decoder of extra fields

* refactor types.go utils.go

* re-write types, comments

* add Hash SigHash for types, and tests

* define Round type

* remove unnecessary logs

* add temp functions

* add v2 engine functions placeholder

* typo fix on the consensus v2 function placeholders

* add countdown timer

* make initilised private to countdown

* push verify function

* add test on receiving vote

* revert type change

* add async on broadcast function

* add quit initial

* fix test

Co-authored-by: Jianrong <wjrjerome@gmail.com>
Co-authored-by: wgr523 <wgr523@gmail.com>
This commit is contained in:
Liam 2021-11-06 15:39:34 +11:00 committed by Jianrong
parent 521b703207
commit a1b77f3ca8
11 changed files with 526 additions and 24 deletions

View file

@ -53,8 +53,8 @@ type XDPoS struct {
GetLendingService func() utils.LendingService
// The exact consensus engine with different versions
EngineV1 engine_v1.XDPoS_v1
EngineV2 engine_v2.XDPoS_v2
EngineV1 *engine_v1.XDPoS_v1
EngineV2 *engine_v2.XDPoS_v2
}
// New creates a XDPoS delegated-proof-of-stake consensus engine with the initial
@ -74,8 +74,8 @@ func New(config *params.XDPoSConfig, db ethdb.Database) *XDPoS {
db: db,
signingTxsCache: signingTxsCache,
EngineV1: *engine_v1.New(&conf, db),
EngineV2: *engine_v2.New(&conf, db),
EngineV1: engine_v1.New(&conf, db),
EngineV2: engine_v2.New(&conf, db),
}
}
@ -93,12 +93,14 @@ func NewFaker(db ethdb.Database, chainConfig *params.ChainConfig) *XDPoS {
signingTxsCache, _ := lru.New(utils.BlockSignersCacheLimit)
fakeEngine = &XDPoS{
config: conf,
db: db,
config: conf,
db: db,
GetXDCXService: func() utils.TradingService { return nil },
GetLendingService: func() utils.LendingService { return nil },
signingTxsCache: signingTxsCache,
EngineV1: *engine_v1.NewFaker(db, conf),
EngineV2: *engine_v2.NewFaker(db, conf),
EngineV1: engine_v1.NewFaker(db, conf),
EngineV2: engine_v2.NewFaker(db, conf),
}
return fakeEngine
}
@ -350,3 +352,20 @@ func (x *XDPoS) CacheSigningTxs(hash common.Hash, txs []*types.Transaction) []*t
func (x *XDPoS) GetCachedSigningTxs(hash common.Hash) (interface{}, bool) {
return x.signingTxsCache.Get(hash)
}
//V2
func (x *XDPoS) VerifyVote(utils.Vote) error {
return nil
}
func (x *XDPoS) VerifyTimeout(utils.Timeout) error {
return nil
}
func (x *XDPoS) VerifySyncInfo(utils.SyncInfo) error {
return nil
}
func (x *XDPoS) VerifyBlockInfo(utils.BlockInfo) error {
return nil
}

View file

@ -6,6 +6,7 @@ import (
"github.com/XinFinOrg/XDPoSChain/common"
"github.com/XinFinOrg/XDPoSChain/common/countdown"
"github.com/XinFinOrg/XDPoSChain/consensus"
"github.com/XinFinOrg/XDPoSChain/consensus/XDPoS/utils"
"github.com/XinFinOrg/XDPoSChain/core/types"
"github.com/XinFinOrg/XDPoSChain/ethdb"
"github.com/XinFinOrg/XDPoSChain/log"
@ -77,7 +78,7 @@ func (consensus *XDPoS_v2) Dispatcher() error {
SyncInfo workflow
*/
// Verify syncInfo and trigger trigger process QC or TC if successful
func (consensus *XDPoS_v2) VerifySyncInfoMessage(header *types.Header) error {
func (consensus *XDPoS_v2) VerifySyncInfoMessage(syncInfo utils.SyncInfo) error {
/*
1. Verify items including:
- verifyQC
@ -98,7 +99,7 @@ func (consensus *XDPoS_v2) SyncInfoHandler(header *types.Header) error {
/*
Vote workflow
*/
func (consensus *XDPoS_v2) VerifyVoteMessage() error {
func (consensus *XDPoS_v2) VerifyVoteMessage(vote utils.Vote) error {
/*
1. Check signature:
- Use ecRecover to get the public key
@ -123,7 +124,7 @@ func (consensus *XDPoS_v2) VoteHandler() {
Timeout workflow
*/
// Verify timeout message type from peers in bft.go
func (consensus *XDPoS_v2) VerifyTimeoutMessage() error {
func (consensus *XDPoS_v2) VerifyTimeoutMessage(utils.Timeout) error {
/*
1. Check signature:
- Use ecRecover to get the public key
@ -166,7 +167,7 @@ func (consensus *XDPoS_v2) generateBlockInfo() error {
}
// To be used by different message verification. Verify local DB block info against the received block information(i.e hash, blockNum, round)
func (consensus *XDPoS_v2) verifyBlockInfo(header *types.Header) error {
func (consensus *XDPoS_v2) VerifyBlockInfo(blockInfo utils.BlockInfo) error {
return nil
}

View file

@ -61,6 +61,13 @@ type PublicApiSnapshot struct {
// Round number type in XDPoS 2.0
type Round uint64
// Block Info struct in XDPoS 2.0, used for vote message, etc.
type BlockInfo struct {
Hash common.Hash
Round Round
Number *big.Int
}
// Vote message in XDPoS 2.0
type Vote struct {
ProposedBlockInfo BlockInfo
@ -79,13 +86,6 @@ type SyncInfo struct {
HighestTimeoutCert TimeoutCert
}
// Block Info struct in XDPoS 2.0, used for vote message, etc.
type BlockInfo struct {
Hash common.Hash
Round Round
Number *big.Int
}
// Quorum Certificate struct in XDPoS 2.0
type QuorumCert struct {
ProposedBlockInfo BlockInfo

156
eth/bfter/bft.go Normal file
View file

@ -0,0 +1,156 @@
package bfter
import (
"github.com/XinFinOrg/XDPoSChain/consensus"
"github.com/XinFinOrg/XDPoSChain/consensus/XDPoS"
"github.com/XinFinOrg/XDPoSChain/consensus/XDPoS/utils"
"github.com/XinFinOrg/XDPoSChain/log"
lru "github.com/hashicorp/golang-lru"
)
const (
messageLimit = 1024
)
//Define Verify Group functions
type VerifySyncInfoFn func(utils.SyncInfo) error
type VerifyVoteFn func(utils.Vote) error
type VerifyTimeoutFn func(utils.Timeout) error
//Define Boradcast Group functions
type broadcastVoteFn func(utils.Vote)
type broadcastTimeoutFn func(utils.Timeout)
type broadcastSyncInfoFn func(utils.SyncInfo)
type Bfter struct {
broadcastCh chan interface{}
quit chan struct{}
consensus ConsensusFns
broadcast BroadcastFns
// Message Cache
knownVotes *lru.ARCCache
knownSyncInfos *lru.ARCCache
knownTimeouts *lru.ARCCache
}
type ConsensusFns struct {
verifySyncInfo VerifySyncInfoFn
verifyVote VerifyVoteFn
verifyTimeout VerifyTimeoutFn
}
type BroadcastFns struct {
Vote broadcastVoteFn
Timeout broadcastTimeoutFn
SyncInfo broadcastSyncInfoFn
}
func New(broadcasts BroadcastFns) *Bfter {
knownVotes, _ := lru.NewARC(messageLimit)
knownSyncInfos, _ := lru.NewARC(messageLimit)
knownTimeouts, _ := lru.NewARC(messageLimit)
return &Bfter{
quit: make(chan struct{}),
broadcastCh: make(chan interface{}),
broadcast: broadcasts,
knownVotes: knownVotes,
knownSyncInfos: knownSyncInfos,
knownTimeouts: knownTimeouts,
}
}
func (b *Bfter) SetConsensusFuns(engine consensus.Engine) {
e := engine.(*XDPoS.XDPoS)
b.broadcastCh = e.EngineV2.BroadcastCh
b.consensus = ConsensusFns{
verifySyncInfo: e.VerifySyncInfo,
verifyVote: e.VerifyVote,
verifyTimeout: e.VerifyTimeout,
}
}
// TODO: rename
func (b *Bfter) Vote(vote utils.Vote) {
log.Trace("Receive Vote", "vote", vote)
if b.knownVotes.Contains(vote.Hash()) {
log.Trace("Discarded vote, known vote", "Signature", vote.Signature, "hash", vote.Hash())
return
}
err := b.consensus.verifyVote(vote)
if err != nil {
log.Error("Verify BFT Vote", "error", err)
return
}
b.knownVotes.Add(vote.Hash(), true)
b.broadcastCh <- vote
}
func (b *Bfter) Timeout(timeout utils.Timeout) {
log.Trace("Receive Timeout", "timeout", timeout)
if b.knownVotes.Contains(timeout.Hash()) {
log.Trace("Discarded Timeout, known Timeout", "Signature", timeout.Signature, "hash", timeout.Hash(), "round", timeout.Round)
return
}
err := b.consensus.verifyTimeout(timeout)
if err != nil {
log.Error("Verify BFT Timeout", "error", err)
return
}
b.knownTimeouts.Add(timeout.Hash(), true)
b.broadcastCh <- timeout
}
func (b *Bfter) SyncInfo(syncInfo utils.SyncInfo) {
log.Trace("Receive SyncInfo", "syncInfo", syncInfo)
if b.knownVotes.Contains(syncInfo.Hash()) {
log.Trace("Discarded SyncInfo, known SyncInfo", "hash", syncInfo.Hash())
return
}
err := b.consensus.verifySyncInfo(syncInfo)
if err != nil {
log.Error("Verify BFT SyncInfo", "error", err)
return
}
b.knownSyncInfos.Add(syncInfo.Hash(), true)
b.broadcastCh <- syncInfo
}
// Start Bft receiver
func (b *Bfter) Start() {
go b.loop()
}
func (b *Bfter) Stop() {
close(b.quit)
}
func (b *Bfter) loop() {
for {
select {
case <-b.quit:
return
case obj := <-b.broadcastCh:
switch v := obj.(type) {
case utils.Vote:
go b.broadcast.Vote(v)
case utils.Timeout:
go b.broadcast.Timeout(v)
case utils.SyncInfo:
go b.broadcast.SyncInfo(v)
default:
log.Error("Unknown message type received, value: %v", v)
}
}
}
}

118
eth/bfter/bft_test.go Normal file
View file

@ -0,0 +1,118 @@
package bfter
import (
"fmt"
"sync/atomic"
"testing"
"time"
"github.com/XinFinOrg/XDPoSChain/consensus/XDPoS"
"github.com/XinFinOrg/XDPoSChain/consensus/XDPoS/engines/engine_v2"
"github.com/XinFinOrg/XDPoSChain/consensus/XDPoS/utils"
)
// make different votes based on Signatures
func makeVotes(n int) []utils.Vote {
var votes []utils.Vote
for i := 0; i < n; i++ {
votes = append(votes, utils.Vote{Signature: []byte{byte(i)}})
}
return votes
}
// bfterTester is a test simulator for mocking out bfter worker.
type bfterTester struct {
bfter *Bfter
}
// newTester creates a new bft fetcher test mocker.
func newTester() *bfterTester {
testConsensus := &XDPoS.XDPoS{EngineV2: &engine_v2.XDPoS_v2{}}
broadcasts := BroadcastFns{}
tester := &bfterTester{}
tester.bfter = New(broadcasts)
tester.bfter.SetConsensusFuns(testConsensus)
tester.bfter.broadcastCh = make(chan interface{})
tester.bfter.Start()
return tester
}
// Tests that a bfter accepts vote and process verfiy and broadcast
func TestSequentialVotes(t *testing.T) {
tester := newTester()
verifyCounter := uint32(0)
broadcastCounter := uint32(0)
targetVotes := 10
tester.bfter.consensus.verifyVote = func(vote utils.Vote) error {
atomic.AddUint32(&verifyCounter, 1)
return nil
}
tester.bfter.broadcast.Vote = func(utils.Vote) {
atomic.AddUint32(&broadcastCounter, 1)
}
votes := makeVotes(targetVotes)
for _, vote := range votes {
tester.bfter.Vote(vote)
}
time.Sleep(50 * time.Millisecond)
if int(verifyCounter) != targetVotes || int(broadcastCounter) != targetVotes {
t.Fatalf("count mismatch: have %v on verify and have %v on broadcast, want %v", verifyCounter, broadcastCounter, targetVotes)
}
}
// Tests that vote already being retrieved will not be duplicated.
func TestDuplicateVotes(t *testing.T) {
tester := newTester()
verifyCounter := uint32(0)
broadcastCounter := uint32(0)
targetVotes := 1
tester.bfter.consensus.verifyVote = func(vote utils.Vote) error {
atomic.AddUint32(&verifyCounter, 1)
return nil
}
tester.bfter.broadcast.Vote = func(utils.Vote) {
atomic.AddUint32(&broadcastCounter, 1)
}
vote := utils.Vote{}
// send twice
tester.bfter.Vote(vote)
tester.bfter.Vote(vote)
time.Sleep(50 * time.Millisecond)
if int(verifyCounter) != targetVotes || int(broadcastCounter) != targetVotes {
t.Fatalf("count mismatch: have %v on verify and have %v on broadcast, want %v", verifyCounter, broadcastCounter, targetVotes)
}
}
// Test that avoid boardcast if there is bad vote
func TestNotBoardcastInvalidVote(t *testing.T) {
tester := newTester()
broadcastCounter := uint32(0)
targetVotes := 0
tester.bfter.consensus.verifyVote = func(vote utils.Vote) error {
return fmt.Errorf("This is invalid vote")
}
tester.bfter.broadcast.Vote = func(utils.Vote) {
atomic.AddUint32(&broadcastCounter, 1)
}
vote := utils.Vote{}
tester.bfter.Vote(vote)
time.Sleep(50 * time.Millisecond)
if int(broadcastCounter) != targetVotes {
t.Fatalf("count mismatch: have %v on broadcast, want %v", broadcastCounter, targetVotes)
}
}
// TODO: SyncInfo and Timeout Test, should be same as Vote.
// Once all test on vote covered, then duplicate to others

View file

@ -29,9 +29,11 @@ import (
"github.com/XinFinOrg/XDPoSChain/common"
"github.com/XinFinOrg/XDPoSChain/consensus"
"github.com/XinFinOrg/XDPoSChain/consensus/XDPoS/utils"
"github.com/XinFinOrg/XDPoSChain/consensus/misc"
"github.com/XinFinOrg/XDPoSChain/core"
"github.com/XinFinOrg/XDPoSChain/core/types"
"github.com/XinFinOrg/XDPoSChain/eth/bfter"
"github.com/XinFinOrg/XDPoSChain/eth/downloader"
"github.com/XinFinOrg/XDPoSChain/eth/fetcher"
"github.com/XinFinOrg/XDPoSChain/ethdb"
@ -80,6 +82,7 @@ type ProtocolManager struct {
downloader *downloader.Downloader
fetcher *fetcher.Fetcher
peers *peerSet
bfter *bfter.Bfter
SubProtocols []p2p.Protocol
@ -218,6 +221,16 @@ func NewProtocolManager(config *params.ChainConfig, mode downloader.SyncMode, ne
return manager.blockchain.PrepareBlock(block)
}
manager.fetcher = fetcher.New(blockchain.GetBlockByHash, validator, manager.BroadcastBlock, heighter, inserter, prepare, manager.removePeer)
//Define bft function
broadcasts := bfter.BroadcastFns{
Vote: manager.BroadcastVote,
Timeout: manager.BroadcastTimeout,
SyncInfo: manager.BroadcastSyncInfo,
}
manager.bfter = bfter.New(broadcasts)
if blockchain.Config().XDPoS != nil {
manager.bfter.SetConsensusFuns(engine)
}
return manager, nil
}
@ -253,7 +266,6 @@ func (pm *ProtocolManager) Start(maxPeers int) {
// broadcast transactions
pm.txCh = make(chan core.TxPreEvent, txChanSize)
pm.txSub = pm.txpool.SubscribeTxPreEvent(pm.txCh)
pm.orderTxCh = make(chan core.OrderTxPreEvent, txChanSize)
if pm.orderpool != nil {
pm.orderTxSub = pm.orderpool.SubscribeTxPreEvent(pm.orderTxCh)
@ -808,6 +820,31 @@ func (pm *ProtocolManager) handleMsg(p *peer) error {
if pm.lendingpool != nil {
pm.lendingpool.AddRemotes(txs)
}
case msg.Code == VoteMsg:
var vote utils.Vote
if err := msg.Decode(&vote); err != nil {
return errResp(ErrDecode, "msg %v: %v", msg, err)
}
// Mark the peer as owning the vote and process it
p.MarkVote(vote)
pm.bfter.Vote(vote)
case msg.Code == TimeoutMsg:
var timeout utils.Timeout
if err := msg.Decode(&timeout); err != nil {
return errResp(ErrDecode, "msg %v: %v", msg, err)
}
// Mark the peer as owning the timeout and process it
p.MarkTimeout(timeout)
pm.bfter.Timeout(timeout)
case msg.Code == SyncInfoMsg:
var syncInfo utils.SyncInfo
if err := msg.Decode(&syncInfo); err != nil {
return errResp(ErrDecode, "msg %v: %v", msg, err)
}
// Mark the peer as owning the syncInfo and process it
p.MarkSyncInfo(syncInfo)
pm.bfter.SyncInfo(syncInfo)
default:
return errResp(ErrInvalidMsgCode, "%v", msg.Code)
@ -859,6 +896,42 @@ func (pm *ProtocolManager) BroadcastTx(hash common.Hash, tx *types.Transaction)
log.Trace("Broadcast transaction", "hash", hash, "recipients", len(peers))
}
// BroadcastVote will propagate a Vote to all peers which are not known to
// already have the given vote.
func (pm *ProtocolManager) BroadcastVote(vote utils.Vote) {
//hash := Vote.Hash()
hash := common.Hash{}
peers := pm.peers.PeersWithoutVote(hash)
for _, peer := range peers {
peer.SendVote(vote)
}
log.Trace("Propagated Vote", "hash", hash, "recipients", len(peers))
}
// BroadcastTimeout will propagate a Timeout to all peers which are not known to
// already have the given timeout.
func (pm *ProtocolManager) BroadcastTimeout(timeout utils.Timeout) {
//hash := timeout.Hash()
hash := common.Hash{}
peers := pm.peers.PeersWithoutTimeout(hash)
for _, peer := range peers {
peer.SendTimeout(timeout)
}
log.Trace("Propagated Timeout", "hash", hash, "recipients", len(peers))
}
// BroadcastSyncInfo will propagate a SyncInfo to all peers which are not known to
// already have the given SyncInfo.
func (pm *ProtocolManager) BroadcastSyncInfo(syncInfo utils.SyncInfo) {
//hash := syncInfo.Hash()
hash := common.Hash{}
peers := pm.peers.PeersWithoutSyncInfo(hash)
for _, peer := range peers {
peer.SendSyncInfo(syncInfo)
}
log.Trace("Propagated SyncInfo", "hash", hash, "recipients", len(peers))
}
// OrderBroadcastTx will propagate a transaction to all peers which are not known to
// already have the given transaction.
func (pm *ProtocolManager) OrderBroadcastTx(hash common.Hash, tx *types.OrderTransaction) {

View file

@ -17,13 +17,14 @@
package eth
import (
"github.com/XinFinOrg/XDPoSChain/core/rawdb"
"math"
"math/big"
"math/rand"
"testing"
"time"
"github.com/XinFinOrg/XDPoSChain/core/rawdb"
"github.com/XinFinOrg/XDPoSChain/common"
"github.com/XinFinOrg/XDPoSChain/consensus/ethash"
"github.com/XinFinOrg/XDPoSChain/core"

View file

@ -22,12 +22,13 @@ package eth
import (
"crypto/ecdsa"
"crypto/rand"
"github.com/XinFinOrg/XDPoSChain/core/rawdb"
"math/big"
"sort"
"sync"
"testing"
"github.com/XinFinOrg/XDPoSChain/core/rawdb"
"github.com/XinFinOrg/XDPoSChain/common"
"github.com/XinFinOrg/XDPoSChain/consensus/ethash"
"github.com/XinFinOrg/XDPoSChain/core"

View file

@ -41,6 +41,9 @@ const (
maxKnownOrderTxs = 32768 // Maximum transactions hashes to keep in the known list (prevent DOS)
maxKnownLendingTxs = 32768 // Maximum transactions hashes to keep in the known list (prevent DOS)
maxKnownBlocks = 1024 // Maximum block hashes to keep in the known list (prevent DOS)
maxKnownVote = 1024 // Maximum transactions hashes to keep in the known list (prevent DOS)
maxKnownTimeout = 1024 // Maximum transactions hashes to keep in the known list (prevent DOS)
maxKnownSyncInfo = 1024 // Maximum transactions hashes to keep in the known list (prevent DOS)
handshakeTimeout = 5 * time.Second
)
@ -66,10 +69,15 @@ type peer struct {
td *big.Int
lock sync.RWMutex
knownTxs mapset.Set // Set of transaction hashes known to be known by this peer
knownBlocks mapset.Set // Set of block hashes known to be known by this peer
knownTxs mapset.Set // Set of transaction hashes known to be known by this peer
knownBlocks mapset.Set // Set of block hashes known to be known by this peer
knownOrderTxs mapset.Set // Set of order transaction hashes known to be known by this peer
knownLendingTxs mapset.Set // Set of lending transaction hashes known to be known by this peer
knownVote mapset.Set // Set of BFT Vote known to be known by this peer
knownTimeout mapset.Set // Set of BFT timeout known to be known by this peer
knownSyncInfo mapset.Set // Set of BFT Sync Info known to be known by this peer`
}
func newPeer(version int, p *p2p.Peer, rw p2p.MsgReadWriter) *peer {
@ -157,6 +165,36 @@ func (p *peer) MarkLendingTransaction(hash common.Hash) {
p.knownLendingTxs.Add(hash)
}
// MarkVote marks a vote as known for the peer, ensuring that it
// will never be propagated to this particular peer.
func (p *peer) MarkVote(hash interface{}) {
// If we reached the memory allowance, drop a previously known transaction hash
for p.knownVote.Cardinality() >= maxKnownVote {
p.knownVote.Pop()
}
p.knownVote.Add(hash)
}
// MarkTimeout marks a timeout as known for the peer, ensuring that it
// will never be propagated to this particular peer.
func (p *peer) MarkTimeout(hash interface{}) {
// If we reached the memory allowance, drop a previously known transaction hash
for p.knownTimeout.Cardinality() >= maxKnownTimeout {
p.knownTimeout.Pop()
}
p.knownTimeout.Add(hash)
}
// MarkSyncInfo marks a syncInfo as known for the peer, ensuring that it
// will never be propagated to this particular peer.
func (p *peer) MarkSyncInfo(hash interface{}) {
// If we reached the memory allowance, drop a previously known transaction hash
for p.knownSyncInfo.Cardinality() >= maxKnownSyncInfo {
p.knownSyncInfo.Pop()
}
p.knownSyncInfo.Add(hash)
}
// SendTransactions sends transactions to the peer and includes the hashes
// in its transaction hash set for future reference.
func (p *peer) SendTransactions(txs types.Transactions) error {
@ -256,6 +294,49 @@ func (p *peer) SendReceiptsRLP(receipts []rlp.RawValue) error {
}
}
func (p *peer) SendVote(vote interface{}) error {
p.knownVote.Add(vote)
if p.pairRw != nil {
return p2p.Send(p.pairRw, VoteMsg, vote)
} else {
return p2p.Send(p.rw, VoteMsg, vote)
}
}
/*
func (p *peer) AsyncSendVote() {
}
*/
func (p *peer) SendTimeout(timeout interface{}) error {
p.knownTimeout.Add(timeout)
if p.pairRw != nil {
return p2p.Send(p.pairRw, TimeoutMsg, timeout)
} else {
return p2p.Send(p.rw, TimeoutMsg, timeout)
}
}
/*
func (p *peer) AsyncSendTimeout() {
}
*/
func (p *peer) SendSyncInfo(syncInfo interface{}) error {
p.knownSyncInfo.Add(syncInfo)
if p.pairRw != nil {
return p2p.Send(p.pairRw, SyncInfoMsg, syncInfo)
} else {
return p2p.Send(p.rw, SyncInfoMsg, syncInfo)
}
}
/*
func (p *peer) AsyncSendSyncInfo() {
}
*/
// RequestOneHeader is a wrapper around the header query functions to fetch a
// single header. It is used solely by the fetcher.
func (p *peer) RequestOneHeader(hash common.Hash) error {
@ -486,6 +567,51 @@ func (ps *peerSet) PeersWithoutTx(hash common.Hash) []*peer {
return list
}
// PeersWithoutVote retrieves a list of peers that do not have a given block in
// their set of known hashes.
func (ps *peerSet) PeersWithoutVote(hash common.Hash) []*peer {
ps.lock.RLock()
defer ps.lock.RUnlock()
list := make([]*peer, 0, len(ps.peers))
for _, p := range ps.peers {
if !p.knownVote.Contains(hash) {
list = append(list, p)
}
}
return list
}
// PeersWithoutTimeout retrieves a list of peers that do not have a given block in
// their set of known hashes.
func (ps *peerSet) PeersWithoutTimeout(hash common.Hash) []*peer {
ps.lock.RLock()
defer ps.lock.RUnlock()
list := make([]*peer, 0, len(ps.peers))
for _, p := range ps.peers {
if !p.knownTimeout.Contains(hash) {
list = append(list, p)
}
}
return list
}
// PeersWithoutSyncInfo retrieves a list of peers that do not have a given block in
// their set of known hashes.
func (ps *peerSet) PeersWithoutSyncInfo(hash common.Hash) []*peer {
ps.lock.RLock()
defer ps.lock.RUnlock()
list := make([]*peer, 0, len(ps.peers))
for _, p := range ps.peers {
if !p.knownSyncInfo.Contains(hash) {
list = append(list, p)
}
}
return list
}
// PeersWithoutTx retrieves a list of peers that do not have a given transaction
// in their set of known hashes.
func (ps *peerSet) OrderPeersWithoutTx(hash common.Hash) []*peer {

View file

@ -63,6 +63,11 @@ const (
NodeDataMsg = 0x0e
GetReceiptsMsg = 0x0f
ReceiptsMsg = 0x10
// Protocol messages belonging to eth/100
VoteMsg = 0xe0
TimeoutMsg = 0xe1
SyncInfoMsg = 0xe2
)
type errCode int

View file

@ -134,7 +134,9 @@ func (pm *ProtocolManager) txsyncLoop() {
func (pm *ProtocolManager) syncer() {
// Start and ensure cleanup of sync mechanisms
pm.fetcher.Start()
pm.bfter.Start()
defer pm.fetcher.Stop()
defer pm.bfter.Stop()
defer pm.downloader.Terminate()
// Wait for different events to fire synchronisation operations