diff --git a/consensus/XDPoS/XDPoS.go b/consensus/XDPoS/XDPoS.go index 2a3bc0ffb9..066b52a9bb 100644 --- a/consensus/XDPoS/XDPoS.go +++ b/consensus/XDPoS/XDPoS.go @@ -53,8 +53,8 @@ type XDPoS struct { GetLendingService func() utils.LendingService // The exact consensus engine with different versions - EngineV1 engine_v1.XDPoS_v1 - EngineV2 engine_v2.XDPoS_v2 + EngineV1 *engine_v1.XDPoS_v1 + EngineV2 *engine_v2.XDPoS_v2 } // New creates a XDPoS delegated-proof-of-stake consensus engine with the initial @@ -74,8 +74,8 @@ func New(config *params.XDPoSConfig, db ethdb.Database) *XDPoS { db: db, signingTxsCache: signingTxsCache, - EngineV1: *engine_v1.New(&conf, db), - EngineV2: *engine_v2.New(&conf, db), + EngineV1: engine_v1.New(&conf, db), + EngineV2: engine_v2.New(&conf, db), } } @@ -93,12 +93,14 @@ func NewFaker(db ethdb.Database, chainConfig *params.ChainConfig) *XDPoS { signingTxsCache, _ := lru.New(utils.BlockSignersCacheLimit) fakeEngine = &XDPoS{ - config: conf, - db: db, + config: conf, + db: db, + GetXDCXService: func() utils.TradingService { return nil }, + GetLendingService: func() utils.LendingService { return nil }, signingTxsCache: signingTxsCache, - EngineV1: *engine_v1.NewFaker(db, conf), - EngineV2: *engine_v2.NewFaker(db, conf), + EngineV1: engine_v1.NewFaker(db, conf), + EngineV2: engine_v2.NewFaker(db, conf), } return fakeEngine } @@ -350,3 +352,20 @@ func (x *XDPoS) CacheSigningTxs(hash common.Hash, txs []*types.Transaction) []*t func (x *XDPoS) GetCachedSigningTxs(hash common.Hash) (interface{}, bool) { return x.signingTxsCache.Get(hash) } + +//V2 +func (x *XDPoS) VerifyVote(utils.Vote) error { + return nil +} + +func (x *XDPoS) VerifyTimeout(utils.Timeout) error { + return nil +} + +func (x *XDPoS) VerifySyncInfo(utils.SyncInfo) error { + return nil +} + +func (x *XDPoS) VerifyBlockInfo(utils.BlockInfo) error { + return nil +} diff --git a/consensus/XDPoS/engines/engine_v2/engine.go b/consensus/XDPoS/engines/engine_v2/engine.go index 5d84a19800..15d9ee48f2 100644 --- a/consensus/XDPoS/engines/engine_v2/engine.go +++ b/consensus/XDPoS/engines/engine_v2/engine.go @@ -6,6 +6,7 @@ import ( "github.com/XinFinOrg/XDPoSChain/common" "github.com/XinFinOrg/XDPoSChain/common/countdown" "github.com/XinFinOrg/XDPoSChain/consensus" + "github.com/XinFinOrg/XDPoSChain/consensus/XDPoS/utils" "github.com/XinFinOrg/XDPoSChain/core/types" "github.com/XinFinOrg/XDPoSChain/ethdb" "github.com/XinFinOrg/XDPoSChain/log" @@ -77,7 +78,7 @@ func (consensus *XDPoS_v2) Dispatcher() error { SyncInfo workflow */ // Verify syncInfo and trigger trigger process QC or TC if successful -func (consensus *XDPoS_v2) VerifySyncInfoMessage(header *types.Header) error { +func (consensus *XDPoS_v2) VerifySyncInfoMessage(syncInfo utils.SyncInfo) error { /* 1. Verify items including: - verifyQC @@ -98,7 +99,7 @@ func (consensus *XDPoS_v2) SyncInfoHandler(header *types.Header) error { /* Vote workflow */ -func (consensus *XDPoS_v2) VerifyVoteMessage() error { +func (consensus *XDPoS_v2) VerifyVoteMessage(vote utils.Vote) error { /* 1. Check signature: - Use ecRecover to get the public key @@ -123,7 +124,7 @@ func (consensus *XDPoS_v2) VoteHandler() { Timeout workflow */ // Verify timeout message type from peers in bft.go -func (consensus *XDPoS_v2) VerifyTimeoutMessage() error { +func (consensus *XDPoS_v2) VerifyTimeoutMessage(utils.Timeout) error { /* 1. Check signature: - Use ecRecover to get the public key @@ -166,7 +167,7 @@ func (consensus *XDPoS_v2) generateBlockInfo() error { } // To be used by different message verification. Verify local DB block info against the received block information(i.e hash, blockNum, round) -func (consensus *XDPoS_v2) verifyBlockInfo(header *types.Header) error { +func (consensus *XDPoS_v2) VerifyBlockInfo(blockInfo utils.BlockInfo) error { return nil } diff --git a/consensus/XDPoS/utils/types.go b/consensus/XDPoS/utils/types.go index 2dd2228d36..2e0802e733 100644 --- a/consensus/XDPoS/utils/types.go +++ b/consensus/XDPoS/utils/types.go @@ -61,6 +61,13 @@ type PublicApiSnapshot struct { // Round number type in XDPoS 2.0 type Round uint64 +// Block Info struct in XDPoS 2.0, used for vote message, etc. +type BlockInfo struct { + Hash common.Hash + Round Round + Number *big.Int +} + // Vote message in XDPoS 2.0 type Vote struct { ProposedBlockInfo BlockInfo @@ -79,13 +86,6 @@ type SyncInfo struct { HighestTimeoutCert TimeoutCert } -// Block Info struct in XDPoS 2.0, used for vote message, etc. -type BlockInfo struct { - Hash common.Hash - Round Round - Number *big.Int -} - // Quorum Certificate struct in XDPoS 2.0 type QuorumCert struct { ProposedBlockInfo BlockInfo diff --git a/eth/bfter/bft.go b/eth/bfter/bft.go new file mode 100644 index 0000000000..66324bf481 --- /dev/null +++ b/eth/bfter/bft.go @@ -0,0 +1,156 @@ +package bfter + +import ( + "github.com/XinFinOrg/XDPoSChain/consensus" + "github.com/XinFinOrg/XDPoSChain/consensus/XDPoS" + "github.com/XinFinOrg/XDPoSChain/consensus/XDPoS/utils" + "github.com/XinFinOrg/XDPoSChain/log" + lru "github.com/hashicorp/golang-lru" +) + +const ( + messageLimit = 1024 +) + +//Define Verify Group functions +type VerifySyncInfoFn func(utils.SyncInfo) error +type VerifyVoteFn func(utils.Vote) error +type VerifyTimeoutFn func(utils.Timeout) error + +//Define Boradcast Group functions +type broadcastVoteFn func(utils.Vote) +type broadcastTimeoutFn func(utils.Timeout) +type broadcastSyncInfoFn func(utils.SyncInfo) + +type Bfter struct { + broadcastCh chan interface{} + quit chan struct{} + consensus ConsensusFns + broadcast BroadcastFns + + // Message Cache + knownVotes *lru.ARCCache + knownSyncInfos *lru.ARCCache + knownTimeouts *lru.ARCCache +} + +type ConsensusFns struct { + verifySyncInfo VerifySyncInfoFn + verifyVote VerifyVoteFn + verifyTimeout VerifyTimeoutFn +} + +type BroadcastFns struct { + Vote broadcastVoteFn + Timeout broadcastTimeoutFn + SyncInfo broadcastSyncInfoFn +} + +func New(broadcasts BroadcastFns) *Bfter { + knownVotes, _ := lru.NewARC(messageLimit) + knownSyncInfos, _ := lru.NewARC(messageLimit) + knownTimeouts, _ := lru.NewARC(messageLimit) + return &Bfter{ + quit: make(chan struct{}), + broadcastCh: make(chan interface{}), + broadcast: broadcasts, + knownVotes: knownVotes, + knownSyncInfos: knownSyncInfos, + knownTimeouts: knownTimeouts, + } +} + +func (b *Bfter) SetConsensusFuns(engine consensus.Engine) { + e := engine.(*XDPoS.XDPoS) + b.broadcastCh = e.EngineV2.BroadcastCh + b.consensus = ConsensusFns{ + verifySyncInfo: e.VerifySyncInfo, + verifyVote: e.VerifyVote, + verifyTimeout: e.VerifyTimeout, + } +} + +// TODO: rename +func (b *Bfter) Vote(vote utils.Vote) { + log.Trace("Receive Vote", "vote", vote) + + if b.knownVotes.Contains(vote.Hash()) { + log.Trace("Discarded vote, known vote", "Signature", vote.Signature, "hash", vote.Hash()) + return + } + + err := b.consensus.verifyVote(vote) + if err != nil { + log.Error("Verify BFT Vote", "error", err) + return + } + + b.knownVotes.Add(vote.Hash(), true) + b.broadcastCh <- vote +} + +func (b *Bfter) Timeout(timeout utils.Timeout) { + log.Trace("Receive Timeout", "timeout", timeout) + + if b.knownVotes.Contains(timeout.Hash()) { + log.Trace("Discarded Timeout, known Timeout", "Signature", timeout.Signature, "hash", timeout.Hash(), "round", timeout.Round) + return + } + + err := b.consensus.verifyTimeout(timeout) + if err != nil { + log.Error("Verify BFT Timeout", "error", err) + return + } + + b.knownTimeouts.Add(timeout.Hash(), true) + b.broadcastCh <- timeout +} + +func (b *Bfter) SyncInfo(syncInfo utils.SyncInfo) { + log.Trace("Receive SyncInfo", "syncInfo", syncInfo) + + if b.knownVotes.Contains(syncInfo.Hash()) { + log.Trace("Discarded SyncInfo, known SyncInfo", "hash", syncInfo.Hash()) + return + } + + err := b.consensus.verifySyncInfo(syncInfo) + if err != nil { + log.Error("Verify BFT SyncInfo", "error", err) + return + } + + b.knownSyncInfos.Add(syncInfo.Hash(), true) + b.broadcastCh <- syncInfo +} + +// Start Bft receiver +func (b *Bfter) Start() { + go b.loop() +} + +func (b *Bfter) Stop() { + close(b.quit) +} + +func (b *Bfter) loop() { + + for { + select { + case <-b.quit: + return + case obj := <-b.broadcastCh: + switch v := obj.(type) { + case utils.Vote: + go b.broadcast.Vote(v) + case utils.Timeout: + go b.broadcast.Timeout(v) + case utils.SyncInfo: + go b.broadcast.SyncInfo(v) + default: + log.Error("Unknown message type received, value: %v", v) + } + } + } +} diff --git a/eth/bfter/bft_test.go b/eth/bfter/bft_test.go new file mode 100644 index 0000000000..3efeb6d9e8 --- /dev/null +++ b/eth/bfter/bft_test.go @@ -0,0 +1,118 @@ +package bfter + +import ( + "fmt" + "sync/atomic" + "testing" + "time" + + "github.com/XinFinOrg/XDPoSChain/consensus/XDPoS" + "github.com/XinFinOrg/XDPoSChain/consensus/XDPoS/engines/engine_v2" + "github.com/XinFinOrg/XDPoSChain/consensus/XDPoS/utils" +) + +// make different votes based on Signatures +func makeVotes(n int) []utils.Vote { + var votes []utils.Vote + for i := 0; i < n; i++ { + votes = append(votes, utils.Vote{Signature: []byte{byte(i)}}) + } + return votes +} + +// bfterTester is a test simulator for mocking out bfter worker. +type bfterTester struct { + bfter *Bfter +} + +// newTester creates a new bft fetcher test mocker. +func newTester() *bfterTester { + testConsensus := &XDPoS.XDPoS{EngineV2: &engine_v2.XDPoS_v2{}} + broadcasts := BroadcastFns{} + + tester := &bfterTester{} + tester.bfter = New(broadcasts) + tester.bfter.SetConsensusFuns(testConsensus) + tester.bfter.broadcastCh = make(chan interface{}) + tester.bfter.Start() + + return tester +} + +// Tests that a bfter accepts vote and process verfiy and broadcast +func TestSequentialVotes(t *testing.T) { + tester := newTester() + verifyCounter := uint32(0) + broadcastCounter := uint32(0) + targetVotes := 10 + + tester.bfter.consensus.verifyVote = func(vote utils.Vote) error { + atomic.AddUint32(&verifyCounter, 1) + return nil + } + tester.bfter.broadcast.Vote = func(utils.Vote) { + atomic.AddUint32(&broadcastCounter, 1) + } + + votes := makeVotes(targetVotes) + for _, vote := range votes { + tester.bfter.Vote(vote) + } + + time.Sleep(50 * time.Millisecond) + if int(verifyCounter) != targetVotes || int(broadcastCounter) != targetVotes { + t.Fatalf("count mismatch: have %v on verify and have %v on broadcast, want %v", verifyCounter, broadcastCounter, targetVotes) + } +} + +// Tests that vote already being retrieved will not be duplicated. +func TestDuplicateVotes(t *testing.T) { + tester := newTester() + verifyCounter := uint32(0) + broadcastCounter := uint32(0) + targetVotes := 1 + + tester.bfter.consensus.verifyVote = func(vote utils.Vote) error { + atomic.AddUint32(&verifyCounter, 1) + return nil + } + tester.bfter.broadcast.Vote = func(utils.Vote) { + atomic.AddUint32(&broadcastCounter, 1) + } + + vote := utils.Vote{} + + // send twice + tester.bfter.Vote(vote) + tester.bfter.Vote(vote) + + time.Sleep(50 * time.Millisecond) + if int(verifyCounter) != targetVotes || int(broadcastCounter) != targetVotes { + t.Fatalf("count mismatch: have %v on verify and have %v on broadcast, want %v", verifyCounter, broadcastCounter, targetVotes) + } +} + +// Test that avoid boardcast if there is bad vote +func TestNotBoardcastInvalidVote(t *testing.T) { + tester := newTester() + broadcastCounter := uint32(0) + targetVotes := 0 + + tester.bfter.consensus.verifyVote = func(vote utils.Vote) error { + return fmt.Errorf("This is invalid vote") + } + tester.bfter.broadcast.Vote = func(utils.Vote) { + atomic.AddUint32(&broadcastCounter, 1) + } + + vote := utils.Vote{} + tester.bfter.Vote(vote) + + time.Sleep(50 * time.Millisecond) + if int(broadcastCounter) != targetVotes { + t.Fatalf("count mismatch: have %v on broadcast, want %v", broadcastCounter, targetVotes) + } +} + +// TODO: SyncInfo and Timeout Test, should be same as Vote. +// Once all test on vote covered, then duplicate to others diff --git a/eth/handler.go b/eth/handler.go index 670c44410a..8d0155f71c 100644 --- a/eth/handler.go +++ b/eth/handler.go @@ -29,9 +29,11 @@ import ( "github.com/XinFinOrg/XDPoSChain/common" "github.com/XinFinOrg/XDPoSChain/consensus" + "github.com/XinFinOrg/XDPoSChain/consensus/XDPoS/utils" "github.com/XinFinOrg/XDPoSChain/consensus/misc" "github.com/XinFinOrg/XDPoSChain/core" "github.com/XinFinOrg/XDPoSChain/core/types" + "github.com/XinFinOrg/XDPoSChain/eth/bfter" "github.com/XinFinOrg/XDPoSChain/eth/downloader" "github.com/XinFinOrg/XDPoSChain/eth/fetcher" "github.com/XinFinOrg/XDPoSChain/ethdb" @@ -80,6 +82,7 @@ type ProtocolManager struct { downloader *downloader.Downloader fetcher *fetcher.Fetcher peers *peerSet + bfter *bfter.Bfter SubProtocols []p2p.Protocol @@ -218,6 +221,16 @@ func NewProtocolManager(config *params.ChainConfig, mode downloader.SyncMode, ne return manager.blockchain.PrepareBlock(block) } manager.fetcher = fetcher.New(blockchain.GetBlockByHash, validator, manager.BroadcastBlock, heighter, inserter, prepare, manager.removePeer) + //Define bft function + broadcasts := bfter.BroadcastFns{ + Vote: manager.BroadcastVote, + Timeout: manager.BroadcastTimeout, + SyncInfo: manager.BroadcastSyncInfo, + } + manager.bfter = bfter.New(broadcasts) + if blockchain.Config().XDPoS != nil { + manager.bfter.SetConsensusFuns(engine) + } return manager, nil } @@ -253,7 +266,6 @@ func (pm *ProtocolManager) Start(maxPeers int) { // broadcast transactions pm.txCh = make(chan core.TxPreEvent, txChanSize) pm.txSub = pm.txpool.SubscribeTxPreEvent(pm.txCh) - pm.orderTxCh = make(chan core.OrderTxPreEvent, txChanSize) if pm.orderpool != nil { pm.orderTxSub = pm.orderpool.SubscribeTxPreEvent(pm.orderTxCh) @@ -808,6 +820,31 @@ func (pm *ProtocolManager) handleMsg(p *peer) error { if pm.lendingpool != nil { pm.lendingpool.AddRemotes(txs) } + case msg.Code == VoteMsg: + var vote utils.Vote + if err := msg.Decode(&vote); err != nil { + return errResp(ErrDecode, "msg %v: %v", msg, err) + } + // Mark the peer as owning the vote and process it + p.MarkVote(vote) + pm.bfter.Vote(vote) + case msg.Code == TimeoutMsg: + var timeout utils.Timeout + if err := msg.Decode(&timeout); err != nil { + return errResp(ErrDecode, "msg %v: %v", msg, err) + } + + // Mark the peer as owning the timeout and process it + p.MarkTimeout(timeout) + pm.bfter.Timeout(timeout) + case msg.Code == SyncInfoMsg: + var syncInfo utils.SyncInfo + if err := msg.Decode(&syncInfo); err != nil { + return errResp(ErrDecode, "msg %v: %v", msg, err) + } + // Mark the peer as owning the syncInfo and process it + p.MarkSyncInfo(syncInfo) + pm.bfter.SyncInfo(syncInfo) default: return errResp(ErrInvalidMsgCode, "%v", msg.Code) @@ -859,6 +896,42 @@ func (pm *ProtocolManager) BroadcastTx(hash common.Hash, tx *types.Transaction) log.Trace("Broadcast transaction", "hash", hash, "recipients", len(peers)) } +// BroadcastVote will propagate a Vote to all peers which are not known to +// already have the given vote. +func (pm *ProtocolManager) BroadcastVote(vote utils.Vote) { + //hash := Vote.Hash() + hash := common.Hash{} + peers := pm.peers.PeersWithoutVote(hash) + for _, peer := range peers { + peer.SendVote(vote) + } + log.Trace("Propagated Vote", "hash", hash, "recipients", len(peers)) +} + +// BroadcastTimeout will propagate a Timeout to all peers which are not known to +// already have the given timeout. +func (pm *ProtocolManager) BroadcastTimeout(timeout utils.Timeout) { + //hash := timeout.Hash() + hash := common.Hash{} + peers := pm.peers.PeersWithoutTimeout(hash) + for _, peer := range peers { + peer.SendTimeout(timeout) + } + log.Trace("Propagated Timeout", "hash", hash, "recipients", len(peers)) +} + +// BroadcastSyncInfo will propagate a SyncInfo to all peers which are not known to +// already have the given SyncInfo. +func (pm *ProtocolManager) BroadcastSyncInfo(syncInfo utils.SyncInfo) { + //hash := syncInfo.Hash() + hash := common.Hash{} + peers := pm.peers.PeersWithoutSyncInfo(hash) + for _, peer := range peers { + peer.SendSyncInfo(syncInfo) + } + log.Trace("Propagated SyncInfo", "hash", hash, "recipients", len(peers)) +} + // OrderBroadcastTx will propagate a transaction to all peers which are not known to // already have the given transaction. func (pm *ProtocolManager) OrderBroadcastTx(hash common.Hash, tx *types.OrderTransaction) { diff --git a/eth/handler_test.go b/eth/handler_test.go index 9777a65e60..14dbcb3b81 100644 --- a/eth/handler_test.go +++ b/eth/handler_test.go @@ -17,13 +17,14 @@ package eth import ( - "github.com/XinFinOrg/XDPoSChain/core/rawdb" "math" "math/big" "math/rand" "testing" "time" + "github.com/XinFinOrg/XDPoSChain/core/rawdb" + "github.com/XinFinOrg/XDPoSChain/common" "github.com/XinFinOrg/XDPoSChain/consensus/ethash" "github.com/XinFinOrg/XDPoSChain/core" diff --git a/eth/helper_test.go b/eth/helper_test.go index 8663949423..b3e489bd8b 100644 --- a/eth/helper_test.go +++ b/eth/helper_test.go @@ -22,12 +22,13 @@ package eth import ( "crypto/ecdsa" "crypto/rand" - "github.com/XinFinOrg/XDPoSChain/core/rawdb" "math/big" "sort" "sync" "testing" + "github.com/XinFinOrg/XDPoSChain/core/rawdb" + "github.com/XinFinOrg/XDPoSChain/common" "github.com/XinFinOrg/XDPoSChain/consensus/ethash" "github.com/XinFinOrg/XDPoSChain/core" diff --git a/eth/peer.go b/eth/peer.go index 283a1bfc56..96ceacb25f 100644 --- a/eth/peer.go +++ b/eth/peer.go @@ -41,6 +41,9 @@ const ( maxKnownOrderTxs = 32768 // Maximum transactions hashes to keep in the known list (prevent DOS) maxKnownLendingTxs = 32768 // Maximum transactions hashes to keep in the known list (prevent DOS) maxKnownBlocks = 1024 // Maximum block hashes to keep in the known list (prevent DOS) + maxKnownVote = 1024 // Maximum transactions hashes to keep in the known list (prevent DOS) + maxKnownTimeout = 1024 // Maximum transactions hashes to keep in the known list (prevent DOS) + maxKnownSyncInfo = 1024 // Maximum transactions hashes to keep in the known list (prevent DOS) handshakeTimeout = 5 * time.Second ) @@ -66,10 +69,15 @@ type peer struct { td *big.Int lock sync.RWMutex - knownTxs mapset.Set // Set of transaction hashes known to be known by this peer - knownBlocks mapset.Set // Set of block hashes known to be known by this peer + knownTxs mapset.Set // Set of transaction hashes known to be known by this peer + knownBlocks mapset.Set // Set of block hashes known to be known by this peer + knownOrderTxs mapset.Set // Set of order transaction hashes known to be known by this peer knownLendingTxs mapset.Set // Set of lending transaction hashes known to be known by this peer + + knownVote mapset.Set // Set of BFT Vote known to be known by this peer + knownTimeout mapset.Set // Set of BFT timeout known to be known by this peer + knownSyncInfo mapset.Set // Set of BFT Sync Info known to be known by this peer` } func newPeer(version int, p *p2p.Peer, rw p2p.MsgReadWriter) *peer { @@ -157,6 +165,36 @@ func (p *peer) MarkLendingTransaction(hash common.Hash) { p.knownLendingTxs.Add(hash) } +// MarkVote marks a vote as known for the peer, ensuring that it +// will never be propagated to this particular peer. +func (p *peer) MarkVote(hash interface{}) { + // If we reached the memory allowance, drop a previously known transaction hash + for p.knownVote.Cardinality() >= maxKnownVote { + p.knownVote.Pop() + } + p.knownVote.Add(hash) +} + +// MarkTimeout marks a timeout as known for the peer, ensuring that it +// will never be propagated to this particular peer. +func (p *peer) MarkTimeout(hash interface{}) { + // If we reached the memory allowance, drop a previously known transaction hash + for p.knownTimeout.Cardinality() >= maxKnownTimeout { + p.knownTimeout.Pop() + } + p.knownTimeout.Add(hash) +} + +// MarkSyncInfo marks a syncInfo as known for the peer, ensuring that it +// will never be propagated to this particular peer. +func (p *peer) MarkSyncInfo(hash interface{}) { + // If we reached the memory allowance, drop a previously known transaction hash + for p.knownSyncInfo.Cardinality() >= maxKnownSyncInfo { + p.knownSyncInfo.Pop() + } + p.knownSyncInfo.Add(hash) +} + // SendTransactions sends transactions to the peer and includes the hashes // in its transaction hash set for future reference. func (p *peer) SendTransactions(txs types.Transactions) error { @@ -256,6 +294,49 @@ func (p *peer) SendReceiptsRLP(receipts []rlp.RawValue) error { } } +func (p *peer) SendVote(vote interface{}) error { + p.knownVote.Add(vote) + if p.pairRw != nil { + return p2p.Send(p.pairRw, VoteMsg, vote) + } else { + return p2p.Send(p.rw, VoteMsg, vote) + } +} + +/* +func (p *peer) AsyncSendVote() { + +} +*/ +func (p *peer) SendTimeout(timeout interface{}) error { + p.knownTimeout.Add(timeout) + if p.pairRw != nil { + return p2p.Send(p.pairRw, TimeoutMsg, timeout) + } else { + return p2p.Send(p.rw, TimeoutMsg, timeout) + } +} + +/* +func (p *peer) AsyncSendTimeout() { + +} +*/ +func (p *peer) SendSyncInfo(syncInfo interface{}) error { + p.knownSyncInfo.Add(syncInfo) + if p.pairRw != nil { + return p2p.Send(p.pairRw, SyncInfoMsg, syncInfo) + } else { + return p2p.Send(p.rw, SyncInfoMsg, syncInfo) + } +} + +/* +func (p *peer) AsyncSendSyncInfo() { + +} +*/ + // RequestOneHeader is a wrapper around the header query functions to fetch a // single header. It is used solely by the fetcher. func (p *peer) RequestOneHeader(hash common.Hash) error { @@ -486,6 +567,51 @@ func (ps *peerSet) PeersWithoutTx(hash common.Hash) []*peer { return list } +// PeersWithoutVote retrieves a list of peers that do not have a given block in +// their set of known hashes. +func (ps *peerSet) PeersWithoutVote(hash common.Hash) []*peer { + ps.lock.RLock() + defer ps.lock.RUnlock() + + list := make([]*peer, 0, len(ps.peers)) + for _, p := range ps.peers { + if !p.knownVote.Contains(hash) { + list = append(list, p) + } + } + return list +} + +// PeersWithoutTimeout retrieves a list of peers that do not have a given block in +// their set of known hashes. +func (ps *peerSet) PeersWithoutTimeout(hash common.Hash) []*peer { + ps.lock.RLock() + defer ps.lock.RUnlock() + + list := make([]*peer, 0, len(ps.peers)) + for _, p := range ps.peers { + if !p.knownTimeout.Contains(hash) { + list = append(list, p) + } + } + return list +} + +// PeersWithoutSyncInfo retrieves a list of peers that do not have a given block in +// their set of known hashes. +func (ps *peerSet) PeersWithoutSyncInfo(hash common.Hash) []*peer { + ps.lock.RLock() + defer ps.lock.RUnlock() + + list := make([]*peer, 0, len(ps.peers)) + for _, p := range ps.peers { + if !p.knownSyncInfo.Contains(hash) { + list = append(list, p) + } + } + return list +} + // PeersWithoutTx retrieves a list of peers that do not have a given transaction // in their set of known hashes. func (ps *peerSet) OrderPeersWithoutTx(hash common.Hash) []*peer { diff --git a/eth/protocol.go b/eth/protocol.go index 1060870304..6495408f4f 100644 --- a/eth/protocol.go +++ b/eth/protocol.go @@ -63,6 +63,11 @@ const ( NodeDataMsg = 0x0e GetReceiptsMsg = 0x0f ReceiptsMsg = 0x10 + + // Protocol messages belonging to eth/100 + VoteMsg = 0xe0 + TimeoutMsg = 0xe1 + SyncInfoMsg = 0xe2 ) type errCode int diff --git a/eth/sync.go b/eth/sync.go index 139a3b6f45..b0f2c74fe9 100644 --- a/eth/sync.go +++ b/eth/sync.go @@ -134,7 +134,9 @@ func (pm *ProtocolManager) txsyncLoop() { func (pm *ProtocolManager) syncer() { // Start and ensure cleanup of sync mechanisms pm.fetcher.Start() + pm.bfter.Start() defer pm.fetcher.Stop() + defer pm.bfter.Stop() defer pm.downloader.Terminate() // Wait for different events to fire synchronisation operations