From 39e9d8f94def2a5e3eaaaf0ebf487614d15c3ca2 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?P=C3=A9ter=20Szil=C3=A1gyi?= Date: Thu, 9 Feb 2023 13:03:54 +0200 Subject: [PATCH] common/prque: generic priority queue (#26290) --- XDCx/XDCx.go | 8 ++-- XDCxlending/XDCxlending.go | 8 ++-- common/prque/lazyqueue.go | 78 +++++++++++++++++----------------- common/prque/prque.go | 44 +++++++++---------- common/prque/prque_test.go | 27 ++++++------ common/prque/sstack.go | 65 +++++++++++++--------------- common/prque/sstack_test.go | 30 ++++++------- consensus/XDPoS/utils/types.go | 4 +- core/blockchain.go | 18 ++++---- core/txpool/lending_pool.go | 6 +-- core/txpool/order_pool.go | 6 +-- core/txpool/txpool.go | 6 +-- eth/downloader/queue.go | 48 ++++++++++----------- eth/fetcher/fetcher.go | 11 +++-- go.mod | 1 + go.sum | 2 + trie/sync.go | 4 +- 17 files changed, 178 insertions(+), 188 deletions(-) diff --git a/XDCx/XDCx.go b/XDCx/XDCx.go index fbb7372402..1c09249874 100644 --- a/XDCx/XDCx.go +++ b/XDCx/XDCx.go @@ -52,8 +52,8 @@ type XDCX struct { // Order related db XDCxDAO.XDCXDAO mongodb XDCxDAO.XDCXDAO - Triegc *prque.Prque // Priority queue mapping block numbers to tries to gc - StateCache tradingstate.Database // State database to reuse between imports (contains state cache) *XDCx_state.TradingStateDB + Triegc *prque.Prque[int64, common.Hash] // Priority queue mapping block numbers to tries to gc + StateCache tradingstate.Database // State database to reuse between imports (contains state cache) *XDCx_state.TradingStateDB orderNonce map[common.Address]*big.Int @@ -94,7 +94,7 @@ func NewMongoDBEngine(cfg *Config) *XDCxDAO.MongoDatabase { func New(cfg *Config) *XDCX { XDCX := &XDCX{ orderNonce: make(map[common.Address]*big.Int), - Triegc: prque.New(nil), + Triegc: prque.New[int64, common.Hash](nil), tokenDecimalCache: lru.NewCache[common.Address, *big.Int](defaultCacheLimit), orderCache: lru.NewCache[common.Hash, map[common.Hash]tradingstate.OrderHistoryItem](tradingstate.OrderCacheLimit), } @@ -579,7 +579,7 @@ func (XDCx *XDCX) HasTradingState(block *types.Block, author common.Address) boo return err == nil } -func (XDCx *XDCX) GetTriegc() *prque.Prque { +func (XDCx *XDCX) GetTriegc() *prque.Prque[int64, common.Hash] { return XDCx.Triegc } diff --git a/XDCxlending/XDCxlending.go b/XDCxlending/XDCxlending.go index a1f8e9749f..34a0009554 100644 --- a/XDCxlending/XDCxlending.go +++ b/XDCxlending/XDCxlending.go @@ -36,8 +36,8 @@ var ( ) type Lending struct { - Triegc *prque.Prque // Priority queue mapping block numbers to tries to gc - StateCache lendingstate.Database // State database to reuse between imports (contains state cache) *lendingstate.TradingStateDB + Triegc *prque.Prque[int64, common.Hash] // Priority queue mapping block numbers to tries to gc + StateCache lendingstate.Database // State database to reuse between imports (contains state cache) *lendingstate.TradingStateDB orderNonce map[common.Address]*big.Int @@ -61,7 +61,7 @@ func (l *Lending) Stop() error { func New(XDCx *XDCx.XDCX) *Lending { lending := &Lending{ orderNonce: make(map[common.Address]*big.Int), - Triegc: prque.New(nil), + Triegc: prque.New[int64, common.Hash](nil), lendingItemHistory: lru.NewCache[common.Hash, map[common.Hash]lendingstate.LendingItemHistoryItem](defaultCacheLimit), lendingTradeHistory: lru.NewCache[common.Hash, map[common.Hash]lendingstate.LendingTradeHistoryItem](defaultCacheLimit), } @@ -682,7 +682,7 @@ func (l *Lending) HasLendingState(block *types.Block, author common.Address) boo return err == nil } -func (l *Lending) GetTriegc() *prque.Prque { +func (l *Lending) GetTriegc() *prque.Prque[int64, common.Hash] { return l.Triegc } diff --git a/common/prque/lazyqueue.go b/common/prque/lazyqueue.go index 883d210735..ae3cd798da 100644 --- a/common/prque/lazyqueue.go +++ b/common/prque/lazyqueue.go @@ -21,6 +21,7 @@ import ( "time" "github.com/XinFinOrg/XDPoSChain/common/mclock" + "golang.org/x/exp/constraints" ) // LazyQueue is a priority queue data structure where priorities can change over @@ -32,31 +33,31 @@ import ( // // If the upper estimate is exceeded then Update should be called for that item. // A global Refresh function should also be called periodically. -type LazyQueue struct { +type LazyQueue[P constraints.Ordered, V any] struct { clock mclock.Clock // Items are stored in one of two internal queues ordered by estimated max // priority until the next and the next-after-next refresh. Update and Refresh // always places items in queue[1]. - queue [2]*sstack - popQueue *sstack + queue [2]*sstack[P, V] + popQueue *sstack[P, V] period time.Duration maxUntil mclock.AbsTime indexOffset int - setIndex SetIndexCallback - priority PriorityCallback - maxPriority MaxPriorityCallback + setIndex SetIndexCallback[V] + priority PriorityCallback[P, V] + maxPriority MaxPriorityCallback[P, V] lastRefresh1, lastRefresh2 mclock.AbsTime } type ( - PriorityCallback func(data interface{}) int64 // actual priority callback - MaxPriorityCallback func(data interface{}, until mclock.AbsTime) int64 // estimated maximum priority callback + PriorityCallback[P constraints.Ordered, V any] func(data V) P // actual priority callback + MaxPriorityCallback[P constraints.Ordered, V any] func(data V, until mclock.AbsTime) P // estimated maximum priority callback ) // NewLazyQueue creates a new lazy queue -func NewLazyQueue(setIndex SetIndexCallback, priority PriorityCallback, maxPriority MaxPriorityCallback, clock mclock.Clock, refreshPeriod time.Duration) *LazyQueue { - q := &LazyQueue{ - popQueue: newSstack(nil, false), +func NewLazyQueue[P constraints.Ordered, V any](setIndex SetIndexCallback[V], priority PriorityCallback[P, V], maxPriority MaxPriorityCallback[P, V], clock mclock.Clock, refreshPeriod time.Duration) *LazyQueue[P, V] { + q := &LazyQueue[P, V]{ + popQueue: newSstack[P, V](nil), setIndex: setIndex, priority: priority, maxPriority: maxPriority, @@ -71,13 +72,13 @@ func NewLazyQueue(setIndex SetIndexCallback, priority PriorityCallback, maxPrior } // Reset clears the contents of the queue -func (q *LazyQueue) Reset() { - q.queue[0] = newSstack(q.setIndex0, false) - q.queue[1] = newSstack(q.setIndex1, false) +func (q *LazyQueue[P, V]) Reset() { + q.queue[0] = newSstack[P, V](q.setIndex0) + q.queue[1] = newSstack[P, V](q.setIndex1) } // Refresh performs queue re-evaluation if necessary -func (q *LazyQueue) Refresh() { +func (q *LazyQueue[P, V]) Refresh() { now := q.clock.Now() for time.Duration(now-q.lastRefresh2) >= q.period*2 { q.refresh(now) @@ -87,10 +88,10 @@ func (q *LazyQueue) Refresh() { } // refresh re-evaluates items in the older queue and swaps the two queues -func (q *LazyQueue) refresh(now mclock.AbsTime) { +func (q *LazyQueue[P, V]) refresh(now mclock.AbsTime) { q.maxUntil = now.Add(q.period) for q.queue[0].Len() != 0 { - q.Push(heap.Pop(q.queue[0]).(*item).value) + q.Push(heap.Pop(q.queue[0]).(*item[P, V]).value) } q.queue[0], q.queue[1] = q.queue[1], q.queue[0] q.indexOffset = 1 - q.indexOffset @@ -98,22 +99,22 @@ func (q *LazyQueue) refresh(now mclock.AbsTime) { } // Push adds an item to the queue -func (q *LazyQueue) Push(data interface{}) { - heap.Push(q.queue[1], &item{data, q.maxPriority(data, q.maxUntil)}) +func (q *LazyQueue[P, V]) Push(data V) { + heap.Push(q.queue[1], &item[P, V]{data, q.maxPriority(data, q.maxUntil)}) } // Update updates the upper priority estimate for the item with the given queue index -func (q *LazyQueue) Update(index int) { +func (q *LazyQueue[P, V]) Update(index int) { q.Push(q.Remove(index)) } // Pop removes and returns the item with the greatest actual priority -func (q *LazyQueue) Pop() (interface{}, int64) { +func (q *LazyQueue[P, V]) Pop() (V, P) { var ( - resData interface{} - resPri int64 + resData V + resPri P ) - q.MultiPop(func(data interface{}, priority int64) bool { + q.MultiPop(func(data V, priority P) bool { resData = data resPri = priority return false @@ -123,7 +124,7 @@ func (q *LazyQueue) Pop() (interface{}, int64) { // peekIndex returns the index of the internal queue where the item with the // highest estimated priority is or -1 if both are empty -func (q *LazyQueue) peekIndex() int { +func (q *LazyQueue[P, V]) peekIndex() int { if q.queue[0].Len() != 0 { if q.queue[1].Len() != 0 && q.queue[1].blocks[0][0].priority > q.queue[0].blocks[0][0].priority { return 1 @@ -139,17 +140,17 @@ func (q *LazyQueue) peekIndex() int { // MultiPop pops multiple items from the queue and is more efficient than calling // Pop multiple times. Popped items are passed to the callback. MultiPop returns // when the callback returns false or there are no more items to pop. -func (q *LazyQueue) MultiPop(callback func(data interface{}, priority int64) bool) { +func (q *LazyQueue[P, V]) MultiPop(callback func(data V, priority P) bool) { nextIndex := q.peekIndex() for nextIndex != -1 { - data := heap.Pop(q.queue[nextIndex]).(*item).value - heap.Push(q.popQueue, &item{data, q.priority(data)}) + data := heap.Pop(q.queue[nextIndex]).(*item[P, V]).value + heap.Push(q.popQueue, &item[P, V]{data, q.priority(data)}) nextIndex = q.peekIndex() for q.popQueue.Len() != 0 && (nextIndex == -1 || q.queue[nextIndex].blocks[0][0].priority < q.popQueue.blocks[0][0].priority) { - i := heap.Pop(q.popQueue).(*item) + i := heap.Pop(q.popQueue).(*item[P, V]) if !callback(i.value, i.priority) { for q.popQueue.Len() != 0 { - q.Push(heap.Pop(q.popQueue).(*item).value) + q.Push(heap.Pop(q.popQueue).(*item[P, V]).value) } return } @@ -159,31 +160,28 @@ func (q *LazyQueue) MultiPop(callback func(data interface{}, priority int64) boo } // PopItem pops the item from the queue only, dropping the associated priority value. -func (q *LazyQueue) PopItem() interface{} { +func (q *LazyQueue[P, V]) PopItem() V { i, _ := q.Pop() return i } // Remove removes the item with the given index. -func (q *LazyQueue) Remove(index int) interface{} { - if index < 0 { - return nil - } - return heap.Remove(q.queue[index&1^q.indexOffset], index>>1).(*item).value +func (q *LazyQueue[P, V]) Remove(index int) V { + return heap.Remove(q.queue[index&1^q.indexOffset], index>>1).(*item[P, V]).value } // Empty checks whether the priority queue is empty. -func (q *LazyQueue) Empty() bool { +func (q *LazyQueue[P, V]) Empty() bool { return q.queue[0].Len() == 0 && q.queue[1].Len() == 0 } // Size returns the number of items in the priority queue. -func (q *LazyQueue) Size() int { +func (q *LazyQueue[P, V]) Size() int { return q.queue[0].Len() + q.queue[1].Len() } // setIndex0 translates internal queue item index to the virtual index space of LazyQueue -func (q *LazyQueue) setIndex0(data interface{}, index int) { +func (q *LazyQueue[P, V]) setIndex0(data V, index int) { if index == -1 { q.setIndex(data, -1) } else { @@ -192,6 +190,6 @@ func (q *LazyQueue) setIndex0(data interface{}, index int) { } // setIndex1 translates internal queue item index to the virtual index space of LazyQueue -func (q *LazyQueue) setIndex1(data interface{}, index int) { +func (q *LazyQueue[P, V]) setIndex1(data V, index int) { q.setIndex(data, index+index+1) } diff --git a/common/prque/prque.go b/common/prque/prque.go index fb02e3418c..0e8c9f897f 100755 --- a/common/prque/prque.go +++ b/common/prque/prque.go @@ -19,65 +19,59 @@ package prque import ( "container/heap" + + "golang.org/x/exp/constraints" ) // Priority queue data structure. -type Prque struct { - cont *sstack +type Prque[P constraints.Ordered, V any] struct { + cont *sstack[P, V] } // New creates a new priority queue. -func New(setIndex SetIndexCallback) *Prque { - return &Prque{newSstack(setIndex, false)} -} - -// NewWrapAround creates a new priority queue with wrap-around priority handling. -func NewWrapAround(setIndex SetIndexCallback) *Prque { - return &Prque{newSstack(setIndex, true)} +func New[P constraints.Ordered, V any](setIndex SetIndexCallback[V]) *Prque[P, V] { + return &Prque[P, V]{newSstack[P, V](setIndex)} } // Pushes a value with a given priority into the queue, expanding if necessary. -func (p *Prque) Push(data interface{}, priority int64) { - heap.Push(p.cont, &item{data, priority}) +func (p *Prque[P, V]) Push(data V, priority P) { + heap.Push(p.cont, &item[P, V]{data, priority}) } // Peek returns the value with the greatest priority but does not pop it off. -func (p *Prque) Peek() (interface{}, int64) { +func (p *Prque[P, V]) Peek() (V, P) { item := p.cont.blocks[0][0] return item.value, item.priority } // Pops the value with the greatest priority off the stack and returns it. // Currently no shrinking is done. -func (p *Prque) Pop() (interface{}, int64) { - item := heap.Pop(p.cont).(*item) +func (p *Prque[P, V]) Pop() (V, P) { + item := heap.Pop(p.cont).(*item[P, V]) return item.value, item.priority } // Pops only the item from the queue, dropping the associated priority value. -func (p *Prque) PopItem() interface{} { - return heap.Pop(p.cont).(*item).value +func (p *Prque[P, V]) PopItem() V { + return heap.Pop(p.cont).(*item[P, V]).value } // Remove removes the element with the given index. -func (p *Prque) Remove(i int) interface{} { - if i < 0 { - return nil - } - return heap.Remove(p.cont, i) +func (p *Prque[P, V]) Remove(i int) V { + return heap.Remove(p.cont, i).(*item[P, V]).value } // Checks whether the priority queue is empty. -func (p *Prque) Empty() bool { +func (p *Prque[P, V]) Empty() bool { return p.cont.Len() == 0 } // Returns the number of element in the priority queue. -func (p *Prque) Size() int { +func (p *Prque[P, V]) Size() int { return p.cont.Len() } // Clears the contents of the priority queue. -func (p *Prque) Reset() { - *p = *New(p.cont.setIndex) +func (p *Prque[P, V]) Reset() { + *p = *New[P, V](p.cont.setIndex) } diff --git a/common/prque/prque_test.go b/common/prque/prque_test.go index 1cffcebad4..c4910f205a 100644 --- a/common/prque/prque_test.go +++ b/common/prque/prque_test.go @@ -21,22 +21,24 @@ func TestPrque(t *testing.T) { for i := 0; i < size; i++ { data[i] = rand.Int() } - queue := New(nil) + queue := New[int, int](nil) + for rep := 0; rep < 2; rep++ { // Fill a priority queue with the above data for i := 0; i < size; i++ { - queue.Push(data[i], int64(prio[i])) + queue.Push(data[i], prio[i]) if queue.Size() != i+1 { t.Errorf("queue size mismatch: have %v, want %v.", queue.Size(), i+1) } } // Create a map the values to the priorities for easier verification - dict := make(map[int64]int) + dict := make(map[int]int) for i := 0; i < size; i++ { - dict[int64(prio[i])] = data[i] + dict[prio[i]] = data[i] } + // Pop out the elements in priority order and verify them - prevPrio := int64(size + 1) + prevPrio := size + 1 for !queue.Empty() { val, prio := queue.Pop() if prio > prevPrio { @@ -59,22 +61,23 @@ func TestReset(t *testing.T) { for i := 0; i < size; i++ { data[i] = rand.Int() } - queue := New(nil) + queue := New[int, int](nil) + for rep := 0; rep < 2; rep++ { // Fill a priority queue with the above data for i := 0; i < size; i++ { - queue.Push(data[i], int64(prio[i])) + queue.Push(data[i], prio[i]) if queue.Size() != i+1 { t.Errorf("queue size mismatch: have %v, want %v.", queue.Size(), i+1) } } // Create a map the values to the priorities for easier verification - dict := make(map[int64]int) + dict := make(map[int]int) for i := 0; i < size; i++ { - dict[int64(prio[i])] = data[i] + dict[prio[i]] = data[i] } // Pop out half the elements in priority order and verify them - prevPrio := int64(size + 1) + prevPrio := size + 1 for i := 0; i < size/2; i++ { val, prio := queue.Pop() if prio > prevPrio { @@ -104,7 +107,7 @@ func BenchmarkPush(b *testing.B) { } // Execute the benchmark b.ResetTimer() - queue := New(nil) + queue := New[int64, int](nil) for i := 0; i < len(data); i++ { queue.Push(data[i], prio[i]) } @@ -118,7 +121,7 @@ func BenchmarkPop(b *testing.B) { data[i] = rand.Int() prio[i] = rand.Int63() } - queue := New(nil) + queue := New[int64, int](nil) for i := 0; i < len(data); i++ { queue.Push(data[i], prio[i]) } diff --git a/common/prque/sstack.go b/common/prque/sstack.go index b06a95413d..5dcd1d9dd0 100755 --- a/common/prque/sstack.go +++ b/common/prque/sstack.go @@ -10,53 +10,50 @@ package prque +import "golang.org/x/exp/constraints" + // The size of a block of data const blockSize = 4096 // A prioritized item in the sorted stack. -// -// Note: priorities can "wrap around" the int64 range, a comes before b if (a.priority - b.priority) > 0. -// The difference between the lowest and highest priorities in the queue at any point should be less than 2^63. -type item struct { - value interface{} - priority int64 +type item[P constraints.Ordered, V any] struct { + value V + priority P } // SetIndexCallback is called when the element is moved to a new index. // Providing SetIndexCallback is optional, it is needed only if the application needs // to delete elements other than the top one. -type SetIndexCallback func(data interface{}, index int) +type SetIndexCallback[V any] func(data V, index int) // Internal sortable stack data structure. Implements the Push and Pop ops for // the stack (heap) functionality and the Len, Less and Swap methods for the // sortability requirements of the heaps. -type sstack struct { - setIndex SetIndexCallback - size int - capacity int - offset int - wrapAround bool +type sstack[P constraints.Ordered, V any] struct { + setIndex SetIndexCallback[V] + size int + capacity int + offset int - blocks [][]*item - active []*item + blocks [][]*item[P, V] + active []*item[P, V] } // Creates a new, empty stack. -func newSstack(setIndex SetIndexCallback, wrapAround bool) *sstack { - result := new(sstack) +func newSstack[P constraints.Ordered, V any](setIndex SetIndexCallback[V]) *sstack[P, V] { + result := new(sstack[P, V]) result.setIndex = setIndex - result.active = make([]*item, blockSize) - result.blocks = [][]*item{result.active} + result.active = make([]*item[P, V], blockSize) + result.blocks = [][]*item[P, V]{result.active} result.capacity = blockSize - result.wrapAround = wrapAround return result } // Pushes a value onto the stack, expanding it if necessary. Required by // heap.Interface. -func (s *sstack) Push(data interface{}) { +func (s *sstack[P, V]) Push(data any) { if s.size == s.capacity { - s.active = make([]*item, blockSize) + s.active = make([]*item[P, V], blockSize) s.blocks = append(s.blocks, s.active) s.capacity += blockSize s.offset = 0 @@ -65,16 +62,16 @@ func (s *sstack) Push(data interface{}) { s.offset = 0 } if s.setIndex != nil { - s.setIndex(data.(*item).value, s.size) + s.setIndex(data.(*item[P, V]).value, s.size) } - s.active[s.offset] = data.(*item) + s.active[s.offset] = data.(*item[P, V]) s.offset++ s.size++ } // Pops a value off the stack and returns it. Currently no shrinking is done. // Required by heap.Interface. -func (s *sstack) Pop() (res interface{}) { +func (s *sstack[P, V]) Pop() (res any) { s.size-- s.offset-- if s.offset < 0 { @@ -83,28 +80,24 @@ func (s *sstack) Pop() (res interface{}) { } res, s.active[s.offset] = s.active[s.offset], nil if s.setIndex != nil { - s.setIndex(res.(*item).value, -1) + s.setIndex(res.(*item[P, V]).value, -1) } return } // Returns the length of the stack. Required by sort.Interface. -func (s *sstack) Len() int { +func (s *sstack[P, V]) Len() int { return s.size } // Compares the priority of two elements of the stack (higher is first). // Required by sort.Interface. -func (s *sstack) Less(i, j int) bool { - a, b := s.blocks[i/blockSize][i%blockSize].priority, s.blocks[j/blockSize][j%blockSize].priority - if s.wrapAround { - return a-b > 0 - } - return a > b +func (s *sstack[P, V]) Less(i, j int) bool { + return s.blocks[i/blockSize][i%blockSize].priority > s.blocks[j/blockSize][j%blockSize].priority } // Swaps two elements in the stack. Required by sort.Interface. -func (s *sstack) Swap(i, j int) { +func (s *sstack[P, V]) Swap(i, j int) { ib, io, jb, jo := i/blockSize, i%blockSize, j/blockSize, j%blockSize a, b := s.blocks[jb][jo], s.blocks[ib][io] if s.setIndex != nil { @@ -115,6 +108,6 @@ func (s *sstack) Swap(i, j int) { } // Resets the stack, effectively clearing its contents. -func (s *sstack) Reset() { - *s = *newSstack(s.setIndex, false) +func (s *sstack[P, V]) Reset() { + *s = *newSstack[P, V](s.setIndex) } diff --git a/common/prque/sstack_test.go b/common/prque/sstack_test.go index bc6298979c..edc99955e8 100644 --- a/common/prque/sstack_test.go +++ b/common/prque/sstack_test.go @@ -17,23 +17,23 @@ import ( func TestSstack(t *testing.T) { // Create some initial data size := 16 * blockSize - data := make([]*item, size) + data := make([]*item[int64, int], size) for i := 0; i < size; i++ { - data[i] = &item{rand.Int(), rand.Int63()} + data[i] = &item[int64, int]{rand.Int(), rand.Int63()} } - stack := newSstack(nil, false) + stack := newSstack[int64, int](nil) for rep := 0; rep < 2; rep++ { // Push all the data into the stack, pop out every second - secs := []*item{} + secs := []*item[int64, int]{} for i := 0; i < size; i++ { stack.Push(data[i]) if i%2 == 0 { - secs = append(secs, stack.Pop().(*item)) + secs = append(secs, stack.Pop().(*item[int64, int])) } } - rest := []*item{} + rest := []*item[int64, int]{} for stack.Len() > 0 { - rest = append(rest, stack.Pop().(*item)) + rest = append(rest, stack.Pop().(*item[int64, int])) } // Make sure the contents of the resulting slices are ok for i := 0; i < size; i++ { @@ -50,12 +50,12 @@ func TestSstack(t *testing.T) { func TestSstackSort(t *testing.T) { // Create some initial data size := 16 * blockSize - data := make([]*item, size) + data := make([]*item[int64, int], size) for i := 0; i < size; i++ { - data[i] = &item{rand.Int(), int64(i)} + data[i] = &item[int64, int]{rand.Int(), int64(i)} } // Push all the data into the stack - stack := newSstack(nil, false) + stack := newSstack[int64, int](nil) for _, val := range data { stack.Push(val) } @@ -72,18 +72,18 @@ func TestSstackSort(t *testing.T) { func TestSstackReset(t *testing.T) { // Create some initial data size := 16 * blockSize - data := make([]*item, size) + data := make([]*item[int64, int], size) for i := 0; i < size; i++ { - data[i] = &item{rand.Int(), rand.Int63()} + data[i] = &item[int64, int]{rand.Int(), rand.Int63()} } - stack := newSstack(nil, false) + stack := newSstack[int64, int](nil) for rep := 0; rep < 2; rep++ { // Push all the data into the stack, pop out every second - secs := []*item{} + secs := []*item[int64, int]{} for i := 0; i < size; i++ { stack.Push(data[i]) if i%2 == 0 { - secs = append(secs, stack.Pop().(*item)) + secs = append(secs, stack.Pop().(*item[int64, int])) } } // Reset and verify both pulled and stack contents diff --git a/consensus/XDPoS/utils/types.go b/consensus/XDPoS/utils/types.go index fc4aaf0463..55265516f7 100644 --- a/consensus/XDPoS/utils/types.go +++ b/consensus/XDPoS/utils/types.go @@ -26,7 +26,7 @@ type TradingService interface { GetEmptyTradingState() (*tradingstate.TradingStateDB, error) HasTradingState(block *types.Block, author common.Address) bool GetStateCache() tradingstate.Database - GetTriegc() *prque.Prque + GetTriegc() *prque.Prque[int64, common.Hash] ApplyOrder(header *types.Header, coinbase common.Address, chain consensus.ChainContext, statedb *state.StateDB, XDCXstatedb *tradingstate.TradingStateDB, orderBook common.Hash, order *tradingstate.OrderItem) ([]map[string]string, []*tradingstate.OrderItem, error) UpdateMediumPriceBeforeEpoch(epochNumber uint64, tradingStateDB *tradingstate.TradingStateDB, statedb *state.StateDB) error IsSDKNode() bool @@ -40,7 +40,7 @@ type LendingService interface { GetLendingState(block *types.Block, author common.Address) (*lendingstate.LendingStateDB, error) HasLendingState(block *types.Block, author common.Address) bool GetStateCache() lendingstate.Database - GetTriegc() *prque.Prque + GetTriegc() *prque.Prque[int64, common.Hash] ApplyOrder(header *types.Header, coinbase common.Address, chain consensus.ChainContext, statedb *state.StateDB, lendingStateDB *lendingstate.LendingStateDB, tradingStateDb *tradingstate.TradingStateDB, lendingOrderBook common.Hash, order *lendingstate.LendingItem) ([]*lendingstate.LendingTrade, []*lendingstate.LendingItem, error) GetCollateralPrices(header *types.Header, chain consensus.ChainContext, statedb *state.StateDB, tradingStateDb *tradingstate.TradingStateDB, collateralToken common.Address, lendingToken common.Address) (*big.Int, *big.Int, error) GetMediumTradePriceBeforeEpoch(chain consensus.ChainContext, statedb *state.StateDB, tradingStateDb *tradingstate.TradingStateDB, baseToken common.Address, quoteToken common.Address) (*big.Int, error) diff --git a/core/blockchain.go b/core/blockchain.go index 3c74fc6f8e..b6d328cc64 100644 --- a/core/blockchain.go +++ b/core/blockchain.go @@ -137,8 +137,8 @@ type BlockChain struct { db ethdb.Database // Low level persistent database to store final content in XDCxDb ethdb.XDCxDatabase - triegc *prque.Prque // Priority queue mapping block numbers to tries to gc - gcproc time.Duration // Accumulates canonical block processing for trie dumping + triegc *prque.Prque[int64, common.Hash] // Priority queue mapping block numbers to tries to gc + gcproc time.Duration // Accumulates canonical block processing for trie dumping hc *HeaderChain rmLogsFeed event.Feed @@ -209,7 +209,7 @@ func NewBlockChain(db ethdb.Database, cacheConfig *CacheConfig, chainConfig *par chainConfig: chainConfig, cacheConfig: cacheConfig, db: db, - triegc: prque.New(nil), + triegc: prque.New[int64, common.Hash](nil), stateCache: state.NewDatabase(db), quit: make(chan struct{}), bodyCache: lru.NewCache[common.Hash, *types.Body](bodyCacheLimit), @@ -975,17 +975,17 @@ func (bc *BlockChain) saveData() { } } for !bc.triegc.Empty() { - triedb.Dereference(bc.triegc.PopItem().(common.Hash)) + triedb.Dereference(bc.triegc.PopItem()) } if tradingTriedb != nil && lendingTriedb != nil { if tradingService.GetTriegc() != nil { for !tradingService.GetTriegc().Empty() { - tradingTriedb.Dereference(tradingService.GetTriegc().PopItem().(common.Hash)) + tradingTriedb.Dereference(tradingService.GetTriegc().PopItem()) } } if lendingService.GetTriegc() != nil { for !lendingService.GetTriegc().Empty() { - lendingTriedb.Dereference(lendingService.GetTriegc().PopItem().(common.Hash)) + lendingTriedb.Dereference(lendingService.GetTriegc().PopItem()) } } } @@ -1328,7 +1328,7 @@ func (bc *BlockChain) WriteBlockWithState(block *types.Block, receipts []*types. bc.triegc.Push(root, number) break } - triedb.Dereference(root.(common.Hash)) + triedb.Dereference(root) } if tradingService != nil { for !tradingService.GetTriegc().Empty() { @@ -1337,7 +1337,7 @@ func (bc *BlockChain) WriteBlockWithState(block *types.Block, receipts []*types. tradingService.GetTriegc().Push(tradingRoot, number) break } - tradingTrieDb.Dereference(tradingRoot.(common.Hash)) + tradingTrieDb.Dereference(tradingRoot) } } if lendingService != nil { @@ -1347,7 +1347,7 @@ func (bc *BlockChain) WriteBlockWithState(block *types.Block, receipts []*types. lendingService.GetTriegc().Push(lendingRoot, number) break } - lendingTrieDb.Dereference(lendingRoot.(common.Hash)) + lendingTrieDb.Dereference(lendingRoot) } } } diff --git a/core/txpool/lending_pool.go b/core/txpool/lending_pool.go index 4a199384cf..e73d71fc31 100644 --- a/core/txpool/lending_pool.go +++ b/core/txpool/lending_pool.go @@ -997,7 +997,7 @@ func (pool *LendingPool) promoteExecutables(accounts []common.Address) { if pending > pool.config.GlobalSlots { pendingBeforeCap := pending // Assemble a spam order to penalize large transactors first - spammers := prque.New(nil) + spammers := prque.New[int64, common.Address](nil) for addr, list := range pool.pending { // Only evict transactions from high rollers if !pool.locals.contains(addr) && uint64(list.Len()) > pool.config.AccountSlots { @@ -1009,12 +1009,12 @@ func (pool *LendingPool) promoteExecutables(accounts []common.Address) { for pending > pool.config.GlobalSlots && !spammers.Empty() { // Retrieve the next offender if not local address offender, _ := spammers.Pop() - offenders = append(offenders, offender.(common.Address)) + offenders = append(offenders, offender) // Equalize balances until all the same or below threshold if len(offenders) > 1 { // Calculate the equalization threshold for all current offenders - threshold := pool.pending[offender.(common.Address)].Len() + threshold := pool.pending[offender].Len() // Iteratively reduce all offenders until below limit or threshold reached for pending > pool.config.GlobalSlots && pool.pending[offenders[len(offenders)-2]].Len() > threshold { diff --git a/core/txpool/order_pool.go b/core/txpool/order_pool.go index 4857d99388..247fdf6cef 100644 --- a/core/txpool/order_pool.go +++ b/core/txpool/order_pool.go @@ -914,7 +914,7 @@ func (pool *OrderPool) promoteExecutables(accounts []common.Address) { if pending > pool.config.GlobalSlots { pendingBeforeCap := pending // Assemble a spam order to penalize large transactors first - spammers := prque.New(nil) + spammers := prque.New[int64, common.Address](nil) for addr, list := range pool.pending { // Only evict transactions from high rollers if !pool.locals.contains(addr) && uint64(list.Len()) > pool.config.AccountSlots { @@ -926,12 +926,12 @@ func (pool *OrderPool) promoteExecutables(accounts []common.Address) { for pending > pool.config.GlobalSlots && !spammers.Empty() { // Retrieve the next offender if not local address offender, _ := spammers.Pop() - offenders = append(offenders, offender.(common.Address)) + offenders = append(offenders, offender) // Equalize balances until all the same or below threshold if len(offenders) > 1 { // Calculate the equalization threshold for all current offenders - threshold := pool.pending[offender.(common.Address)].Len() + threshold := pool.pending[offender].Len() // Iteratively reduce all offenders until below limit or threshold reached for pending > pool.config.GlobalSlots && pool.pending[offenders[len(offenders)-2]].Len() > threshold { diff --git a/core/txpool/txpool.go b/core/txpool/txpool.go index fc57d9355a..fd5f238169 100644 --- a/core/txpool/txpool.go +++ b/core/txpool/txpool.go @@ -1535,7 +1535,7 @@ func (pool *TxPool) truncatePending() { pendingBeforeCap := pending // Assemble a spam order to penalize large transactors first - spammers := prque.New(nil) + spammers := prque.New[int64, common.Address](nil) for addr, list := range pool.pending { // Only evict transactions from high rollers if !pool.locals.contains(addr) && uint64(list.Len()) > pool.config.AccountSlots { @@ -1547,12 +1547,12 @@ func (pool *TxPool) truncatePending() { for pending > pool.config.GlobalSlots && !spammers.Empty() { // Retrieve the next offender if not local address offender, _ := spammers.Pop() - offenders = append(offenders, offender.(common.Address)) + offenders = append(offenders, offender) // Equalize balances until all the same or below threshold if len(offenders) > 1 { // Calculate the equalization threshold for all current offenders - threshold := pool.pending[offender.(common.Address)].Len() + threshold := pool.pending[offender].Len() // Iteratively reduce all offenders until below limit or threshold reached for pending > pool.config.GlobalSlots && pool.pending[offenders[len(offenders)-2]].Len() > threshold { diff --git a/eth/downloader/queue.go b/eth/downloader/queue.go index b21460c2ef..115a780f92 100644 --- a/eth/downloader/queue.go +++ b/eth/downloader/queue.go @@ -70,7 +70,7 @@ type queue struct { // Headers are "special", they download in batches, supported by a skeleton chain headerHead common.Hash // [eth/62] Hash of the last queued header to verify order headerTaskPool map[uint64]*types.Header // [eth/62] Pending header retrieval tasks, mapping starting indexes to skeleton headers - headerTaskQueue *prque.Prque // [eth/62] Priority queue of the skeleton indexes to fetch the filling headers for + headerTaskQueue *prque.Prque[int64, uint64] // [eth/62] Priority queue of the skeleton indexes to fetch the filling headers for headerPeerMiss map[string]map[uint64]struct{} // [eth/62] Set of per-peer header batches known to be unavailable headerPendPool map[string]*fetchRequest // [eth/62] Currently pending header retrieval operations headerResults []*types.Header // [eth/62] Result cache accumulating the completed headers @@ -79,15 +79,15 @@ type queue struct { headerContCh chan bool // [eth/62] Channel to notify when header download finishes // All data retrievals below are based on an already assembles header chain - blockTaskPool map[common.Hash]*types.Header // [eth/62] Pending block (body) retrieval tasks, mapping hashes to headers - blockTaskQueue *prque.Prque // [eth/62] Priority queue of the headers to fetch the blocks (bodies) for - blockPendPool map[string]*fetchRequest // [eth/62] Currently pending block (body) retrieval operations - blockDonePool map[common.Hash]struct{} // [eth/62] Set of the completed block (body) fetches + blockTaskPool map[common.Hash]*types.Header // [eth/62] Pending block (body) retrieval tasks, mapping hashes to headers + blockTaskQueue *prque.Prque[int64, *types.Header] // [eth/62] Priority queue of the headers to fetch the blocks (bodies) for + blockPendPool map[string]*fetchRequest // [eth/62] Currently pending block (body) retrieval operations + blockDonePool map[common.Hash]struct{} // [eth/62] Set of the completed block (body) fetches - receiptTaskPool map[common.Hash]*types.Header // [eth/63] Pending receipt retrieval tasks, mapping hashes to headers - receiptTaskQueue *prque.Prque // [eth/63] Priority queue of the headers to fetch the receipts for - receiptPendPool map[string]*fetchRequest // [eth/63] Currently pending receipt retrieval operations - receiptDonePool map[common.Hash]struct{} // [eth/63] Set of the completed receipt fetches + receiptTaskPool map[common.Hash]*types.Header // [eth/63] Pending receipt retrieval tasks, mapping hashes to headers + receiptTaskQueue *prque.Prque[int64, *types.Header] // [eth/63] Priority queue of the headers to fetch the receipts for + receiptPendPool map[string]*fetchRequest // [eth/63] Currently pending receipt retrieval operations + receiptDonePool map[common.Hash]struct{} // [eth/63] Set of the completed receipt fetches resultCache []*fetchResult // Downloaded but not yet delivered fetch results resultOffset uint64 // Offset of the first cached fetch result in the block chain @@ -105,11 +105,11 @@ func newQueue() *queue { headerPendPool: make(map[string]*fetchRequest), headerContCh: make(chan bool), blockTaskPool: make(map[common.Hash]*types.Header), - blockTaskQueue: prque.New(nil), + blockTaskQueue: prque.New[int64, *types.Header](nil), blockPendPool: make(map[string]*fetchRequest), blockDonePool: make(map[common.Hash]struct{}), receiptTaskPool: make(map[common.Hash]*types.Header), - receiptTaskQueue: prque.New(nil), + receiptTaskQueue: prque.New[int64, *types.Header](nil), receiptPendPool: make(map[string]*fetchRequest), receiptDonePool: make(map[common.Hash]struct{}), resultCache: make([]*fetchResult, blockCacheItems), @@ -277,7 +277,7 @@ func (q *queue) ScheduleSkeleton(from uint64, skeleton []*types.Header) { } // Shedule all the header retrieval tasks for the skeleton assembly q.headerTaskPool = make(map[uint64]*types.Header) - q.headerTaskQueue = prque.New(nil) + q.headerTaskQueue = prque.New[int64, uint64](nil) q.headerPeerMiss = make(map[string]map[uint64]struct{}) // Reset availability to correct invalid chains q.headerResults = make([]*types.Header, len(skeleton)*MaxHeaderFetch) q.headerProced = 0 @@ -427,12 +427,12 @@ func (q *queue) ReserveHeaders(p *peerConnection, count int) *fetchRequest { for send == 0 && !q.headerTaskQueue.Empty() { from, _ := q.headerTaskQueue.Pop() if q.headerPeerMiss[p.id] != nil { - if _, ok := q.headerPeerMiss[p.id][from.(uint64)]; ok { - skip = append(skip, from.(uint64)) + if _, ok := q.headerPeerMiss[p.id][from]; ok { + skip = append(skip, from) continue } } - send = from.(uint64) + send = from } // Merge all the skipped batches back for _, from := range skip { @@ -484,7 +484,7 @@ func (q *queue) ReserveReceipts(p *peerConnection, count int) (*fetchRequest, bo // Note, this method expects the queue lock to be already held for writing. The // reason the lock is not obtained in here is because the parameters already need // to access the queue, so they already need a lock anyway. -func (q *queue) reserveHeaders(p *peerConnection, count int, taskPool map[common.Hash]*types.Header, taskQueue *prque.Prque, +func (q *queue) reserveHeaders(p *peerConnection, count int, taskPool map[common.Hash]*types.Header, taskQueue *prque.Prque[int64, *types.Header], pendPool map[string]*fetchRequest, donePool map[common.Hash]struct{}, isNoop func(*types.Header) bool) (*fetchRequest, bool, error) { // Short circuit if the pool has been depleted, or if the peer's already // downloading something (sanity check not to corrupt state) @@ -503,7 +503,7 @@ func (q *queue) reserveHeaders(p *peerConnection, count int, taskPool map[common progress := false for proc := 0; proc < space && len(send) < count && !taskQueue.Empty(); proc++ { - header := taskQueue.PopItem().(*types.Header) + header := taskQueue.PopItem() hash := header.Hash() // If we're the first to request this task, initialise the result container @@ -586,12 +586,12 @@ func (q *queue) CancelReceipts(request *fetchRequest) { } // Cancel aborts a fetch request, returning all pending hashes to the task queue. -func (q *queue) cancel(request *fetchRequest, taskQueue *prque.Prque, pendPool map[string]*fetchRequest) { +func (q *queue) cancel(request *fetchRequest, taskQueue interface{}, pendPool map[string]*fetchRequest) { if request.From > 0 { - taskQueue.Push(request.From, -int64(request.From)) + taskQueue.(*prque.Prque[int64, uint64]).Push(request.From, -int64(request.From)) } for _, header := range request.Headers { - taskQueue.Push(header, -int64(header.Number.Uint64())) + taskQueue.(*prque.Prque[int64, *types.Header]).Push(header, -int64(header.Number.Uint64())) } delete(pendPool, request.Peer.id) } @@ -650,7 +650,7 @@ func (q *queue) ExpireReceipts(timeout time.Duration) map[string]int { // Note, this method expects the queue lock to be already held. The // reason the lock is not obtained in here is because the parameters already need // to access the queue, so they already need a lock anyway. -func (q *queue) expire(timeout time.Duration, pendPool map[string]*fetchRequest, taskQueue *prque.Prque, timeoutMeter *metrics.Meter) map[string]int { +func (q *queue) expire(timeout time.Duration, pendPool map[string]*fetchRequest, taskQueue interface{}, timeoutMeter *metrics.Meter) map[string]int { // Iterate over the expired requests and return each to the queue expiries := make(map[string]int) for id, request := range pendPool { @@ -660,10 +660,10 @@ func (q *queue) expire(timeout time.Duration, pendPool map[string]*fetchRequest, // Return any non satisfied requests to the pool if request.From > 0 { - taskQueue.Push(request.From, -int64(request.From)) + taskQueue.(*prque.Prque[int64, uint64]).Push(request.From, -int64(request.From)) } for _, header := range request.Headers { - taskQueue.Push(header, -int64(header.Number.Uint64())) + taskQueue.(*prque.Prque[int64, *types.Header]).Push(header, -int64(header.Number.Uint64())) } // Add the peer to the expiry report along the the number of failed requests expiries[id] = len(request.Headers) @@ -804,7 +804,7 @@ func (q *queue) DeliverReceipts(id string, receiptList [][]*types.Receipt) (int, // Note, this method expects the queue lock to be already held for writing. The // reason the lock is not obtained in here is because the parameters already need // to access the queue, so they already need a lock anyway. -func (q *queue) deliver(id string, taskPool map[common.Hash]*types.Header, taskQueue *prque.Prque, +func (q *queue) deliver(id string, taskPool map[common.Hash]*types.Header, taskQueue *prque.Prque[int64, *types.Header], pendPool map[string]*fetchRequest, donePool map[common.Hash]struct{}, reqTimer *metrics.Timer, results int, reconstruct func(header *types.Header, index int, result *fetchResult) error) (int, error) { diff --git a/eth/fetcher/fetcher.go b/eth/fetcher/fetcher.go index 259781a6bb..f8cf67f23c 100644 --- a/eth/fetcher/fetcher.go +++ b/eth/fetcher/fetcher.go @@ -131,11 +131,10 @@ type Fetcher struct { completing map[common.Hash]*announce // Blocks with headers, currently body-completing // Block cache - queue *prque.Prque // Queue containing the import operations (block number sorted) - queues map[string]int // Per peer block counts to prevent memory exhaustion - queued map[common.Hash]*inject // Set of already queued blocks (to dedup imports) + queue *prque.Prque[int64, *inject] // Queue containing the import operations (block number sorted) + queues map[string]int // Per peer block counts to prevent memory exhaustion + queued map[common.Hash]*inject // Set of already queued blocks (to dedup imports) knowns *lru.Cache[common.Hash, struct{}] - // Callbacks getBlock blockRetrievalFn // Retrieves a block from the local chain verifyHeader headerVerifierFn // Checks if a block's headers have a valid proof of work @@ -170,7 +169,7 @@ func New(getBlock blockRetrievalFn, verifyHeader headerVerifierFn, handlePropose fetching: make(map[common.Hash]*announce), fetched: make(map[common.Hash][]*announce), completing: make(map[common.Hash]*announce), - queue: prque.New(nil), + queue: prque.New[int64, *inject](nil), queues: make(map[string]int), queued: make(map[common.Hash]*inject), knowns: lru.NewCache[common.Hash, struct{}](blockLimit), @@ -304,7 +303,7 @@ func (f *Fetcher) loop() { // Import any queued blocks that could potentially fit height := f.chainHeight() for !f.queue.Empty() { - op := f.queue.PopItem().(*inject) + op := f.queue.PopItem() if f.queueChangeHook != nil { f.queueChangeHook(op.block.Hash(), false) } diff --git a/go.mod b/go.mod index 9adb083eca..83100986b0 100644 --- a/go.mod +++ b/go.mod @@ -53,6 +53,7 @@ require ( github.com/mattn/go-isatty v0.0.17 github.com/shirou/gopsutil v3.21.4-0.20210419000835-c7a38de76ee5+incompatible github.com/urfave/cli/v2 v2.27.5 + golang.org/x/exp v0.0.0-20221126150942-6ab00d035af9 gopkg.in/natefinch/lumberjack.v2 v2.2.1 ) diff --git a/go.sum b/go.sum index e59acf3916..7b0965c69d 100644 --- a/go.sum +++ b/go.sum @@ -214,6 +214,8 @@ golang.org/x/crypto v0.0.0-20200820211705-5c72a883971a/go.mod h1:LzIPMQfyMNhhGPh golang.org/x/crypto v0.0.0-20201221181555-eec23a3978ad/go.mod h1:jdWPYTVW3xRLrWPugEBEK3UY2ZEsg3UU495nc5E+M+I= golang.org/x/crypto v0.29.0 h1:L5SG1JTTXupVV3n6sUqMTeWbjAyfPwoda2DLX8J8FrQ= golang.org/x/crypto v0.29.0/go.mod h1:+F4F4N5hv6v38hfeYwTdx20oUvLLc+QfrE9Ax9HtgRg= +golang.org/x/exp v0.0.0-20221126150942-6ab00d035af9 h1:yZNXmy+j/JpX19vZkVktWqAo7Gny4PBWYYK3zskGpx4= +golang.org/x/exp v0.0.0-20221126150942-6ab00d035af9/go.mod h1:CxIveKay+FTh1D0yPZemJVgC/95VzuuOLq5Qi4xnoYc= golang.org/x/mod v0.17.0 h1:zY54UmvipHiNd+pm+m0x9KhZ9hl1/7QNMyxXbc6ICqA= golang.org/x/mod v0.17.0/go.mod h1:hTbmBsO62+eylJbnUtE2MGJUyE7QWk4xUqPFrRgJ+7c= golang.org/x/net v0.0.0-20180906233101-161cd47e91fd/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= diff --git a/trie/sync.go b/trie/sync.go index 645789f6ea..021a836f3a 100644 --- a/trie/sync.go +++ b/trie/sync.go @@ -73,7 +73,7 @@ type Sync struct { database ethdb.KeyValueReader // Persistent database to check for existing entries membatch *syncMemBatch // Memory buffer to avoid frequent database writes requests map[common.Hash]*request // Pending requests pertaining to a key hash - queue *prque.Prque // Priority queue with the pending requests + queue *prque.Prque[int64, any] // Priority queue with the pending requests bloom *SyncBloom // Bloom filter for fast Node existence checks } @@ -83,7 +83,7 @@ func NewSync(root common.Hash, database ethdb.KeyValueReader, callback LeafCallb database: database, membatch: newSyncMemBatch(), requests: make(map[common.Hash]*request), - queue: prque.New(nil), + queue: prque.New[int64, any](nil), // Ugh, can contain both string and hash, whyyy bloom: bloom, } ts.AddSubTrie(root, 0, common.Hash{}, callback)