Merge pull request #563 from gzliudan/fix-issue-377

fix issue #377 and a bug in queue
This commit is contained in:
Daniel Liu 2024-06-27 14:10:51 +08:00 committed by GitHub
commit bf289e89ea
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
2 changed files with 9 additions and 8 deletions

View file

@ -1347,7 +1347,7 @@ func testBlockHeaderAttackerDropping(t *testing.T, protocol int) {
defer tester.terminate()
for i, tt := range tests {
// Register a new peer and ensure it's presence
// Register a new peer and ensure its presence
id := fmt.Sprintf("test %d", i)
if err := tester.newPeer(id, protocol, []common.Hash{tester.genesis.Hash()}, nil, nil, nil); err != nil {
t.Fatalf("test %d: failed to register new peer: %v", i, err)

View file

@ -235,8 +235,7 @@ func (q *queue) ShouldThrottleReceipts() bool {
}
// resultSlots calculates the number of results slots available for requests
// whilst adhering to both the item and the memory limit too of the results
// cache.
// whilst adhering to both the item and the memory limits of the result cache.
func (q *queue) resultSlots(pendPool map[string]*fetchRequest, donePool map[common.Hash]struct{}) int {
// Calculate the maximum length capped by the memory limit
limit := len(q.resultCache)
@ -349,7 +348,7 @@ func (q *queue) Schedule(headers []*types.Header, from uint64) []*types.Header {
}
// Results retrieves and permanently removes a batch of fetch results from
// the cache. the result slice will be empty if the queue has been closed.
// the cache. The result slice will be empty if the queue has been closed.
func (q *queue) Results(block bool) []*fetchResult {
q.lock.Lock()
defer q.lock.Unlock()
@ -511,7 +510,6 @@ func (q *queue) reserveHeaders(p *peerConnection, count int, taskPool map[common
index := int(header.Number.Int64() - int64(q.resultOffset))
if index >= len(q.resultCache) || index < 0 {
log.Error("index allocation went beyond available resultCache space", "index", index, "len.resultCache", len(q.resultCache), "blockNum", header.Number.Int64(), "resultOffset", q.resultOffset)
common.Report("index allocation went beyond available resultCache space")
return nil, false, errInvalidChain
}
if q.resultCache[index] == nil {
@ -566,26 +564,29 @@ func (q *queue) reserveHeaders(p *peerConnection, count int, taskPool map[common
// CancelHeaders aborts a fetch request, returning all pending skeleton indexes to the queue.
func (q *queue) CancelHeaders(request *fetchRequest) {
q.lock.Lock()
defer q.lock.Unlock()
q.cancel(request, q.headerTaskQueue, q.headerPendPool)
}
// CancelBodies aborts a body fetch request, returning all pending headers to the
// task queue.
func (q *queue) CancelBodies(request *fetchRequest) {
q.lock.Lock()
defer q.lock.Unlock()
q.cancel(request, q.blockTaskQueue, q.blockPendPool)
}
// CancelReceipts aborts a body fetch request, returning all pending headers to
// the task queue.
func (q *queue) CancelReceipts(request *fetchRequest) {
q.lock.Lock()
defer q.lock.Unlock()
q.cancel(request, q.receiptTaskQueue, q.receiptPendPool)
}
// Cancel aborts a fetch request, returning all pending hashes to the task queue.
func (q *queue) cancel(request *fetchRequest, taskQueue *prque.Prque, pendPool map[string]*fetchRequest) {
q.lock.Lock()
defer q.lock.Unlock()
if request.From > 0 {
taskQueue.Push(request.From, -int64(request.From))
}