refactor RW & pairRW connect for block , transaction

This commit is contained in:
parmarrushabh 2018-11-14 10:40:05 +05:30
parent f2332e0d11
commit 52ae30023a
8 changed files with 80 additions and 92 deletions

View file

@ -188,17 +188,16 @@ func (config *TxPoolConfig) sanitize() TxPoolConfig {
// current state) and future transactions. Transactions move between those
// two states over time as they are received and processed.
type TxPool struct {
config TxPoolConfig
chainconfig *params.ChainConfig
chain blockChain
gasPrice *big.Int
txFeed event.Feed
specialTxFeed event.Feed
scope event.SubscriptionScope
chainHeadCh chan ChainHeadEvent
chainHeadSub event.Subscription
signer types.Signer
mu sync.RWMutex
config TxPoolConfig
chainconfig *params.ChainConfig
chain blockChain
gasPrice *big.Int
txFeed event.Feed
scope event.SubscriptionScope
chainHeadCh chan ChainHeadEvent
chainHeadSub event.Subscription
signer types.Signer
mu sync.RWMutex
currentState *state.StateDB // Current state in the blockchain head
pendingState *state.ManagedState // Pending state tracking virtual nonces
@ -458,12 +457,6 @@ func (pool *TxPool) SubscribeTxPreEvent(ch chan<- TxPreEvent) event.Subscription
return pool.scope.Track(pool.txFeed.Subscribe(ch))
}
// SubscribeSpecialTxPreEvent registers a subscription of TxPreEvent and
// starts sending event to the given channel.
func (pool *TxPool) SubscribeSpecialTxPreEvent(ch chan<- TxPreEvent) event.Subscription {
return pool.scope.Track(pool.specialTxFeed.Subscribe(ch))
}
// GasPrice returns the current gas price enforced by the transaction pool.
func (pool *TxPool) GasPrice() *big.Int {
pool.mu.RLock()
@ -830,8 +823,7 @@ func (pool *TxPool) promoteSpecialTx(addr common.Address, tx *types.Transaction)
broadcastTxs = append(broadcastTxs, tx)
go func() {
for _, btx := range broadcastTxs {
pool.specialTxFeed.Send(TxPreEvent{btx})
log.Trace("Pooled new special transaction", "hash", tx.Hash(), "from", addr, "to", tx.To(), "nonce", tx.Nonce())
pool.txFeed.Send(TxPreEvent{btx})
}
}()
return true, nil

View file

@ -144,7 +144,7 @@ func TestTransactionPriceNonceSort(t *testing.T) {
}
}
// Sort the transactions and cross check the nonce ordering
txset, _ := NewTransactionsByPriceAndNonce(signer, groups,nil)
txset, _ := NewTransactionsByPriceAndNonce(signer, groups, nil)
txs := Transactions{}
for tx := txset.Peek(); tx != nil; tx = txset.Peek() {

View file

@ -704,8 +704,8 @@ func (f *Fetcher) insert(peer string, block *types.Block) {
// If import succeeded, broadcast the block
propAnnounceOutTimer.UpdateSince(block.ReceivedAt)
go f.broadcastBlock(block, true)
go f.broadcastBlock(block, false)
}()
//go f.broadcastBlock(block, false)
}()
}
// forgetHash removes all traces of a block announcement from the fetcher's

View file

