core, eth, trie: use common/prque (#17508)

This commit is contained in:
Daniel Liu 2024-05-09 11:46:19 +08:00
parent 6c657ef6af
commit ec50ca36d9
11 changed files with 51 additions and 62 deletions

View file

@ -9,14 +9,13 @@ import (
"github.com/XinFinOrg/XDPoSChain/XDCx/tradingstate"
"github.com/XinFinOrg/XDPoSChain/XDCxDAO"
"github.com/XinFinOrg/XDPoSChain/consensus"
"github.com/XinFinOrg/XDPoSChain/core/types"
"github.com/XinFinOrg/XDPoSChain/p2p"
"gopkg.in/karalabe/cookiejar.v2/collections/prque"
"github.com/XinFinOrg/XDPoSChain/common"
"github.com/XinFinOrg/XDPoSChain/common/prque"
"github.com/XinFinOrg/XDPoSChain/consensus"
"github.com/XinFinOrg/XDPoSChain/core/state"
"github.com/XinFinOrg/XDPoSChain/core/types"
"github.com/XinFinOrg/XDPoSChain/log"
"github.com/XinFinOrg/XDPoSChain/p2p"
"github.com/XinFinOrg/XDPoSChain/rpc"
lru "github.com/hashicorp/golang-lru"
"golang.org/x/sync/syncmap"
@ -105,7 +104,7 @@ func New(cfg *Config) *XDCX {
}
XDCX := &XDCX{
orderNonce: make(map[common.Address]*big.Int),
Triegc: prque.New(),
Triegc: prque.New(nil),
tokenDecimalCache: tokenDecimalCache,
orderCache: orderCache,
}

View file

@ -12,14 +12,13 @@ import (
"github.com/XinFinOrg/XDPoSChain/XDCx/tradingstate"
"github.com/XinFinOrg/XDPoSChain/XDCxDAO"
"github.com/XinFinOrg/XDPoSChain/XDCxlending/lendingstate"
"github.com/XinFinOrg/XDPoSChain/consensus"
"github.com/XinFinOrg/XDPoSChain/core/types"
"github.com/XinFinOrg/XDPoSChain/p2p"
"gopkg.in/karalabe/cookiejar.v2/collections/prque"
"github.com/XinFinOrg/XDPoSChain/common"
"github.com/XinFinOrg/XDPoSChain/common/prque"
"github.com/XinFinOrg/XDPoSChain/consensus"
"github.com/XinFinOrg/XDPoSChain/core/state"
"github.com/XinFinOrg/XDPoSChain/core/types"
"github.com/XinFinOrg/XDPoSChain/log"
"github.com/XinFinOrg/XDPoSChain/p2p"
"github.com/XinFinOrg/XDPoSChain/rpc"
lru "github.com/hashicorp/golang-lru"
)
@ -67,7 +66,7 @@ func New(XDCx *XDCx.XDCX) *Lending {
lendingTradeCache, _ := lru.New(defaultCacheLimit)
lending := &Lending{
orderNonce: make(map[common.Address]*big.Int),
Triegc: prque.New(),
Triegc: prque.New(nil),
lendingItemHistory: itemCache,
lendingTradeHistory: lendingTradeCache,
}

View file

@ -7,11 +7,11 @@ import (
"github.com/XinFinOrg/XDPoSChain/XDCx/tradingstate"
"github.com/XinFinOrg/XDPoSChain/XDCxlending/lendingstate"
"github.com/XinFinOrg/XDPoSChain/common"
"github.com/XinFinOrg/XDPoSChain/common/prque"
"github.com/XinFinOrg/XDPoSChain/consensus"
"github.com/XinFinOrg/XDPoSChain/consensus/clique"
"github.com/XinFinOrg/XDPoSChain/core/state"
"github.com/XinFinOrg/XDPoSChain/core/types"
"gopkg.in/karalabe/cookiejar.v2/collections/prque"
)
type Masternode struct {

View file

@ -28,13 +28,12 @@ import (
"sync/atomic"
"time"
"github.com/XinFinOrg/XDPoSChain/XDCxlending/lendingstate"
"github.com/XinFinOrg/XDPoSChain/XDCx/tradingstate"
"github.com/XinFinOrg/XDPoSChain/XDCxlending/lendingstate"
"github.com/XinFinOrg/XDPoSChain/accounts/abi/bind"
"github.com/XinFinOrg/XDPoSChain/common"
"github.com/XinFinOrg/XDPoSChain/common/mclock"
"github.com/XinFinOrg/XDPoSChain/common/prque"
"github.com/XinFinOrg/XDPoSChain/common/sort"
"github.com/XinFinOrg/XDPoSChain/consensus"
"github.com/XinFinOrg/XDPoSChain/consensus/XDPoS"
@ -53,7 +52,6 @@ import (
"github.com/XinFinOrg/XDPoSChain/rlp"
"github.com/XinFinOrg/XDPoSChain/trie"
lru "github.com/hashicorp/golang-lru"
"gopkg.in/karalabe/cookiejar.v2/collections/prque"
)
var (
@ -201,7 +199,7 @@ func NewBlockChain(db ethdb.Database, cacheConfig *CacheConfig, chainConfig *par
chainConfig: chainConfig,
cacheConfig: cacheConfig,
db: db,
triegc: prque.New(),
triegc: prque.New(nil),
stateCache: state.NewDatabase(db),
quit: make(chan struct{}),
bodyCache: bodyCache,
@ -1268,18 +1266,18 @@ func (bc *BlockChain) WriteBlockWithState(block *types.Block, receipts []*types.
} else {
// Full but not archive node, do proper garbage collection
triedb.Reference(root, common.Hash{}) // metadata reference to keep trie alive
bc.triegc.Push(root, -float32(block.NumberU64()))
bc.triegc.Push(root, -int64(block.NumberU64()))
if tradingTrieDb != nil {
tradingTrieDb.Reference(tradingRoot, common.Hash{})
}
if tradingService != nil {
tradingService.GetTriegc().Push(tradingRoot, -float32(block.NumberU64()))
tradingService.GetTriegc().Push(tradingRoot, -int64(block.NumberU64()))
}
if lendingTrieDb != nil {
lendingTrieDb.Reference(lendingRoot, common.Hash{})
}
if lendingService != nil {
lendingService.GetTriegc().Push(lendingRoot, -float32(block.NumberU64()))
lendingService.GetTriegc().Push(lendingRoot, -int64(block.NumberU64()))
}
if current := block.NumberU64(); current > triesInMemory {
// Find the next state trie we need to commit

View file

@ -24,18 +24,16 @@ import (
"sync"
"time"
"github.com/XinFinOrg/XDPoSChain/consensus/XDPoS"
"github.com/XinFinOrg/XDPoSChain/XDCxlending/lendingstate"
"github.com/XinFinOrg/XDPoSChain/consensus"
"github.com/XinFinOrg/XDPoSChain/common"
"github.com/XinFinOrg/XDPoSChain/common/prque"
"github.com/XinFinOrg/XDPoSChain/consensus"
"github.com/XinFinOrg/XDPoSChain/consensus/XDPoS"
"github.com/XinFinOrg/XDPoSChain/core/state"
"github.com/XinFinOrg/XDPoSChain/core/types"
"github.com/XinFinOrg/XDPoSChain/event"
"github.com/XinFinOrg/XDPoSChain/log"
"github.com/XinFinOrg/XDPoSChain/params"
"gopkg.in/karalabe/cookiejar.v2/collections/prque"
)
var (
@ -998,11 +996,11 @@ func (pool *LendingPool) promoteExecutables(accounts []common.Address) {
if pending > pool.config.GlobalSlots {
pendingBeforeCap := pending
// Assemble a spam order to penalize large transactors first
spammers := prque.New()
spammers := prque.New(nil)
for addr, list := range pool.pending {
// Only evict transactions from high rollers
if !pool.locals.contains(addr) && uint64(list.Len()) > pool.config.AccountSlots {
spammers.Push(addr, float32(list.Len()))
spammers.Push(addr, int64(list.Len()))
}
}
// Gradually drop transactions from offenders

View file

@ -25,16 +25,15 @@ import (
"time"
"github.com/XinFinOrg/XDPoSChain/XDCx/tradingstate"
"github.com/XinFinOrg/XDPoSChain/common"
"github.com/XinFinOrg/XDPoSChain/common/prque"
"github.com/XinFinOrg/XDPoSChain/consensus"
"github.com/XinFinOrg/XDPoSChain/consensus/XDPoS"
"github.com/XinFinOrg/XDPoSChain/common"
"github.com/XinFinOrg/XDPoSChain/core/state"
"github.com/XinFinOrg/XDPoSChain/core/types"
"github.com/XinFinOrg/XDPoSChain/event"
"github.com/XinFinOrg/XDPoSChain/log"
"github.com/XinFinOrg/XDPoSChain/params"
"gopkg.in/karalabe/cookiejar.v2/collections/prque"
)
var (
@ -914,11 +913,11 @@ func (pool *OrderPool) promoteExecutables(accounts []common.Address) {
if pending > pool.config.GlobalSlots {
pendingBeforeCap := pending
// Assemble a spam order to penalize large transactors first
spammers := prque.New()
spammers := prque.New(nil)
for addr, list := range pool.pending {
// Only evict transactions from high rollers
if !pool.locals.contains(addr) && uint64(list.Len()) > pool.config.AccountSlots {
spammers.Push(addr, float32(list.Len()))
spammers.Push(addr, int64(list.Len()))
}
}
// Gradually drop transactions from offenders

View file

@ -25,16 +25,15 @@ import (
"sync"
"time"
"github.com/XinFinOrg/XDPoSChain/consensus"
"github.com/XinFinOrg/XDPoSChain/common"
"github.com/XinFinOrg/XDPoSChain/common/prque"
"github.com/XinFinOrg/XDPoSChain/consensus"
"github.com/XinFinOrg/XDPoSChain/core/state"
"github.com/XinFinOrg/XDPoSChain/core/types"
"github.com/XinFinOrg/XDPoSChain/event"
"github.com/XinFinOrg/XDPoSChain/log"
"github.com/XinFinOrg/XDPoSChain/metrics"
"github.com/XinFinOrg/XDPoSChain/params"
"gopkg.in/karalabe/cookiejar.v2/collections/prque"
)
const (
@ -1135,11 +1134,11 @@ func (pool *TxPool) promoteExecutables(accounts []common.Address) {
if pending > pool.config.GlobalSlots {
pendingBeforeCap := pending
// Assemble a spam order to penalize large transactors first
spammers := prque.New()
spammers := prque.New(nil)
for addr, list := range pool.pending {
// Only evict transactions from high rollers
if !pool.locals.contains(addr) && uint64(list.Len()) > pool.config.AccountSlots {
spammers.Push(addr, float32(list.Len()))
spammers.Push(addr, int64(list.Len()))
}
}
// Gradually drop transactions from offenders

View file

@ -26,10 +26,10 @@ import (
"time"
"github.com/XinFinOrg/XDPoSChain/common"
"github.com/XinFinOrg/XDPoSChain/common/prque"
"github.com/XinFinOrg/XDPoSChain/core/types"
"github.com/XinFinOrg/XDPoSChain/log"
"github.com/XinFinOrg/XDPoSChain/metrics"
"gopkg.in/karalabe/cookiejar.v2/collections/prque"
)
var (
@ -105,11 +105,11 @@ func newQueue() *queue {
headerPendPool: make(map[string]*fetchRequest),
headerContCh: make(chan bool),
blockTaskPool: make(map[common.Hash]*types.Header),
blockTaskQueue: prque.New(),
blockTaskQueue: prque.New(nil),
blockPendPool: make(map[string]*fetchRequest),
blockDonePool: make(map[common.Hash]struct{}),
receiptTaskPool: make(map[common.Hash]*types.Header),
receiptTaskQueue: prque.New(),
receiptTaskQueue: prque.New(nil),
receiptPendPool: make(map[string]*fetchRequest),
receiptDonePool: make(map[common.Hash]struct{}),
resultCache: make([]*fetchResult, blockCacheItems),
@ -278,7 +278,7 @@ func (q *queue) ScheduleSkeleton(from uint64, skeleton []*types.Header) {
}
// Shedule all the header retrieval tasks for the skeleton assembly
q.headerTaskPool = make(map[uint64]*types.Header)
q.headerTaskQueue = prque.New()
q.headerTaskQueue = prque.New(nil)
q.headerPeerMiss = make(map[string]map[uint64]struct{}) // Reset availability to correct invalid chains
q.headerResults = make([]*types.Header, len(skeleton)*MaxHeaderFetch)
q.headerProced = 0
@ -289,7 +289,7 @@ func (q *queue) ScheduleSkeleton(from uint64, skeleton []*types.Header) {
index := from + uint64(i*MaxHeaderFetch)
q.headerTaskPool[index] = header
q.headerTaskQueue.Push(index, -float32(index))
q.headerTaskQueue.Push(index, -int64(index))
}
}
@ -335,11 +335,11 @@ func (q *queue) Schedule(headers []*types.Header, from uint64) []*types.Header {
}
// Queue the header for content retrieval
q.blockTaskPool[hash] = header
q.blockTaskQueue.Push(header, -float32(header.Number.Uint64()))
q.blockTaskQueue.Push(header, -int64(header.Number.Uint64()))
if q.mode == FastSync {
q.receiptTaskPool[hash] = header
q.receiptTaskQueue.Push(header, -float32(header.Number.Uint64()))
q.receiptTaskQueue.Push(header, -int64(header.Number.Uint64()))
}
inserts = append(inserts, header)
q.headerHead = hash
@ -437,7 +437,7 @@ func (q *queue) ReserveHeaders(p *peerConnection, count int) *fetchRequest {
}
// Merge all the skipped batches back
for _, from := range skip {
q.headerTaskQueue.Push(from, -float32(from))
q.headerTaskQueue.Push(from, -int64(from))
}
// Assemble and return the block download request
if send == 0 {
@ -544,7 +544,7 @@ func (q *queue) reserveHeaders(p *peerConnection, count int, taskPool map[common
}
// Merge all the skipped headers back
for _, header := range skip {
taskQueue.Push(header, -float32(header.Number.Uint64()))
taskQueue.Push(header, -int64(header.Number.Uint64()))
}
if progress {
// Wake WaitResults, resultCache was modified
@ -587,10 +587,10 @@ func (q *queue) cancel(request *fetchRequest, taskQueue *prque.Prque, pendPool m
defer q.lock.Unlock()
if request.From > 0 {
taskQueue.Push(request.From, -float32(request.From))
taskQueue.Push(request.From, -int64(request.From))
}
for _, header := range request.Headers {
taskQueue.Push(header, -float32(header.Number.Uint64()))
taskQueue.Push(header, -int64(header.Number.Uint64()))
}
delete(pendPool, request.Peer.id)
}
@ -604,13 +604,13 @@ func (q *queue) Revoke(peerId string) {
if request, ok := q.blockPendPool[peerId]; ok {
for _, header := range request.Headers {
q.blockTaskQueue.Push(header, -float32(header.Number.Uint64()))
q.blockTaskQueue.Push(header, -int64(header.Number.Uint64()))
}
delete(q.blockPendPool, peerId)
}
if request, ok := q.receiptPendPool[peerId]; ok {
for _, header := range request.Headers {
q.receiptTaskQueue.Push(header, -float32(header.Number.Uint64()))
q.receiptTaskQueue.Push(header, -int64(header.Number.Uint64()))
}
delete(q.receiptPendPool, peerId)
}
@ -659,10 +659,10 @@ func (q *queue) expire(timeout time.Duration, pendPool map[string]*fetchRequest,
// Return any non satisfied requests to the pool
if request.From > 0 {
taskQueue.Push(request.From, -float32(request.From))
taskQueue.Push(request.From, -int64(request.From))
}
for _, header := range request.Headers {
taskQueue.Push(header, -float32(header.Number.Uint64()))
taskQueue.Push(header, -int64(header.Number.Uint64()))
}
// Add the peer to the expiry report along the the number of failed requests
expiries[id] = len(request.Headers)
@ -733,7 +733,7 @@ func (q *queue) DeliverHeaders(id string, headers []*types.Header, headerProcCh
}
miss[request.From] = struct{}{}
q.headerTaskQueue.Push(request.From, -float32(request.From))
q.headerTaskQueue.Push(request.From, -int64(request.From))
return 0, errors.New("delivery not accepted")
}
// Clean up a successful fetch and try to deliver any sub-results
@ -856,7 +856,7 @@ func (q *queue) deliver(id string, taskPool map[common.Hash]*types.Header, taskQ
// Return all failed or missing fetches to the queue
for _, header := range request.Headers {
if header != nil {
taskQueue.Push(header, -float32(header.Number.Uint64()))
taskQueue.Push(header, -int64(header.Number.Uint64()))
}
}
// Wake up WaitResults

View file

@ -25,10 +25,10 @@ import (
lru "github.com/hashicorp/golang-lru"
"github.com/XinFinOrg/XDPoSChain/common"
"github.com/XinFinOrg/XDPoSChain/common/prque"
"github.com/XinFinOrg/XDPoSChain/consensus"
"github.com/XinFinOrg/XDPoSChain/core/types"
"github.com/XinFinOrg/XDPoSChain/log"
"gopkg.in/karalabe/cookiejar.v2/collections/prque"
)
const (
@ -171,7 +171,7 @@ func New(getBlock blockRetrievalFn, verifyHeader headerVerifierFn, handlePropose
fetching: make(map[common.Hash]*announce),
fetched: make(map[common.Hash][]*announce),
completing: make(map[common.Hash]*announce),
queue: prque.New(),
queue: prque.New(nil),
queues: make(map[string]int),
queued: make(map[common.Hash]*inject),
knowns: knownBlocks,
@ -312,7 +312,7 @@ func (f *Fetcher) loop() {
// If too high up the chain or phase, continue later
number := op.block.NumberU64()
if number > height+1 {
f.queue.Push(op, -float32(op.block.NumberU64()))
f.queue.Push(op, -int64(op.block.NumberU64()))
if f.queueChangeHook != nil {
f.queueChangeHook(op.block.Hash(), true)
}
@ -642,7 +642,7 @@ func (f *Fetcher) enqueue(peer string, block *types.Block) {
f.queues[peer] = count
f.queued[hash] = op
f.knowns.Add(hash, true)
f.queue.Push(op, -float32(block.NumberU64()))
f.queue.Push(op, -int64(block.NumberU64()))
if f.queueChangeHook != nil {
f.queueChangeHook(op.block.Hash(), true)
}

1
go.mod
View file

@ -44,7 +44,6 @@ require (
golang.org/x/sys v0.14.0
golang.org/x/tools v0.14.0
gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c
gopkg.in/karalabe/cookiejar.v2 v2.0.0-20150724131613-8dcd6a7f4951
gopkg.in/natefinch/npipe.v2 v2.0.0-20160621034901-c1b8fa8bdcce
gopkg.in/olebedev/go-duktape.v3 v3.0.0-20190213234257-ec84240a7772
gopkg.in/urfave/cli.v1 v1.20.0

2
go.sum
View file

@ -371,8 +371,6 @@ gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c h1:Hei/4ADfdWqJk1ZMxUNpqntN
gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c/go.mod h1:JHkPIbrfpd72SG/EVd6muEfDQjcINNoR0C8j2r3qZ4Q=
gopkg.in/errgo.v2 v2.1.0/go.mod h1:hNsd1EY+bozCKY1Ytp96fpM3vjJbqLJn88ws8XvfDNI=
gopkg.in/fsnotify.v1 v1.4.7/go.mod h1:Tz8NjZHkW78fSQdbUxIjBTcgA1z1m8ZHf0WmKUhAMys=
gopkg.in/karalabe/cookiejar.v2 v2.0.0-20150724131613-8dcd6a7f4951 h1:DMTcQRFbEH62YPRWwOI647s2e5mHda3oBPMHfrLs2bw=
gopkg.in/karalabe/cookiejar.v2 v2.0.0-20150724131613-8dcd6a7f4951/go.mod h1:owOxCRGGeAx1uugABik6K9oeNu1cgxP/R9ItzLDxNWA=
gopkg.in/natefinch/npipe.v2 v2.0.0-20160621034901-c1b8fa8bdcce h1:+JknDZhAj8YMt7GC73Ei8pv4MzjDUNPHgQWJdtMAaDU=
gopkg.in/natefinch/npipe.v2 v2.0.0-20160621034901-c1b8fa8bdcce/go.mod h1:5AcXVHNjg+BDxry382+8OKon8SEWiKktQR07RKPsv1c=
gopkg.in/olebedev/go-duktape.v3 v3.0.0-20190213234257-ec84240a7772 h1:hhsSf/5z74Ck/DJYc+R8zpq8KGm7uJvpdLRQED/IedA=