From e2eb22dcac3dea0d4e5266c637e7a9ed995852b3 Mon Sep 17 00:00:00 2001 From: Daniel Liu Date: Tue, 17 Dec 2024 11:37:21 +0800 Subject: [PATCH] common/prque: make Prque wrap-around priority handling optional (#22495) --- common/prque/lazyqueue.go | 6 +++--- common/prque/prque.go | 11 ++++++++--- common/prque/sstack.go | 20 +++++++++++++------- common/prque/sstack_test.go | 6 +++--- 4 files changed, 27 insertions(+), 16 deletions(-) diff --git a/common/prque/lazyqueue.go b/common/prque/lazyqueue.go index 1a9c1e0007..85d7bf6eee 100644 --- a/common/prque/lazyqueue.go +++ b/common/prque/lazyqueue.go @@ -56,7 +56,7 @@ type ( // NewLazyQueue creates a new lazy queue func NewLazyQueue(setIndex SetIndexCallback, priority PriorityCallback, maxPriority MaxPriorityCallback, clock mclock.Clock, refreshPeriod time.Duration) *LazyQueue { q := &LazyQueue{ - popQueue: newSstack(nil), + popQueue: newSstack(nil, false), setIndex: setIndex, priority: priority, maxPriority: maxPriority, @@ -72,8 +72,8 @@ func NewLazyQueue(setIndex SetIndexCallback, priority PriorityCallback, maxPrior // Reset clears the contents of the queue func (q *LazyQueue) Reset() { - q.queue[0] = newSstack(q.setIndex0) - q.queue[1] = newSstack(q.setIndex1) + q.queue[0] = newSstack(q.setIndex0, false) + q.queue[1] = newSstack(q.setIndex1, false) } // Refresh performs queue re-evaluation if necessary diff --git a/common/prque/prque.go b/common/prque/prque.go index 3cc5a1adaf..fb02e3418c 100755 --- a/common/prque/prque.go +++ b/common/prque/prque.go @@ -28,7 +28,12 @@ type Prque struct { // New creates a new priority queue. func New(setIndex SetIndexCallback) *Prque { - return &Prque{newSstack(setIndex)} + return &Prque{newSstack(setIndex, false)} +} + +// NewWrapAround creates a new priority queue with wrap-around priority handling. +func NewWrapAround(setIndex SetIndexCallback) *Prque { + return &Prque{newSstack(setIndex, true)} } // Pushes a value with a given priority into the queue, expanding if necessary. @@ -36,13 +41,13 @@ func (p *Prque) Push(data interface{}, priority int64) { heap.Push(p.cont, &item{data, priority}) } -// Peek returns the value with the greates priority but does not pop it off. +// Peek returns the value with the greatest priority but does not pop it off. func (p *Prque) Peek() (interface{}, int64) { item := p.cont.blocks[0][0] return item.value, item.priority } -// Pops the value with the greates priority off the stack and returns it. +// Pops the value with the greatest priority off the stack and returns it. // Currently no shrinking is done. func (p *Prque) Pop() (interface{}, int64) { item := heap.Pop(p.cont).(*item) diff --git a/common/prque/sstack.go b/common/prque/sstack.go index 8518af54ff..b06a95413d 100755 --- a/common/prque/sstack.go +++ b/common/prque/sstack.go @@ -31,22 +31,24 @@ type SetIndexCallback func(data interface{}, index int) // the stack (heap) functionality and the Len, Less and Swap methods for the // sortability requirements of the heaps. type sstack struct { - setIndex SetIndexCallback - size int - capacity int - offset int + setIndex SetIndexCallback + size int + capacity int + offset int + wrapAround bool blocks [][]*item active []*item } // Creates a new, empty stack. -func newSstack(setIndex SetIndexCallback) *sstack { +func newSstack(setIndex SetIndexCallback, wrapAround bool) *sstack { result := new(sstack) result.setIndex = setIndex result.active = make([]*item, blockSize) result.blocks = [][]*item{result.active} result.capacity = blockSize + result.wrapAround = wrapAround return result } @@ -94,7 +96,11 @@ func (s *sstack) Len() int { // Compares the priority of two elements of the stack (higher is first). // Required by sort.Interface. func (s *sstack) Less(i, j int) bool { - return (s.blocks[i/blockSize][i%blockSize].priority - s.blocks[j/blockSize][j%blockSize].priority) > 0 + a, b := s.blocks[i/blockSize][i%blockSize].priority, s.blocks[j/blockSize][j%blockSize].priority + if s.wrapAround { + return a-b > 0 + } + return a > b } // Swaps two elements in the stack. Required by sort.Interface. @@ -110,5 +116,5 @@ func (s *sstack) Swap(i, j int) { // Resets the stack, effectively clearing its contents. func (s *sstack) Reset() { - *s = *newSstack(s.setIndex) + *s = *newSstack(s.setIndex, false) } diff --git a/common/prque/sstack_test.go b/common/prque/sstack_test.go index 2ff093579d..bc6298979c 100644 --- a/common/prque/sstack_test.go +++ b/common/prque/sstack_test.go @@ -21,7 +21,7 @@ func TestSstack(t *testing.T) { for i := 0; i < size; i++ { data[i] = &item{rand.Int(), rand.Int63()} } - stack := newSstack(nil) + stack := newSstack(nil, false) for rep := 0; rep < 2; rep++ { // Push all the data into the stack, pop out every second secs := []*item{} @@ -55,7 +55,7 @@ func TestSstackSort(t *testing.T) { data[i] = &item{rand.Int(), int64(i)} } // Push all the data into the stack - stack := newSstack(nil) + stack := newSstack(nil, false) for _, val := range data { stack.Push(val) } @@ -76,7 +76,7 @@ func TestSstackReset(t *testing.T) { for i := 0; i < size; i++ { data[i] = &item{rand.Int(), rand.Int63()} } - stack := newSstack(nil) + stack := newSstack(nil, false) for rep := 0; rep < 2; rep++ { // Push all the data into the stack, pop out every second secs := []*item{}