From 9d6344fbb44e0f2c1f6952d66e78593429fb169d Mon Sep 17 00:00:00 2001 From: parmarrushabh Date: Fri, 9 Nov 2018 12:45:18 +0530 Subject: [PATCH] Broadcast special Tx through pairRW --- core/blockchain.go | 4 +- core/tx_list.go | 21 ++++---- core/tx_pool.go | 92 ++++++++++++++++++++++++++-------- core/types/transaction.go | 42 +++++++++++++--- core/types/transaction_test.go | 4 +- eth/handler.go | 30 +++++++++++ eth/helper_test.go | 4 ++ eth/peer.go | 11 +++- eth/protocol.go | 1 + internal/ethapi/api.go | 4 +- miner/worker.go | 65 +++++++++++++++++++----- 11 files changed, 219 insertions(+), 59 deletions(-) diff --git a/core/blockchain.go b/core/blockchain.go index 1495f78001..5b7396dc4a 100644 --- a/core/blockchain.go +++ b/core/blockchain.go @@ -690,7 +690,7 @@ func (bc *BlockChain) procFutureBlocks() { // Insert one by one as chain insertion needs contiguous ancestry between blocks for i := range blocks { - bc.InsertChain(blocks[i: i+1]) + bc.InsertChain(blocks[i : i+1]) } } } @@ -1243,7 +1243,7 @@ func (st *insertStats) report(chain []*types.Block, index int, cache common.Stor if index == len(chain)-1 || elapsed >= statsReportLimit { var ( end = chain[index] - txs = countTransactions(chain[st.lastIndex: index+1]) + txs = countTransactions(chain[st.lastIndex : index+1]) ) context := []interface{}{ "blocks", st.processed, "txs", txs, "mgas", float64(st.usedGas) / 1000000, diff --git a/core/tx_list.go b/core/tx_list.go index bbcc113d91..ba8a1f4418 100644 --- a/core/tx_list.go +++ b/core/tx_list.go @@ -251,19 +251,18 @@ func (l *txList) Overlaps(tx *types.Transaction) bool { func (l *txList) Add(tx *types.Transaction, priceBump uint64) (bool, *types.Transaction) { // If there's an older better transaction, abort old := l.txs.Get(tx.Nonce()) - - if (tx.To() != nil && tx.To().String() != common.RandomizeSMC) || tx.To() == nil { - if old != nil { - threshold := new(big.Int).Div(new(big.Int).Mul(old.GasPrice(), big.NewInt(100+int64(priceBump))), big.NewInt(100)) - // Have to ensure that the new gas price is higher than the old gas - // price as well as checking the percentage threshold to ensure that - // this is accurate for low (Wei-level) gas price replacements - if old.GasPrice().Cmp(tx.GasPrice()) >= 0 || threshold.Cmp(tx.GasPrice()) > 0 { - return false, nil - } + if old != nil && old.IsSpecialTransaction() { + return false, nil + } + if old != nil { + threshold := new(big.Int).Div(new(big.Int).Mul(old.GasPrice(), big.NewInt(100+int64(priceBump))), big.NewInt(100)) + // Have to ensure that the new gas price is higher than the old gas + // price as well as checking the percentage threshold to ensure that + // this is accurate for low (Wei-level) gas price replacements + if old.GasPrice().Cmp(tx.GasPrice()) >= 0 || threshold.Cmp(tx.GasPrice()) > 0 { + return false, nil } } - // Otherwise overwrite the old transaction with the current one l.txs.Put(tx) if cost := tx.Cost(); l.costcap.Cmp(cost) < 0 { diff --git a/core/tx_pool.go b/core/tx_pool.go index d781437f59..d0b632f2e8 100644 --- a/core/tx_pool.go +++ b/core/tx_pool.go @@ -80,6 +80,8 @@ var ( ErrOversizedData = errors.New("oversized data") ErrZeroGasPrice = errors.New("zero gas price") + + ErrDuplicateSpecialTransaction = errors.New("duplicate a specail transaction") ) var ( @@ -186,16 +188,17 @@ func (config *TxPoolConfig) sanitize() TxPoolConfig { // current state) and future transactions. Transactions move between those // two states over time as they are received and processed. type TxPool struct { - config TxPoolConfig - chainconfig *params.ChainConfig - chain blockChain - gasPrice *big.Int - txFeed event.Feed - scope event.SubscriptionScope - chainHeadCh chan ChainHeadEvent - chainHeadSub event.Subscription - signer types.Signer - mu sync.RWMutex + config TxPoolConfig + chainconfig *params.ChainConfig + chain blockChain + gasPrice *big.Int + txFeed event.Feed + specialTxFeed event.Feed + scope event.SubscriptionScope + chainHeadCh chan ChainHeadEvent + chainHeadSub event.Subscription + signer types.Signer + mu sync.RWMutex currentState *state.StateDB // Current state in the blockchain head pendingState *state.ManagedState // Pending state tracking virtual nonces @@ -454,6 +457,10 @@ func (pool *TxPool) SubscribeTxPreEvent(ch chan<- TxPreEvent) event.Subscription return pool.scope.Track(pool.txFeed.Subscribe(ch)) } +func (pool *TxPool) SubscribeSpecialTxPreEvent(ch chan<- TxPreEvent) event.Subscription { + return pool.scope.Track(pool.specialTxFeed.Subscribe(ch)) +} + // GasPrice returns the current gas price enforced by the transaction pool. func (pool *TxPool) GasPrice() *big.Int { pool.mu.RLock() @@ -553,14 +560,6 @@ func (pool *TxPool) local() map[common.Address]types.Transactions { return txs } -func (pool *TxPool) GetSender(tx *types.Transaction) (common.Address, error) { - from, err := types.Sender(pool.signer, tx) - if err != nil { - return common.Address{}, ErrInvalidSender - } - return from, nil -} - // validateTx checks whether a transaction is valid according to the consensus // rules and adheres to some heuristic limits of the local node (price and size). func (pool *TxPool) validateTx(tx *types.Transaction, local bool) error { @@ -584,7 +583,7 @@ func (pool *TxPool) validateTx(tx *types.Transaction, local bool) error { } // Drop non-local transactions under our own minimal accepted gas price local = local || pool.locals.contains(from) // account may be local even if the transaction arrived from the network - if !local && pool.gasPrice.Cmp(tx.GasPrice()) > 0 { + if !local && tx.To() != nil && !tx.IsSpecialTransaction() && pool.gasPrice.Cmp(tx.GasPrice()) > 0 { return ErrUnderpriced } // Ensure the transaction adheres to nonce ordering @@ -597,7 +596,7 @@ func (pool *TxPool) validateTx(tx *types.Transaction, local bool) error { return ErrInsufficientFunds } - if tx.To() != nil && tx.To().String() != common.BlockSigners && tx.To().String() != common.RandomizeSMC { + if tx.To() != nil && !tx.IsSpecialTransaction() { intrGas, err := IntrinsicGas(tx.Data(), tx.To() == nil, pool.homestead) if err != nil { return err @@ -654,8 +653,11 @@ func (pool *TxPool) add(tx *types.Transaction, local bool) (bool, error) { pool.removeTx(tx.Hash()) } } - // If the transaction is replacing an already pending one, do directly from, _ := types.Sender(pool.signer, tx) // already validated + if tx.IsSpecialTransaction() { + return pool.promoteSpecialTx(from, tx) + } + // If the transaction is replacing an already pending one, do directly if list := pool.pending[from]; list != nil && list.Overlaps(tx) { // Nonce already pending, check if required price bump is met inserted, old := list.Add(tx, pool.config.PriceBump) @@ -771,6 +773,54 @@ func (pool *TxPool) promoteTx(addr common.Address, hash common.Hash, tx *types.T go pool.txFeed.Send(TxPreEvent{tx}) } +func (pool *TxPool) promoteSpecialTx(addr common.Address, tx *types.Transaction) (bool, error) { + // Try to insert the transaction into the pending queue + if pool.pending[addr] == nil { + pool.pending[addr] = newTxList(true) + } + list := pool.pending[addr] + old := list.txs.Get(tx.Nonce()) + if old != nil && old.IsSpecialTransaction() { + return false, ErrDuplicateSpecialTransaction + } + // Otherwise discard any previous transaction and mark this + if old != nil { + delete(pool.all, old.Hash()) + pool.priced.Removed() + pendingReplaceCounter.Inc(1) + } + list.txs.Put(tx) + if cost := tx.Cost(); list.costcap.Cmp(cost) < 0 { + list.costcap = cost + } + if gas := tx.Gas(); list.gascap < gas { + list.gascap = gas + } + // Failsafe to work around direct pending inserts (tests) + if pool.all[tx.Hash()] == nil { + pool.all[tx.Hash()] = tx + } + // Set the potentially new pending nonce and notify any subsystems of the new tx + pool.beats[addr] = time.Now() + pool.pendingState.SetNonce(addr, tx.Nonce()+1) + broadcastTxs := types.Transactions{} + for i := tx.Nonce() - 1; i > 0; i-- { + before := list.txs.Get(i) + if before == nil || before.IsSpecialTransaction() { + break + } + broadcastTxs = append(broadcastTxs, before) + } + broadcastTxs = append(broadcastTxs, tx) + go func() { + for _, btx := range broadcastTxs { + pool.specialTxFeed.Send(TxPreEvent{btx}) + log.Debug("Pooled new special transaction", "hash", tx.Hash(), "from", addr, "to", tx.To(), "nonce", tx.Nonce()) + } + }() + return true, nil +} + // AddLocal enqueues a single transaction into the pool if it is valid, marking // the sender as a local one in the mean time, ensuring it goes around the local // pricing constraints. diff --git a/core/types/transaction.go b/core/types/transaction.go index 11b06fc561..c3bea1ef14 100644 --- a/core/types/transaction.go +++ b/core/types/transaction.go @@ -267,6 +267,14 @@ func (tx *Transaction) RawSignatureValues() (*big.Int, *big.Int, *big.Int) { return tx.data.V, tx.data.R, tx.data.S } +func (tx *Transaction) IsSpecialTransaction() bool { + to := "" + if tx.To() != nil { + to = tx.To().String() + } + return to == common.RandomizeSMC || to == common.BlockSigners +} + func (tx *Transaction) String() string { var from, to string if tx.data.V != nil { @@ -395,14 +403,32 @@ type TransactionsByPriceAndNonce struct { // // Note, the input map is reowned so the caller should not interact any more with // if after providing it to the constructor. -func NewTransactionsByPriceAndNonce(signer Signer, txs map[common.Address]Transactions) *TransactionsByPriceAndNonce { +func NewTransactionsByPriceAndNonce(signer Signer, txs map[common.Address]Transactions) (*TransactionsByPriceAndNonce, Transactions) { // Initialize a price based heap with the head transactions - heads := make(TxByPrice, 0, len(txs)) + heads := TxByPrice{} + specialTxs := Transactions{} for _, accTxs := range txs { - heads = append(heads, accTxs[0]) - // Ensure the sender address is from the signer - acc, _ := Sender(signer, accTxs[0]) - txs[acc] = accTxs[1:] + var normalTxs Transactions + lastSpecialTx := -1 + for i, tx := range accTxs { + if tx.IsSpecialTransaction() { + lastSpecialTx = i + } + } + if lastSpecialTx >= 0 { + for i := 0; i <= lastSpecialTx; i++ { + specialTxs = append(specialTxs, accTxs[i]) + } + normalTxs = accTxs[lastSpecialTx+1:] + } else { + normalTxs = accTxs + } + if len(normalTxs) > 0 { + acc, _ := Sender(signer, normalTxs[0]) + heads = append(heads, normalTxs[0]) + // Ensure the sender address is from the signer + txs[acc] = normalTxs[1:] + } } heap.Init(&heads) @@ -411,7 +437,7 @@ func NewTransactionsByPriceAndNonce(signer Signer, txs map[common.Address]Transa txs: txs, heads: heads, signer: signer, - } + }, specialTxs } // Peek returns the next transaction by price. @@ -474,4 +500,4 @@ func (m Message) Value() *big.Int { return m.amount } func (m Message) Gas() uint64 { return m.gasLimit } func (m Message) Nonce() uint64 { return m.nonce } func (m Message) Data() []byte { return m.data } -func (m Message) CheckNonce() bool { return m.checkNonce } +func (m Message) CheckNonce() bool { return m.checkNonce } \ No newline at end of file diff --git a/core/types/transaction_test.go b/core/types/transaction_test.go index d1861b14cd..05325c8b69 100644 --- a/core/types/transaction_test.go +++ b/core/types/transaction_test.go @@ -144,8 +144,8 @@ func TestTransactionPriceNonceSort(t *testing.T) { } } // Sort the transactions and cross check the nonce ordering - txset := NewTransactionsByPriceAndNonce(signer, groups) - + txset, _ := NewTransactionsByPriceAndNonce(signer, groups) + txs := Transactions{} for tx := txset.Peek(); tx != nil; tx = txset.Peek() { txs = append(txs, tx) diff --git a/eth/handler.go b/eth/handler.go index 46fd8db368..5ce8934c23 100644 --- a/eth/handler.go +++ b/eth/handler.go @@ -83,6 +83,8 @@ type ProtocolManager struct { eventMux *event.TypeMux txCh chan core.TxPreEvent txSub event.Subscription + specialTxCh chan core.TxPreEvent + specialTxSub event.Subscription minedBlockSub *event.TypeMuxSubscription // channels for fetcher, syncer, txsyncLoop @@ -208,6 +210,10 @@ func (pm *ProtocolManager) Start(maxPeers int) { pm.txSub = pm.txpool.SubscribeTxPreEvent(pm.txCh) go pm.txBroadcastLoop() + pm.specialTxCh = make(chan core.TxPreEvent, txChanSize) + pm.specialTxSub = pm.txpool.SubscribeSpecialTxPreEvent(pm.specialTxCh) + go pm.specialTxBroadcastLoop() + // broadcast mined blocks pm.minedBlockSub = pm.eventMux.Subscribe(core.NewMinedBlockEvent{}) go pm.minedBroadcastLoop() @@ -221,6 +227,7 @@ func (pm *ProtocolManager) Stop() { log.Info("Stopping Ethereum protocol") pm.txSub.Unsubscribe() // quits txBroadcastLoop + pm.specialTxSub.Unsubscribe() // quits specialTxBroadcastLoop pm.minedBlockSub.Unsubscribe() // quits blockBroadcastLoop // Quit the sync loop. @@ -726,6 +733,16 @@ func (pm *ProtocolManager) BroadcastTx(hash common.Hash, tx *types.Transaction) log.Trace("Broadcast transaction", "hash", hash, "recipients", len(peers)) } +func (pm *ProtocolManager) BroadcastSpecialTx(hash common.Hash, tx *types.Transaction) { + // Broadcast transaction to a batch of peers not knowing about it + peers := pm.peers.PeersWithoutTx(hash) + //FIXME include this again: peers = peers[:int(math.Sqrt(float64(len(peers))))] + for _, peer := range peers { + peer.SendSpecialTransactions(tx) + } + log.Debug("Broadcast special transaction", "hash", hash, "recipients", len(peers)) +} + // Mined broadcast loop func (self *ProtocolManager) minedBroadcastLoop() { // automatically stops if unsubscribe @@ -751,6 +768,19 @@ func (self *ProtocolManager) txBroadcastLoop() { } } +func (self *ProtocolManager) specialTxBroadcastLoop() { + for { + select { + case event := <-self.specialTxCh: + self.BroadcastSpecialTx(event.Tx.Hash(), event.Tx) + + // Err() channel will be closed when unsubscribing. + case <-self.specialTxSub.Err(): + return + } + } +} + // NodeInfo represents a short summary of the Ethereum sub-protocol metadata // known about the host peer. type NodeInfo struct { diff --git a/eth/helper_test.go b/eth/helper_test.go index 2b05cea801..ca686df808 100644 --- a/eth/helper_test.go +++ b/eth/helper_test.go @@ -128,6 +128,10 @@ func (p *testTxPool) SubscribeTxPreEvent(ch chan<- core.TxPreEvent) event.Subscr return p.txFeed.Subscribe(ch) } +func (p *testTxPool) SubscribeSpecialTxPreEvent(ch chan<- core.TxPreEvent) event.Subscription { + return p.txFeed.Subscribe(ch) +} + // newTestTransaction create a new dummy transaction. func newTestTransaction(from *ecdsa.PrivateKey, nonce uint64, datasize int) *types.Transaction { tx := types.NewTransaction(nonce, common.Address{}, big.NewInt(0), 100000, big.NewInt(0), make([]byte, datasize)) diff --git a/eth/peer.go b/eth/peer.go index 8fda1b13cd..37acadd68a 100644 --- a/eth/peer.go +++ b/eth/peer.go @@ -141,6 +141,15 @@ func (p *peer) SendTransactions(txs types.Transactions) error { return p2p.Send(p.rw, TxMsg, txs) } +func (p *peer) SendSpecialTransactions(tx *types.Transaction) error { + p.knownTxs.Add(tx.Hash()) + if p.pairRw != nil { + return p2p.Send(p.pairRw, TxMsg, types.Transactions{tx}) + } else { + return p2p.Send(p.rw, TxMsg, types.Transactions{tx}) + } +} + // SendNewBlockHashes announces the availability of a number of blocks through // a hash notification. func (p *peer) SendNewBlockHashes(hashes []common.Hash, numbers []uint64) error { @@ -159,7 +168,7 @@ func (p *peer) SendNewBlockHashes(hashes []common.Hash, numbers []uint64) error func (p *peer) SendNewBlock(block *types.Block, td *big.Int) error { p.knownBlocks.Add(block.Hash()) if p.pairRw != nil { - log.Trace("p2p SendNewBlock with pairRw", "p", p, "number", block.NumberU64()) + log.Trace("p2p Send New Block with pairRw", "p", p, "number", block.NumberU64()) return p2p.Send(p.pairRw, NewBlockMsg, []interface{}{block, td}) } else { return p2p.Send(p.rw, NewBlockMsg, []interface{}{block, td}) diff --git a/eth/protocol.go b/eth/protocol.go index cd7db57f23..f44a01f020 100644 --- a/eth/protocol.go +++ b/eth/protocol.go @@ -106,6 +106,7 @@ type txPool interface { // SubscribeTxPreEvent should return an event subscription of // TxPreEvent and send events to the given channel. SubscribeTxPreEvent(chan<- core.TxPreEvent) event.Subscription + SubscribeSpecialTxPreEvent(chan<- core.TxPreEvent) event.Subscription } // statusData is the network packet for the status message. diff --git a/internal/ethapi/api.go b/internal/ethapi/api.go index 5100877a23..a6429d5b6c 100644 --- a/internal/ethapi/api.go +++ b/internal/ethapi/api.go @@ -1210,8 +1210,8 @@ func (args *SendTxArgs) toTransaction() *types.Transaction { // submitTransaction is a helper function that submits tx to txPool and logs a message. func submitTransaction(ctx context.Context, b Backend, tx *types.Transaction) (common.Hash, error) { - if tx.To() != nil && tx.To().String() == common.BlockSigners { - return common.Hash{}, errors.New("Dont allow transaction sent to BlockSigners smart contract via API") + if tx.To() != nil && tx.IsSpecialTransaction() { + return common.Hash{}, errors.New("Dont allow transaction sent to BlockSigners & RandomizeSMC smart contract via API") } if err := b.SendTx(ctx, tx); err != nil { return common.Hash{}, err diff --git a/miner/worker.go b/miner/worker.go index 51415a4a04..80015939e3 100644 --- a/miner/worker.go +++ b/miner/worker.go @@ -20,14 +20,15 @@ import ( "bytes" "fmt" "math/big" + "os" "sync" "sync/atomic" "time" "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/consensus" - "github.com/ethereum/go-ethereum/consensus/XDPoS" "github.com/ethereum/go-ethereum/consensus/misc" + "github.com/ethereum/go-ethereum/consensus/XDPoS" "github.com/ethereum/go-ethereum/contracts" "github.com/ethereum/go-ethereum/core" "github.com/ethereum/go-ethereum/core/state" @@ -52,7 +53,7 @@ const ( // chainSideChanSize is the size of channel listening to ChainSideEvent. chainSideChanSize = 10 // timeout waiting for M1 - waitPeriod = 10 + waitPeriod = 10 ) // Agent can register themself with the worker @@ -269,9 +270,9 @@ func (self *worker) update() { self.currentMu.Lock() acc, _ := types.Sender(self.current.signer, ev.Tx) txs := map[common.Address]types.Transactions{acc: {ev.Tx}} - txset := types.NewTransactionsByPriceAndNonce(self.current.signer, txs) + txset, specialTxs := types.NewTransactionsByPriceAndNonce(self.current.signer, txs) - self.current.commitTransactions(self.mux, txset, self.chain, self.coinbase) + self.current.commitTransactions(self.mux, txset, specialTxs, self.chain, self.coinbase) self.currentMu.Unlock() } else { // If we're mining, but nothing is being processed, wake on new transactions @@ -279,7 +280,6 @@ func (self *worker) update() { self.commitNewWork() } } - // System stopped case <-self.txSub.Err(): return @@ -551,8 +551,8 @@ func (self *worker) commitNewWork() { log.Error("Failed to fetch pending transactions", "err", err) return } - txs := types.NewTransactionsByPriceAndNonce(self.current.signer, pending) - work.commitTransactions(self.mux, txs, self.chain, self.coinbase) + txs, specialTxs := types.NewTransactionsByPriceAndNonce(self.current.signer, pending) + work.commitTransactions(self.mux, txs, specialTxs, self.chain, self.coinbase) // compute uncles for the new block. var ( @@ -583,7 +583,7 @@ func (self *worker) commitNewWork() { } // We only care about logging if we're actually mining. if atomic.LoadInt32(&self.mining) == 1 { - log.Info("Commit new mining work", "number", work.Block.Number(), "txs", work.tcount, "uncles", len(uncles), "elapsed", common.PrettyDuration(time.Since(tstart))) + log.Info("Commit new mining work", "number", work.Block.Number(), "txs", work.tcount, "special txs", len(specialTxs), "uncles", len(uncles), "elapsed", common.PrettyDuration(time.Since(tstart))) self.unconfirmed.Shift(work.Block.NumberU64() - 1) } if work.config.XDPoS != nil { @@ -596,11 +596,12 @@ func (self *worker) commitNewWork() { err := self.chain.UpdateM1() if err != nil { if err == core.ErrNotXDPoS { - log.Crit("Error when update M1 ", "err", err) + log.Error("Stopping node", "err", err) + os.Exit(1) } else { - log.Error("Error when update M1 ", "err", err) + log.Error("Error when update masternodes set. Keep the current masternodes set for the next epoch.", "err", err) } - } + } } } self.push(work) @@ -621,11 +622,51 @@ func (self *worker) commitUncle(work *Work, uncle *types.Header) error { return nil } -func (env *Work) commitTransactions(mux *event.TypeMux, txs *types.TransactionsByPriceAndNonce, bc *core.BlockChain, coinbase common.Address) { +func (env *Work) commitTransactions(mux *event.TypeMux, txs *types.TransactionsByPriceAndNonce, specialTxs types.Transactions, bc *core.BlockChain, coinbase common.Address) { gp := new(core.GasPool).AddGas(env.header.GasLimit) var coalescedLogs []*types.Log + for _, tx := range specialTxs { + if gp.Gas() < params.TxGas && tx.Gas() > 0 { + log.Trace("Not enough gas for further transactions", "gp", gp) + break + } + // Error may be ignored here. The error has already been checked + // during transaction acceptance is the transaction pool. + // + // We use the eip155 signer regardless of the current hf. + from, _ := types.Sender(env.signer, tx) + // Check whether the tx is replay protected. If we're not in the EIP155 hf + // phase, start ignoring the sender until we do. + if tx.Protected() && !env.config.IsEIP155(env.header.Number) { + log.Debug("Ignoring reply protected special transaction", "hash", tx.Hash(), "eip155", env.config.EIP155Block) + continue + } + // Start executing the transaction + env.state.Prepare(tx.Hash(), common.Hash{}, env.tcount) + err, logs := env.commitTransaction(tx, bc, coinbase, gp) + switch err { + case core.ErrNonceTooLow: + // New head notification data race between the transaction pool and miner, shift + log.Debug("Skipping special transaction with low nonce", "sender", from, "nonce", tx.Nonce(), "to", tx.To()) + + case core.ErrNonceTooHigh: + // Reorg notification data race between the transaction pool and miner, skip account = + log.Debug("Skipping account with special transaction hight nonce", "sender", from, "nonce", tx.Nonce(), "to", tx.To()) + + case nil: + // Everything ok, collect the logs and shift in the next transaction from the same account + coalescedLogs = append(coalescedLogs, logs...) + env.tcount++ + + default: + // Strange error, discard the transaction and get the next in line (note, the + // nonce-too-high clause will prevent us from executing in vain). + log.Debug("Add Special Transaction failed, account skipped", "hash", tx.Hash(), "sender", from, "nonce", tx.Nonce(), "to", tx.To(), "err", err) + } + } + for { // If we don't have enough gas for any further transactions then we're done if gp.Gas() < params.TxGas {