fix error transaction underpriced when add sign tx to pool(full)

This commit is contained in:
parmarrushabh 2018-11-12 13:08:55 +05:30
parent 72bc872939
commit f586154a8e
6 changed files with 50 additions and 28 deletions

View file

@ -215,7 +215,8 @@ type TxPool struct {
wg sync.WaitGroup // for shutdown sync
homestead bool
homestead bool
IsMasterNode func(address common.Address) bool
}
// NewTxPool creates a new transaction pool to gather, sort and filter inbound
@ -593,13 +594,18 @@ func (pool *TxPool) validateTx(tx *types.Transaction, local bool) error {
}
// Drop non-local transactions under our own minimal accepted gas price
local = local || pool.locals.contains(from) // account may be local even if the transaction arrived from the network
if !local && tx.To() != nil && !tx.IsSpecialTransaction() && pool.gasPrice.Cmp(tx.GasPrice()) > 0 {
return ErrUnderpriced
if !local && tx.To() != nil && pool.gasPrice.Cmp(tx.GasPrice()) > 0 {
if !tx.IsSpecialTransaction() || (pool.IsMasterNode != nil && !pool.IsMasterNode(from)) {
return ErrUnderpriced
}
}
// Ensure the transaction adheres to nonce ordering
if pool.currentState.GetNonce(from) > tx.Nonce() {
return ErrNonceTooLow
}
if pool.pendingState.GetNonce(from)+common.LimitThresholdNonceInQueue < tx.Nonce() {
return ErrNonceTooHigh
}
// Transactor should have enough funds to cover the costs
// cost == V + GP * GL
if pool.currentState.GetBalance(from).Cmp(tx.Cost()) < 0 {
@ -647,6 +653,10 @@ func (pool *TxPool) add(tx *types.Transaction, local bool) (bool, error) {
invalidTxCounter.Inc(1)
return false, err
}
from, _ := types.Sender(pool.signer, tx) // already validated
if tx.IsSpecialTransaction() && pool.IsMasterNode != nil && pool.IsMasterNode(from) {
return pool.promoteSpecialTx(from, tx)
}
// If the transaction pool is full, discard underpriced transactions
if uint64(len(pool.all)) >= pool.config.GlobalSlots+pool.config.GlobalQueue {
// If the new transaction is underpriced, don't accept it
@ -663,10 +673,6 @@ func (pool *TxPool) add(tx *types.Transaction, local bool) (bool, error) {
pool.removeTx(tx.Hash())
}
}
from, _ := types.Sender(pool.signer, tx) // already validated
if tx.IsSpecialTransaction() {
return pool.promoteSpecialTx(from, tx)
}
// If the transaction is replacing an already pending one, do directly
if list := pool.pending[from]; list != nil && list.Overlaps(tx) {
// Nonce already pending, check if required price bump is met
@ -1246,4 +1252,4 @@ func (as *accountSet) containsTx(tx *types.Transaction) bool {
// add inserts a new address into the set to track.
func (as *accountSet) add(addr common.Address) {
as.accounts[addr] = struct{}{}
}
}

View file

@ -932,14 +932,14 @@ func TestTransactionPendingLimiting(t *testing.T) {
account, _ := deriveSender(transaction(0, 0, key))
pool.currentState.AddBalance(account, big.NewInt(1000000))
testTxPoolConfig.AccountQueue = 10
// Keep track of transaction events to ensure all executables get announced
events := make(chan TxPreEvent, testTxPoolConfig.AccountQueue)
events := make(chan TxPreEvent, testTxPoolConfig.AccountQueue+5)
sub := pool.txFeed.Subscribe(events)
defer sub.Unsubscribe()
// Keep queuing up transactions and make sure all above a limit are dropped
for i := uint64(0); i < testTxPoolConfig.AccountQueue; i++ {
for i := uint64(0); i < testTxPoolConfig.AccountQueue+5; i++ {
if err := pool.AddRemote(transaction(i, 100000, key)); err != nil {
t.Fatalf("tx %d: failed to add transaction: %v", i, err)
}
@ -950,10 +950,10 @@ func TestTransactionPendingLimiting(t *testing.T) {
t.Errorf("tx %d: queue size mismatch: have %d, want %d", i, pool.queue[account].Len(), 0)
}
}
if len(pool.all) != int(testTxPoolConfig.AccountQueue) {
if len(pool.all) != int(testTxPoolConfig.AccountQueue+5) {
t.Errorf("total transaction mismatch: have %d, want %d", len(pool.all), testTxPoolConfig.AccountQueue+5)
}
if err := validateEvents(events, int(testTxPoolConfig.AccountQueue)); err != nil {
if err := validateEvents(events, int(testTxPoolConfig.AccountQueue+5)); err != nil {
t.Fatalf("event firing failed: %v", err)
}
if err := validateTxPoolInternals(pool); err != nil {
@ -1106,9 +1106,8 @@ func TestTransactionPendingMinimumAllowance(t *testing.T) {
blockchain := &testBlockChain{statedb, 1000000, new(event.Feed)}
config := testTxPoolConfig
config.AccountSlots = 10
config.GlobalSlots = 0
config.AccountSlots = 5
pool := NewTxPool(config, params.TestChainConfig, blockchain)
defer pool.Stop()

View file

@ -404,16 +404,21 @@ type TransactionsByPriceAndNonce struct {
// if after providing it to the constructor.
// It also classifies special txs and normal txs
func NewTransactionsByPriceAndNonce(signer Signer, txs map[common.Address]Transactions) (*TransactionsByPriceAndNonce, Transactions) {
func NewTransactionsByPriceAndNonce(signer Signer, txs map[common.Address]Transactions, signers map[common.Address]struct{}) (*TransactionsByPriceAndNonce, Transactions) {
// Initialize a price based heap with the head transactions
heads := TxByPrice{}
specialTxs := Transactions{}
for _, accTxs := range txs {
from, _ := Sender(signer, accTxs[0])
var normalTxs Transactions
lastSpecialTx := -1
for i, tx := range accTxs {
if tx.IsSpecialTransaction() {
lastSpecialTx = i
if len(signers) > 0 {
if _, ok := signers[from]; ok {
for i, tx := range accTxs {
if tx.IsSpecialTransaction() {
lastSpecialTx = i
}
}
}
}
if lastSpecialTx >= 0 {
@ -425,10 +430,9 @@ func NewTransactionsByPriceAndNonce(signer Signer, txs map[common.Address]Transa
normalTxs = accTxs
}
if len(normalTxs) > 0 {
acc, _ := Sender(signer, normalTxs[0])
heads = append(heads, normalTxs[0])
// Ensure the sender address is from the signer
txs[acc] = normalTxs[1:]
txs[from] = normalTxs[1:]
}
}
heap.Init(&heads)
@ -501,4 +505,4 @@ func (m Message) Value() *big.Int { return m.amount }
func (m Message) Gas() uint64 { return m.gasLimit }
func (m Message) Nonce() uint64 { return m.nonce }
func (m Message) Data() []byte { return m.data }
func (m Message) CheckNonce() bool { return m.checkNonce
func (m Message) CheckNonce() bool { return m.checkNonce }

View file

@ -144,8 +144,8 @@ func TestTransactionPriceNonceSort(t *testing.T) {
}
}
// Sort the transactions and cross check the nonce ordering
txset, _ := NewTransactionsByPriceAndNonce(signer, groups)
txset, _ := NewTransactionsByPriceAndNonce(signer, groups,nil)
txs := Transactions{}
for tx := txset.Peek(); tx != nil; tx = txset.Peek() {
txs = append(txs, tx)
@ -232,4 +232,4 @@ func TestTransactionJSON(t *testing.T) {
t.Errorf("invalid chain id, want %d, got %d", tx.ChainId(), parsedTx.ChainId())
}
}
}
}

View file

@ -354,6 +354,18 @@ func New(ctx *node.ServiceContext, config *Config) (*Ethereum, error) {
}
}
return nil
}
eth.txPool.IsMasterNode = func(address common.Address) bool {
currentHeader := eth.blockchain.CurrentHeader()
snap, err := c.GetSnapshot(eth.blockchain, currentHeader)
if err != nil {
log.Error("Can't get snap shot with current header ", "number", currentHeader.Number, "hash", currentHeader.Hash().Hex())
return false
}
if _, ok := snap.Signers[address]; ok {
return true
}
return false
}
}

View file

@ -270,7 +270,7 @@ func (self *worker) update() {
self.currentMu.Lock()
acc, _ := types.Sender(self.current.signer, ev.Tx)
txs := map[common.Address]types.Transactions{acc: {ev.Tx}}
txset, specialTxs := types.NewTransactionsByPriceAndNonce(self.current.signer, txs)
txset, specialTxs := types.NewTransactionsByPriceAndNonce(self.current.signer, txs, nil)
self.current.commitTransactions(self.mux, txset, specialTxs, self.chain, self.coinbase)
self.currentMu.Unlock()
@ -463,7 +463,7 @@ func (self *worker) commitNewWork() {
tstart := time.Now()
parent := self.chain.CurrentBlock()
var signers map[common.Address]struct{}
// Only try to commit new work if we are mining
if atomic.LoadInt32(&self.mining) == 1 {
// check if we are right after parent's coinbase in the list
@ -477,6 +477,7 @@ func (self *worker) commitNewWork() {
log.Error("Failed when trying to commit new work", "err", err)
return
}
signers = snap.Signers
preIndex, curIndex, ok, err := XDPoS.YourTurn(masternodes, snap, parent.Header(), self.coinbase)
if err != nil {
log.Error("Failed when trying to commit new work", "err", err)
@ -569,7 +570,7 @@ func (self *worker) commitNewWork() {
log.Error("Failed to fetch pending transactions", "err", err)
return
}
txs, specialTxs := types.NewTransactionsByPriceAndNonce(self.current.signer, pending)
txs, specialTxs := types.NewTransactionsByPriceAndNonce(self.current.signer, pending, signers)
work.commitTransactions(self.mux, txs, specialTxs, self.chain, self.coinbase)
// compute uncles for the new block.