diff --git a/core/tx_pool.go b/core/tx_pool.go index fc78d5e8b7..aee9e51975 100644 --- a/core/tx_pool.go +++ b/core/tx_pool.go @@ -188,17 +188,16 @@ func (config *TxPoolConfig) sanitize() TxPoolConfig { // current state) and future transactions. Transactions move between those // two states over time as they are received and processed. type TxPool struct { - config TxPoolConfig - chainconfig *params.ChainConfig - chain blockChain - gasPrice *big.Int - txFeed event.Feed - specialTxFeed event.Feed - scope event.SubscriptionScope - chainHeadCh chan ChainHeadEvent - chainHeadSub event.Subscription - signer types.Signer - mu sync.RWMutex + config TxPoolConfig + chainconfig *params.ChainConfig + chain blockChain + gasPrice *big.Int + txFeed event.Feed + scope event.SubscriptionScope + chainHeadCh chan ChainHeadEvent + chainHeadSub event.Subscription + signer types.Signer + mu sync.RWMutex currentState *state.StateDB // Current state in the blockchain head pendingState *state.ManagedState // Pending state tracking virtual nonces @@ -458,12 +457,6 @@ func (pool *TxPool) SubscribeTxPreEvent(ch chan<- TxPreEvent) event.Subscription return pool.scope.Track(pool.txFeed.Subscribe(ch)) } -// SubscribeSpecialTxPreEvent registers a subscription of TxPreEvent and -// starts sending event to the given channel. -func (pool *TxPool) SubscribeSpecialTxPreEvent(ch chan<- TxPreEvent) event.Subscription { - return pool.scope.Track(pool.specialTxFeed.Subscribe(ch)) -} - // GasPrice returns the current gas price enforced by the transaction pool. func (pool *TxPool) GasPrice() *big.Int { pool.mu.RLock() @@ -830,8 +823,7 @@ func (pool *TxPool) promoteSpecialTx(addr common.Address, tx *types.Transaction) broadcastTxs = append(broadcastTxs, tx) go func() { for _, btx := range broadcastTxs { - pool.specialTxFeed.Send(TxPreEvent{btx}) - log.Trace("Pooled new special transaction", "hash", tx.Hash(), "from", addr, "to", tx.To(), "nonce", tx.Nonce()) + pool.txFeed.Send(TxPreEvent{btx}) } }() return true, nil diff --git a/core/types/transaction_test.go b/core/types/transaction_test.go index 485f6e733a..dbe808b23d 100644 --- a/core/types/transaction_test.go +++ b/core/types/transaction_test.go @@ -144,7 +144,7 @@ func TestTransactionPriceNonceSort(t *testing.T) { } } // Sort the transactions and cross check the nonce ordering - txset, _ := NewTransactionsByPriceAndNonce(signer, groups,nil) + txset, _ := NewTransactionsByPriceAndNonce(signer, groups, nil) txs := Transactions{} for tx := txset.Peek(); tx != nil; tx = txset.Peek() { diff --git a/eth/fetcher/fetcher.go b/eth/fetcher/fetcher.go index 9b846ef76a..6055f8f23b 100644 --- a/eth/fetcher/fetcher.go +++ b/eth/fetcher/fetcher.go @@ -704,8 +704,8 @@ func (f *Fetcher) insert(peer string, block *types.Block) { // If import succeeded, broadcast the block propAnnounceOutTimer.UpdateSince(block.ReceivedAt) go f.broadcastBlock(block, true) - go f.broadcastBlock(block, false) - }() + //go f.broadcastBlock(block, false) + }() } // forgetHash removes all traces of a block announcement from the fetcher's diff --git a/eth/handler.go b/eth/handler.go index 2cae0a0e6c..8e23de5de0 100644 --- a/eth/handler.go +++ b/eth/handler.go @@ -82,8 +82,6 @@ type ProtocolManager struct { eventMux *event.TypeMux txCh chan core.TxPreEvent txSub event.Subscription - specialTxCh chan core.TxPreEvent - specialTxSub event.Subscription minedBlockSub *event.TypeMuxSubscription // channels for fetcher, syncer, txsyncLoop @@ -209,11 +207,6 @@ func (pm *ProtocolManager) Start(maxPeers int) { pm.txSub = pm.txpool.SubscribeTxPreEvent(pm.txCh) go pm.txBroadcastLoop() - // broadcast special transactions - pm.specialTxCh = make(chan core.TxPreEvent, txChanSize) - pm.specialTxSub = pm.txpool.SubscribeSpecialTxPreEvent(pm.specialTxCh) - go pm.specialTxBroadcastLoop() - // broadcast mined blocks pm.minedBlockSub = pm.eventMux.Subscribe(core.NewMinedBlockEvent{}) go pm.minedBroadcastLoop() @@ -227,7 +220,6 @@ func (pm *ProtocolManager) Stop() { log.Info("Stopping Ethereum protocol") pm.txSub.Unsubscribe() // quits txBroadcastLoop - pm.specialTxSub.Unsubscribe() // quits specialTxBroadcastLoop pm.minedBlockSub.Unsubscribe() // quits blockBroadcastLoop // Quit the sync loop. @@ -735,24 +727,14 @@ func (pm *ProtocolManager) BroadcastTx(hash common.Hash, tx *types.Transaction) log.Trace("Broadcast transaction", "hash", hash, "recipients", len(peers)) } -func (pm *ProtocolManager) BroadcastSpecialTx(hash common.Hash, tx *types.Transaction) { - // Broadcast transaction to a batch of peers not knowing about it - peers := pm.peers.PeersWithoutTx(hash) - //FIXME include this again: peers = peers[:int(math.Sqrt(float64(len(peers))))] - for _, peer := range peers { - peer.SendSpecialTransactions(tx) - } - log.Trace("Broadcast special transaction", "hash", hash, "recipients", len(peers)) -} - // Mined broadcast loop func (self *ProtocolManager) minedBroadcastLoop() { // automatically stops if unsubscribe for obj := range self.minedBlockSub.Chan() { switch ev := obj.Data.(type) { case core.NewMinedBlockEvent: - self.BroadcastBlock(ev.Block, true) // First propagate block to peers - self.BroadcastBlock(ev.Block, false) // Only then announce to the rest + self.BroadcastBlock(ev.Block, true) // First propagate block to peers + //self.BroadcastBlock(ev.Block, false) // Only then announce to the rest } } } @@ -770,19 +752,6 @@ func (self *ProtocolManager) txBroadcastLoop() { } } -func (self *ProtocolManager) specialTxBroadcastLoop() { - for { - select { - case event := <-self.specialTxCh: - self.BroadcastSpecialTx(event.Tx.Hash(), event.Tx) - - // Err() channel will be closed when unsubscribing. - case <-self.specialTxSub.Err(): - return - } - } -} - // NodeInfo represents a short summary of the Ethereum sub-protocol metadata // known about the host peer. type NodeInfo struct { diff --git a/eth/helper_test.go b/eth/helper_test.go index ca686df808..2b05cea801 100644 --- a/eth/helper_test.go +++ b/eth/helper_test.go @@ -128,10 +128,6 @@ func (p *testTxPool) SubscribeTxPreEvent(ch chan<- core.TxPreEvent) event.Subscr return p.txFeed.Subscribe(ch) } -func (p *testTxPool) SubscribeSpecialTxPreEvent(ch chan<- core.TxPreEvent) event.Subscription { - return p.txFeed.Subscribe(ch) -} - // newTestTransaction create a new dummy transaction. func newTestTransaction(from *ecdsa.PrivateKey, nonce uint64, datasize int) *types.Transaction { tx := types.NewTransaction(nonce, common.Address{}, big.NewInt(0), 100000, big.NewInt(0), make([]byte, datasize)) diff --git a/eth/peer.go b/eth/peer.go index 02d7a9a9af..782641881c 100644 --- a/eth/peer.go +++ b/eth/peer.go @@ -25,7 +25,6 @@ import ( "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/core/types" - "github.com/ethereum/go-ethereum/log" "github.com/ethereum/go-ethereum/p2p" "github.com/ethereum/go-ethereum/rlp" "gopkg.in/fatih/set.v0" @@ -141,15 +140,6 @@ func (p *peer) SendTransactions(txs types.Transactions) error { return p2p.Send(p.rw, TxMsg, txs) } -func (p *peer) SendSpecialTransactions(tx *types.Transaction) error { - p.knownTxs.Add(tx.Hash()) - if p.pairRw != nil { - return p2p.Send(p.pairRw, TxMsg, types.Transactions{tx}) - } else { - return p2p.Send(p.rw, TxMsg, types.Transactions{tx}) - } -} - // SendNewBlockHashes announces the availability of a number of blocks through // a hash notification. func (p *peer) SendNewBlockHashes(hashes []common.Hash, numbers []uint64) error { @@ -168,81 +158,123 @@ func (p *peer) SendNewBlockHashes(hashes []common.Hash, numbers []uint64) error func (p *peer) SendNewBlock(block *types.Block, td *big.Int) error { p.knownBlocks.Add(block.Hash()) if p.pairRw != nil { - log.Trace("p2p send new block to the pairRw connection", "p", p, "number", block.NumberU64()) return p2p.Send(p.pairRw, NewBlockMsg, []interface{}{block, td}) } else { return p2p.Send(p.rw, NewBlockMsg, []interface{}{block, td}) } - } // SendBlockHeaders sends a batch of block headers to the remote peer. func (p *peer) SendBlockHeaders(headers []*types.Header) error { - return p2p.Send(p.rw, BlockHeadersMsg, headers) + if p.pairRw != nil { + return p2p.Send(p.pairRw, BlockHeadersMsg, headers) + } else { + return p2p.Send(p.rw, BlockHeadersMsg, headers) + } } // SendBlockBodies sends a batch of block contents to the remote peer. func (p *peer) SendBlockBodies(bodies []*blockBody) error { - return p2p.Send(p.rw, BlockBodiesMsg, blockBodiesData(bodies)) + if p.pairRw != nil { + return p2p.Send(p.pairRw, BlockBodiesMsg, blockBodiesData(bodies)) + } else { + return p2p.Send(p.rw, BlockBodiesMsg, blockBodiesData(bodies)) + } } // SendBlockBodiesRLP sends a batch of block contents to the remote peer from // an already RLP encoded format. func (p *peer) SendBlockBodiesRLP(bodies []rlp.RawValue) error { - return p2p.Send(p.rw, BlockBodiesMsg, bodies) + if p.pairRw != nil { + return p2p.Send(p.pairRw, BlockBodiesMsg, bodies) + } else { + return p2p.Send(p.rw, BlockBodiesMsg, bodies) + } } // SendNodeDataRLP sends a batch of arbitrary internal data, corresponding to the // hashes requested. func (p *peer) SendNodeData(data [][]byte) error { - return p2p.Send(p.rw, NodeDataMsg, data) + if p.pairRw != nil { + return p2p.Send(p.pairRw, NodeDataMsg, data) + } else { + return p2p.Send(p.rw, NodeDataMsg, data) + } } // SendReceiptsRLP sends a batch of transaction receipts, corresponding to the // ones requested from an already RLP encoded format. func (p *peer) SendReceiptsRLP(receipts []rlp.RawValue) error { - return p2p.Send(p.rw, ReceiptsMsg, receipts) + if p.pairRw != nil { + return p2p.Send(p.pairRw, ReceiptsMsg, receipts) + } else { + return p2p.Send(p.rw, ReceiptsMsg, receipts) + } } // RequestOneHeader is a wrapper around the header query functions to fetch a // single header. It is used solely by the fetcher. func (p *peer) RequestOneHeader(hash common.Hash) error { p.Log().Debug("Fetching single header", "hash", hash) - return p2p.Send(p.rw, GetBlockHeadersMsg, &getBlockHeadersData{Origin: hashOrNumber{Hash: hash}, Amount: uint64(1), Skip: uint64(0), Reverse: false}) + if p.pairRw != nil { + return p2p.Send(p.pairRw, GetBlockHeadersMsg, &getBlockHeadersData{Origin: hashOrNumber{Hash: hash}, Amount: uint64(1), Skip: uint64(0), Reverse: false}) + } else { + return p2p.Send(p.rw, GetBlockHeadersMsg, &getBlockHeadersData{Origin: hashOrNumber{Hash: hash}, Amount: uint64(1), Skip: uint64(0), Reverse: false}) + } } // RequestHeadersByHash fetches a batch of blocks' headers corresponding to the // specified header query, based on the hash of an origin block. func (p *peer) RequestHeadersByHash(origin common.Hash, amount int, skip int, reverse bool) error { p.Log().Debug("Fetching batch of headers", "count", amount, "fromhash", origin, "skip", skip, "reverse", reverse) - return p2p.Send(p.rw, GetBlockHeadersMsg, &getBlockHeadersData{Origin: hashOrNumber{Hash: origin}, Amount: uint64(amount), Skip: uint64(skip), Reverse: reverse}) + if p.pairRw != nil { + return p2p.Send(p.pairRw, GetBlockHeadersMsg, &getBlockHeadersData{Origin: hashOrNumber{Hash: origin}, Amount: uint64(amount), Skip: uint64(skip), Reverse: reverse}) + } else { + return p2p.Send(p.rw, GetBlockHeadersMsg, &getBlockHeadersData{Origin: hashOrNumber{Hash: origin}, Amount: uint64(amount), Skip: uint64(skip), Reverse: reverse}) + } } // RequestHeadersByNumber fetches a batch of blocks' headers corresponding to the // specified header query, based on the number of an origin block. func (p *peer) RequestHeadersByNumber(origin uint64, amount int, skip int, reverse bool) error { p.Log().Debug("Fetching batch of headers", "count", amount, "fromnum", origin, "skip", skip, "reverse", reverse) - return p2p.Send(p.rw, GetBlockHeadersMsg, &getBlockHeadersData{Origin: hashOrNumber{Number: origin}, Amount: uint64(amount), Skip: uint64(skip), Reverse: reverse}) + if p.pairRw != nil { + return p2p.Send(p.pairRw, GetBlockHeadersMsg, &getBlockHeadersData{Origin: hashOrNumber{Number: origin}, Amount: uint64(amount), Skip: uint64(skip), Reverse: reverse}) + } else { + return p2p.Send(p.rw, GetBlockHeadersMsg, &getBlockHeadersData{Origin: hashOrNumber{Number: origin}, Amount: uint64(amount), Skip: uint64(skip), Reverse: reverse}) + } } // RequestBodies fetches a batch of blocks' bodies corresponding to the hashes // specified. func (p *peer) RequestBodies(hashes []common.Hash) error { p.Log().Debug("Fetching batch of block bodies", "count", len(hashes)) - return p2p.Send(p.rw, GetBlockBodiesMsg, hashes) + if p.pairRw != nil { + return p2p.Send(p.pairRw, GetBlockBodiesMsg, hashes) + } else { + return p2p.Send(p.rw, GetBlockBodiesMsg, hashes) + } } // RequestNodeData fetches a batch of arbitrary data from a node's known state // data, corresponding to the specified hashes. func (p *peer) RequestNodeData(hashes []common.Hash) error { p.Log().Debug("Fetching batch of state data", "count", len(hashes)) - return p2p.Send(p.rw, GetNodeDataMsg, hashes) + if p.pairRw != nil { + return p2p.Send(p.pairRw, GetNodeDataMsg, hashes) + } else { + return p2p.Send(p.rw, GetNodeDataMsg, hashes) + } } // RequestReceipts fetches a batch of transaction receipts from a remote node. func (p *peer) RequestReceipts(hashes []common.Hash) error { p.Log().Debug("Fetching batch of receipts", "count", len(hashes)) - return p2p.Send(p.rw, GetReceiptsMsg, hashes) + if p.pairRw != nil { + return p2p.Send(p.pairRw, GetReceiptsMsg, hashes) + } else { + return p2p.Send(p.rw, GetReceiptsMsg, hashes) + } } // Handshake executes the eth protocol handshake, negotiating version number, diff --git a/eth/protocol.go b/eth/protocol.go index f44a01f020..cd7db57f23 100644 --- a/eth/protocol.go +++ b/eth/protocol.go @@ -106,7 +106,6 @@ type txPool interface { // SubscribeTxPreEvent should return an event subscription of // TxPreEvent and send events to the given channel. SubscribeTxPreEvent(chan<- core.TxPreEvent) event.Subscription - SubscribeSpecialTxPreEvent(chan<- core.TxPreEvent) event.Subscription } // statusData is the network packet for the status message. diff --git a/eth/sync.go b/eth/sync.go index 79c594f726..882c94878f 100644 --- a/eth/sync.go +++ b/eth/sync.go @@ -206,13 +206,13 @@ func (pm *ProtocolManager) synchronise(peer *peer) { atomic.StoreUint32(&pm.fastSync, 0) } atomic.StoreUint32(&pm.acceptTxs, 1) // Mark initial sync done - if head := pm.blockchain.CurrentBlock(); head.NumberU64() > 0 { - // We've completed a sync cycle, notify all peers of new state. This path is - // essential in star-topology networks where a gateway node needs to notify - // all its out-of-date peers of the availability of a new block. This failure - // scenario will most often crop up in private and hackathon networks with - // degenerate connectivity, but it should be healthy for the mainnet too to - // more reliably update peers or the local TD state. - go pm.BroadcastBlock(head, false) - } + //if head := pm.blockchain.CurrentBlock(); head.NumberU64() > 0 { + // // We've completed a sync cycle, notify all peers of new state. This path is + // // essential in star-topology networks where a gateway node needs to notify + // // all its out-of-date peers of the availability of a new block. This failure + // // scenario will most often crop up in private and hackathon networks with + // // degenerate connectivity, but it should be healthy for the mainnet too to + // // more reliably update peers or the local TD state. + // go pm.BroadcastBlock(head, false) + //} }