common/prque: generic priority queue (#26290)

This commit is contained in:
Péter Szilágyi 2023-02-09 13:03:54 +02:00 committed by Daniel Liu
parent 5059114992
commit 39e9d8f94d
17 changed files with 178 additions and 188 deletions

View file

@ -52,8 +52,8 @@ type XDCX struct {
// Order related
db XDCxDAO.XDCXDAO
mongodb XDCxDAO.XDCXDAO
Triegc *prque.Prque // Priority queue mapping block numbers to tries to gc
StateCache tradingstate.Database // State database to reuse between imports (contains state cache) *XDCx_state.TradingStateDB
Triegc *prque.Prque[int64, common.Hash] // Priority queue mapping block numbers to tries to gc
StateCache tradingstate.Database // State database to reuse between imports (contains state cache) *XDCx_state.TradingStateDB
orderNonce map[common.Address]*big.Int
@ -94,7 +94,7 @@ func NewMongoDBEngine(cfg *Config) *XDCxDAO.MongoDatabase {
func New(cfg *Config) *XDCX {
XDCX := &XDCX{
orderNonce: make(map[common.Address]*big.Int),
Triegc: prque.New(nil),
Triegc: prque.New[int64, common.Hash](nil),
tokenDecimalCache: lru.NewCache[common.Address, *big.Int](defaultCacheLimit),
orderCache: lru.NewCache[common.Hash, map[common.Hash]tradingstate.OrderHistoryItem](tradingstate.OrderCacheLimit),
}
@ -579,7 +579,7 @@ func (XDCx *XDCX) HasTradingState(block *types.Block, author common.Address) boo
return err == nil
}
func (XDCx *XDCX) GetTriegc() *prque.Prque {
func (XDCx *XDCX) GetTriegc() *prque.Prque[int64, common.Hash] {
return XDCx.Triegc
}

View file

@ -36,8 +36,8 @@ var (
)
type Lending struct {
Triegc *prque.Prque // Priority queue mapping block numbers to tries to gc
StateCache lendingstate.Database // State database to reuse between imports (contains state cache) *lendingstate.TradingStateDB
Triegc *prque.Prque[int64, common.Hash] // Priority queue mapping block numbers to tries to gc
StateCache lendingstate.Database // State database to reuse between imports (contains state cache) *lendingstate.TradingStateDB
orderNonce map[common.Address]*big.Int
@ -61,7 +61,7 @@ func (l *Lending) Stop() error {
func New(XDCx *XDCx.XDCX) *Lending {
lending := &Lending{
orderNonce: make(map[common.Address]*big.Int),
Triegc: prque.New(nil),
Triegc: prque.New[int64, common.Hash](nil),
lendingItemHistory: lru.NewCache[common.Hash, map[common.Hash]lendingstate.LendingItemHistoryItem](defaultCacheLimit),
lendingTradeHistory: lru.NewCache[common.Hash, map[common.Hash]lendingstate.LendingTradeHistoryItem](defaultCacheLimit),
}
@ -682,7 +682,7 @@ func (l *Lending) HasLendingState(block *types.Block, author common.Address) boo
return err == nil
}
func (l *Lending) GetTriegc() *prque.Prque {
func (l *Lending) GetTriegc() *prque.Prque[int64, common.Hash] {
return l.Triegc
}

View file

@ -21,6 +21,7 @@ import (
"time"
"github.com/XinFinOrg/XDPoSChain/common/mclock"
"golang.org/x/exp/constraints"
)
// LazyQueue is a priority queue data structure where priorities can change over
@ -32,31 +33,31 @@ import (
//
// If the upper estimate is exceeded then Update should be called for that item.
// A global Refresh function should also be called periodically.
type LazyQueue struct {
type LazyQueue[P constraints.Ordered, V any] struct {
clock mclock.Clock
// Items are stored in one of two internal queues ordered by estimated max
// priority until the next and the next-after-next refresh. Update and Refresh
// always places items in queue[1].
queue [2]*sstack
popQueue *sstack
queue [2]*sstack[P, V]
popQueue *sstack[P, V]
period time.Duration
maxUntil mclock.AbsTime
indexOffset int
setIndex SetIndexCallback
priority PriorityCallback
maxPriority MaxPriorityCallback
setIndex SetIndexCallback[V]
priority PriorityCallback[P, V]
maxPriority MaxPriorityCallback[P, V]
lastRefresh1, lastRefresh2 mclock.AbsTime
}
type (
PriorityCallback func(data interface{}) int64 // actual priority callback
MaxPriorityCallback func(data interface{}, until mclock.AbsTime) int64 // estimated maximum priority callback
PriorityCallback[P constraints.Ordered, V any] func(data V) P // actual priority callback
MaxPriorityCallback[P constraints.Ordered, V any] func(data V, until mclock.AbsTime) P // estimated maximum priority callback
)
// NewLazyQueue creates a new lazy queue
func NewLazyQueue(setIndex SetIndexCallback, priority PriorityCallback, maxPriority MaxPriorityCallback, clock mclock.Clock, refreshPeriod time.Duration) *LazyQueue {
q := &LazyQueue{
popQueue: newSstack(nil, false),
func NewLazyQueue[P constraints.Ordered, V any](setIndex SetIndexCallback[V], priority PriorityCallback[P, V], maxPriority MaxPriorityCallback[P, V], clock mclock.Clock, refreshPeriod time.Duration) *LazyQueue[P, V] {
q := &LazyQueue[P, V]{
popQueue: newSstack[P, V](nil),
setIndex: setIndex,
priority: priority,
maxPriority: maxPriority,
@ -71,13 +72,13 @@ func NewLazyQueue(setIndex SetIndexCallback, priority PriorityCallback, maxPrior
}
// Reset clears the contents of the queue
func (q *LazyQueue) Reset() {
q.queue[0] = newSstack(q.setIndex0, false)
q.queue[1] = newSstack(q.setIndex1, false)
func (q *LazyQueue[P, V]) Reset() {
q.queue[0] = newSstack[P, V](q.setIndex0)
q.queue[1] = newSstack[P, V](q.setIndex1)
}
// Refresh performs queue re-evaluation if necessary
func (q *LazyQueue) Refresh() {
func (q *LazyQueue[P, V]) Refresh() {
now := q.clock.Now()
for time.Duration(now-q.lastRefresh2) >= q.period*2 {
q.refresh(now)
@ -87,10 +88,10 @@ func (q *LazyQueue) Refresh() {
}
// refresh re-evaluates items in the older queue and swaps the two queues
func (q *LazyQueue) refresh(now mclock.AbsTime) {
func (q *LazyQueue[P, V]) refresh(now mclock.AbsTime) {
q.maxUntil = now.Add(q.period)
for q.queue[0].Len() != 0 {
q.Push(heap.Pop(q.queue[0]).(*item).value)
q.Push(heap.Pop(q.queue[0]).(*item[P, V]).value)
}
q.queue[0], q.queue[1] = q.queue[1], q.queue[0]
q.indexOffset = 1 - q.indexOffset
@ -98,22 +99,22 @@ func (q *LazyQueue) refresh(now mclock.AbsTime) {
}
// Push adds an item to the queue
func (q *LazyQueue) Push(data interface{}) {
heap.Push(q.queue[1], &item{data, q.maxPriority(data, q.maxUntil)})
func (q *LazyQueue[P, V]) Push(data V) {
heap.Push(q.queue[1], &item[P, V]{data, q.maxPriority(data, q.maxUntil)})
}
// Update updates the upper priority estimate for the item with the given queue index
func (q *LazyQueue) Update(index int) {
func (q *LazyQueue[P, V]) Update(index int) {
q.Push(q.Remove(index))
}
// Pop removes and returns the item with the greatest actual priority
func (q *LazyQueue) Pop() (interface{}, int64) {
func (q *LazyQueue[P, V]) Pop() (V, P) {
var (
resData interface{}
resPri int64
resData V
resPri P
)
q.MultiPop(func(data interface{}, priority int64) bool {
q.MultiPop(func(data V, priority P) bool {
resData = data
resPri = priority
return false
@ -123,7 +124,7 @@ func (q *LazyQueue) Pop() (interface{}, int64) {
// peekIndex returns the index of the internal queue where the item with the
// highest estimated priority is or -1 if both are empty
func (q *LazyQueue) peekIndex() int {
func (q *LazyQueue[P, V]) peekIndex() int {
if q.queue[0].Len() != 0 {
if q.queue[1].Len() != 0 && q.queue[1].blocks[0][0].priority > q.queue[0].blocks[0][0].priority {
return 1
@ -139,17 +140,17 @@ func (q *LazyQueue) peekIndex() int {
// MultiPop pops multiple items from the queue and is more efficient than calling
// Pop multiple times. Popped items are passed to the callback. MultiPop returns
// when the callback returns false or there are no more items to pop.
func (q *LazyQueue) MultiPop(callback func(data interface{}, priority int64) bool) {
func (q *LazyQueue[P, V]) MultiPop(callback func(data V, priority P) bool) {
nextIndex := q.peekIndex()
for nextIndex != -1 {
data := heap.Pop(q.queue[nextIndex]).(*item).value
heap.Push(q.popQueue, &item{data, q.priority(data)})
data := heap.Pop(q.queue[nextIndex]).(*item[P, V]).value
heap.Push(q.popQueue, &item[P, V]{data, q.priority(data)})
nextIndex = q.peekIndex()
for q.popQueue.Len() != 0 && (nextIndex == -1 || q.queue[nextIndex].blocks[0][0].priority < q.popQueue.blocks[0][0].priority) {
i := heap.Pop(q.popQueue).(*item)
i := heap.Pop(q.popQueue).(*item[P, V])
if !callback(i.value, i.priority) {
for q.popQueue.Len() != 0 {
q.Push(heap.Pop(q.popQueue).(*item).value)
q.Push(heap.Pop(q.popQueue).(*item[P, V]).value)
}
return
}
@ -159,31 +160,28 @@ func (q *LazyQueue) MultiPop(callback func(data interface{}, priority int64) boo
}
// PopItem pops the item from the queue only, dropping the associated priority value.
func (q *LazyQueue) PopItem() interface{} {
func (q *LazyQueue[P, V]) PopItem() V {
i, _ := q.Pop()
return i
}
// Remove removes the item with the given index.
func (q *LazyQueue) Remove(index int) interface{} {
if index < 0 {
return nil
}
return heap.Remove(q.queue[index&1^q.indexOffset], index>>1).(*item).value
func (q *LazyQueue[P, V]) Remove(index int) V {
return heap.Remove(q.queue[index&1^q.indexOffset], index>>1).(*item[P, V]).value
}
// Empty checks whether the priority queue is empty.
func (q *LazyQueue) Empty() bool {
func (q *LazyQueue[P, V]) Empty() bool {
return q.queue[0].Len() == 0 && q.queue[1].Len() == 0
}
// Size returns the number of items in the priority queue.
func (q *LazyQueue) Size() int {
func (q *LazyQueue[P, V]) Size() int {
return q.queue[0].Len() + q.queue[1].Len()
}
// setIndex0 translates internal queue item index to the virtual index space of LazyQueue
func (q *LazyQueue) setIndex0(data interface{}, index int) {
func (q *LazyQueue[P, V]) setIndex0(data V, index int) {
if index == -1 {
q.setIndex(data, -1)
} else {
@ -192,6 +190,6 @@ func (q *LazyQueue) setIndex0(data interface{}, index int) {
}
// setIndex1 translates internal queue item index to the virtual index space of LazyQueue
func (q *LazyQueue) setIndex1(data interface{}, index int) {
func (q *LazyQueue[P, V]) setIndex1(data V, index int) {
q.setIndex(data, index+index+1)
}

View file

@ -19,65 +19,59 @@ package prque
import (
"container/heap"
"golang.org/x/exp/constraints"
)
// Priority queue data structure.
type Prque struct {
cont *sstack
type Prque[P constraints.Ordered, V any] struct {
cont *sstack[P, V]
}
// New creates a new priority queue.
func New(setIndex SetIndexCallback) *Prque {
return &Prque{newSstack(setIndex, false)}
}
// NewWrapAround creates a new priority queue with wrap-around priority handling.
func NewWrapAround(setIndex SetIndexCallback) *Prque {
return &Prque{newSstack(setIndex, true)}
func New[P constraints.Ordered, V any](setIndex SetIndexCallback[V]) *Prque[P, V] {
return &Prque[P, V]{newSstack[P, V](setIndex)}
}
// Pushes a value with a given priority into the queue, expanding if necessary.
func (p *Prque) Push(data interface{}, priority int64) {
heap.Push(p.cont, &item{data, priority})
func (p *Prque[P, V]) Push(data V, priority P) {
heap.Push(p.cont, &item[P, V]{data, priority})
}
// Peek returns the value with the greatest priority but does not pop it off.
func (p *Prque) Peek() (interface{}, int64) {
func (p *Prque[P, V]) Peek() (V, P) {
item := p.cont.blocks[0][0]
return item.value, item.priority
}
// Pops the value with the greatest priority off the stack and returns it.
// Currently no shrinking is done.
func (p *Prque) Pop() (interface{}, int64) {
item := heap.Pop(p.cont).(*item)
func (p *Prque[P, V]) Pop() (V, P) {
item := heap.Pop(p.cont).(*item[P, V])
return item.value, item.priority
}
// Pops only the item from the queue, dropping the associated priority value.
func (p *Prque) PopItem() interface{} {
return heap.Pop(p.cont).(*item).value
func (p *Prque[P, V]) PopItem() V {
return heap.Pop(p.cont).(*item[P, V]).value
}
// Remove removes the element with the given index.
func (p *Prque) Remove(i int) interface{} {
if i < 0 {
return nil
}
return heap.Remove(p.cont, i)
func (p *Prque[P, V]) Remove(i int) V {
return heap.Remove(p.cont, i).(*item[P, V]).value
}
// Checks whether the priority queue is empty.
func (p *Prque) Empty() bool {
func (p *Prque[P, V]) Empty() bool {
return p.cont.Len() == 0
}
// Returns the number of element in the priority queue.
func (p *Prque) Size() int {
func (p *Prque[P, V]) Size() int {
return p.cont.Len()
}
// Clears the contents of the priority queue.
func (p *Prque) Reset() {
*p = *New(p.cont.setIndex)
func (p *Prque[P, V]) Reset() {
*p = *New[P, V](p.cont.setIndex)
}

View file

@ -21,22 +21,24 @@ func TestPrque(t *testing.T) {
for i := 0; i < size; i++ {
data[i] = rand.Int()
}
queue := New(nil)
queue := New[int, int](nil)
for rep := 0; rep < 2; rep++ {
// Fill a priority queue with the above data
for i := 0; i < size; i++ {
queue.Push(data[i], int64(prio[i]))
queue.Push(data[i], prio[i])
if queue.Size() != i+1 {
t.Errorf("queue size mismatch: have %v, want %v.", queue.Size(), i+1)
}
}
// Create a map the values to the priorities for easier verification
dict := make(map[int64]int)
dict := make(map[int]int)
for i := 0; i < size; i++ {
dict[int64(prio[i])] = data[i]
dict[prio[i]] = data[i]
}
// Pop out the elements in priority order and verify them
prevPrio := int64(size + 1)
prevPrio := size + 1
for !queue.Empty() {
val, prio := queue.Pop()
if prio > prevPrio {
@ -59,22 +61,23 @@ func TestReset(t *testing.T) {
for i := 0; i < size; i++ {
data[i] = rand.Int()
}
queue := New(nil)
queue := New[int, int](nil)
for rep := 0; rep < 2; rep++ {
// Fill a priority queue with the above data
for i := 0; i < size; i++ {
queue.Push(data[i], int64(prio[i]))
queue.Push(data[i], prio[i])
if queue.Size() != i+1 {
t.Errorf("queue size mismatch: have %v, want %v.", queue.Size(), i+1)
}
}
// Create a map the values to the priorities for easier verification
dict := make(map[int64]int)
dict := make(map[int]int)
for i := 0; i < size; i++ {
dict[int64(prio[i])] = data[i]
dict[prio[i]] = data[i]
}
// Pop out half the elements in priority order and verify them
prevPrio := int64(size + 1)
prevPrio := size + 1
for i := 0; i < size/2; i++ {
val, prio := queue.Pop()
if prio > prevPrio {
@ -104,7 +107,7 @@ func BenchmarkPush(b *testing.B) {
}
// Execute the benchmark
b.ResetTimer()
queue := New(nil)
queue := New[int64, int](nil)
for i := 0; i < len(data); i++ {
queue.Push(data[i], prio[i])
}
@ -118,7 +121,7 @@ func BenchmarkPop(b *testing.B) {
data[i] = rand.Int()
prio[i] = rand.Int63()
}
queue := New(nil)
queue := New[int64, int](nil)
for i := 0; i < len(data); i++ {
queue.Push(data[i], prio[i])
}

View file

@ -10,53 +10,50 @@
package prque
import "golang.org/x/exp/constraints"
// The size of a block of data
const blockSize = 4096
// A prioritized item in the sorted stack.
//
// Note: priorities can "wrap around" the int64 range, a comes before b if (a.priority - b.priority) > 0.
// The difference between the lowest and highest priorities in the queue at any point should be less than 2^63.
type item struct {
value interface{}
priority int64
type item[P constraints.Ordered, V any] struct {
value V
priority P
}
// SetIndexCallback is called when the element is moved to a new index.
// Providing SetIndexCallback is optional, it is needed only if the application needs
// to delete elements other than the top one.
type SetIndexCallback func(data interface{}, index int)
type SetIndexCallback[V any] func(data V, index int)
// Internal sortable stack data structure. Implements the Push and Pop ops for
// the stack (heap) functionality and the Len, Less and Swap methods for the
// sortability requirements of the heaps.
type sstack struct {
setIndex SetIndexCallback
size int
capacity int
offset int
wrapAround bool
type sstack[P constraints.Ordered, V any] struct {
setIndex SetIndexCallback[V]
size int
capacity int
offset int
blocks [][]*item
active []*item
blocks [][]*item[P, V]
active []*item[P, V]
}
// Creates a new, empty stack.
func newSstack(setIndex SetIndexCallback, wrapAround bool) *sstack {
result := new(sstack)
func newSstack[P constraints.Ordered, V any](setIndex SetIndexCallback[V]) *sstack[P, V] {
result := new(sstack[P, V])
result.setIndex = setIndex
result.active = make([]*item, blockSize)
result.blocks = [][]*item{result.active}
result.active = make([]*item[P, V], blockSize)
result.blocks = [][]*item[P, V]{result.active}
result.capacity = blockSize
result.wrapAround = wrapAround
return result
}
// Pushes a value onto the stack, expanding it if necessary. Required by
// heap.Interface.
func (s *sstack) Push(data interface{}) {
func (s *sstack[P, V]) Push(data any) {
if s.size == s.capacity {
s.active = make([]*item, blockSize)
s.active = make([]*item[P, V], blockSize)
s.blocks = append(s.blocks, s.active)
s.capacity += blockSize
s.offset = 0
@ -65,16 +62,16 @@ func (s *sstack) Push(data interface{}) {
s.offset = 0
}
if s.setIndex != nil {
s.setIndex(data.(*item).value, s.size)
s.setIndex(data.(*item[P, V]).value, s.size)
}
s.active[s.offset] = data.(*item)
s.active[s.offset] = data.(*item[P, V])
s.offset++
s.size++
}
// Pops a value off the stack and returns it. Currently no shrinking is done.
// Required by heap.Interface.
func (s *sstack) Pop() (res interface{}) {
func (s *sstack[P, V]) Pop() (res any) {
s.size--
s.offset--
if s.offset < 0 {
@ -83,28 +80,24 @@ func (s *sstack) Pop() (res interface{}) {
}
res, s.active[s.offset] = s.active[s.offset], nil
if s.setIndex != nil {
s.setIndex(res.(*item).value, -1)
s.setIndex(res.(*item[P, V]).value, -1)
}
return
}
// Returns the length of the stack. Required by sort.Interface.
func (s *sstack) Len() int {
func (s *sstack[P, V]) Len() int {
return s.size
}
// Compares the priority of two elements of the stack (higher is first).
// Required by sort.Interface.
func (s *sstack) Less(i, j int) bool {
a, b := s.blocks[i/blockSize][i%blockSize].priority, s.blocks[j/blockSize][j%blockSize].priority
if s.wrapAround {
return a-b > 0
}
return a > b
func (s *sstack[P, V]) Less(i, j int) bool {
return s.blocks[i/blockSize][i%blockSize].priority > s.blocks[j/blockSize][j%blockSize].priority
}
// Swaps two elements in the stack. Required by sort.Interface.
func (s *sstack) Swap(i, j int) {
func (s *sstack[P, V]) Swap(i, j int) {
ib, io, jb, jo := i/blockSize, i%blockSize, j/blockSize, j%blockSize
a, b := s.blocks[jb][jo], s.blocks[ib][io]
if s.setIndex != nil {
@ -115,6 +108,6 @@ func (s *sstack) Swap(i, j int) {
}
// Resets the stack, effectively clearing its contents.
func (s *sstack) Reset() {
*s = *newSstack(s.setIndex, false)
func (s *sstack[P, V]) Reset() {
*s = *newSstack[P, V](s.setIndex)
}

View file

@ -17,23 +17,23 @@ import (
func TestSstack(t *testing.T) {
// Create some initial data
size := 16 * blockSize
data := make([]*item, size)
data := make([]*item[int64, int], size)
for i := 0; i < size; i++ {
data[i] = &item{rand.Int(), rand.Int63()}
data[i] = &item[int64, int]{rand.Int(), rand.Int63()}
}
stack := newSstack(nil, false)
stack := newSstack[int64, int](nil)
for rep := 0; rep < 2; rep++ {
// Push all the data into the stack, pop out every second
secs := []*item{}
secs := []*item[int64, int]{}
for i := 0; i < size; i++ {
stack.Push(data[i])
if i%2 == 0 {
secs = append(secs, stack.Pop().(*item))
secs = append(secs, stack.Pop().(*item[int64, int]))
}
}
rest := []*item{}
rest := []*item[int64, int]{}
for stack.Len() > 0 {
rest = append(rest, stack.Pop().(*item))
rest = append(rest, stack.Pop().(*item[int64, int]))
}
// Make sure the contents of the resulting slices are ok
for i := 0; i < size; i++ {
@ -50,12 +50,12 @@ func TestSstack(t *testing.T) {
func TestSstackSort(t *testing.T) {
// Create some initial data
size := 16 * blockSize
data := make([]*item, size)
data := make([]*item[int64, int], size)
for i := 0; i < size; i++ {
data[i] = &item{rand.Int(), int64(i)}
data[i] = &item[int64, int]{rand.Int(), int64(i)}
}
// Push all the data into the stack
stack := newSstack(nil, false)
stack := newSstack[int64, int](nil)
for _, val := range data {
stack.Push(val)
}
@ -72,18 +72,18 @@ func TestSstackSort(t *testing.T) {
func TestSstackReset(t *testing.T) {
// Create some initial data
size := 16 * blockSize
data := make([]*item, size)
data := make([]*item[int64, int], size)
for i := 0; i < size; i++ {
data[i] = &item{rand.Int(), rand.Int63()}
data[i] = &item[int64, int]{rand.Int(), rand.Int63()}
}
stack := newSstack(nil, false)
stack := newSstack[int64, int](nil)
for rep := 0; rep < 2; rep++ {
// Push all the data into the stack, pop out every second
secs := []*item{}
secs := []*item[int64, int]{}
for i := 0; i < size; i++ {
stack.Push(data[i])
if i%2 == 0 {
secs = append(secs, stack.Pop().(*item))
secs = append(secs, stack.Pop().(*item[int64, int]))
}
}
// Reset and verify both pulled and stack contents

View file

@ -26,7 +26,7 @@ type TradingService interface {
GetEmptyTradingState() (*tradingstate.TradingStateDB, error)
HasTradingState(block *types.Block, author common.Address) bool
GetStateCache() tradingstate.Database
GetTriegc() *prque.Prque
GetTriegc() *prque.Prque[int64, common.Hash]
ApplyOrder(header *types.Header, coinbase common.Address, chain consensus.ChainContext, statedb *state.StateDB, XDCXstatedb *tradingstate.TradingStateDB, orderBook common.Hash, order *tradingstate.OrderItem) ([]map[string]string, []*tradingstate.OrderItem, error)
UpdateMediumPriceBeforeEpoch(epochNumber uint64, tradingStateDB *tradingstate.TradingStateDB, statedb *state.StateDB) error
IsSDKNode() bool
@ -40,7 +40,7 @@ type LendingService interface {
GetLendingState(block *types.Block, author common.Address) (*lendingstate.LendingStateDB, error)
HasLendingState(block *types.Block, author common.Address) bool
GetStateCache() lendingstate.Database
GetTriegc() *prque.Prque
GetTriegc() *prque.Prque[int64, common.Hash]
ApplyOrder(header *types.Header, coinbase common.Address, chain consensus.ChainContext, statedb *state.StateDB, lendingStateDB *lendingstate.LendingStateDB, tradingStateDb *tradingstate.TradingStateDB, lendingOrderBook common.Hash, order *lendingstate.LendingItem) ([]*lendingstate.LendingTrade, []*lendingstate.LendingItem, error)
GetCollateralPrices(header *types.Header, chain consensus.ChainContext, statedb *state.StateDB, tradingStateDb *tradingstate.TradingStateDB, collateralToken common.Address, lendingToken common.Address) (*big.Int, *big.Int, error)
GetMediumTradePriceBeforeEpoch(chain consensus.ChainContext, statedb *state.StateDB, tradingStateDb *tradingstate.TradingStateDB, baseToken common.Address, quoteToken common.Address) (*big.Int, error)

View file

@ -137,8 +137,8 @@ type BlockChain struct {
db ethdb.Database // Low level persistent database to store final content in
XDCxDb ethdb.XDCxDatabase
triegc *prque.Prque // Priority queue mapping block numbers to tries to gc
gcproc time.Duration // Accumulates canonical block processing for trie dumping
triegc *prque.Prque[int64, common.Hash] // Priority queue mapping block numbers to tries to gc
gcproc time.Duration // Accumulates canonical block processing for trie dumping
hc *HeaderChain
rmLogsFeed event.Feed
@ -209,7 +209,7 @@ func NewBlockChain(db ethdb.Database, cacheConfig *CacheConfig, chainConfig *par
chainConfig: chainConfig,
cacheConfig: cacheConfig,
db: db,
triegc: prque.New(nil),
triegc: prque.New[int64, common.Hash](nil),
stateCache: state.NewDatabase(db),
quit: make(chan struct{}),
bodyCache: lru.NewCache[common.Hash, *types.Body](bodyCacheLimit),
@ -975,17 +975,17 @@ func (bc *BlockChain) saveData() {
}
}
for !bc.triegc.Empty() {
triedb.Dereference(bc.triegc.PopItem().(common.Hash))
triedb.Dereference(bc.triegc.PopItem())
}
if tradingTriedb != nil && lendingTriedb != nil {
if tradingService.GetTriegc() != nil {
for !tradingService.GetTriegc().Empty() {
tradingTriedb.Dereference(tradingService.GetTriegc().PopItem().(common.Hash))
tradingTriedb.Dereference(tradingService.GetTriegc().PopItem())
}
}
if lendingService.GetTriegc() != nil {
for !lendingService.GetTriegc().Empty() {
lendingTriedb.Dereference(lendingService.GetTriegc().PopItem().(common.Hash))
lendingTriedb.Dereference(lendingService.GetTriegc().PopItem())
}
}
}
@ -1328,7 +1328,7 @@ func (bc *BlockChain) WriteBlockWithState(block *types.Block, receipts []*types.
bc.triegc.Push(root, number)
break
}
triedb.Dereference(root.(common.Hash))
triedb.Dereference(root)
}
if tradingService != nil {
for !tradingService.GetTriegc().Empty() {
@ -1337,7 +1337,7 @@ func (bc *BlockChain) WriteBlockWithState(block *types.Block, receipts []*types.
tradingService.GetTriegc().Push(tradingRoot, number)
break
}
tradingTrieDb.Dereference(tradingRoot.(common.Hash))
tradingTrieDb.Dereference(tradingRoot)
}
}
if lendingService != nil {
@ -1347,7 +1347,7 @@ func (bc *BlockChain) WriteBlockWithState(block *types.Block, receipts []*types.
lendingService.GetTriegc().Push(lendingRoot, number)
break
}
lendingTrieDb.Dereference(lendingRoot.(common.Hash))
lendingTrieDb.Dereference(lendingRoot)
}
}
}

View file

@ -997,7 +997,7 @@ func (pool *LendingPool) promoteExecutables(accounts []common.Address) {
if pending > pool.config.GlobalSlots {
pendingBeforeCap := pending
// Assemble a spam order to penalize large transactors first
spammers := prque.New(nil)
spammers := prque.New[int64, common.Address](nil)
for addr, list := range pool.pending {
// Only evict transactions from high rollers
if !pool.locals.contains(addr) && uint64(list.Len()) > pool.config.AccountSlots {
@ -1009,12 +1009,12 @@ func (pool *LendingPool) promoteExecutables(accounts []common.Address) {
for pending > pool.config.GlobalSlots && !spammers.Empty() {
// Retrieve the next offender if not local address
offender, _ := spammers.Pop()
offenders = append(offenders, offender.(common.Address))
offenders = append(offenders, offender)
// Equalize balances until all the same or below threshold
if len(offenders) > 1 {
// Calculate the equalization threshold for all current offenders
threshold := pool.pending[offender.(common.Address)].Len()
threshold := pool.pending[offender].Len()
// Iteratively reduce all offenders until below limit or threshold reached
for pending > pool.config.GlobalSlots && pool.pending[offenders[len(offenders)-2]].Len() > threshold {

View file

@ -914,7 +914,7 @@ func (pool *OrderPool) promoteExecutables(accounts []common.Address) {
if pending > pool.config.GlobalSlots {
pendingBeforeCap := pending
// Assemble a spam order to penalize large transactors first
spammers := prque.New(nil)
spammers := prque.New[int64, common.Address](nil)
for addr, list := range pool.pending {
// Only evict transactions from high rollers
if !pool.locals.contains(addr) && uint64(list.Len()) > pool.config.AccountSlots {
@ -926,12 +926,12 @@ func (pool *OrderPool) promoteExecutables(accounts []common.Address) {
for pending > pool.config.GlobalSlots && !spammers.Empty() {
// Retrieve the next offender if not local address
offender, _ := spammers.Pop()
offenders = append(offenders, offender.(common.Address))
offenders = append(offenders, offender)
// Equalize balances until all the same or below threshold
if len(offenders) > 1 {
// Calculate the equalization threshold for all current offenders
threshold := pool.pending[offender.(common.Address)].Len()
threshold := pool.pending[offender].Len()
// Iteratively reduce all offenders until below limit or threshold reached
for pending > pool.config.GlobalSlots && pool.pending[offenders[len(offenders)-2]].Len() > threshold {

View file

@ -1535,7 +1535,7 @@ func (pool *TxPool) truncatePending() {
pendingBeforeCap := pending
// Assemble a spam order to penalize large transactors first
spammers := prque.New(nil)
spammers := prque.New[int64, common.Address](nil)
for addr, list := range pool.pending {
// Only evict transactions from high rollers
if !pool.locals.contains(addr) && uint64(list.Len()) > pool.config.AccountSlots {
@ -1547,12 +1547,12 @@ func (pool *TxPool) truncatePending() {
for pending > pool.config.GlobalSlots && !spammers.Empty() {
// Retrieve the next offender if not local address
offender, _ := spammers.Pop()
offenders = append(offenders, offender.(common.Address))
offenders = append(offenders, offender)
// Equalize balances until all the same or below threshold
if len(offenders) > 1 {
// Calculate the equalization threshold for all current offenders
threshold := pool.pending[offender.(common.Address)].Len()
threshold := pool.pending[offender].Len()
// Iteratively reduce all offenders until below limit or threshold reached
for pending > pool.config.GlobalSlots && pool.pending[offenders[len(offenders)-2]].Len() > threshold {

View file

@ -70,7 +70,7 @@ type queue struct {
// Headers are "special", they download in batches, supported by a skeleton chain
headerHead common.Hash // [eth/62] Hash of the last queued header to verify order
headerTaskPool map[uint64]*types.Header // [eth/62] Pending header retrieval tasks, mapping starting indexes to skeleton headers
headerTaskQueue *prque.Prque // [eth/62] Priority queue of the skeleton indexes to fetch the filling headers for
headerTaskQueue *prque.Prque[int64, uint64] // [eth/62] Priority queue of the skeleton indexes to fetch the filling headers for
headerPeerMiss map[string]map[uint64]struct{} // [eth/62] Set of per-peer header batches known to be unavailable
headerPendPool map[string]*fetchRequest // [eth/62] Currently pending header retrieval operations
headerResults []*types.Header // [eth/62] Result cache accumulating the completed headers
@ -79,15 +79,15 @@ type queue struct {
headerContCh chan bool // [eth/62] Channel to notify when header download finishes
// All data retrievals below are based on an already assembles header chain
blockTaskPool map[common.Hash]*types.Header // [eth/62] Pending block (body) retrieval tasks, mapping hashes to headers
blockTaskQueue *prque.Prque // [eth/62] Priority queue of the headers to fetch the blocks (bodies) for
blockPendPool map[string]*fetchRequest // [eth/62] Currently pending block (body) retrieval operations
blockDonePool map[common.Hash]struct{} // [eth/62] Set of the completed block (body) fetches
blockTaskPool map[common.Hash]*types.Header // [eth/62] Pending block (body) retrieval tasks, mapping hashes to headers
blockTaskQueue *prque.Prque[int64, *types.Header] // [eth/62] Priority queue of the headers to fetch the blocks (bodies) for
blockPendPool map[string]*fetchRequest // [eth/62] Currently pending block (body) retrieval operations
blockDonePool map[common.Hash]struct{} // [eth/62] Set of the completed block (body) fetches
receiptTaskPool map[common.Hash]*types.Header // [eth/63] Pending receipt retrieval tasks, mapping hashes to headers
receiptTaskQueue *prque.Prque // [eth/63] Priority queue of the headers to fetch the receipts for
receiptPendPool map[string]*fetchRequest // [eth/63] Currently pending receipt retrieval operations
receiptDonePool map[common.Hash]struct{} // [eth/63] Set of the completed receipt fetches
receiptTaskPool map[common.Hash]*types.Header // [eth/63] Pending receipt retrieval tasks, mapping hashes to headers
receiptTaskQueue *prque.Prque[int64, *types.Header] // [eth/63] Priority queue of the headers to fetch the receipts for
receiptPendPool map[string]*fetchRequest // [eth/63] Currently pending receipt retrieval operations
receiptDonePool map[common.Hash]struct{} // [eth/63] Set of the completed receipt fetches
resultCache []*fetchResult // Downloaded but not yet delivered fetch results
resultOffset uint64 // Offset of the first cached fetch result in the block chain
@ -105,11 +105,11 @@ func newQueue() *queue {
headerPendPool: make(map[string]*fetchRequest),
headerContCh: make(chan bool),
blockTaskPool: make(map[common.Hash]*types.Header),
blockTaskQueue: prque.New(nil),
blockTaskQueue: prque.New[int64, *types.Header](nil),
blockPendPool: make(map[string]*fetchRequest),
blockDonePool: make(map[common.Hash]struct{}),
receiptTaskPool: make(map[common.Hash]*types.Header),
receiptTaskQueue: prque.New(nil),
receiptTaskQueue: prque.New[int64, *types.Header](nil),
receiptPendPool: make(map[string]*fetchRequest),
receiptDonePool: make(map[common.Hash]struct{}),
resultCache: make([]*fetchResult, blockCacheItems),
@ -277,7 +277,7 @@ func (q *queue) ScheduleSkeleton(from uint64, skeleton []*types.Header) {
}
// Shedule all the header retrieval tasks for the skeleton assembly
q.headerTaskPool = make(map[uint64]*types.Header)
q.headerTaskQueue = prque.New(nil)
q.headerTaskQueue = prque.New[int64, uint64](nil)
q.headerPeerMiss = make(map[string]map[uint64]struct{}) // Reset availability to correct invalid chains
q.headerResults = make([]*types.Header, len(skeleton)*MaxHeaderFetch)
q.headerProced = 0
@ -427,12 +427,12 @@ func (q *queue) ReserveHeaders(p *peerConnection, count int) *fetchRequest {
for send == 0 && !q.headerTaskQueue.Empty() {
from, _ := q.headerTaskQueue.Pop()
if q.headerPeerMiss[p.id] != nil {
if _, ok := q.headerPeerMiss[p.id][from.(uint64)]; ok {
skip = append(skip, from.(uint64))
if _, ok := q.headerPeerMiss[p.id][from]; ok {
skip = append(skip, from)
continue
}
}
send = from.(uint64)
send = from
}
// Merge all the skipped batches back
for _, from := range skip {
@ -484,7 +484,7 @@ func (q *queue) ReserveReceipts(p *peerConnection, count int) (*fetchRequest, bo
// Note, this method expects the queue lock to be already held for writing. The
// reason the lock is not obtained in here is because the parameters already need
// to access the queue, so they already need a lock anyway.
func (q *queue) reserveHeaders(p *peerConnection, count int, taskPool map[common.Hash]*types.Header, taskQueue *prque.Prque,
func (q *queue) reserveHeaders(p *peerConnection, count int, taskPool map[common.Hash]*types.Header, taskQueue *prque.Prque[int64, *types.Header],
pendPool map[string]*fetchRequest, donePool map[common.Hash]struct{}, isNoop func(*types.Header) bool) (*fetchRequest, bool, error) {
// Short circuit if the pool has been depleted, or if the peer's already
// downloading something (sanity check not to corrupt state)
@ -503,7 +503,7 @@ func (q *queue) reserveHeaders(p *peerConnection, count int, taskPool map[common
progress := false
for proc := 0; proc < space && len(send) < count && !taskQueue.Empty(); proc++ {
header := taskQueue.PopItem().(*types.Header)
header := taskQueue.PopItem()
hash := header.Hash()
// If we're the first to request this task, initialise the result container
@ -586,12 +586,12 @@ func (q *queue) CancelReceipts(request *fetchRequest) {
}
// Cancel aborts a fetch request, returning all pending hashes to the task queue.
func (q *queue) cancel(request *fetchRequest, taskQueue *prque.Prque, pendPool map[string]*fetchRequest) {
func (q *queue) cancel(request *fetchRequest, taskQueue interface{}, pendPool map[string]*fetchRequest) {
if request.From > 0 {
taskQueue.Push(request.From, -int64(request.From))
taskQueue.(*prque.Prque[int64, uint64]).Push(request.From, -int64(request.From))
}
for _, header := range request.Headers {
taskQueue.Push(header, -int64(header.Number.Uint64()))
taskQueue.(*prque.Prque[int64, *types.Header]).Push(header, -int64(header.Number.Uint64()))
}
delete(pendPool, request.Peer.id)
}
@ -650,7 +650,7 @@ func (q *queue) ExpireReceipts(timeout time.Duration) map[string]int {
// Note, this method expects the queue lock to be already held. The
// reason the lock is not obtained in here is because the parameters already need
// to access the queue, so they already need a lock anyway.
func (q *queue) expire(timeout time.Duration, pendPool map[string]*fetchRequest, taskQueue *prque.Prque, timeoutMeter *metrics.Meter) map[string]int {
func (q *queue) expire(timeout time.Duration, pendPool map[string]*fetchRequest, taskQueue interface{}, timeoutMeter *metrics.Meter) map[string]int {
// Iterate over the expired requests and return each to the queue
expiries := make(map[string]int)
for id, request := range pendPool {
@ -660,10 +660,10 @@ func (q *queue) expire(timeout time.Duration, pendPool map[string]*fetchRequest,
// Return any non satisfied requests to the pool
if request.From > 0 {
taskQueue.Push(request.From, -int64(request.From))
taskQueue.(*prque.Prque[int64, uint64]).Push(request.From, -int64(request.From))
}
for _, header := range request.Headers {
taskQueue.Push(header, -int64(header.Number.Uint64()))
taskQueue.(*prque.Prque[int64, *types.Header]).Push(header, -int64(header.Number.Uint64()))
}
// Add the peer to the expiry report along the the number of failed requests
expiries[id] = len(request.Headers)
@ -804,7 +804,7 @@ func (q *queue) DeliverReceipts(id string, receiptList [][]*types.Receipt) (int,
// Note, this method expects the queue lock to be already held for writing. The
// reason the lock is not obtained in here is because the parameters already need
// to access the queue, so they already need a lock anyway.
func (q *queue) deliver(id string, taskPool map[common.Hash]*types.Header, taskQueue *prque.Prque,
func (q *queue) deliver(id string, taskPool map[common.Hash]*types.Header, taskQueue *prque.Prque[int64, *types.Header],
pendPool map[string]*fetchRequest, donePool map[common.Hash]struct{}, reqTimer *metrics.Timer,
results int, reconstruct func(header *types.Header, index int, result *fetchResult) error) (int, error) {

View file

@ -131,11 +131,10 @@ type Fetcher struct {
completing map[common.Hash]*announce // Blocks with headers, currently body-completing
// Block cache
queue *prque.Prque // Queue containing the import operations (block number sorted)
queues map[string]int // Per peer block counts to prevent memory exhaustion
queued map[common.Hash]*inject // Set of already queued blocks (to dedup imports)
queue *prque.Prque[int64, *inject] // Queue containing the import operations (block number sorted)
queues map[string]int // Per peer block counts to prevent memory exhaustion
queued map[common.Hash]*inject // Set of already queued blocks (to dedup imports)
knowns *lru.Cache[common.Hash, struct{}]
// Callbacks
getBlock blockRetrievalFn // Retrieves a block from the local chain
verifyHeader headerVerifierFn // Checks if a block's headers have a valid proof of work
@ -170,7 +169,7 @@ func New(getBlock blockRetrievalFn, verifyHeader headerVerifierFn, handlePropose
fetching: make(map[common.Hash]*announce),
fetched: make(map[common.Hash][]*announce),
completing: make(map[common.Hash]*announce),
queue: prque.New(nil),
queue: prque.New[int64, *inject](nil),
queues: make(map[string]int),
queued: make(map[common.Hash]*inject),
knowns: lru.NewCache[common.Hash, struct{}](blockLimit),
@ -304,7 +303,7 @@ func (f *Fetcher) loop() {
// Import any queued blocks that could potentially fit
height := f.chainHeight()
for !f.queue.Empty() {
op := f.queue.PopItem().(*inject)
op := f.queue.PopItem()
if f.queueChangeHook != nil {
f.queueChangeHook(op.block.Hash(), false)
}

1
go.mod
View file

@ -53,6 +53,7 @@ require (
github.com/mattn/go-isatty v0.0.17
github.com/shirou/gopsutil v3.21.4-0.20210419000835-c7a38de76ee5+incompatible
github.com/urfave/cli/v2 v2.27.5
golang.org/x/exp v0.0.0-20221126150942-6ab00d035af9
gopkg.in/natefinch/lumberjack.v2 v2.2.1
)

2
go.sum
View file

@ -214,6 +214,8 @@ golang.org/x/crypto v0.0.0-20200820211705-5c72a883971a/go.mod h1:LzIPMQfyMNhhGPh
golang.org/x/crypto v0.0.0-20201221181555-eec23a3978ad/go.mod h1:jdWPYTVW3xRLrWPugEBEK3UY2ZEsg3UU495nc5E+M+I=
golang.org/x/crypto v0.29.0 h1:L5SG1JTTXupVV3n6sUqMTeWbjAyfPwoda2DLX8J8FrQ=
golang.org/x/crypto v0.29.0/go.mod h1:+F4F4N5hv6v38hfeYwTdx20oUvLLc+QfrE9Ax9HtgRg=
golang.org/x/exp v0.0.0-20221126150942-6ab00d035af9 h1:yZNXmy+j/JpX19vZkVktWqAo7Gny4PBWYYK3zskGpx4=
golang.org/x/exp v0.0.0-20221126150942-6ab00d035af9/go.mod h1:CxIveKay+FTh1D0yPZemJVgC/95VzuuOLq5Qi4xnoYc=
golang.org/x/mod v0.17.0 h1:zY54UmvipHiNd+pm+m0x9KhZ9hl1/7QNMyxXbc6ICqA=
golang.org/x/mod v0.17.0/go.mod h1:hTbmBsO62+eylJbnUtE2MGJUyE7QWk4xUqPFrRgJ+7c=
golang.org/x/net v0.0.0-20180906233101-161cd47e91fd/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=

View file

@ -73,7 +73,7 @@ type Sync struct {
database ethdb.KeyValueReader // Persistent database to check for existing entries
membatch *syncMemBatch // Memory buffer to avoid frequent database writes
requests map[common.Hash]*request // Pending requests pertaining to a key hash
queue *prque.Prque // Priority queue with the pending requests
queue *prque.Prque[int64, any] // Priority queue with the pending requests
bloom *SyncBloom // Bloom filter for fast Node existence checks
}
@ -83,7 +83,7 @@ func NewSync(root common.Hash, database ethdb.KeyValueReader, callback LeafCallb
database: database,
membatch: newSyncMemBatch(),
requests: make(map[common.Hash]*request),
queue: prque.New(nil),
queue: prque.New[int64, any](nil), // Ugh, can contain both string and hash, whyyy
bloom: bloom,
}
ts.AddSubTrie(root, 0, common.Hash{}, callback)