@ -82,8 +82,6 @@ type ProtocolManager struct {
eventMux *event.TypeMux
txCh chan core.TxPreEvent
txSub event.Subscription
specialTxCh chan core.TxPreEvent
specialTxSub event.Subscription
minedBlockSub *event.TypeMuxSubscription
// channels for fetcher, syncer, txsyncLoop
@ -209,11 +207,6 @@ func (pm *ProtocolManager) Start(maxPeers int) {
pm.txSub = pm.txpool.SubscribeTxPreEvent(pm.txCh)
go pm.txBroadcastLoop()
// broadcast special transactions
pm.specialTxCh = make(chan core.TxPreEvent, txChanSize)
pm.specialTxSub = pm.txpool.SubscribeSpecialTxPreEvent(pm.specialTxCh)
go pm.specialTxBroadcastLoop()
// broadcast mined blocks
pm.minedBlockSub = pm.eventMux.Subscribe(core.NewMinedBlockEvent{})
go pm.minedBroadcastLoop()
@ -227,7 +220,6 @@ func (pm *ProtocolManager) Stop() {
log.Info("Stopping Ethereum protocol")
pm.txSub.Unsubscribe() // quits txBroadcastLoop
pm.specialTxSub.Unsubscribe() // quits specialTxBroadcastLoop
pm.minedBlockSub.Unsubscribe() // quits blockBroadcastLoop
// Quit the sync loop.
@ -735,24 +727,14 @@ func (pm *ProtocolManager) BroadcastTx(hash common.Hash, tx *types.Transaction)
log.Trace("Broadcast transaction", "hash", hash, "recipients", len(peers))
}
func (pm *ProtocolManager) BroadcastSpecialTx(hash common.Hash, tx *types.Transaction) {
// Broadcast transaction to a batch of peers not knowing about it
peers := pm.peers.PeersWithoutTx(hash)
//FIXME include this again: peers = peers[:int(math.Sqrt(float64(len(peers))))]
for _, peer := range peers {
peer.SendSpecialTransactions(tx)
}
log.Trace("Broadcast special transaction", "hash", hash, "recipients", len(peers))
}
// Mined broadcast loop
func (self *ProtocolManager) minedBroadcastLoop() {
// automatically stops if unsubscribe
for obj := range self.minedBlockSub.Chan() {
switch ev := obj.Data.(type) {
case core.NewMinedBlockEvent:
self.BroadcastBlock(ev.Block, true) // First propagate block to peers
self.BroadcastBlock(ev.Block, false) // Only then announce to the rest
self.BroadcastBlock(ev.Block, true) // First propagate block to peers
//self.BroadcastBlock(ev.Block, false) // Only then announce to the rest
}
}
}
@ -770,19 +752,6 @@ func (self *ProtocolManager) txBroadcastLoop() {
}
}
func (self *ProtocolManager) specialTxBroadcastLoop() {
for {
select {
case event := <-self.specialTxCh:
self.BroadcastSpecialTx(event.Tx.Hash(), event.Tx)
// Err() channel will be closed when unsubscribing.
case <-self.specialTxSub.Err():
return
}
}
}
// NodeInfo represents a short summary of the Ethereum sub-protocol metadata
// known about the host peer.
type NodeInfo struct {

View file

@ -128,10 +128,6 @@ func (p *testTxPool) SubscribeTxPreEvent(ch chan<- core.TxPreEvent) event.Subscr
return p.txFeed.Subscribe(ch)
}
func (p *testTxPool) SubscribeSpecialTxPreEvent(ch chan<- core.TxPreEvent) event.Subscription {
return p.txFeed.Subscribe(ch)
}
// newTestTransaction create a new dummy transaction.
func newTestTransaction(from *ecdsa.PrivateKey, nonce uint64, datasize int) *types.Transaction {
tx := types.NewTransaction(nonce, common.Address{}, big.NewInt(0), 100000, big.NewInt(0), make([]byte, datasize))

View file

@ -25,7 +25,6 @@ import (
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/p2p"
"github.com/ethereum/go-ethereum/rlp"
"gopkg.in/fatih/set.v0"
@ -141,15 +140,6 @@ func (p *peer) SendTransactions(txs types.Transactions) error {
return p2p.Send(p.rw, TxMsg, txs)
}
func (p *peer) SendSpecialTransactions(tx *types.Transaction) error {
p.knownTxs.Add(tx.Hash())
if p.pairRw != nil {
return p2p.Send(p.pairRw, TxMsg, types.Transactions{tx})
} else {
return p2p.Send(p.rw, TxMsg, types.Transactions{tx})
}
}
// SendNewBlockHashes announces the availability of a number of blocks through
// a hash notification.
func (p *peer) SendNewBlockHashes(hashes []common.Hash, numbers []uint64) error {
@ -168,81 +158,123 @@ func (p *peer) SendNewBlockHashes(hashes []common.Hash, numbers []uint64) error
func (p *peer) SendNewBlock(block *types.Block, td *big.Int) error {
p.knownBlocks.Add(block.Hash())
if p.pairRw != nil {
log.Trace("p2p send new block to the pairRw connection", "p", p, "number", block.NumberU64())
return p2p.Send(p.pairRw, NewBlockMsg, []interface{}{block, td})
} else {
return p2p.Send(p.rw, NewBlockMsg, []interface{}{block, td})
}
}
// SendBlockHeaders sends a batch of block headers to the remote peer.
func (p *peer) SendBlockHeaders(headers []*types.Header) error {
return p2p.Send(p.rw, BlockHeadersMsg, headers)
if p.pairRw != nil {
return p2p.Send(p.pairRw, BlockHeadersMsg, headers)
} else {
return p2p.Send(p.rw, BlockHeadersMsg, headers)
}
}
// SendBlockBodies sends a batch of block contents to the remote peer.
func (p *peer) SendBlockBodies(bodies []*blockBody) error {
return p2p.Send(p.rw, BlockBodiesMsg, blockBodiesData(bodies))
if p.pairRw != nil {
return p2p.Send(p.pairRw, BlockBodiesMsg, blockBodiesData(bodies))
} else {
return p2p.Send(p.rw, BlockBodiesMsg, blockBodiesData(bodies))
}
}
// SendBlockBodiesRLP sends a batch of block contents to the remote peer from
// an already RLP encoded format.
func (p *peer) SendBlockBodiesRLP(bodies []rlp.RawValue) error {
return p2p.Send(p.rw, BlockBodiesMsg, bodies)
if p.pairRw != nil {
return p2p.Send(p.pairRw, BlockBodiesMsg, bodies)
} else {
return p2p.Send(p.rw, BlockBodiesMsg, bodies)
}
}
// SendNodeDataRLP sends a batch of arbitrary internal data, corresponding to the
// hashes requested.
func (p *peer) SendNodeData(data [][]byte) error {
return p2p.Send(p.rw, NodeDataMsg, data)
if p.pairRw != nil {
return p2p.Send(p.pairRw, NodeDataMsg, data)
} else {
return p2p.Send(p.rw, NodeDataMsg, data)
}
}
// SendReceiptsRLP sends a batch of transaction receipts, corresponding to the
// ones requested from an already RLP encoded format.
func (p *peer) SendReceiptsRLP(receipts []rlp.RawValue) error {
return p2p.Send(p.rw, ReceiptsMsg, receipts)
if p.pairRw != nil {
return p2p.Send(p.pairRw, ReceiptsMsg, receipts)
} else {
return p2p.Send(p.rw, ReceiptsMsg, receipts)
}
}
// RequestOneHeader is a wrapper around the header query functions to fetch a
// single header. It is used solely by the fetcher.
func (p *peer) RequestOneHeader(hash common.Hash) error {
p.Log().Debug("Fetching single header", "hash", hash)
return p2p.Send(p.rw, GetBlockHeadersMsg, &getBlockHeadersData{Origin: hashOrNumber{Hash: hash}, Amount: uint64(1), Skip: uint64(0), Reverse: false})
if p.pairRw != nil {
return p2p.Send(p.pairRw, GetBlockHeadersMsg, &getBlockHeadersData{Origin: hashOrNumber{Hash: hash}, Amount: uint64(1), Skip: uint64(0), Reverse: false})
} else {
return p2p.Send(p.rw, GetBlockHeadersMsg, &getBlockHeadersData{Origin: hashOrNumber{Hash: hash}, Amount: uint64(1), Skip: uint64(0), Reverse: false})
}
}
// RequestHeadersByHash fetches a batch of blocks' headers corresponding to the
// specified header query, based on the hash of an origin block.
func (p *peer) RequestHeadersByHash(origin common.Hash, amount int, skip int, reverse bool) error {
p.Log().Debug("Fetching batch of headers", "count", amount, "fromhash", origin, "skip", skip, "reverse", reverse)
return p2p.Send(p.rw, GetBlockHeadersMsg, &getBlockHeadersData{Origin: hashOrNumber{Hash: origin}, Amount: uint64(amount), Skip: uint64(skip), Reverse: reverse})
if p.pairRw != nil {
return p2p.Send(p.pairRw, GetBlockHeadersMsg, &getBlockHeadersData{Origin: hashOrNumber{Hash: origin}, Amount: uint64(amount), Skip: uint64(skip), Reverse: reverse})
} else {
return p2p.Send(p.rw, GetBlockHeadersMsg, &getBlockHeadersData{Origin: hashOrNumber{Hash: origin}, Amount: uint64(amount), Skip: uint64(skip), Reverse: reverse})
}
}
// RequestHeadersByNumber fetches a batch of blocks' headers corresponding to the
// specified header query, based on the number of an origin block.
func (p *peer) RequestHeadersByNumber(origin uint64, amount int, skip int, reverse bool) error {
p.Log().Debug("Fetching batch of headers", "count", amount, "fromnum", origin, "skip", skip, "reverse", reverse)
return p2p.Send(p.rw, GetBlockHeadersMsg, &getBlockHeadersData{Origin: hashOrNumber{Number: origin}, Amount: uint64(amount), Skip: uint64(skip), Reverse: reverse})
if p.pairRw != nil {
return p2p.Send(p.pairRw, GetBlockHeadersMsg, &getBlockHeadersData{Origin: hashOrNumber{Number: origin}, Amount: uint64(amount), Skip: uint64(skip), Reverse: reverse})
} else {
return p2p.Send(p.rw, GetBlockHeadersMsg, &getBlockHeadersData{Origin: hashOrNumber{Number: origin}, Amount: uint64(amount), Skip: uint64(skip), Reverse: reverse})
}
}
// RequestBodies fetches a batch of blocks' bodies corresponding to the hashes
// specified.
func (p *peer) RequestBodies(hashes []common.Hash) error {
p.Log().Debug("Fetching batch of block bodies", "count", len(hashes))
return p2p.Send(p.rw, GetBlockBodiesMsg, hashes)
if p.pairRw != nil {
return p2p.Send(p.pairRw, GetBlockBodiesMsg, hashes)
} else {
return p2p.Send(p.rw, GetBlockBodiesMsg, hashes)
}
}
// RequestNodeData fetches a batch of arbitrary data from a node's known state
// data, corresponding to the specified hashes.
func (p *peer) RequestNodeData(hashes []common.Hash) error {
p.Log().Debug("Fetching batch of state data", "count", len(hashes))
return p2p.Send(p.rw, GetNodeDataMsg, hashes)
if p.pairRw != nil {
return p2p.Send(p.pairRw, GetNodeDataMsg, hashes)
} else {
return p2p.Send(p.rw, GetNodeDataMsg, hashes)
}
}
// RequestReceipts fetches a batch of transaction receipts from a remote node.
func (p *peer) RequestReceipts(hashes []common.Hash) error {
p.Log().Debug("Fetching batch of receipts", "count", len(hashes))
return p2p.Send(p.rw, GetReceiptsMsg, hashes)
if p.pairRw != nil {
return p2p.Send(p.pairRw, GetReceiptsMsg, hashes)
} else {
return p2p.Send(p.rw, GetReceiptsMsg, hashes)
}
}
// Handshake executes the eth protocol handshake, negotiating version number,

View file

@ -106,7 +106,6 @@ type txPool interface {
// SubscribeTxPreEvent should return an event subscription of
// TxPreEvent and send events to the given channel.
SubscribeTxPreEvent(chan<- core.TxPreEvent) event.Subscription
SubscribeSpecialTxPreEvent(chan<- core.TxPreEvent) event.Subscription
}
// statusData is the network packet for the status message.

View file

@ -206,13 +206,13 @@ func (pm *ProtocolManager) synchronise(peer *peer) {
atomic.StoreUint32(&pm.fastSync, 0)
}
atomic.StoreUint32(&pm.acceptTxs, 1) // Mark initial sync done
if head := pm.blockchain.CurrentBlock(); head.NumberU64() > 0 {
// We've completed a sync cycle, notify all peers of new state. This path is
// essential in star-topology networks where a gateway node needs to notify
// all its out-of-date peers of the availability of a new block. This failure
// scenario will most often crop up in private and hackathon networks with
// degenerate connectivity, but it should be healthy for the mainnet too to
// more reliably update peers or the local TD state.
go pm.BroadcastBlock(head, false)
}
//if head := pm.blockchain.CurrentBlock(); head.NumberU64() > 0 {
// // We've completed a sync cycle, notify all peers of new state. This path is
// // essential in star-topology networks where a gateway node needs to notify
// // all its out-of-date peers of the availability of a new block. This failure
// // scenario will most often crop up in private and hackathon networks with
// // degenerate connectivity, but it should be healthy for the mainnet too to
// // more reliably update peers or the local TD state.
// go pm.BroadcastBlock(head, false)
//}
}