Merge pull request #528 from gzliudan/tx-pool

upgrade tx pool
This commit is contained in:
Daniel Liu 2024-05-10 21:05:26 +08:00 committed by GitHub
commit 24fa7e3996
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
25 changed files with 1316 additions and 989 deletions

View file

@ -9,14 +9,13 @@ import (
"github.com/XinFinOrg/XDPoSChain/XDCx/tradingstate"
"github.com/XinFinOrg/XDPoSChain/XDCxDAO"
"github.com/XinFinOrg/XDPoSChain/consensus"
"github.com/XinFinOrg/XDPoSChain/core/types"
"github.com/XinFinOrg/XDPoSChain/p2p"
"gopkg.in/karalabe/cookiejar.v2/collections/prque"
"github.com/XinFinOrg/XDPoSChain/common"
"github.com/XinFinOrg/XDPoSChain/common/prque"
"github.com/XinFinOrg/XDPoSChain/consensus"
"github.com/XinFinOrg/XDPoSChain/core/state"
"github.com/XinFinOrg/XDPoSChain/core/types"
"github.com/XinFinOrg/XDPoSChain/log"
"github.com/XinFinOrg/XDPoSChain/p2p"
"github.com/XinFinOrg/XDPoSChain/rpc"
lru "github.com/hashicorp/golang-lru"
"golang.org/x/sync/syncmap"
@ -105,7 +104,7 @@ func New(cfg *Config) *XDCX {
}
XDCX := &XDCX{
orderNonce: make(map[common.Address]*big.Int),
Triegc: prque.New(),
Triegc: prque.New(nil),
tokenDecimalCache: tokenDecimalCache,
orderCache: orderCache,
}

View file

@ -12,14 +12,13 @@ import (
"github.com/XinFinOrg/XDPoSChain/XDCx/tradingstate"
"github.com/XinFinOrg/XDPoSChain/XDCxDAO"
"github.com/XinFinOrg/XDPoSChain/XDCxlending/lendingstate"
"github.com/XinFinOrg/XDPoSChain/consensus"
"github.com/XinFinOrg/XDPoSChain/core/types"
"github.com/XinFinOrg/XDPoSChain/p2p"
"gopkg.in/karalabe/cookiejar.v2/collections/prque"
"github.com/XinFinOrg/XDPoSChain/common"
"github.com/XinFinOrg/XDPoSChain/common/prque"
"github.com/XinFinOrg/XDPoSChain/consensus"
"github.com/XinFinOrg/XDPoSChain/core/state"
"github.com/XinFinOrg/XDPoSChain/core/types"
"github.com/XinFinOrg/XDPoSChain/log"
"github.com/XinFinOrg/XDPoSChain/p2p"
"github.com/XinFinOrg/XDPoSChain/rpc"
lru "github.com/hashicorp/golang-lru"
)
@ -67,7 +66,7 @@ func New(XDCx *XDCx.XDCX) *Lending {
lendingTradeCache, _ := lru.New(defaultCacheLimit)
lending := &Lending{
orderNonce: make(map[common.Address]*big.Int),
Triegc: prque.New(),
Triegc: prque.New(nil),
lendingItemHistory: itemCache,
lendingTradeHistory: lendingTradeCache,
}

View file

@ -7,11 +7,11 @@ import (
"github.com/XinFinOrg/XDPoSChain/XDCx/tradingstate"
"github.com/XinFinOrg/XDPoSChain/XDCxlending/lendingstate"
"github.com/XinFinOrg/XDPoSChain/common"
"github.com/XinFinOrg/XDPoSChain/common/prque"
"github.com/XinFinOrg/XDPoSChain/consensus"
"github.com/XinFinOrg/XDPoSChain/consensus/clique"
"github.com/XinFinOrg/XDPoSChain/core/state"
"github.com/XinFinOrg/XDPoSChain/core/types"
"gopkg.in/karalabe/cookiejar.v2/collections/prque"
)
type Masternode struct {

View file

@ -86,7 +86,7 @@ func CreateTransactionSign(chainConfig *params.ChainConfig, pool *core.TxPool, m
}
// Create and send tx to smart contract for sign validate block.
nonce := pool.State().GetNonce(account.Address)
nonce := pool.Nonce(account.Address)
tx := CreateTxSign(block.Number(), block.Hash(), nonce, common.HexToAddress(common.BlockSigners))
txSigned, err := wallet.SignTx(account, tx, chainConfig.ChainId)
if err != nil {

View file

@ -85,7 +85,7 @@ func genValueTx(nbytes int) func(int, *BlockGen) {
return func(i int, gen *BlockGen) {
toaddr := common.Address{}
data := make([]byte, nbytes)
gas, _ := IntrinsicGas(data, false, false)
gas, _ := IntrinsicGas(data, false, false, false)
tx, _ := types.SignTx(types.NewTransaction(gen.TxNonce(benchRootAddr), toaddr, big.NewInt(1), gas, nil, data), types.HomesteadSigner{}, benchRootKey)
gen.AddTx(tx)
}

View file

@ -28,13 +28,12 @@ import (
"sync/atomic"
"time"
"github.com/XinFinOrg/XDPoSChain/XDCxlending/lendingstate"
"github.com/XinFinOrg/XDPoSChain/XDCx/tradingstate"
"github.com/XinFinOrg/XDPoSChain/XDCxlending/lendingstate"
"github.com/XinFinOrg/XDPoSChain/accounts/abi/bind"
"github.com/XinFinOrg/XDPoSChain/common"
"github.com/XinFinOrg/XDPoSChain/common/mclock"
"github.com/XinFinOrg/XDPoSChain/common/prque"
"github.com/XinFinOrg/XDPoSChain/common/sort"
"github.com/XinFinOrg/XDPoSChain/consensus"
"github.com/XinFinOrg/XDPoSChain/consensus/XDPoS"
@ -53,7 +52,6 @@ import (
"github.com/XinFinOrg/XDPoSChain/rlp"
"github.com/XinFinOrg/XDPoSChain/trie"
lru "github.com/hashicorp/golang-lru"
"gopkg.in/karalabe/cookiejar.v2/collections/prque"
)
var (
@ -201,7 +199,7 @@ func NewBlockChain(db ethdb.Database, cacheConfig *CacheConfig, chainConfig *par
chainConfig: chainConfig,
cacheConfig: cacheConfig,
db: db,
triegc: prque.New(),
triegc: prque.New(nil),
stateCache: state.NewDatabase(db),
quit: make(chan struct{}),
bodyCache: bodyCache,
@ -1268,18 +1266,18 @@ func (bc *BlockChain) WriteBlockWithState(block *types.Block, receipts []*types.
} else {
// Full but not archive node, do proper garbage collection
triedb.Reference(root, common.Hash{}) // metadata reference to keep trie alive
bc.triegc.Push(root, -float32(block.NumberU64()))
bc.triegc.Push(root, -int64(block.NumberU64()))
if tradingTrieDb != nil {
tradingTrieDb.Reference(tradingRoot, common.Hash{})
}
if tradingService != nil {
tradingService.GetTriegc().Push(tradingRoot, -float32(block.NumberU64()))
tradingService.GetTriegc().Push(tradingRoot, -int64(block.NumberU64()))
}
if lendingTrieDb != nil {
lendingTrieDb.Reference(lendingRoot, common.Hash{})
}
if lendingService != nil {
lendingService.GetTriegc().Push(lendingRoot, -float32(block.NumberU64()))
lendingService.GetTriegc().Push(lendingRoot, -int64(block.NumberU64()))
}
if current := block.NumberU64(); current > triesInMemory {
// Find the next state trie we need to commit
@ -1450,6 +1448,10 @@ func (bc *BlockChain) InsertChain(chain types.Blocks) (int, error) {
// only reason this method exists as a separate one is to make locking cleaner
// with deferred statements.
func (bc *BlockChain) insertChain(chain types.Blocks, verifySeals bool) (int, []interface{}, []*types.Log, error) {
// Sanity check that we have something meaningful to import
if len(chain) == 0 {
return 0, nil, nil, nil
}
engine, _ := bc.Engine().(*XDPoS.XDPoS)
// Do a sanity check that the provided chain is actually ordered and linked
@ -1491,6 +1493,9 @@ func (bc *BlockChain) insertChain(chain types.Blocks, verifySeals bool) (int, []
abort, results := bc.engine.VerifyHeaders(bc, headers, seals)
defer close(abort)
// Start a parallel signature recovery (signer will fluke on fork transition, minimal perf loss)
senderCacher.recoverFromBlocks(types.MakeSigner(bc.chainConfig, chain[0].Number()), chain)
// Iterate over the blocks and insert when the verifier permits
for i, block := range chain {
// If the chain is terminating, stop processing blocks

View file

@ -24,18 +24,16 @@ import (
"sync"
"time"
"github.com/XinFinOrg/XDPoSChain/consensus/XDPoS"
"github.com/XinFinOrg/XDPoSChain/XDCxlending/lendingstate"
"github.com/XinFinOrg/XDPoSChain/consensus"
"github.com/XinFinOrg/XDPoSChain/common"
"github.com/XinFinOrg/XDPoSChain/common/prque"
"github.com/XinFinOrg/XDPoSChain/consensus"
"github.com/XinFinOrg/XDPoSChain/consensus/XDPoS"
"github.com/XinFinOrg/XDPoSChain/core/state"
"github.com/XinFinOrg/XDPoSChain/core/types"
"github.com/XinFinOrg/XDPoSChain/event"
"github.com/XinFinOrg/XDPoSChain/log"
"github.com/XinFinOrg/XDPoSChain/params"
"gopkg.in/karalabe/cookiejar.v2/collections/prque"
)
var (
@ -671,7 +669,7 @@ func (pool *LendingPool) add(tx *types.LendingTransaction, local bool) (bool, er
// If the transaction fails basic validation, discard it
if err := pool.validateTx(tx, local); err != nil {
log.Debug("Discarding invalid lending transaction", "hash", hash, "userAddress", tx.UserAddress, "status", tx.Status, "err", err)
invalidTxCounter.Inc(1)
invalidTxMeter.Mark(1)
return false, err
}
from, _ := types.LendingSender(pool.signer, tx) // already validated
@ -685,12 +683,12 @@ func (pool *LendingPool) add(tx *types.LendingTransaction, local bool) (bool, er
if list := pool.pending[from]; list != nil && list.Overlaps(tx) {
inserted, old := list.Add(tx)
if !inserted {
pendingDiscardCounter.Inc(1)
pendingDiscardMeter.Mark(1)
return false, ErrPendingNonceTooLow
}
if old != nil {
delete(pool.all, old.Hash())
pendingReplaceCounter.Inc(1)
pendingReplaceMeter.Mark(1)
}
pool.all[tx.Hash()] = tx
pool.journalTx(from, tx)
@ -726,13 +724,13 @@ func (pool *LendingPool) enqueueTx(hash common.Hash, tx *types.LendingTransactio
inserted, old := pool.queue[from].Add(tx)
if !inserted {
// An older transaction was better, discard this
queuedDiscardCounter.Inc(1)
pendingDiscardMeter.Mark(1)
return false, ErrPendingNonceTooLow
}
// Discard any previous transaction and mark this
if old != nil {
delete(pool.all, old.Hash())
queuedReplaceCounter.Inc(1)
queuedReplaceMeter.Mark(1)
}
pool.all[hash] = tx
return old != nil, nil
@ -764,13 +762,13 @@ func (pool *LendingPool) promoteTx(addr common.Address, hash common.Hash, tx *ty
if !inserted {
// An older transaction was better, discard this
delete(pool.all, hash)
pendingDiscardCounter.Inc(1)
pendingDiscardMeter.Mark(1)
return
}
// Otherwise discard any previous transaction and mark this
if old != nil {
delete(pool.all, old.Hash())
pendingReplaceCounter.Inc(1)
pendingReplaceMeter.Mark(1)
}
// Failsafe to work around direct pending inserts (tests)
if pool.all[hash] == nil {
@ -981,7 +979,7 @@ func (pool *LendingPool) promoteExecutables(accounts []common.Address) {
hash := tx.Hash()
delete(pool.all, hash)
queuedRateLimitCounter.Inc(1)
queuedRateLimitMeter.Mark(1)
log.Trace("Removed cap-exceeding queued transaction", "hash", hash)
}
}
@ -998,11 +996,11 @@ func (pool *LendingPool) promoteExecutables(accounts []common.Address) {
if pending > pool.config.GlobalSlots {
pendingBeforeCap := pending
// Assemble a spam order to penalize large transactors first
spammers := prque.New()
spammers := prque.New(nil)
for addr, list := range pool.pending {
// Only evict transactions from high rollers
if !pool.locals.contains(addr) && uint64(list.Len()) > pool.config.AccountSlots {
spammers.Push(addr, float32(list.Len()))
spammers.Push(addr, int64(list.Len()))
}
}
// Gradually drop transactions from offenders
@ -1057,7 +1055,7 @@ func (pool *LendingPool) promoteExecutables(accounts []common.Address) {
}
}
}
pendingRateLimitCounter.Inc(int64(pendingBeforeCap - pending))
pendingRateLimitMeter.Mark(int64(pendingBeforeCap - pending))
}
// If we've queued more transactions than the hard limit, drop oldest ones
queued := uint64(0)
@ -1066,7 +1064,7 @@ func (pool *LendingPool) promoteExecutables(accounts []common.Address) {
}
if queued > pool.config.GlobalQueue {
// Sort all accounts with queued transactions by heartbeat
addresses := make(addresssByHeartbeat, 0, len(pool.queue))
addresses := make(addressesByHeartbeat, 0, len(pool.queue))
for addr := range pool.queue {
if !pool.locals.contains(addr) { // don't drop locals
addresses = append(addresses, addressByHeartbeat{addr, pool.beats[addr]})
@ -1087,7 +1085,7 @@ func (pool *LendingPool) promoteExecutables(accounts []common.Address) {
pool.removeTx(tx.Hash())
}
drop -= size
queuedRateLimitCounter.Inc(int64(size))
queuedRateLimitMeter.Mark(int64(size))
continue
}
// Otherwise drop only last few transactions
@ -1095,7 +1093,7 @@ func (pool *LendingPool) promoteExecutables(accounts []common.Address) {
for i := len(txs) - 1; i >= 0 && drop > 0; i-- {
pool.removeTx(txs[i].Hash())
drop--
queuedRateLimitCounter.Inc(1)
queuedRateLimitMeter.Mark(1)
}
}
}

View file

@ -25,16 +25,15 @@ import (
"time"
"github.com/XinFinOrg/XDPoSChain/XDCx/tradingstate"
"github.com/XinFinOrg/XDPoSChain/common"
"github.com/XinFinOrg/XDPoSChain/common/prque"
"github.com/XinFinOrg/XDPoSChain/consensus"
"github.com/XinFinOrg/XDPoSChain/consensus/XDPoS"
"github.com/XinFinOrg/XDPoSChain/common"
"github.com/XinFinOrg/XDPoSChain/core/state"
"github.com/XinFinOrg/XDPoSChain/core/types"
"github.com/XinFinOrg/XDPoSChain/event"
"github.com/XinFinOrg/XDPoSChain/log"
"github.com/XinFinOrg/XDPoSChain/params"
"gopkg.in/karalabe/cookiejar.v2/collections/prque"
)
var (
@ -579,7 +578,7 @@ func (pool *OrderPool) add(tx *types.OrderTransaction, local bool) (bool, error)
// If the transaction fails basic validation, discard it
if err := pool.validateTx(tx, local); err != nil {
log.Debug("Discarding invalid order transaction", "hash", hash, "userAddress", tx.UserAddress().Hex(), "status", tx.Status, "err", err)
invalidTxCounter.Inc(1)
invalidTxMeter.Mark(1)
return false, err
}
from, _ := types.OrderSender(pool.signer, tx) // already validated
@ -593,12 +592,12 @@ func (pool *OrderPool) add(tx *types.OrderTransaction, local bool) (bool, error)
if list := pool.pending[from]; list != nil && list.Overlaps(tx) {
inserted, old := list.Add(tx)
if !inserted {
pendingDiscardCounter.Inc(1)
pendingDiscardMeter.Mark(1)
return false, ErrPendingNonceTooLow
}
if old != nil {
delete(pool.all, old.Hash())
pendingReplaceCounter.Inc(1)
pendingReplaceMeter.Mark(1)
}
pool.all[tx.Hash()] = tx
pool.journalTx(from, tx)
@ -636,13 +635,13 @@ func (pool *OrderPool) enqueueTx(hash common.Hash, tx *types.OrderTransaction) (
inserted, old := pool.queue[from].Add(tx)
if !inserted {
// An older transaction was better, discard this
queuedDiscardCounter.Inc(1)
queuedDiscardMeter.Mark(1)
return false, ErrPendingNonceTooLow
}
// Discard any previous transaction and mark this
if old != nil {
delete(pool.all, old.Hash())
queuedReplaceCounter.Inc(1)
queuedReplaceMeter.Mark(1)
}
pool.all[hash] = tx
return old != nil, nil
@ -675,13 +674,13 @@ func (pool *OrderPool) promoteTx(addr common.Address, hash common.Hash, tx *type
if !inserted {
// An older transaction was better, discard this
delete(pool.all, hash)
pendingDiscardCounter.Inc(1)
pendingDiscardMeter.Mark(1)
return
}
// Otherwise discard any previous transaction and mark this
if old != nil {
delete(pool.all, old.Hash())
pendingReplaceCounter.Inc(1)
pendingReplaceMeter.Mark(1)
}
// Failsafe to work around direct pending inserts (tests)
if pool.all[hash] == nil {
@ -896,7 +895,7 @@ func (pool *OrderPool) promoteExecutables(accounts []common.Address) {
hash := tx.Hash()
delete(pool.all, hash)
queuedRateLimitCounter.Inc(1)
queuedRateLimitMeter.Mark(1)
log.Debug("Removed cap-exceeding queued transaction", "addr", tx.UserAddress().Hex(), "nonce", tx.Nonce(), "ohash", tx.OrderHash().Hex(), "status", tx.Status(), "orderid", tx.OrderID())
}
}
@ -914,11 +913,11 @@ func (pool *OrderPool) promoteExecutables(accounts []common.Address) {
if pending > pool.config.GlobalSlots {
pendingBeforeCap := pending
// Assemble a spam order to penalize large transactors first
spammers := prque.New()
spammers := prque.New(nil)
for addr, list := range pool.pending {
// Only evict transactions from high rollers
if !pool.locals.contains(addr) && uint64(list.Len()) > pool.config.AccountSlots {
spammers.Push(addr, float32(list.Len()))
spammers.Push(addr, int64(list.Len()))
}
}
// Gradually drop transactions from offenders
@ -973,7 +972,7 @@ func (pool *OrderPool) promoteExecutables(accounts []common.Address) {
}
}
}
pendingRateLimitCounter.Inc(int64(pendingBeforeCap - pending))
pendingRateLimitMeter.Mark(int64(pendingBeforeCap - pending))
}
// If we've queued more transactions than the hard limit, drop oldest ones
queued := uint64(0)
@ -982,7 +981,7 @@ func (pool *OrderPool) promoteExecutables(accounts []common.Address) {
}
if queued > pool.config.GlobalQueue {
// Sort all accounts with queued transactions by heartbeat
addresses := make(addresssByHeartbeat, 0, len(pool.queue))
addresses := make(addressesByHeartbeat, 0, len(pool.queue))
for addr := range pool.queue {
if !pool.locals.contains(addr) { // don't drop locals
addresses = append(addresses, addressByHeartbeat{addr, pool.beats[addr]})
@ -1003,7 +1002,7 @@ func (pool *OrderPool) promoteExecutables(accounts []common.Address) {
pool.removeTx(tx.Hash())
}
drop -= size
queuedRateLimitCounter.Inc(int64(size))
queuedRateLimitMeter.Mark(int64(size))
continue
}
// Otherwise drop only last few transactions
@ -1011,7 +1010,7 @@ func (pool *OrderPool) promoteExecutables(accounts []common.Address) {
for i := len(txs) - 1; i >= 0 && drop > 0; i-- {
pool.removeTx(txs[i].Hash())
drop--
queuedRateLimitCounter.Inc(1)
queuedRateLimitMeter.Mark(1)
}
}
}

View file

@ -1,143 +0,0 @@
// Copyright 2015 The go-ethereum Authors
// This file is part of the go-ethereum library.
//
// The go-ethereum library is free software: you can redistribute it and/or modify
// it under the terms of the GNU Lesser General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// The go-ethereum library is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Lesser General Public License for more details.
//
// You should have received a copy of the GNU Lesser General Public License
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
package state
import (
"sync"
"github.com/XinFinOrg/XDPoSChain/common"
)
type account struct {
stateObject *stateObject
nstart uint64
nonces []bool
}
type ManagedState struct {
*StateDB
mu sync.RWMutex
accounts map[common.Address]*account
}
// ManagedState returns a new managed state with the statedb as it's backing layer
func ManageState(statedb *StateDB) *ManagedState {
return &ManagedState{
StateDB: statedb.Copy(),
accounts: make(map[common.Address]*account),
}
}
// SetState sets the backing layer of the managed state
func (ms *ManagedState) SetState(statedb *StateDB) {
ms.mu.Lock()
defer ms.mu.Unlock()
ms.StateDB = statedb
}
// RemoveNonce removed the nonce from the managed state and all future pending nonces
func (ms *ManagedState) RemoveNonce(addr common.Address, n uint64) {
if ms.hasAccount(addr) {
ms.mu.Lock()
defer ms.mu.Unlock()
account := ms.getAccount(addr)
if n-account.nstart <= uint64(len(account.nonces)) {
reslice := make([]bool, n-account.nstart)
copy(reslice, account.nonces[:n-account.nstart])
account.nonces = reslice
}
}
}
// NewNonce returns the new canonical nonce for the managed account
func (ms *ManagedState) NewNonce(addr common.Address) uint64 {
ms.mu.Lock()
defer ms.mu.Unlock()
account := ms.getAccount(addr)
for i, nonce := range account.nonces {
if !nonce {
return account.nstart + uint64(i)
}
}
account.nonces = append(account.nonces, true)
return uint64(len(account.nonces)-1) + account.nstart
}
// GetNonce returns the canonical nonce for the managed or unmanaged account.
//
// Because GetNonce mutates the DB, we must take a write lock.
func (ms *ManagedState) GetNonce(addr common.Address) uint64 {
ms.mu.Lock()
defer ms.mu.Unlock()
if ms.hasAccount(addr) {
account := ms.getAccount(addr)
return uint64(len(account.nonces)) + account.nstart
} else {
return ms.StateDB.GetNonce(addr)
}
}
// SetNonce sets the new canonical nonce for the managed state
func (ms *ManagedState) SetNonce(addr common.Address, nonce uint64) {
ms.mu.Lock()
defer ms.mu.Unlock()
so := ms.GetOrNewStateObject(addr)
so.SetNonce(nonce)
ms.accounts[addr] = newAccount(so)
}
// HasAccount returns whether the given address is managed or not
func (ms *ManagedState) HasAccount(addr common.Address) bool {
ms.mu.RLock()
defer ms.mu.RUnlock()
return ms.hasAccount(addr)
}
func (ms *ManagedState) hasAccount(addr common.Address) bool {
_, ok := ms.accounts[addr]
return ok
}
// populate the managed state
func (ms *ManagedState) getAccount(addr common.Address) *account {
if account, ok := ms.accounts[addr]; !ok {
so := ms.GetOrNewStateObject(addr)
ms.accounts[addr] = newAccount(so)
} else {
// Always make sure the state account nonce isn't actually higher
// than the tracked one.
so := ms.StateDB.getStateObject(addr)
if so != nil && uint64(len(account.nonces))+account.nstart < so.Nonce() {
ms.accounts[addr] = newAccount(so)
}
}
return ms.accounts[addr]
}
func newAccount(so *stateObject) *account {
return &account{so, so.Nonce(), nil}
}

View file

@ -1,124 +0,0 @@
// Copyright 2015 The go-ethereum Authors
// This file is part of the go-ethereum library.
//
// The go-ethereum library is free software: you can redistribute it and/or modify
// it under the terms of the GNU Lesser General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// The go-ethereum library is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Lesser General Public License for more details.
//
// You should have received a copy of the GNU Lesser General Public License
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
package state
import (
"github.com/XinFinOrg/XDPoSChain/core/rawdb"
"testing"
"github.com/XinFinOrg/XDPoSChain/common"
)
var addr = common.BytesToAddress([]byte("test"))
func create() (*ManagedState, *account) {
db := rawdb.NewMemoryDatabase()
statedb, _ := New(common.Hash{}, NewDatabase(db))
ms := ManageState(statedb)
ms.StateDB.SetNonce(addr, 100)
ms.accounts[addr] = newAccount(ms.StateDB.getStateObject(addr))
return ms, ms.accounts[addr]
}
func TestNewNonce(t *testing.T) {
ms, _ := create()
nonce := ms.NewNonce(addr)
if nonce != 100 {
t.Error("expected nonce 100. got", nonce)
}
nonce = ms.NewNonce(addr)
if nonce != 101 {
t.Error("expected nonce 101. got", nonce)
}
}
func TestRemove(t *testing.T) {
ms, account := create()
nn := make([]bool, 10)
for i := range nn {
nn[i] = true
}
account.nonces = append(account.nonces, nn...)
i := uint64(5)
ms.RemoveNonce(addr, account.nstart+i)
if len(account.nonces) != 5 {
t.Error("expected", i, "'th index to be false")
}
}
func TestReuse(t *testing.T) {
ms, account := create()
nn := make([]bool, 10)
for i := range nn {
nn[i] = true
}
account.nonces = append(account.nonces, nn...)
i := uint64(5)
ms.RemoveNonce(addr, account.nstart+i)
nonce := ms.NewNonce(addr)
if nonce != 105 {
t.Error("expected nonce to be 105. got", nonce)
}
}
func TestRemoteNonceChange(t *testing.T) {
ms, account := create()
nn := make([]bool, 10)
for i := range nn {
nn[i] = true
}
account.nonces = append(account.nonces, nn...)
ms.NewNonce(addr)
ms.StateDB.stateObjects[addr].data.Nonce = 200
nonce := ms.NewNonce(addr)
if nonce != 200 {
t.Error("expected nonce after remote update to be", 200, "got", nonce)
}
ms.NewNonce(addr)
ms.NewNonce(addr)
ms.NewNonce(addr)
ms.StateDB.stateObjects[addr].data.Nonce = 200
nonce = ms.NewNonce(addr)
if nonce != 204 {
t.Error("expected nonce after remote update to be", 204, "got", nonce)
}
}
func TestSetNonce(t *testing.T) {
ms, _ := create()
var addr common.Address
ms.SetNonce(addr, 10)
if ms.GetNonce(addr) != 10 {
t.Error("Expected nonce of 10, got", ms.GetNonce(addr))
}
addr[0] = 1
ms.StateDB.SetNonce(addr, 1)
if ms.GetNonce(addr) != 1 {
t.Error("Expected nonce of 1, got", ms.GetNonce(addr))
}
}

View file

@ -42,8 +42,10 @@ The state transitioning model does all all the necessary work to work out a vali
3) Create a new state object if the recipient is \0*32
4) Value transfer
== If contract creation ==
4a) Attempt to run transaction data
4b) If valid, use result as code for the new state object
4a) Attempt to run transaction data
4b) If valid, use result as code for the new state object
== end ==
5) Run Script section
6) Derive new state root
@ -77,10 +79,10 @@ type Message interface {
}
// IntrinsicGas computes the 'intrinsic gas' for a message with the given data.
func IntrinsicGas(data []byte, contractCreation, homestead bool) (uint64, error) {
func IntrinsicGas(data []byte, contractCreation, isEIP155 bool, isEIP2028 bool) (uint64, error) {
// Set the starting gas for the raw transaction
var gas uint64
if contractCreation && homestead {
if contractCreation && isEIP155 {
gas = params.TxGasContractCreation
} else {
gas = params.TxGas
@ -95,10 +97,14 @@ func IntrinsicGas(data []byte, contractCreation, homestead bool) (uint64, error)
}
}
// Make sure we don't exceed uint64 for all data combinations
if (math.MaxUint64-gas)/params.TxDataNonZeroGas < nz {
nonZeroGas := params.TxDataNonZeroGasFrontier
if isEIP2028 {
nonZeroGas = params.TxDataNonZeroGasEIP2028
}
if (math.MaxUint64-gas)/nonZeroGas < nz {
return 0, vm.ErrOutOfGas
}
gas += nz * params.TxDataNonZeroGas
gas += nz * nonZeroGas
z := uint64(len(data)) - nz
if (math.MaxUint64-gas)/params.TxDataZeroGas < z {
@ -223,10 +229,11 @@ func (st *StateTransition) TransitionDb(owner common.Address) (ret []byte, usedG
sender := st.from() // err checked in preCheck
homestead := st.evm.ChainConfig().IsHomestead(st.evm.BlockNumber)
istanbul := st.evm.ChainConfig().IsIstanbul(st.evm.BlockNumber)
contractCreation := msg.To() == nil
// Pay intrinsic gas
gas, err := IntrinsicGas(st.data, contractCreation, homestead)
gas, err := IntrinsicGas(st.data, contractCreation, homestead, istanbul)
if err != nil {
return nil, 0, false, err, nil
}

105
core/tx_cacher.go Normal file
View file

@ -0,0 +1,105 @@
// Copyright 2018 The go-ethereum Authors
// This file is part of the go-ethereum library.
//
// The go-ethereum library is free software: you can redistribute it and/or modify
// it under the terms of the GNU Lesser General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// The go-ethereum library is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Lesser General Public License for more details.
//
// You should have received a copy of the GNU Lesser General Public License
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
package core
import (
"runtime"
"github.com/XinFinOrg/XDPoSChain/core/types"
)
// senderCacher is a concurrent tranaction sender recoverer anc cacher.
var senderCacher = newTxSenderCacher(runtime.NumCPU())
// txSenderCacherRequest is a request for recovering transaction senders with a
// specific signature scheme and caching it into the transactions themselves.
//
// The inc field defines the number of transactions to skip after each recovery,
// which is used to feed the same underlying input array to different threads but
// ensure they process the early transactions fast.
type txSenderCacherRequest struct {
signer types.Signer
txs []*types.Transaction
inc int
}
// txSenderCacher is a helper structure to concurrently ecrecover transaction
// senders from digital signatures on background threads.
type txSenderCacher struct {
threads int
tasks chan *txSenderCacherRequest
}
// newTxSenderCacher creates a new transaction sender background cacher and starts
// as many procesing goroutines as allowed by the GOMAXPROCS on construction.
func newTxSenderCacher(threads int) *txSenderCacher {
cacher := &txSenderCacher{
tasks: make(chan *txSenderCacherRequest, threads),
threads: threads,
}
for i := 0; i < threads; i++ {
go cacher.cache()
}
return cacher
}
// cache is an infinite loop, caching transaction senders from various forms of
// data structures.
func (cacher *txSenderCacher) cache() {
for task := range cacher.tasks {
for i := 0; i < len(task.txs); i += task.inc {
types.Sender(task.signer, task.txs[i])
}
}
}
// recover recovers the senders from a batch of transactions and caches them
// back into the same data structures. There is no validation being done, nor
// any reaction to invalid signatures. That is up to calling code later.
func (cacher *txSenderCacher) recover(signer types.Signer, txs []*types.Transaction) {
// If there's nothing to recover, abort
if len(txs) == 0 {
return
}
// Ensure we have meaningful task sizes and schedule the recoveries
tasks := cacher.threads
if len(txs) < tasks*4 {
tasks = (len(txs) + 3) / 4
}
for i := 0; i < tasks; i++ {
cacher.tasks <- &txSenderCacherRequest{
signer: signer,
txs: txs[i:],
inc: tasks,
}
}
}
// recoverFromBlocks recovers the senders from a batch of blocks and caches them
// back into the same data structures. There is no validation being done, nor
// any reaction to invalid signatures. That is up to calling code later.
func (cacher *txSenderCacher) recoverFromBlocks(signer types.Signer, blocks []*types.Block) {
count := 0
for _, block := range blocks {
count += len(block.Transactions())
}
txs := make([]*types.Transaction, 0, count)
for _, block := range blocks {
txs = append(txs, block.Transactions()...)
}
cacher.recover(signer, txs)
}

View file

@ -378,9 +378,20 @@ func (l *txList) Flatten() types.Transactions {
// price-sorted transactions to discard when the pool fills up.
type priceHeap []*types.Transaction
func (h priceHeap) Len() int { return len(h) }
func (h priceHeap) Less(i, j int) bool { return h[i].GasPrice().Cmp(h[j].GasPrice()) < 0 }
func (h priceHeap) Swap(i, j int) { h[i], h[j] = h[j], h[i] }
func (h priceHeap) Len() int { return len(h) }
func (h priceHeap) Swap(i, j int) { h[i], h[j] = h[j], h[i] }
func (h priceHeap) Less(i, j int) bool {
// Sort primarily by price, returning the cheaper one
switch h[i].GasPrice().Cmp(h[j].GasPrice()) {
case -1:
return true
case 1:
return false
}
// If the prices match, stabilize via nonces (high nonce is worse)
return h[i].Nonce() > h[j].Nonce()
}
func (h *priceHeap) Push(x interface{}) {
*h = append(*h, x.(*types.Transaction))
@ -397,13 +408,13 @@ func (h *priceHeap) Pop() interface{} {
// txPricedList is a price-sorted heap to allow operating on transactions pool
// contents in a price-incrementing way.
type txPricedList struct {
all *map[common.Hash]*types.Transaction // Pointer to the map of all transactions
items *priceHeap // Heap of prices of all the stored transactions
stales int // Number of stale price points to (re-heap trigger)
all *txLookup // Pointer to the map of all transactions
items *priceHeap // Heap of prices of all the stored transactions
stales int // Number of stale price points to (re-heap trigger)
}
// newTxPricedList creates a new price-sorted transaction heap.
func newTxPricedList(all *map[common.Hash]*types.Transaction) *txPricedList {
func newTxPricedList(all *txLookup) *txPricedList {
return &txPricedList{
all: all,
items: new(priceHeap),
@ -418,19 +429,20 @@ func (l *txPricedList) Put(tx *types.Transaction) {
// Removed notifies the prices transaction list that an old transaction dropped
// from the pool. The list will just keep a counter of stale objects and update
// the heap if a large enough ratio of transactions go stale.
func (l *txPricedList) Removed() {
func (l *txPricedList) Removed(count int) {
// Bump the stale counter, but exit if still too low (< 25%)
l.stales++
l.stales += count
if l.stales <= len(*l.items)/4 {
return
}
// Seems we've reached a critical number of stale transactions, reheap
reheap := make(priceHeap, 0, len(*l.all))
reheap := make(priceHeap, 0, l.all.Count())
l.stales, l.items = 0, &reheap
for _, tx := range *l.all {
l.all.Range(func(hash common.Hash, tx *types.Transaction) bool {
*l.items = append(*l.items, tx)
}
return true
})
heap.Init(l.items)
}
@ -443,7 +455,7 @@ func (l *txPricedList) Cap(threshold *big.Int, local *accountSet) types.Transact
for len(*l.items) > 0 {
// Discard stale transactions if found during cleanup
tx := heap.Pop(l.items).(*types.Transaction)
if _, ok := (*l.all)[tx.Hash()]; !ok {
if l.all.Get(tx.Hash()) == nil {
l.stales--
continue
}
@ -475,7 +487,7 @@ func (l *txPricedList) Underpriced(tx *types.Transaction, local *accountSet) boo
// Discard stale price points if found at the heap start
for len(*l.items) > 0 {
head := []*types.Transaction(*l.items)[0]
if _, ok := (*l.all)[head.Hash()]; !ok {
if l.all.Get(head.Hash()) == nil {
l.stales--
heap.Pop(l.items)
continue
@ -500,7 +512,7 @@ func (l *txPricedList) Discard(count int, local *accountSet) types.Transactions
for len(*l.items) > 0 && count > 0 {
// Discard stale transactions if found during cleanup
tx := heap.Pop(l.items).(*types.Transaction)
if _, ok := (*l.all)[tx.Hash()]; !ok {
if l.all.Get(tx.Hash()) == nil {
l.stales--
continue
}

79
core/tx_noncer.go Normal file
View file

@ -0,0 +1,79 @@
// Copyright 2019 The go-ethereum Authors
// This file is part of the go-ethereum library.
//
// The go-ethereum library is free software: you can redistribute it and/or modify
// it under the terms of the GNU Lesser General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// The go-ethereum library is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Lesser General Public License for more details.
//
// You should have received a copy of the GNU Lesser General Public License
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
package core
import (
"sync"
"github.com/XinFinOrg/XDPoSChain/common"
"github.com/XinFinOrg/XDPoSChain/core/state"
)
// txNoncer is a tiny virtual state database to manage the executable nonces of
// accounts in the pool, falling back to reading from a real state database if
// an account is unknown.
type txNoncer struct {
fallback *state.StateDB
nonces map[common.Address]uint64
lock sync.Mutex
}
// newTxNoncer creates a new virtual state database to track the pool nonces.
func newTxNoncer(statedb *state.StateDB) *txNoncer {
return &txNoncer{
fallback: statedb.Copy(),
nonces: make(map[common.Address]uint64),
}
}
// get returns the current nonce of an account, falling back to a real state
// database if the account is unknown.
func (txn *txNoncer) get(addr common.Address) uint64 {
// We use mutex for get operation is the underlying
// state will mutate db even for read access.
txn.lock.Lock()
defer txn.lock.Unlock()
if _, ok := txn.nonces[addr]; !ok {
txn.nonces[addr] = txn.fallback.GetNonce(addr)
}
return txn.nonces[addr]
}
// set inserts a new virtual nonce into the virtual state database to be returned
// whenever the pool requests it instead of reaching into the real state database.
func (txn *txNoncer) set(addr common.Address, nonce uint64) {
txn.lock.Lock()
defer txn.lock.Unlock()
txn.nonces[addr] = nonce
}
// setIfLower updates a new virtual nonce into the virtual state database if the
// the new one is lower.
func (txn *txNoncer) setIfLower(addr common.Address, nonce uint64) {
txn.lock.Lock()
defer txn.lock.Unlock()
if _, ok := txn.nonces[addr]; !ok {
txn.nonces[addr] = txn.fallback.GetNonce(addr)
}
if txn.nonces[addr] <= nonce {
return
}
txn.nonces[addr] = nonce
}

File diff suppressed because it is too large Load diff

View file

@ -112,7 +112,7 @@ func validateTxPoolInternals(pool *TxPool) error {
// Ensure the total transaction set is consistent with pending + queued
pending, queued := pool.stats()
if total := len(pool.all); total != pending+queued {
if total := pool.all.Count(); total != pending+queued {
return fmt.Errorf("total transaction count %d != %d pending + %d queued", total, pending, queued)
}
if priced := pool.priced.items.Len() - pool.priced.stales; priced != pending+queued {
@ -127,7 +127,7 @@ func validateTxPoolInternals(pool *TxPool) error {
last = nonce
}
}
if nonce := pool.pendingState.GetNonce(addr); nonce != last+1 {
if nonce := pool.Nonce(addr); nonce != last+1 {
return fmt.Errorf("pending nonce mismatch: have %v, want %v", nonce, last+1)
}
}
@ -215,33 +215,27 @@ func TestStateChangeDuringTransactionPoolReset(t *testing.T) {
pool := NewTxPool(testTxPoolConfig, params.TestChainConfig, blockchain)
defer pool.Stop()
nonce := pool.State().GetNonce(address)
nonce := pool.Nonce(address)
if nonce != 0 {
t.Fatalf("Invalid nonce, want 0, got %d", nonce)
}
pool.AddRemotes(types.Transactions{tx0, tx1})
pool.AddRemotesSync([]*types.Transaction{tx0, tx1})
nonce = pool.State().GetNonce(address)
nonce = pool.Nonce(address)
if nonce != 2 {
t.Fatalf("Invalid nonce, want 2, got %d", nonce)
}
// trigger state change in the background
trigger = true
<-pool.requestReset(nil, nil)
pool.lockedReset(nil, nil)
pendingTx, err := pool.Pending()
_, err := pool.Pending()
if err != nil {
t.Fatalf("Could not fetch pending transactions: %v", err)
}
for addr, txs := range pendingTx {
t.Logf("%0x: %d\n", addr, len(txs))
}
nonce = pool.State().GetNonce(address)
nonce = pool.Nonce(address)
if nonce != 2 {
t.Fatalf("Invalid nonce, want 2, got %d", nonce)
}
@ -293,10 +287,10 @@ func TestTransactionQueue(t *testing.T) {
tx := transaction(0, 100, key)
from, _ := deriveSender(tx)
pool.currentState.AddBalance(from, big.NewInt(1000))
pool.lockedReset(nil, nil)
pool.enqueueTx(tx.Hash(), tx)
<-pool.requestReset(nil, nil)
pool.promoteExecutables([]common.Address{from})
pool.enqueueTx(tx.Hash(), tx)
<-pool.requestPromoteExecutables(newAccountSet(pool.signer, from))
if len(pool.pending) != 1 {
t.Error("expected valid txs to be 1 is", len(pool.pending))
}
@ -305,7 +299,8 @@ func TestTransactionQueue(t *testing.T) {
from, _ = deriveSender(tx)
pool.currentState.SetNonce(from, 2)
pool.enqueueTx(tx.Hash(), tx)
pool.promoteExecutables([]common.Address{from})
<-pool.requestPromoteExecutables(newAccountSet(pool.signer, from))
if _, ok := pool.pending[from].txs.items[tx.Nonce()]; ok {
t.Error("expected transaction to be in tx pool")
}
@ -313,25 +308,28 @@ func TestTransactionQueue(t *testing.T) {
if len(pool.queue) > 0 {
t.Error("expected transaction queue to be empty. is", len(pool.queue))
}
}
pool, key = setupTxPool()
func TestTransactionQueue2(t *testing.T) {
t.Parallel()
pool, key := setupTxPool()
defer pool.Stop()
tx1 := transaction(0, 100, key)
tx2 := transaction(10, 100, key)
tx3 := transaction(11, 100, key)
from, _ = deriveSender(tx1)
from, _ := deriveSender(tx1)
pool.currentState.AddBalance(from, big.NewInt(1000))
pool.lockedReset(nil, nil)
pool.reset(nil, nil)
pool.enqueueTx(tx1.Hash(), tx1)
pool.enqueueTx(tx2.Hash(), tx2)
pool.enqueueTx(tx3.Hash(), tx3)
pool.promoteExecutables([]common.Address{from})
if len(pool.pending) != 1 {
t.Error("expected tx pool to be 1, got", len(pool.pending))
t.Error("expected pending length to be 1, got", len(pool.pending))
}
if pool.queue[from].Len() != 2 {
t.Error("expected len(queue) == 2, got", pool.queue[from].Len())
@ -365,7 +363,7 @@ func TestTransactionChainFork(t *testing.T) {
statedb.AddBalance(addr, big.NewInt(100000000000000))
pool.chain = &testBlockChain{statedb, 1000000, new(event.Feed)}
pool.lockedReset(nil, nil)
<-pool.requestReset(nil, nil)
}
resetState()
@ -373,7 +371,7 @@ func TestTransactionChainFork(t *testing.T) {
if _, err := pool.add(tx, false); err != nil {
t.Error("didn't expect error", err)
}
pool.removeTx(tx.Hash())
pool.removeTx(tx.Hash(), true)
// reset the pool's internal state
resetState()
@ -395,7 +393,7 @@ func TestTransactionDoubleNonce(t *testing.T) {
statedb.AddBalance(addr, big.NewInt(100000000000000))
pool.chain = &testBlockChain{statedb, 1000000, new(event.Feed)}
pool.lockedReset(nil, nil)
<-pool.requestReset(nil, nil)
}
resetState()
@ -411,16 +409,17 @@ func TestTransactionDoubleNonce(t *testing.T) {
if replace, err := pool.add(tx2, false); err != nil || !replace {
t.Errorf("second transaction insert failed (%v) or not reported replacement (%v)", err, replace)
}
pool.promoteExecutables([]common.Address{addr})
<-pool.requestPromoteExecutables(newAccountSet(signer, addr))
if pool.pending[addr].Len() != 1 {
t.Error("expected 1 pending transactions, got", pool.pending[addr].Len())
}
if tx := pool.pending[addr].txs.items[0]; tx.Hash() != tx2.Hash() {
t.Errorf("transaction mismatch: have %x, want %x", tx.Hash(), tx2.Hash())
}
// Add the third transaction and ensure it's not saved (smaller price)
pool.add(tx3, false)
pool.promoteExecutables([]common.Address{addr})
<-pool.requestPromoteExecutables(newAccountSet(signer, addr))
if pool.pending[addr].Len() != 1 {
t.Error("expected 1 pending transactions, got", pool.pending[addr].Len())
}
@ -428,8 +427,8 @@ func TestTransactionDoubleNonce(t *testing.T) {
t.Errorf("transaction mismatch: have %x, want %x", tx.Hash(), tx2.Hash())
}
// Ensure the total transaction count is correct
if len(pool.all) != 1 {
t.Error("expected 1 total transactions, got", len(pool.all))
if pool.all.Count() != 1 {
t.Error("expected 1 total transactions, got", pool.all.Count())
}
}
@ -451,8 +450,8 @@ func TestTransactionMissingNonce(t *testing.T) {
if pool.queue[addr].Len() != 1 {
t.Error("expected 1 queued transaction, got", pool.queue[addr].Len())
}
if len(pool.all) != 1 {
t.Error("expected 1 total transactions, got", len(pool.all))
if pool.all.Count() != 1 {
t.Error("expected 1 total transactions, got", pool.all.Count())
}
}
@ -466,7 +465,7 @@ func TestTransactionNonceRecovery(t *testing.T) {
addr := crypto.PubkeyToAddress(key.PublicKey)
pool.currentState.SetNonce(addr, n)
pool.currentState.AddBalance(addr, big.NewInt(100000000000000))
pool.lockedReset(nil, nil)
<-pool.requestReset(nil, nil)
tx := transaction(n, 100000, key)
if err := pool.AddRemote(tx); err != nil {
@ -474,8 +473,8 @@ func TestTransactionNonceRecovery(t *testing.T) {
}
// simulate some weird re-order of transactions and missing nonce(s)
pool.currentState.SetNonce(addr, n-1)
pool.lockedReset(nil, nil)
if fn := pool.pendingState.GetNonce(addr); fn != n-1 {
<-pool.requestReset(nil, nil)
if fn := pool.Nonce(addr); fn != n-1 {
t.Errorf("expected nonce to be %d, got %d", n-1, fn)
}
}
@ -515,22 +514,22 @@ func TestTransactionDropping(t *testing.T) {
if pool.queue[account].Len() != 3 {
t.Errorf("queued transaction mismatch: have %d, want %d", pool.queue[account].Len(), 3)
}
if len(pool.all) != 6 {
t.Errorf("total transaction mismatch: have %d, want %d", len(pool.all), 6)
if pool.all.Count() != 6 {
t.Errorf("total transaction mismatch: have %d, want %d", pool.all.Count(), 6)
}
pool.lockedReset(nil, nil)
<-pool.requestReset(nil, nil)
if pool.pending[account].Len() != 3 {
t.Errorf("pending transaction mismatch: have %d, want %d", pool.pending[account].Len(), 3)
}
if pool.queue[account].Len() != 3 {
t.Errorf("queued transaction mismatch: have %d, want %d", pool.queue[account].Len(), 3)
}
if len(pool.all) != 6 {
t.Errorf("total transaction mismatch: have %d, want %d", len(pool.all), 6)
if pool.all.Count() != 6 {
t.Errorf("total transaction mismatch: have %d, want %d", pool.all.Count(), 6)
}
// Reduce the balance of the account, and check that invalidated transactions are dropped
pool.currentState.AddBalance(account, big.NewInt(-650))
pool.lockedReset(nil, nil)
<-pool.requestReset(nil, nil)
if _, ok := pool.pending[account].txs.items[tx0.Nonce()]; !ok {
t.Errorf("funded pending transaction missing: %v", tx0)
@ -550,12 +549,12 @@ func TestTransactionDropping(t *testing.T) {
if _, ok := pool.queue[account].txs.items[tx12.Nonce()]; ok {
t.Errorf("out-of-fund queued transaction present: %v", tx11)
}
if len(pool.all) != 4 {
t.Errorf("total transaction mismatch: have %d, want %d", len(pool.all), 4)
if pool.all.Count() != 4 {
t.Errorf("total transaction mismatch: have %d, want %d", pool.all.Count(), 4)
}
// Reduce the block gas limit, check that invalidated transactions are dropped
pool.chain.(*testBlockChain).gasLimit = 100
pool.lockedReset(nil, nil)
<-pool.requestReset(nil, nil)
if _, ok := pool.pending[account].txs.items[tx0.Nonce()]; !ok {
t.Errorf("funded pending transaction missing: %v", tx0)
@ -569,8 +568,8 @@ func TestTransactionDropping(t *testing.T) {
if _, ok := pool.queue[account].txs.items[tx11.Nonce()]; ok {
t.Errorf("over-gased queued transaction present: %v", tx11)
}
if len(pool.all) != 2 {
t.Errorf("total transaction mismatch: have %d, want %d", len(pool.all), 2)
if pool.all.Count() != 2 {
t.Errorf("total transaction mismatch: have %d, want %d", pool.all.Count(), 2)
}
}
@ -612,7 +611,7 @@ func TestTransactionPostponing(t *testing.T) {
txs = append(txs, tx)
}
}
for i, err := range pool.AddRemotes(txs) {
for i, err := range pool.AddRemotesSync(txs) {
if err != nil {
t.Fatalf("tx %d: failed to add transactions: %v", i, err)
}
@ -624,24 +623,24 @@ func TestTransactionPostponing(t *testing.T) {
if len(pool.queue) != 0 {
t.Errorf("queued accounts mismatch: have %d, want %d", len(pool.queue), 0)
}
if len(pool.all) != len(txs) {
t.Errorf("total transaction mismatch: have %d, want %d", len(pool.all), len(txs))
if pool.all.Count() != len(txs) {
t.Errorf("total transaction mismatch: have %d, want %d", pool.all.Count(), len(txs))
}
pool.lockedReset(nil, nil)
<-pool.requestReset(nil, nil)
if pending := pool.pending[accs[0]].Len() + pool.pending[accs[1]].Len(); pending != len(txs) {
t.Errorf("pending transaction mismatch: have %d, want %d", pending, len(txs))
}
if len(pool.queue) != 0 {
t.Errorf("queued accounts mismatch: have %d, want %d", len(pool.queue), 0)
}
if len(pool.all) != len(txs) {
t.Errorf("total transaction mismatch: have %d, want %d", len(pool.all), len(txs))
if pool.all.Count() != len(txs) {
t.Errorf("total transaction mismatch: have %d, want %d", pool.all.Count(), len(txs))
}
// Reduce the balance of the account, and check that transactions are reorganised
for _, addr := range accs {
pool.currentState.AddBalance(addr, big.NewInt(-1))
}
pool.lockedReset(nil, nil)
<-pool.requestReset(nil, nil)
// The first account's first transaction remains valid, check that subsequent
// ones are either filtered out, or queued up for later.
@ -684,8 +683,8 @@ func TestTransactionPostponing(t *testing.T) {
}
}
}
if len(pool.all) != len(txs)/2 {
t.Errorf("total transaction mismatch: have %d, want %d", len(pool.all), len(txs)/2)
if pool.all.Count() != len(txs)/2 {
t.Errorf("total transaction mismatch: have %d, want %d", pool.all.Count(), len(txs)/2)
}
}
@ -708,12 +707,10 @@ func TestTransactionGapFilling(t *testing.T) {
defer sub.Unsubscribe()
// Create a pending and a queued transaction with a nonce-gap in between
if err := pool.AddRemote(transaction(0, 100000, key)); err != nil {
t.Fatalf("failed to add pending transaction: %v", err)
}
if err := pool.AddRemote(transaction(2, 100000, key)); err != nil {
t.Fatalf("failed to add queued transaction: %v", err)
}
pool.AddRemotesSync([]*types.Transaction{
transaction(0, 100000, key),
transaction(2, 100000, key),
})
pending, queued := pool.Stats()
if pending != 1 {
t.Fatalf("pending transactions mismatched: have %d, want %d", pending, 1)
@ -728,7 +725,7 @@ func TestTransactionGapFilling(t *testing.T) {
t.Fatalf("pool internal state corrupted: %v", err)
}
// Fill the nonce gap and ensure all transactions become pending
if err := pool.AddRemote(transaction(1, 100000, key)); err != nil {
if err := pool.AddRemoteSync(transaction(1, 100000, key)); err != nil {
t.Fatalf("failed to add gapped transaction: %v", err)
}
pending, queued = pool.Stats()
@ -760,7 +757,7 @@ func TestTransactionQueueAccountLimiting(t *testing.T) {
testTxPoolConfig.AccountQueue = 10
// Keep queuing up transactions and make sure all above a limit are dropped
for i := uint64(1); i <= testTxPoolConfig.AccountQueue; i++ {
if err := pool.AddRemote(transaction(i, 100000, key)); err != nil {
if err := pool.AddRemoteSync(transaction(i, 100000, key)); err != nil {
t.Fatalf("tx %d: failed to add transaction: %v", i, err)
}
if len(pool.pending) != 0 {
@ -776,8 +773,8 @@ func TestTransactionQueueAccountLimiting(t *testing.T) {
}
}
}
if len(pool.all) != int(testTxPoolConfig.AccountQueue) {
t.Errorf("total transaction mismatch: have %d, want %d", len(pool.all), testTxPoolConfig.AccountQueue)
if pool.all.Count() != int(testTxPoolConfig.AccountQueue) {
t.Errorf("total transaction mismatch: have %d, want %d", pool.all.Count(), testTxPoolConfig.AccountQueue)
}
}
@ -829,7 +826,7 @@ func testTransactionQueueGlobalLimiting(t *testing.T, nolocals bool) {
nonces[addr]++
}
// Import the batch and verify that limits have been enforced
pool.AddRemotes(txs)
pool.AddRemotesSync(txs)
queued := 0
for addr, list := range pool.queue {
@ -966,7 +963,7 @@ func TestTransactionPendingLimiting(t *testing.T) {
// Keep queuing up transactions and make sure all above a limit are dropped
for i := uint64(0); i < testTxPoolConfig.AccountQueue; i++ {
if err := pool.AddRemote(transaction(i, 100000, key)); err != nil {
if err := pool.AddRemoteSync(transaction(i, 100000, key)); err != nil {
t.Fatalf("tx %d: failed to add transaction: %v", i, err)
}
if pool.pending[account].Len() != int(i)+1 {
@ -976,8 +973,8 @@ func TestTransactionPendingLimiting(t *testing.T) {
t.Errorf("tx %d: queue size mismatch: have %d, want %d", i, pool.queue[account].Len(), 0)
}
}
if len(pool.all) != int(testTxPoolConfig.AccountQueue) {
t.Errorf("total transaction mismatch: have %d, want %d", len(pool.all), testTxPoolConfig.AccountQueue+5)
if pool.all.Count() != int(testTxPoolConfig.AccountQueue) {
t.Errorf("total transaction mismatch: have %d, want %d", pool.all.Count(), testTxPoolConfig.AccountQueue+5)
}
if err := validateEvents(events, int(testTxPoolConfig.AccountQueue)); err != nil {
t.Fatalf("event firing failed: %v", err)
@ -987,59 +984,6 @@ func TestTransactionPendingLimiting(t *testing.T) {
}
}
// Tests that the transaction limits are enforced the same way irrelevant whether
// the transactions are added one by one or in batches.
func TestTransactionQueueLimitingEquivalency(t *testing.T) { testTransactionLimitingEquivalency(t, 1) }
func TestTransactionPendingLimitingEquivalency(t *testing.T) {
testTransactionLimitingEquivalency(t, 0)
}
func testTransactionLimitingEquivalency(t *testing.T, origin uint64) {
t.Parallel()
// Add a batch of transactions to a pool one by one
pool1, key1 := setupTxPool()
defer pool1.Stop()
account1, _ := deriveSender(transaction(0, 0, key1))
pool1.currentState.AddBalance(account1, big.NewInt(1000000))
testTxPoolConfig.AccountQueue = 10
for i := uint64(0); i < testTxPoolConfig.AccountQueue; i++ {
if err := pool1.AddRemote(transaction(origin+i, 100000, key1)); err != nil {
t.Fatalf("tx %d: failed to add transaction: %v", i, err)
}
}
// Add a batch of transactions to a pool in one big batch
pool2, key2 := setupTxPool()
defer pool2.Stop()
account2, _ := deriveSender(transaction(0, 0, key2))
pool2.currentState.AddBalance(account2, big.NewInt(1000000))
txs := []*types.Transaction{}
for i := uint64(0); i < testTxPoolConfig.AccountQueue; i++ {
txs = append(txs, transaction(origin+i, 100000, key2))
}
pool2.AddRemotes(txs)
// Ensure the batch optimization honors the same pool mechanics
if len(pool1.pending) != len(pool2.pending) {
t.Errorf("pending transaction count mismatch: one-by-one algo: %d, batch algo: %d", len(pool1.pending), len(pool2.pending))
}
if len(pool1.queue) != len(pool2.queue) {
t.Errorf("queued transaction count mismatch: one-by-one algo: %d, batch algo: %d", len(pool1.queue), len(pool2.queue))
}
if len(pool1.all) != len(pool2.all) {
t.Errorf("total transaction count mismatch: one-by-one algo %d, batch algo %d", len(pool1.all), len(pool2.all))
}
if err := validateTxPoolInternals(pool1); err != nil {
t.Errorf("pool 1 internal state corrupted: %v", err)
}
if err := validateTxPoolInternals(pool2); err != nil {
t.Errorf("pool 2 internal state corrupted: %v", err)
}
}
// Tests that if the transaction count belonging to multiple accounts go above
// some hard threshold, the higher transactions are dropped to prevent DOS
// attacks.
@ -1075,7 +1019,7 @@ func TestTransactionPendingGlobalLimiting(t *testing.T) {
}
}
// Import the batch and verify that limits have been enforced
pool.AddRemotes(txs)
pool.AddRemotesSync(txs)
pending := 0
for _, list := range pool.pending {
@ -1134,9 +1078,8 @@ func TestTransactionPendingMinimumAllowance(t *testing.T) {
blockchain := &testBlockChain{statedb, 1000000, new(event.Feed)}
config := testTxPoolConfig
config.AccountSlots = 10
config.GlobalSlots = 0
config.AccountSlots = 5
config.GlobalSlots = 1
pool := NewTxPool(config, params.TestChainConfig, blockchain)
defer pool.Stop()
@ -1158,7 +1101,7 @@ func TestTransactionPendingMinimumAllowance(t *testing.T) {
}
}
// Import the batch and verify that limits have been enforced
pool.AddRemotes(txs)
pool.AddRemotesSync(txs)
for addr, list := range pool.pending {
if list.Len() != int(config.AccountSlots) {
@ -1215,7 +1158,7 @@ func TestTransactionPoolRepricing(t *testing.T) {
ltx := pricedTransaction(0, 100000, big.NewInt(1), keys[3])
// Import the batch and that both pending and queued transactions match up
pool.AddRemotes(txs)
pool.AddRemotesSync(txs)
pool.AddLocal(ltx)
pending, queued := pool.Stats()
@ -1381,7 +1324,7 @@ func TestTransactionPoolUnderpricing(t *testing.T) {
defer sub.Unsubscribe()
// Create a number of test accounts and fund them
keys := make([]*ecdsa.PrivateKey, 3)
keys := make([]*ecdsa.PrivateKey, 4)
for i := 0; i < len(keys); i++ {
keys[i], _ = crypto.GenerateKey()
pool.currentState.AddBalance(crypto.PubkeyToAddress(keys[i].PublicKey), big.NewInt(1000000))
@ -1418,13 +1361,13 @@ func TestTransactionPoolUnderpricing(t *testing.T) {
t.Fatalf("adding underpriced pending transaction error mismatch: have %v, want %v", err, ErrUnderpriced)
}
// Ensure that adding high priced transactions drops cheap ones, but not own
if err := pool.AddRemote(pricedTransaction(0, 100000, big.NewInt(3), keys[1])); err != nil {
if err := pool.AddRemote(pricedTransaction(0, 100000, big.NewInt(3), keys[1])); err != nil { // +K1:0 => -K1:1 => Pend K0:0, K0:1, K1:0, K2:0; Que -
t.Fatalf("failed to add well priced transaction: %v", err)
}
if err := pool.AddRemote(pricedTransaction(2, 100000, big.NewInt(4), keys[1])); err != nil {
if err := pool.AddRemote(pricedTransaction(2, 100000, big.NewInt(4), keys[1])); err != nil { // +K1:2 => -K0:0 => Pend K1:0, K2:0; Que K0:1 K1:2
t.Fatalf("failed to add well priced transaction: %v", err)
}
if err := pool.AddRemote(pricedTransaction(3, 100000, big.NewInt(5), keys[1])); err != nil {
if err := pool.AddRemote(pricedTransaction(3, 100000, big.NewInt(5), keys[1])); err != nil { // +K1:3 => -K0:1 => Pend K1:0, K2:0; Que K1:2 K1:3
t.Fatalf("failed to add well priced transaction: %v", err)
}
pending, queued = pool.Stats()
@ -1434,25 +1377,29 @@ func TestTransactionPoolUnderpricing(t *testing.T) {
if queued != 2 {
t.Fatalf("queued transactions mismatched: have %d, want %d", queued, 2)
}
if err := validateEvents(events, 2); err != nil {
t.Fatalf("additional event firing failed: %v", err)
}
if err := validateTxPoolInternals(pool); err != nil {
t.Fatalf("pool internal state corrupted: %v", err)
}
// Ensure that adding local transactions can push out even higher priced ones
tx := pricedTransaction(1, 100000, big.NewInt(1), keys[2])
if err := pool.AddLocal(tx); err != nil {
t.Fatalf("failed to add underpriced local transaction: %v", err)
}
pending, queued = pool.Stats()
if pending != 2 {
t.Fatalf("pending transactions mismatched: have %d, want %d", pending, 2)
}
if queued != 2 {
t.Fatalf("queued transactions mismatched: have %d, want %d", queued, 2)
}
if err := validateEvents(events, 1); err != nil {
t.Fatalf("additional event firing failed: %v", err)
}
if err := validateTxPoolInternals(pool); err != nil {
t.Fatalf("pool internal state corrupted: %v", err)
}
// Ensure that adding local transactions can push out even higher priced ones
ltx = pricedTransaction(1, 100000, big.NewInt(1), keys[2])
if err := pool.AddLocal(ltx); err != nil {
t.Fatalf("failed to append underpriced local transaction: %v", err)
}
ltx = pricedTransaction(0, 100000, big.NewInt(1), keys[3])
if err := pool.AddLocal(ltx); err != nil {
t.Fatalf("failed to add new underpriced local transaction: %v", err)
}
pending, queued = pool.Stats()
if pending != 3 {
t.Fatalf("pending transactions mismatched: have %d, want %d", pending, 3)
}
if queued != 1 {
t.Fatalf("queued transactions mismatched: have %d, want %d", queued, 1)
}
if err := validateEvents(events, 2); err != nil {
t.Fatalf("local event firing failed: %v", err)
}
if err := validateTxPoolInternals(pool); err != nil {
@ -1460,6 +1407,75 @@ func TestTransactionPoolUnderpricing(t *testing.T) {
}
}
// Tests that more expensive transactions push out cheap ones from the pool, but
// without producing instability by creating gaps that start jumping transactions
// back and forth between queued/pending.
func TestTransactionPoolStableUnderpricing(t *testing.T) {
t.Parallel()
// Create the pool to test the pricing enforcement with
db := rawdb.NewMemoryDatabase()
statedb, _ := state.New(common.Hash{}, state.NewDatabase(db))
blockchain := &testBlockChain{statedb, 1000000, new(event.Feed)}
config := testTxPoolConfig
config.GlobalSlots = common.LimitThresholdNonceInQueue
config.GlobalQueue = 0
config.AccountSlots = config.GlobalSlots - 1
pool := NewTxPool(config, params.TestChainConfig, blockchain)
defer pool.Stop()
// Keep track of transaction events to ensure all executables get announced
events := make(chan NewTxsEvent, 32)
sub := pool.txFeed.Subscribe(events)
defer sub.Unsubscribe()
// Create a number of test accounts and fund them
keys := make([]*ecdsa.PrivateKey, 2)
for i := 0; i < len(keys); i++ {
keys[i], _ = crypto.GenerateKey()
pool.currentState.AddBalance(crypto.PubkeyToAddress(keys[i].PublicKey), big.NewInt(1000000))
}
// Fill up the entire queue with the same transaction price points
txs := types.Transactions{}
for i := uint64(0); i < config.GlobalSlots; i++ {
txs = append(txs, pricedTransaction(i, 100000, big.NewInt(1), keys[0]))
}
pool.AddRemotesSync(txs)
pending, queued := pool.Stats()
if pending != int(config.GlobalSlots) {
t.Fatalf("pending transactions mismatched: have %d, want %d", pending, config.GlobalSlots)
}
if queued != 0 {
t.Fatalf("queued transactions mismatched: have %d, want %d", queued, 0)
}
if err := validateEvents(events, int(config.GlobalSlots)); err != nil {
t.Fatalf("original event firing failed: %v", err)
}
if err := validateTxPoolInternals(pool); err != nil {
t.Fatalf("pool internal state corrupted: %v", err)
}
// Ensure that adding high priced transactions drops a cheap, but doesn't produce a gap
if err := pool.AddRemoteSync(pricedTransaction(0, 100000, big.NewInt(3), keys[1])); err != nil {
t.Fatalf("failed to add well priced transaction: %v", err)
}
pending, queued = pool.Stats()
if pending != int(config.GlobalSlots) {
t.Fatalf("pending transactions mismatched: have %d, want %d", pending, config.GlobalSlots)
}
if queued != 0 {
t.Fatalf("queued transactions mismatched: have %d, want %d", queued, 0)
}
if err := validateEvents(events, 1); err != nil {
t.Fatalf("additional event firing failed: %v", err)
}
if err := validateTxPoolInternals(pool); err != nil {
t.Fatalf("pool internal state corrupted: %v", err)
}
}
// Tests that the pool rejects replacement transactions that don't meet the minimum
// price bump required.
func TestTransactionReplacement(t *testing.T) {
@ -1486,7 +1502,7 @@ func TestTransactionReplacement(t *testing.T) {
price := int64(100)
threshold := (price * (100 + int64(testTxPoolConfig.PriceBump))) / 100
if err := pool.AddRemote(pricedTransaction(0, 100000, big.NewInt(1), key)); err != nil {
if err := pool.AddRemoteSync(pricedTransaction(0, 100000, big.NewInt(1), key)); err != nil {
t.Fatalf("failed to add original cheap pending transaction: %v", err)
}
if err := pool.AddRemote(pricedTransaction(0, 100001, big.NewInt(1), key)); err != ErrReplaceUnderpriced {
@ -1499,7 +1515,7 @@ func TestTransactionReplacement(t *testing.T) {
t.Fatalf("cheap replacement event firing failed: %v", err)
}
if err := pool.AddRemote(pricedTransaction(0, 100000, big.NewInt(price), key)); err != nil {
if err := pool.AddRemoteSync(pricedTransaction(0, 100000, big.NewInt(price), key)); err != nil {
t.Fatalf("failed to add original proper pending transaction: %v", err)
}
if err := pool.AddRemote(pricedTransaction(0, 100001, big.NewInt(threshold-1), key)); err != ErrReplaceUnderpriced {
@ -1511,6 +1527,7 @@ func TestTransactionReplacement(t *testing.T) {
if err := validateEvents(events, 2); err != nil {
t.Fatalf("proper replacement event firing failed: %v", err)
}
// Add queued transactions, ensuring the minimum price bump is enforced for replacement (for ultra low prices too)
if err := pool.AddRemote(pricedTransaction(2, 100000, big.NewInt(1), key)); err != nil {
t.Fatalf("failed to add original cheap queued transaction: %v", err)
@ -1589,7 +1606,7 @@ func testTransactionJournaling(t *testing.T, nolocals bool) {
if err := pool.AddLocal(pricedTransaction(2, 100000, big.NewInt(1), local)); err != nil {
t.Fatalf("failed to add local transaction: %v", err)
}
if err := pool.AddRemote(pricedTransaction(0, 100000, big.NewInt(1), remote)); err != nil {
if err := pool.AddRemoteSync(pricedTransaction(0, 100000, big.NewInt(1), remote)); err != nil {
t.Fatalf("failed to add remote transaction: %v", err)
}
pending, queued := pool.Stats()
@ -1627,7 +1644,7 @@ func testTransactionJournaling(t *testing.T, nolocals bool) {
}
// Bump the nonce temporarily and ensure the newly invalidated transaction is removed
statedb.SetNonce(crypto.PubkeyToAddress(local.PublicKey), 2)
pool.lockedReset(nil, nil)
<-pool.requestReset(nil, nil)
time.Sleep(2 * config.Rejournal)
pool.Stop()
@ -1682,7 +1699,7 @@ func TestTransactionStatusCheck(t *testing.T) {
txs = append(txs, pricedTransaction(2, 100000, big.NewInt(1), keys[2])) // Queued only
// Import the transaction and ensure they are correctly added
pool.AddRemotes(txs)
pool.AddRemotesSync(txs)
pending, queued := pool.Stats()
if pending != 2 {
@ -1761,26 +1778,6 @@ func benchmarkFuturePromotion(b *testing.B, size int) {
}
}
// Benchmarks the speed of iterative transaction insertion.
func BenchmarkPoolInsert(b *testing.B) {
// Generate a batch of transactions to enqueue into the pool
pool, key := setupTxPool()
defer pool.Stop()
account, _ := deriveSender(transaction(0, 0, key))
pool.currentState.AddBalance(account, big.NewInt(1000000))
txs := make(types.Transactions, b.N)
for i := 0; i < b.N; i++ {
txs[i] = transaction(uint64(i), 100000, key)
}
// Benchmark importing the transactions into the queue
b.ResetTimer()
for _, tx := range txs {
pool.AddRemote(tx)
}
}
// Benchmarks the speed of batched transaction insertion.
func BenchmarkPoolBatchInsert100(b *testing.B) { benchmarkPoolBatchInsert(b, 100) }
func BenchmarkPoolBatchInsert1000(b *testing.B) { benchmarkPoolBatchInsert(b, 1000) }

View file

@ -304,7 +304,7 @@ func (b *EthApiBackend) GetPoolTransaction(hash common.Hash) *types.Transaction
}
func (b *EthApiBackend) GetPoolNonce(ctx context.Context, addr common.Address) (uint64, error) {
return b.eth.txPool.State().GetNonce(addr), nil
return b.eth.txPool.Nonce(addr), nil
}
func (b *EthApiBackend) Stats() (pending int, queued int) {

View file

@ -26,10 +26,10 @@ import (
"time"
"github.com/XinFinOrg/XDPoSChain/common"
"github.com/XinFinOrg/XDPoSChain/common/prque"
"github.com/XinFinOrg/XDPoSChain/core/types"
"github.com/XinFinOrg/XDPoSChain/log"
"github.com/XinFinOrg/XDPoSChain/metrics"
"gopkg.in/karalabe/cookiejar.v2/collections/prque"
)
var (
@ -105,11 +105,11 @@ func newQueue() *queue {
headerPendPool: make(map[string]*fetchRequest),
headerContCh: make(chan bool),
blockTaskPool: make(map[common.Hash]*types.Header),
blockTaskQueue: prque.New(),
blockTaskQueue: prque.New(nil),
blockPendPool: make(map[string]*fetchRequest),
blockDonePool: make(map[common.Hash]struct{}),
receiptTaskPool: make(map[common.Hash]*types.Header),
receiptTaskQueue: prque.New(),
receiptTaskQueue: prque.New(nil),
receiptPendPool: make(map[string]*fetchRequest),
receiptDonePool: make(map[common.Hash]struct{}),
resultCache: make([]*fetchResult, blockCacheItems),
@ -278,7 +278,7 @@ func (q *queue) ScheduleSkeleton(from uint64, skeleton []*types.Header) {
}
// Shedule all the header retrieval tasks for the skeleton assembly
q.headerTaskPool = make(map[uint64]*types.Header)
q.headerTaskQueue = prque.New()
q.headerTaskQueue = prque.New(nil)
q.headerPeerMiss = make(map[string]map[uint64]struct{}) // Reset availability to correct invalid chains
q.headerResults = make([]*types.Header, len(skeleton)*MaxHeaderFetch)
q.headerProced = 0
@ -289,7 +289,7 @@ func (q *queue) ScheduleSkeleton(from uint64, skeleton []*types.Header) {
index := from + uint64(i*MaxHeaderFetch)
q.headerTaskPool[index] = header
q.headerTaskQueue.Push(index, -float32(index))
q.headerTaskQueue.Push(index, -int64(index))
}
}
@ -335,11 +335,11 @@ func (q *queue) Schedule(headers []*types.Header, from uint64) []*types.Header {
}
// Queue the header for content retrieval
q.blockTaskPool[hash] = header
q.blockTaskQueue.Push(header, -float32(header.Number.Uint64()))
q.blockTaskQueue.Push(header, -int64(header.Number.Uint64()))
if q.mode == FastSync {
q.receiptTaskPool[hash] = header
q.receiptTaskQueue.Push(header, -float32(header.Number.Uint64()))
q.receiptTaskQueue.Push(header, -int64(header.Number.Uint64()))
}
inserts = append(inserts, header)
q.headerHead = hash
@ -437,7 +437,7 @@ func (q *queue) ReserveHeaders(p *peerConnection, count int) *fetchRequest {
}
// Merge all the skipped batches back
for _, from := range skip {
q.headerTaskQueue.Push(from, -float32(from))
q.headerTaskQueue.Push(from, -int64(from))
}
// Assemble and return the block download request
if send == 0 {
@ -544,7 +544,7 @@ func (q *queue) reserveHeaders(p *peerConnection, count int, taskPool map[common
}
// Merge all the skipped headers back
for _, header := range skip {
taskQueue.Push(header, -float32(header.Number.Uint64()))
taskQueue.Push(header, -int64(header.Number.Uint64()))
}
if progress {
// Wake WaitResults, resultCache was modified
@ -587,10 +587,10 @@ func (q *queue) cancel(request *fetchRequest, taskQueue *prque.Prque, pendPool m
defer q.lock.Unlock()
if request.From > 0 {
taskQueue.Push(request.From, -float32(request.From))
taskQueue.Push(request.From, -int64(request.From))
}
for _, header := range request.Headers {
taskQueue.Push(header, -float32(header.Number.Uint64()))
taskQueue.Push(header, -int64(header.Number.Uint64()))
}
delete(pendPool, request.Peer.id)
}
@ -604,13 +604,13 @@ func (q *queue) Revoke(peerId string) {
if request, ok := q.blockPendPool[peerId]; ok {
for _, header := range request.Headers {
q.blockTaskQueue.Push(header, -float32(header.Number.Uint64()))
q.blockTaskQueue.Push(header, -int64(header.Number.Uint64()))
}
delete(q.blockPendPool, peerId)
}
if request, ok := q.receiptPendPool[peerId]; ok {
for _, header := range request.Headers {
q.receiptTaskQueue.Push(header, -float32(header.Number.Uint64()))
q.receiptTaskQueue.Push(header, -int64(header.Number.Uint64()))
}
delete(q.receiptPendPool, peerId)
}
@ -659,10 +659,10 @@ func (q *queue) expire(timeout time.Duration, pendPool map[string]*fetchRequest,
// Return any non satisfied requests to the pool
if request.From > 0 {
taskQueue.Push(request.From, -float32(request.From))
taskQueue.Push(request.From, -int64(request.From))
}
for _, header := range request.Headers {
taskQueue.Push(header, -float32(header.Number.Uint64()))
taskQueue.Push(header, -int64(header.Number.Uint64()))
}
// Add the peer to the expiry report along the the number of failed requests
expiries[id] = len(request.Headers)
@ -733,7 +733,7 @@ func (q *queue) DeliverHeaders(id string, headers []*types.Header, headerProcCh
}
miss[request.From] = struct{}{}
q.headerTaskQueue.Push(request.From, -float32(request.From))
q.headerTaskQueue.Push(request.From, -int64(request.From))
return 0, errors.New("delivery not accepted")
}
// Clean up a successful fetch and try to deliver any sub-results
@ -856,7 +856,7 @@ func (q *queue) deliver(id string, taskPool map[common.Hash]*types.Header, taskQ
// Return all failed or missing fetches to the queue
for _, header := range request.Headers {
if header != nil {
taskQueue.Push(header, -float32(header.Number.Uint64()))
taskQueue.Push(header, -int64(header.Number.Uint64()))
}
}
// Wake up WaitResults

View file

@ -25,10 +25,10 @@ import (
lru "github.com/hashicorp/golang-lru"
"github.com/XinFinOrg/XDPoSChain/common"
"github.com/XinFinOrg/XDPoSChain/common/prque"
"github.com/XinFinOrg/XDPoSChain/consensus"
"github.com/XinFinOrg/XDPoSChain/core/types"
"github.com/XinFinOrg/XDPoSChain/log"
"gopkg.in/karalabe/cookiejar.v2/collections/prque"
)
const (
@ -171,7 +171,7 @@ func New(getBlock blockRetrievalFn, verifyHeader headerVerifierFn, handlePropose
fetching: make(map[common.Hash]*announce),
fetched: make(map[common.Hash][]*announce),
completing: make(map[common.Hash]*announce),
queue: prque.New(),
queue: prque.New(nil),
queues: make(map[string]int),
queued: make(map[common.Hash]*inject),
knowns: knownBlocks,
@ -312,7 +312,7 @@ func (f *Fetcher) loop() {
// If too high up the chain or phase, continue later
number := op.block.NumberU64()
if number > height+1 {
f.queue.Push(op, -float32(op.block.NumberU64()))
f.queue.Push(op, -int64(op.block.NumberU64()))
if f.queueChangeHook != nil {
f.queueChangeHook(op.block.Hash(), true)
}
@ -642,7 +642,7 @@ func (f *Fetcher) enqueue(peer string, block *types.Block) {
f.queues[peer] = count
f.queued[hash] = op
f.knowns.Add(hash, true)
f.queue.Push(op, -float32(block.NumberU64()))
f.queue.Push(op, -int64(block.NumberU64()))
if f.queueChangeHook != nil {
f.queueChangeHook(op.block.Hash(), true)
}

1
go.mod
View file

@ -44,7 +44,6 @@ require (
golang.org/x/sys v0.14.0
golang.org/x/tools v0.14.0
gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c
gopkg.in/karalabe/cookiejar.v2 v2.0.0-20150724131613-8dcd6a7f4951
gopkg.in/natefinch/npipe.v2 v2.0.0-20160621034901-c1b8fa8bdcce
gopkg.in/olebedev/go-duktape.v3 v3.0.0-20190213234257-ec84240a7772
gopkg.in/urfave/cli.v1 v1.20.0

2
go.sum
View file

@ -371,8 +371,6 @@ gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c h1:Hei/4ADfdWqJk1ZMxUNpqntN
gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c/go.mod h1:JHkPIbrfpd72SG/EVd6muEfDQjcINNoR0C8j2r3qZ4Q=
gopkg.in/errgo.v2 v2.1.0/go.mod h1:hNsd1EY+bozCKY1Ytp96fpM3vjJbqLJn88ws8XvfDNI=
gopkg.in/fsnotify.v1 v1.4.7/go.mod h1:Tz8NjZHkW78fSQdbUxIjBTcgA1z1m8ZHf0WmKUhAMys=
gopkg.in/karalabe/cookiejar.v2 v2.0.0-20150724131613-8dcd6a7f4951 h1:DMTcQRFbEH62YPRWwOI647s2e5mHda3oBPMHfrLs2bw=
gopkg.in/karalabe/cookiejar.v2 v2.0.0-20150724131613-8dcd6a7f4951/go.mod h1:owOxCRGGeAx1uugABik6K9oeNu1cgxP/R9ItzLDxNWA=
gopkg.in/natefinch/npipe.v2 v2.0.0-20160621034901-c1b8fa8bdcce h1:+JknDZhAj8YMt7GC73Ei8pv4MzjDUNPHgQWJdtMAaDU=
gopkg.in/natefinch/npipe.v2 v2.0.0-20160621034901-c1b8fa8bdcce/go.mod h1:5AcXVHNjg+BDxry382+8OKon8SEWiKktQR07RKPsv1c=
gopkg.in/olebedev/go-duktape.v3 v3.0.0-20190213234257-ec84240a7772 h1:hhsSf/5z74Ck/DJYc+R8zpq8KGm7uJvpdLRQED/IedA=

View file

@ -91,6 +91,7 @@ type BlockChain interface {
type txPool interface {
AddRemotes(txs []*types.Transaction) []error
AddRemotesSync(txs []*types.Transaction) []error
Status(hashes []common.Hash) []core.TxStatus
}

View file

@ -19,6 +19,7 @@ package light
import (
"context"
"fmt"
"math/big"
"sync"
"time"
@ -66,7 +67,7 @@ type TxPool struct {
mined map[common.Hash][]*types.Transaction // mined transactions by block hash
clearIdx uint64 // earliest block nr that can contain mined tx info
homestead bool
istanbul bool // Fork indicator whether we are in the istanbul stage.
}
// TxRelayBackend provides an interface to the mechanism that forwards transacions
@ -310,7 +311,10 @@ func (pool *TxPool) setNewHead(head *types.Header) {
txc, _ := pool.reorgOnNewHead(ctx, head)
m, r := txc.getLists()
pool.relay.NewHead(pool.head, m, r)
pool.homestead = pool.config.IsHomestead(head.Number)
// Update fork indicator by next pending block number
next := new(big.Int).Add(head.Number, big.NewInt(1))
pool.istanbul = pool.config.IsIstanbul(next)
pool.signer = types.MakeSigner(pool.config, head.Number)
}
@ -403,7 +407,7 @@ func (pool *TxPool) validateTx(ctx context.Context, tx *types.Transaction) error
}
// Should supply enough intrinsic gas
gas, err := core.IntrinsicGas(tx.Data(), tx.To() == nil, pool.homestead)
gas, err := core.IntrinsicGas(tx.Data(), tx.To() == nil, true, pool.istanbul)
if err != nil {
return err
}

View file

@ -6,6 +6,8 @@ import "sync/atomic"
type Gauge interface {
Snapshot() Gauge
Update(int64)
Dec(int64)
Inc(int64)
Value() int64
}
@ -65,6 +67,16 @@ func (GaugeSnapshot) Update(int64) {
panic("Update called on a GaugeSnapshot")
}
// Dec panics.
func (GaugeSnapshot) Dec(int64) {
panic("Dec called on a GaugeSnapshot")
}
// Inc panics.
func (GaugeSnapshot) Inc(int64) {
panic("Inc called on a GaugeSnapshot")
}
// Value returns the value at the time the snapshot was taken.
func (g GaugeSnapshot) Value() int64 { return int64(g) }
@ -77,6 +89,12 @@ func (NilGauge) Snapshot() Gauge { return NilGauge{} }
// Update is a no-op.
func (NilGauge) Update(v int64) {}
// Dec is a no-op.
func (NilGauge) Dec(i int64) {}
// Inc is a no-op.
func (NilGauge) Inc(i int64) {}
// Value is a no-op.
func (NilGauge) Value() int64 { return 0 }
@ -101,6 +119,16 @@ func (g *StandardGauge) Value() int64 {
return atomic.LoadInt64(&g.value)
}
// Dec decrements the gauge's current value by the given amount.
func (g *StandardGauge) Dec(i int64) {
atomic.AddInt64(&g.value, -i)
}
// Inc increments the gauge's current value by the given amount.
func (g *StandardGauge) Inc(i int64) {
atomic.AddInt64(&g.value, i)
}
// FunctionalGauge returns value from given function
type FunctionalGauge struct {
value func() int64
@ -118,3 +146,13 @@ func (g FunctionalGauge) Snapshot() Gauge { return GaugeSnapshot(g.Value()) }
func (FunctionalGauge) Update(int64) {
panic("Update called on a FunctionalGauge")
}
// Dec panics.
func (FunctionalGauge) Dec(int64) {
panic("Dec called on a FunctionalGauge")
}
// Inc panics.
func (FunctionalGauge) Inc(int64) {
panic("Inc called on a FunctionalGauge")
}

View file

@ -23,10 +23,10 @@ var (
)
const (
GasLimitBoundDivisor uint64 = 1024 // The bound divisor of the gas limit, used in update calculations.
MinGasLimit uint64 = 5000 // Minimum the gas limit may ever be.
GasLimitBoundDivisor uint64 = 1024 // The bound divisor of the gas limit, used in update calculations.
MinGasLimit uint64 = 5000 // Minimum the gas limit may ever be.
MaxGasLimit uint64 = 0x7fffffffffffffff // Maximum the gas limit (2^63-1).
GenesisGasLimit uint64 = 4712388 // Gas limit of the Genesis block.
GenesisGasLimit uint64 = 4712388 // Gas limit of the Genesis block.
XDCGenesisGasLimit uint64 = 84000000
MaximumExtraDataSize uint64 = 32 // Maximum size extra data may be after Genesis.
@ -50,17 +50,23 @@ const (
JumpdestGas uint64 = 1 // Refunded gas, once per SSTORE operation if the zeroness changes to zero.
EpochDuration uint64 = 30000 // Duration between proof-of-work epochs.
CallGas uint64 = 40 // Once per CALL operation & message call transaction.
CreateDataGas uint64 = 200 //
CallCreateDepth uint64 = 1024 // Maximum depth of call/create stack.
ExpGas uint64 = 10 // Once per EXP instruction
LogGas uint64 = 375 // Per LOG* operation.
CopyGas uint64 = 3 //
StackLimit uint64 = 1024 // Maximum size of VM stack allowed.
TierStepGas uint64 = 0 // Once per operation, for a selection of them.
LogTopicGas uint64 = 375 // Multiplied by the * of the LOG*, per LOG transaction. e.g. LOG0 incurs 0 * c_txLogTopicGas, LOG4 incurs 4 * c_txLogTopicGas.
CreateGas uint64 = 32000 // Once per CREATE operation & contract-creation transaction.
CreateDataGas uint64 = 200 //
CallCreateDepth uint64 = 1024 // Maximum depth of call/create stack.
ExpGas uint64 = 10 // Once per EXP instruction
LogGas uint64 = 375 // Per LOG* operation.
CopyGas uint64 = 3 //
StackLimit uint64 = 1024 // Maximum size of VM stack allowed.
TierStepGas uint64 = 0 // Once per operation, for a selection of them.
LogTopicGas uint64 = 375 // Multiplied by the * of the LOG*, per LOG transaction. e.g. LOG0 incurs 0 * c_txLogTopicGas, LOG4 incurs 4 * c_txLogTopicGas.
CreateGas uint64 = 32000 // Once per CREATE operation & contract-creation transaction.
Create2Gas uint64 = 32000 // Once per CREATE2 operation
SelfdestructRefundGas uint64 = 24000 // Refunded following a selfdestruct operation.
MemoryGas uint64 = 3 // Times the address of the (highest referenced byte in memory + 1). NOTE: referencing happens on read, write and in instructions such as RETURN and CALL.
TxDataNonZeroGasFrontier uint64 = 68 // Per byte of data attached to a transaction that is not equal to zero. NOTE: Not payable on data of calls between transactions.
TxDataNonZeroGasEIP2028 uint64 = 16 // Per byte of non zero data attached to a transaction after EIP 2028 (part in Istanbul)
SuicideRefundGas uint64 = 24000 // Refunded following a suicide operation.
MemoryGas uint64 = 3 // Times the address of the (highest referenced byte in memory + 1). NOTE: referencing happens on read, write and in instructions such as RETURN and CALL.
TxDataNonZeroGas uint64 = 68 // Per byte of data attached to a transaction that is not equal to zero. NOTE: Not payable on data of calls between transactions.
MaxCodeSize = 24576 // Maximum bytecode to permit for a contract
@ -104,11 +110,6 @@ const (
SstoreResetGasEIP2200 uint64 = 5000 // Once per SSTORE operation from clean non-zero to something else
SstoreClearsScheduleRefundEIP2200 uint64 = 15000 // Once per SSTORE operation for clearing an originally existing storage slot
Create2Gas uint64 = 32000 // Once per CREATE2 operation
SelfdestructRefundGas uint64 = 24000 // Refunded following a selfdestruct operation.
TxDataNonZeroGasFrontier uint64 = 68 // Per byte of data attached to a transaction that is not equal to zero. NOTE: Not payable on data of calls between transactions.
TxDataNonZeroGasEIP2028 uint64 = 16 // Per byte of non zero data attached to a transaction after EIP 2028 (part in Istanbul)
// These have been changed during the course of the chain
CallGasFrontier uint64 = 40 // Once per CALL operation & message call transaction.
CallGasEIP150 uint64 = 700 // Static portion of gas for CALL-derivates after EIP 150 (Tangerine)