Broadcast special Tx through pairRW

This commit is contained in:
parmarrushabh 2018-11-09 12:45:18 +05:30
parent 9fb0674907
commit 9d6344fbb4
11 changed files with 219 additions and 59 deletions

View file

@ -690,7 +690,7 @@ func (bc *BlockChain) procFutureBlocks() {
// Insert one by one as chain insertion needs contiguous ancestry between blocks
for i := range blocks {
bc.InsertChain(blocks[i: i+1])
bc.InsertChain(blocks[i : i+1])
}
}
}
@ -1243,7 +1243,7 @@ func (st *insertStats) report(chain []*types.Block, index int, cache common.Stor
if index == len(chain)-1 || elapsed >= statsReportLimit {
var (
end = chain[index]
txs = countTransactions(chain[st.lastIndex: index+1])
txs = countTransactions(chain[st.lastIndex : index+1])
)
context := []interface{}{
"blocks", st.processed, "txs", txs, "mgas", float64(st.usedGas) / 1000000,

View file

@ -251,19 +251,18 @@ func (l *txList) Overlaps(tx *types.Transaction) bool {
func (l *txList) Add(tx *types.Transaction, priceBump uint64) (bool, *types.Transaction) {
// If there's an older better transaction, abort
old := l.txs.Get(tx.Nonce())
if (tx.To() != nil && tx.To().String() != common.RandomizeSMC) || tx.To() == nil {
if old != nil {
threshold := new(big.Int).Div(new(big.Int).Mul(old.GasPrice(), big.NewInt(100+int64(priceBump))), big.NewInt(100))
// Have to ensure that the new gas price is higher than the old gas
// price as well as checking the percentage threshold to ensure that
// this is accurate for low (Wei-level) gas price replacements
if old.GasPrice().Cmp(tx.GasPrice()) >= 0 || threshold.Cmp(tx.GasPrice()) > 0 {
return false, nil
}
if old != nil && old.IsSpecialTransaction() {
return false, nil
}
if old != nil {
threshold := new(big.Int).Div(new(big.Int).Mul(old.GasPrice(), big.NewInt(100+int64(priceBump))), big.NewInt(100))
// Have to ensure that the new gas price is higher than the old gas
// price as well as checking the percentage threshold to ensure that
// this is accurate for low (Wei-level) gas price replacements
if old.GasPrice().Cmp(tx.GasPrice()) >= 0 || threshold.Cmp(tx.GasPrice()) > 0 {
return false, nil
}
}
// Otherwise overwrite the old transaction with the current one
l.txs.Put(tx)
if cost := tx.Cost(); l.costcap.Cmp(cost) < 0 {

View file

@ -80,6 +80,8 @@ var (
ErrOversizedData = errors.New("oversized data")
ErrZeroGasPrice = errors.New("zero gas price")
ErrDuplicateSpecialTransaction = errors.New("duplicate a specail transaction")
)
var (
@ -186,16 +188,17 @@ func (config *TxPoolConfig) sanitize() TxPoolConfig {
// current state) and future transactions. Transactions move between those
// two states over time as they are received and processed.
type TxPool struct {
config TxPoolConfig
chainconfig *params.ChainConfig
chain blockChain
gasPrice *big.Int
txFeed event.Feed
scope event.SubscriptionScope
chainHeadCh chan ChainHeadEvent
chainHeadSub event.Subscription
signer types.Signer
mu sync.RWMutex
config TxPoolConfig
chainconfig *params.ChainConfig
chain blockChain
gasPrice *big.Int
txFeed event.Feed
specialTxFeed event.Feed
scope event.SubscriptionScope
chainHeadCh chan ChainHeadEvent
chainHeadSub event.Subscription
signer types.Signer
mu sync.RWMutex
currentState *state.StateDB // Current state in the blockchain head
pendingState *state.ManagedState // Pending state tracking virtual nonces
@ -454,6 +457,10 @@ func (pool *TxPool) SubscribeTxPreEvent(ch chan<- TxPreEvent) event.Subscription
return pool.scope.Track(pool.txFeed.Subscribe(ch))
}
func (pool *TxPool) SubscribeSpecialTxPreEvent(ch chan<- TxPreEvent) event.Subscription {
return pool.scope.Track(pool.specialTxFeed.Subscribe(ch))
}
// GasPrice returns the current gas price enforced by the transaction pool.
func (pool *TxPool) GasPrice() *big.Int {
pool.mu.RLock()
@ -553,14 +560,6 @@ func (pool *TxPool) local() map[common.Address]types.Transactions {
return txs
}
func (pool *TxPool) GetSender(tx *types.Transaction) (common.Address, error) {
from, err := types.Sender(pool.signer, tx)
if err != nil {
return common.Address{}, ErrInvalidSender
}
return from, nil
}
// validateTx checks whether a transaction is valid according to the consensus
// rules and adheres to some heuristic limits of the local node (price and size).
func (pool *TxPool) validateTx(tx *types.Transaction, local bool) error {
@ -584,7 +583,7 @@ func (pool *TxPool) validateTx(tx *types.Transaction, local bool) error {
}
// Drop non-local transactions under our own minimal accepted gas price
local = local || pool.locals.contains(from) // account may be local even if the transaction arrived from the network
if !local && pool.gasPrice.Cmp(tx.GasPrice()) > 0 {
if !local && tx.To() != nil && !tx.IsSpecialTransaction() && pool.gasPrice.Cmp(tx.GasPrice()) > 0 {
return ErrUnderpriced
}
// Ensure the transaction adheres to nonce ordering
@ -597,7 +596,7 @@ func (pool *TxPool) validateTx(tx *types.Transaction, local bool) error {
return ErrInsufficientFunds
}
if tx.To() != nil && tx.To().String() != common.BlockSigners && tx.To().String() != common.RandomizeSMC {
if tx.To() != nil && !tx.IsSpecialTransaction() {
intrGas, err := IntrinsicGas(tx.Data(), tx.To() == nil, pool.homestead)
if err != nil {
return err
@ -654,8 +653,11 @@ func (pool *TxPool) add(tx *types.Transaction, local bool) (bool, error) {
pool.removeTx(tx.Hash())
}
}
// If the transaction is replacing an already pending one, do directly
from, _ := types.Sender(pool.signer, tx) // already validated
if tx.IsSpecialTransaction() {
return pool.promoteSpecialTx(from, tx)
}
// If the transaction is replacing an already pending one, do directly
if list := pool.pending[from]; list != nil && list.Overlaps(tx) {
// Nonce already pending, check if required price bump is met
inserted, old := list.Add(tx, pool.config.PriceBump)
@ -771,6 +773,54 @@ func (pool *TxPool) promoteTx(addr common.Address, hash common.Hash, tx *types.T
go pool.txFeed.Send(TxPreEvent{tx})
}
func (pool *TxPool) promoteSpecialTx(addr common.Address, tx *types.Transaction) (bool, error) {
// Try to insert the transaction into the pending queue
if pool.pending[addr] == nil {
pool.pending[addr] = newTxList(true)
}
list := pool.pending[addr]
old := list.txs.Get(tx.Nonce())
if old != nil && old.IsSpecialTransaction() {
return false, ErrDuplicateSpecialTransaction
}
// Otherwise discard any previous transaction and mark this
if old != nil {
delete(pool.all, old.Hash())
pool.priced.Removed()
pendingReplaceCounter.Inc(1)
}
list.txs.Put(tx)
if cost := tx.Cost(); list.costcap.Cmp(cost) < 0 {
list.costcap = cost
}
if gas := tx.Gas(); list.gascap < gas {
list.gascap = gas
}
// Failsafe to work around direct pending inserts (tests)
if pool.all[tx.Hash()] == nil {
pool.all[tx.Hash()] = tx
}
// Set the potentially new pending nonce and notify any subsystems of the new tx
pool.beats[addr] = time.Now()
pool.pendingState.SetNonce(addr, tx.Nonce()+1)
broadcastTxs := types.Transactions{}
for i := tx.Nonce() - 1; i > 0; i-- {
before := list.txs.Get(i)
if before == nil || before.IsSpecialTransaction() {
break
}
broadcastTxs = append(broadcastTxs, before)
}
broadcastTxs = append(broadcastTxs, tx)
go func() {
for _, btx := range broadcastTxs {
pool.specialTxFeed.Send(TxPreEvent{btx})
log.Debug("Pooled new special transaction", "hash", tx.Hash(), "from", addr, "to", tx.To(), "nonce", tx.Nonce())
}
}()
return true, nil
}
// AddLocal enqueues a single transaction into the pool if it is valid, marking
// the sender as a local one in the mean time, ensuring it goes around the local
// pricing constraints.

View file

@ -267,6 +267,14 @@ func (tx *Transaction) RawSignatureValues() (*big.Int, *big.Int, *big.Int) {
return tx.data.V, tx.data.R, tx.data.S
}
func (tx *Transaction) IsSpecialTransaction() bool {
to := ""
if tx.To() != nil {
to = tx.To().String()
}
return to == common.RandomizeSMC || to == common.BlockSigners
}
func (tx *Transaction) String() string {
var from, to string
if tx.data.V != nil {
@ -395,14 +403,32 @@ type TransactionsByPriceAndNonce struct {
//
// Note, the input map is reowned so the caller should not interact any more with
// if after providing it to the constructor.
func NewTransactionsByPriceAndNonce(signer Signer, txs map[common.Address]Transactions) *TransactionsByPriceAndNonce {
func NewTransactionsByPriceAndNonce(signer Signer, txs map[common.Address]Transactions) (*TransactionsByPriceAndNonce, Transactions) {
// Initialize a price based heap with the head transactions
heads := make(TxByPrice, 0, len(txs))
heads := TxByPrice{}
specialTxs := Transactions{}
for _, accTxs := range txs {
heads = append(heads, accTxs[0])
// Ensure the sender address is from the signer
acc, _ := Sender(signer, accTxs[0])
txs[acc] = accTxs[1:]
var normalTxs Transactions
lastSpecialTx := -1
for i, tx := range accTxs {
if tx.IsSpecialTransaction() {
lastSpecialTx = i
}
}
if lastSpecialTx >= 0 {
for i := 0; i <= lastSpecialTx; i++ {
specialTxs = append(specialTxs, accTxs[i])
}
normalTxs = accTxs[lastSpecialTx+1:]
} else {
normalTxs = accTxs
}
if len(normalTxs) > 0 {
acc, _ := Sender(signer, normalTxs[0])
heads = append(heads, normalTxs[0])
// Ensure the sender address is from the signer
txs[acc] = normalTxs[1:]
}
}
heap.Init(&heads)
@ -411,7 +437,7 @@ func NewTransactionsByPriceAndNonce(signer Signer, txs map[common.Address]Transa
txs: txs,
heads: heads,
signer: signer,
}
}, specialTxs
}
// Peek returns the next transaction by price.
@ -474,4 +500,4 @@ func (m Message) Value() *big.Int { return m.amount }
func (m Message) Gas() uint64 { return m.gasLimit }
func (m Message) Nonce() uint64 { return m.nonce }
func (m Message) Data() []byte { return m.data }
func (m Message) CheckNonce() bool { return m.checkNonce }
func (m Message) CheckNonce() bool { return m.checkNonce }

View file

@ -144,8 +144,8 @@ func TestTransactionPriceNonceSort(t *testing.T) {
}
}
// Sort the transactions and cross check the nonce ordering
txset := NewTransactionsByPriceAndNonce(signer, groups)
txset, _ := NewTransactionsByPriceAndNonce(signer, groups)
txs := Transactions{}
for tx := txset.Peek(); tx != nil; tx = txset.Peek() {
txs = append(txs, tx)

View file

@ -83,6 +83,8 @@ type ProtocolManager struct {
eventMux *event.TypeMux
txCh chan core.TxPreEvent
txSub event.Subscription
specialTxCh chan core.TxPreEvent
specialTxSub event.Subscription
minedBlockSub *event.TypeMuxSubscription
// channels for fetcher, syncer, txsyncLoop
@ -208,6 +210,10 @@ func (pm *ProtocolManager) Start(maxPeers int) {
pm.txSub = pm.txpool.SubscribeTxPreEvent(pm.txCh)
go pm.txBroadcastLoop()
pm.specialTxCh = make(chan core.TxPreEvent, txChanSize)
pm.specialTxSub = pm.txpool.SubscribeSpecialTxPreEvent(pm.specialTxCh)
go pm.specialTxBroadcastLoop()
// broadcast mined blocks
pm.minedBlockSub = pm.eventMux.Subscribe(core.NewMinedBlockEvent{})
go pm.minedBroadcastLoop()
@ -221,6 +227,7 @@ func (pm *ProtocolManager) Stop() {
log.Info("Stopping Ethereum protocol")
pm.txSub.Unsubscribe() // quits txBroadcastLoop
pm.specialTxSub.Unsubscribe() // quits specialTxBroadcastLoop
pm.minedBlockSub.Unsubscribe() // quits blockBroadcastLoop
// Quit the sync loop.
@ -726,6 +733,16 @@ func (pm *ProtocolManager) BroadcastTx(hash common.Hash, tx *types.Transaction)
log.Trace("Broadcast transaction", "hash", hash, "recipients", len(peers))
}
func (pm *ProtocolManager) BroadcastSpecialTx(hash common.Hash, tx *types.Transaction) {
// Broadcast transaction to a batch of peers not knowing about it
peers := pm.peers.PeersWithoutTx(hash)
//FIXME include this again: peers = peers[:int(math.Sqrt(float64(len(peers))))]
for _, peer := range peers {
peer.SendSpecialTransactions(tx)
}
log.Debug("Broadcast special transaction", "hash", hash, "recipients", len(peers))
}
// Mined broadcast loop
func (self *ProtocolManager) minedBroadcastLoop() {
// automatically stops if unsubscribe
@ -751,6 +768,19 @@ func (self *ProtocolManager) txBroadcastLoop() {
}
}
func (self *ProtocolManager) specialTxBroadcastLoop() {
for {
select {
case event := <-self.specialTxCh:
self.BroadcastSpecialTx(event.Tx.Hash(), event.Tx)
// Err() channel will be closed when unsubscribing.
case <-self.specialTxSub.Err():
return
}
}
}
// NodeInfo represents a short summary of the Ethereum sub-protocol metadata
// known about the host peer.
type NodeInfo struct {

View file

@ -128,6 +128,10 @@ func (p *testTxPool) SubscribeTxPreEvent(ch chan<- core.TxPreEvent) event.Subscr
return p.txFeed.Subscribe(ch)
}
func (p *testTxPool) SubscribeSpecialTxPreEvent(ch chan<- core.TxPreEvent) event.Subscription {
return p.txFeed.Subscribe(ch)
}
// newTestTransaction create a new dummy transaction.
func newTestTransaction(from *ecdsa.PrivateKey, nonce uint64, datasize int) *types.Transaction {
tx := types.NewTransaction(nonce, common.Address{}, big.NewInt(0), 100000, big.NewInt(0), make([]byte, datasize))

View file

@ -141,6 +141,15 @@ func (p *peer) SendTransactions(txs types.Transactions) error {
return p2p.Send(p.rw, TxMsg, txs)
}
func (p *peer) SendSpecialTransactions(tx *types.Transaction) error {
p.knownTxs.Add(tx.Hash())
if p.pairRw != nil {
return p2p.Send(p.pairRw, TxMsg, types.Transactions{tx})
} else {
return p2p.Send(p.rw, TxMsg, types.Transactions{tx})
}
}
// SendNewBlockHashes announces the availability of a number of blocks through
// a hash notification.
func (p *peer) SendNewBlockHashes(hashes []common.Hash, numbers []uint64) error {
@ -159,7 +168,7 @@ func (p *peer) SendNewBlockHashes(hashes []common.Hash, numbers []uint64) error
func (p *peer) SendNewBlock(block *types.Block, td *big.Int) error {
p.knownBlocks.Add(block.Hash())
if p.pairRw != nil {
log.Trace("p2p SendNewBlock with pairRw", "p", p, "number", block.NumberU64())
log.Trace("p2p Send New Block with pairRw", "p", p, "number", block.NumberU64())
return p2p.Send(p.pairRw, NewBlockMsg, []interface{}{block, td})
} else {
return p2p.Send(p.rw, NewBlockMsg, []interface{}{block, td})

View file

@ -106,6 +106,7 @@ type txPool interface {
// SubscribeTxPreEvent should return an event subscription of
// TxPreEvent and send events to the given channel.
SubscribeTxPreEvent(chan<- core.TxPreEvent) event.Subscription
SubscribeSpecialTxPreEvent(chan<- core.TxPreEvent) event.Subscription
}
// statusData is the network packet for the status message.

View file

@ -1210,8 +1210,8 @@ func (args *SendTxArgs) toTransaction() *types.Transaction {
// submitTransaction is a helper function that submits tx to txPool and logs a message.
func submitTransaction(ctx context.Context, b Backend, tx *types.Transaction) (common.Hash, error) {
if tx.To() != nil && tx.To().String() == common.BlockSigners {
return common.Hash{}, errors.New("Dont allow transaction sent to BlockSigners smart contract via API")
if tx.To() != nil && tx.IsSpecialTransaction() {
return common.Hash{}, errors.New("Dont allow transaction sent to BlockSigners & RandomizeSMC smart contract via API")
}
if err := b.SendTx(ctx, tx); err != nil {
return common.Hash{}, err

View file

@ -20,14 +20,15 @@ import (
"bytes"
"fmt"
"math/big"
"os"
"sync"
"sync/atomic"
"time"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/consensus"
"github.com/ethereum/go-ethereum/consensus/XDPoS"
"github.com/ethereum/go-ethereum/consensus/misc"
"github.com/ethereum/go-ethereum/consensus/XDPoS"
"github.com/ethereum/go-ethereum/contracts"
"github.com/ethereum/go-ethereum/core"
"github.com/ethereum/go-ethereum/core/state"
@ -52,7 +53,7 @@ const (
// chainSideChanSize is the size of channel listening to ChainSideEvent.
chainSideChanSize = 10
// timeout waiting for M1
waitPeriod = 10
waitPeriod = 10
)
// Agent can register themself with the worker
@ -269,9 +270,9 @@ func (self *worker) update() {
self.currentMu.Lock()
acc, _ := types.Sender(self.current.signer, ev.Tx)
txs := map[common.Address]types.Transactions{acc: {ev.Tx}}
txset := types.NewTransactionsByPriceAndNonce(self.current.signer, txs)
txset, specialTxs := types.NewTransactionsByPriceAndNonce(self.current.signer, txs)
self.current.commitTransactions(self.mux, txset, self.chain, self.coinbase)
self.current.commitTransactions(self.mux, txset, specialTxs, self.chain, self.coinbase)
self.currentMu.Unlock()
} else {
// If we're mining, but nothing is being processed, wake on new transactions
@ -279,7 +280,6 @@ func (self *worker) update() {
self.commitNewWork()
}
}
// System stopped
case <-self.txSub.Err():
return
@ -551,8 +551,8 @@ func (self *worker) commitNewWork() {
log.Error("Failed to fetch pending transactions", "err", err)
return
}
txs := types.NewTransactionsByPriceAndNonce(self.current.signer, pending)
work.commitTransactions(self.mux, txs, self.chain, self.coinbase)
txs, specialTxs := types.NewTransactionsByPriceAndNonce(self.current.signer, pending)
work.commitTransactions(self.mux, txs, specialTxs, self.chain, self.coinbase)
// compute uncles for the new block.
var (
@ -583,7 +583,7 @@ func (self *worker) commitNewWork() {
}
// We only care about logging if we're actually mining.
if atomic.LoadInt32(&self.mining) == 1 {
log.Info("Commit new mining work", "number", work.Block.Number(), "txs", work.tcount, "uncles", len(uncles), "elapsed", common.PrettyDuration(time.Since(tstart)))
log.Info("Commit new mining work", "number", work.Block.Number(), "txs", work.tcount, "special txs", len(specialTxs), "uncles", len(uncles), "elapsed", common.PrettyDuration(time.Since(tstart)))
self.unconfirmed.Shift(work.Block.NumberU64() - 1)
}
if work.config.XDPoS != nil {
@ -596,11 +596,12 @@ func (self *worker) commitNewWork() {
err := self.chain.UpdateM1()
if err != nil {
if err == core.ErrNotXDPoS {
log.Crit("Error when update M1 ", "err", err)
log.Error("Stopping node", "err", err)
os.Exit(1)
} else {
log.Error("Error when update M1 ", "err", err)
log.Error("Error when update masternodes set. Keep the current masternodes set for the next epoch.", "err", err)
}
}
}
}
}
self.push(work)
@ -621,11 +622,51 @@ func (self *worker) commitUncle(work *Work, uncle *types.Header) error {
return nil
}
func (env *Work) commitTransactions(mux *event.TypeMux, txs *types.TransactionsByPriceAndNonce, bc *core.BlockChain, coinbase common.Address) {
func (env *Work) commitTransactions(mux *event.TypeMux, txs *types.TransactionsByPriceAndNonce, specialTxs types.Transactions, bc *core.BlockChain, coinbase common.Address) {
gp := new(core.GasPool).AddGas(env.header.GasLimit)
var coalescedLogs []*types.Log
for _, tx := range specialTxs {
if gp.Gas() < params.TxGas && tx.Gas() > 0 {
log.Trace("Not enough gas for further transactions", "gp", gp)
break
}
// Error may be ignored here. The error has already been checked
// during transaction acceptance is the transaction pool.
//
// We use the eip155 signer regardless of the current hf.
from, _ := types.Sender(env.signer, tx)
// Check whether the tx is replay protected. If we're not in the EIP155 hf
// phase, start ignoring the sender until we do.
if tx.Protected() && !env.config.IsEIP155(env.header.Number) {
log.Debug("Ignoring reply protected special transaction", "hash", tx.Hash(), "eip155", env.config.EIP155Block)
continue
}
// Start executing the transaction
env.state.Prepare(tx.Hash(), common.Hash{}, env.tcount)
err, logs := env.commitTransaction(tx, bc, coinbase, gp)
switch err {
case core.ErrNonceTooLow:
// New head notification data race between the transaction pool and miner, shift
log.Debug("Skipping special transaction with low nonce", "sender", from, "nonce", tx.Nonce(), "to", tx.To())
case core.ErrNonceTooHigh:
// Reorg notification data race between the transaction pool and miner, skip account =
log.Debug("Skipping account with special transaction hight nonce", "sender", from, "nonce", tx.Nonce(), "to", tx.To())
case nil:
// Everything ok, collect the logs and shift in the next transaction from the same account
coalescedLogs = append(coalescedLogs, logs...)
env.tcount++
default:
// Strange error, discard the transaction and get the next in line (note, the
// nonce-too-high clause will prevent us from executing in vain).
log.Debug("Add Special Transaction failed, account skipped", "hash", tx.Hash(), "sender", from, "nonce", tx.Nonce(), "to", tx.To(), "err", err)
}
}
for {
// If we don't have enough gas for any further transactions then we're done
if gp.Gas() < params.TxGas {