diff --git a/eth/downloader/downloader.go b/eth/downloader/downloader.go index 457df86ded..1de0933842 100644 --- a/eth/downloader/downloader.go +++ b/eth/downloader/downloader.go @@ -44,7 +44,6 @@ var ( MaxBlockFetch = 128 // Number of blocks to be fetched per retrieval request MaxHeaderFetch = 192 // Number of block headers to be fetched per retrieval request MaxReceiptFetch = 256 // Number of transaction receipts to allow fetching per request - MaxBALFetch = 128 // Number of transaction bals to allow fetching per request maxQueuedHeaders = 32 * 1024 // [eth/62] Maximum number of headers to queue for import (DOS protection) maxHeadersProcess = 2048 // Number of header download results to import at once into the chain @@ -53,8 +52,6 @@ var ( reorgProtHeaderDelay = 2 // Number of headers to delay delivering to cover mini reorgs - maxBALSyncGap uint64 = 128 // Maximum sync gap to enable BAL fetching - fsHeaderSafetyNet = 2048 // Number of headers to discard in case a chain violation is detected fsHeaderContCheck = 3 * time.Second // Time interval to check for header continuations during state download fsMinFullBlocks = 64 // Number of blocks to retrieve fully even in snap sync @@ -68,7 +65,6 @@ var ( errInvalidChain = errors.New("retrieved hash chain is invalid") errInvalidBody = errors.New("retrieved block body is invalid") errInvalidReceipt = errors.New("retrieved receipt is invalid") - errInvalidBAL = errors.New("retrieved bal is invalid") errCancelStateFetch = errors.New("state data download canceled (requested)") errCancelContentProcessing = errors.New("content processing canceled (requested)") errCanceled = errors.New("syncing canceled (requested)") @@ -157,7 +153,6 @@ type Downloader struct { // Testing hooks bodyFetchHook func([]*types.Header) // Method to call upon starting a block body fetch receiptFetchHook func([]*types.Header) // Method to call upon starting a receipt fetch - balFetchHook func([]*types.Header) // Method to call upon starting a bal fetch chainInsertHook func([]*fetchResult) // Method to call upon inserting a chain of blocks (possibly in multiple invocations) // Progress reporting metrics @@ -391,7 +386,7 @@ func (d *Downloader) synchronise(beaconPing chan struct{}) (err error) { d.queue.Reset(blockCacheMaxItems, blockCacheInitialItems) d.peers.Reset() - for _, ch := range []chan bool{d.queue.blockWakeCh, d.queue.receiptWakeCh, d.queue.balWakeCh} { + for _, ch := range []chan bool{d.queue.blockWakeCh, d.queue.receiptWakeCh} { select { case <-ch: default: @@ -586,19 +581,14 @@ func (d *Downloader) syncToHead() (err error) { log.Info("Skip chain segment before cutoff", "origin", origin, "cutoff", d.chainCutoffNumber) } } - // BAL fetching is only enabled for small sync gaps. Actual fetch - // is done only for blocks whose headers contain a BAL hash. - fetchBAL := mode == ethconfig.FullSync && height-origin <= maxBALSyncGap - // Initiate the sync using a concurrent header and content retrieval algorithm - d.queue.Prepare(chainOffset, mode, fetchBAL) + d.queue.Prepare(chainOffset, mode) // In beacon mode, headers are served by the skeleton syncer fetchers := []func() error{ func() error { return d.fetchHeaders(origin + 1) }, // Headers are always retrieved func() error { return d.fetchBodies(chainOffset) }, // Bodies are retrieved during normal and snap sync func() error { return d.fetchReceipts(chainOffset) }, // Receipts are retrieved during snap sync - func() error { return d.fetchBALs(chainOffset) }, func() error { return d.processHeaders(origin + 1) }, } if mode == ethconfig.SnapSync { @@ -710,17 +700,6 @@ func (d *Downloader) fetchReceipts(from uint64) error { return err } -// fetchReceipts iteratively downloads the scheduled bals, taking any -// available peers, reserving a chunk of bals for each, waiting for delivery -// and also periodically checking for timeouts. -func (d *Downloader) fetchBALs(from uint64) error { - log.Debug("Downloading bals", "origin", from) - err := d.concurrentFetch((*balQueue)(d)) - - log.Debug("BAL download terminated", "err", err) - return err -} - // processHeaders takes batches of retrieved headers from an input channel and // keeps processing and scheduling them into the header chain and downloader's // queue until the stream ends or a failure occurs. @@ -740,7 +719,7 @@ func (d *Downloader) processHeaders(origin uint64) error { // Terminate header processing if we synced up if task == nil || len(task.headers) == 0 { // Notify everyone that headers are fully processed - for _, ch := range []chan bool{d.queue.blockWakeCh, d.queue.receiptWakeCh, d.queue.balWakeCh} { + for _, ch := range []chan bool{d.queue.blockWakeCh, d.queue.receiptWakeCh} { select { case ch <- false: case <-d.cancelCh: @@ -819,7 +798,7 @@ func (d *Downloader) processHeaders(origin uint64) error { // Signal the downloader of the availability of new tasks if scheduled { - for _, ch := range []chan bool{d.queue.blockWakeCh, d.queue.receiptWakeCh, d.queue.balWakeCh} { + for _, ch := range []chan bool{d.queue.blockWakeCh, d.queue.receiptWakeCh} { select { case ch <- true: default: @@ -864,11 +843,7 @@ func (d *Downloader) importBlockResults(results []*fetchResult) error { ) blocks := make([]*types.Block, len(results)) for i, result := range results { - block := types.NewBlockWithHeader(result.Header).WithBody(result.body()) - if result.BAL != nil { - block = block.WithAccessList(result.BAL) - } - blocks[i] = block + blocks[i] = types.NewBlockWithHeader(result.Header).WithBody(result.body()) } // Downloaded blocks are always regarded as trusted after the // transition. Because the downloaded chain is guided by the diff --git a/eth/downloader/downloader_test.go b/eth/downloader/downloader_test.go index 67b5cb282d..6d5d159631 100644 --- a/eth/downloader/downloader_test.go +++ b/eth/downloader/downloader_test.go @@ -29,7 +29,6 @@ import ( "github.com/ethereum/go-ethereum/core" "github.com/ethereum/go-ethereum/core/rawdb" "github.com/ethereum/go-ethereum/core/types" - "github.com/ethereum/go-ethereum/crypto" "github.com/ethereum/go-ethereum/eth/ethconfig" "github.com/ethereum/go-ethereum/eth/protocols/eth" "github.com/ethereum/go-ethereum/eth/protocols/snap" @@ -305,36 +304,6 @@ func (dlp *downloadTesterPeer) RequestReceipts(hashes []common.Hash, gasUsed []u return res.Req, nil } -// RequestBALs constructs a getBlockAccessLists method associated with a -// particular peer in the download tester. The returned function can be used to -// retrieve batches of block access lists from the particularly requested peer. -func (dlp *downloadTesterPeer) RequestBALs(hashes []common.Hash, sink chan *eth.Response) (*eth.Request, error) { - bals := make(eth.BlockAccessListResponse, 0, len(hashes)) - respHashes := make([]common.Hash, 0, len(hashes)) - for _, hash := range hashes { - var entry eth.RawBlockAccessList - data := dlp.chain.GetAccessListRLP(hash) - if len(data) == 0 { - _ = entry.AppendRaw([]byte{0xC0}) - } else if err := rlp.DecodeBytes(data, &entry); err != nil { - return nil, err - } - bals = append(bals, entry) - respHashes = append(respHashes, crypto.Keccak256Hash(entry.Bytes())) - } - res := ð.Response{ - Req: ð.Request{Peer: dlp.id}, - Res: &bals, - Meta: respHashes, - Time: 1, - Done: make(chan error, 1), - } - go func() { - sink <- res - }() - return res.Req, nil -} - // ID retrieves the peer's unique identifier. func (dlp *downloadTesterPeer) ID() string { return dlp.id diff --git a/eth/downloader/fetchers_concurrent_bals.go b/eth/downloader/fetchers_concurrent_bals.go deleted file mode 100644 index 141cbb4ccf..0000000000 --- a/eth/downloader/fetchers_concurrent_bals.go +++ /dev/null @@ -1,106 +0,0 @@ -// Copyright 2021 The go-ethereum Authors -// This file is part of the go-ethereum library. -// -// The go-ethereum library is free software: you can redistribute it and/or modify -// it under the terms of the GNU Lesser General Public License as published by -// the Free Software Foundation, either version 3 of the License, or -// (at your option) any later version. -// -// The go-ethereum library is distributed in the hope that it will be useful, -// but WITHOUT ANY WARRANTY; without even the implied warranty of -// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -// GNU Lesser General Public License for more details. -// -// You should have received a copy of the GNU Lesser General Public License -// along with the go-ethereum library. If not, see . - -package downloader - -import ( - "time" - - "github.com/ethereum/go-ethereum/common" - "github.com/ethereum/go-ethereum/eth/protocols/eth" - "github.com/ethereum/go-ethereum/log" -) - -// balQueue implements typedQueue and is a type adapter between the generic -// concurrent fetcher and the downloader. -type balQueue Downloader - -// waker returns a notification channel that gets pinged in case more bal -// fetches have been queued up, so the fetcher might assign it to idle peers. -func (q *balQueue) waker() chan bool { - return q.queue.balWakeCh -} - -// pending returns the number of bal that are currently queued for fetching -// by the concurrent downloader. -func (q *balQueue) pending() int { - return q.queue.PendingBALs() -} - -// capacity is responsible for calculating how many bals a particular peer is -// estimated to be able to retrieve within the allotted round trip time. -func (q *balQueue) capacity(peer *peerConnection, rtt time.Duration) int { - return peer.BALCapacity(rtt) -} - -// updateCapacity is responsible for updating how many bals a particular peer -// is estimated to be able to retrieve in a unit time. -func (q *balQueue) updateCapacity(peer *peerConnection, items int, span time.Duration) { - peer.UpdateBALRate(items, span) -} - -// reserve is responsible for allocating a requested number of pending bals -// from the download queue to the specified peer. -func (q *balQueue) reserve(peer *peerConnection, items int) (*fetchRequest, bool, bool) { - return q.queue.ReserveBALs(peer, items) -} - -// unreserve is responsible for removing the current bal retrieval allocation -// assigned to a specific peer and placing it back into the pool to allow -// reassigning to some other peer. -func (q *balQueue) unreserve(peer string) int { - fails := q.queue.ExpireBALs(peer) - if fails > 2 { - log.Trace("BAL delivery timed out", "peer", peer) - } else { - log.Debug("BAL delivery stalling", "peer", peer) - } - return fails -} - -// request is responsible for converting a generic fetch request into a bal -// one and sending it to the remote peer for fulfillment. -func (q *balQueue) request(peer *peerConnection, req *fetchRequest, resCh chan *eth.Response) (*eth.Request, error) { - peer.log.Trace("Requesting new batch of bals", "count", len(req.Headers), "from", req.Headers[0].Number) - if q.balFetchHook != nil { - q.balFetchHook(req.Headers) - } - var ( - hashes = make([]common.Hash, 0, len(req.Headers)) - ) - for _, header := range req.Headers { - hashes = append(hashes, header.Hash()) - } - return peer.peer.RequestBALs(hashes, resCh) -} - -// deliver is responsible for taking a generic response packet from the concurrent -// fetcher, unpacking the bal data and delivering it to the downloader's queue. -func (q *balQueue) deliver(peer *peerConnection, packet *eth.Response) (int, error) { - bals := *packet.Res.(*eth.BlockAccessListResponse) - hashes := packet.Meta.([]common.Hash) // {bal hashes} - - accepted, err := q.queue.DeliverBALs(peer.id, bals, hashes) - switch { - case err == nil && len(bals) == 0: - peer.log.Trace("Requested bals delivered") - case err == nil: - peer.log.Trace("Delivered new batch of bals", "count", len(bals), "accepted", accepted) - default: - peer.log.Debug("Failed to deliver retrieved bals", "err", err) - } - return accepted, err -} diff --git a/eth/downloader/metrics.go b/eth/downloader/metrics.go index e0212dc1c7..bfe80ddbf1 100644 --- a/eth/downloader/metrics.go +++ b/eth/downloader/metrics.go @@ -37,10 +37,5 @@ var ( receiptDropMeter = metrics.NewRegisteredMeter("eth/downloader/receipts/drop", nil) receiptTimeoutMeter = metrics.NewRegisteredMeter("eth/downloader/receipts/timeout", nil) - balInMeter = metrics.NewRegisteredMeter("eth/downloader/bals/in", nil) - balReqTimer = metrics.NewRegisteredTimer("eth/downloader/bals/req", nil) - balDropMeter = metrics.NewRegisteredMeter("eth/downloader/bals/drop", nil) - balTimeoutMeter = metrics.NewRegisteredMeter("eth/downloader/bals/timeout", nil) - throttleCounter = metrics.NewRegisteredCounter("eth/downloader/throttle", nil) ) diff --git a/eth/downloader/peer.go b/eth/downloader/peer.go index 520f29befe..d20bda69e9 100644 --- a/eth/downloader/peer.go +++ b/eth/downloader/peer.go @@ -61,7 +61,6 @@ type Peer interface { RequestBodies([]common.Hash, chan *eth.Response) (*eth.Request, error) RequestReceipts([]common.Hash, []uint64, []uint64, chan *eth.Response) (*eth.Request, error) - RequestBALs([]common.Hash, chan *eth.Response) (*eth.Request, error) } // newPeerConnection creates a new downloader peer. @@ -101,12 +100,6 @@ func (p *peerConnection) UpdateReceiptRate(delivered int, elapsed time.Duration) p.rates.Update(eth.ReceiptsMsg, elapsed, delivered) } -// UpdateBALRate updates the peer's estimated bal retrieval throughput -// with the current measurement. -func (p *peerConnection) UpdateBALRate(delivered int, elapsed time.Duration) { - p.rates.Update(eth.ReceiptsMsg, elapsed, delivered) -} - // HeaderCapacity retrieves the peer's header download allowance based on its // previously discovered throughput. func (p *peerConnection) HeaderCapacity(targetRTT time.Duration) int { @@ -137,16 +130,6 @@ func (p *peerConnection) ReceiptCapacity(targetRTT time.Duration) int { return cap } -// BALCapacity retrieves the peers bal download allowance based on its -// previously discovered throughput. -func (p *peerConnection) BALCapacity(targetRTT time.Duration) int { - cap := p.rates.Capacity(eth.BlockAccessListsMsg, targetRTT) - if cap > MaxBALFetch { - cap = MaxBALFetch - } - return cap -} - // MarkLacking appends a new entity to the set of items (blocks, receipts, states) // that a peer is known not to have (i.e. have been requested before). If the // set reaches its maximum allowed capacity, items are randomly dropped off. diff --git a/eth/downloader/queue.go b/eth/downloader/queue.go index c02076c59d..dd17b7f1ed 100644 --- a/eth/downloader/queue.go +++ b/eth/downloader/queue.go @@ -29,7 +29,6 @@ import ( "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/common/prque" "github.com/ethereum/go-ethereum/core/types" - "github.com/ethereum/go-ethereum/core/types/bal" "github.com/ethereum/go-ethereum/eth/ethconfig" "github.com/ethereum/go-ethereum/eth/protocols/eth" "github.com/ethereum/go-ethereum/log" @@ -40,7 +39,6 @@ import ( const ( bodyType = uint(0) receiptType = uint(1) - balType = uint(2) ) var ( @@ -73,10 +71,9 @@ type fetchResult struct { Transactions types.Transactions Receipts rlp.RawValue Withdrawals types.Withdrawals - BAL *bal.BlockAccessList } -func newFetchResult(header *types.Header, snapSync bool, fetchBAL bool) *fetchResult { +func newFetchResult(header *types.Header, snapSync bool) *fetchResult { item := &fetchResult{ Header: header, } @@ -92,10 +89,6 @@ func newFetchResult(header *types.Header, snapSync bool, fetchBAL bool) *fetchRe } else { item.pending.Store(item.pending.Load() | (1 << receiptType)) } - } else { - if header.BlockAccessListHash != nil && fetchBAL { - item.pending.Store(item.pending.Load() | (1 << balType)) - } } return item } @@ -128,13 +121,6 @@ func (f *fetchResult) SetReceiptsDone() { } } -// SetBALsDone flags the bals as finished. -func (f *fetchResult) SetBALDone() { - if v := f.pending.Load(); (v & (1 << balType)) != 0 { - f.pending.Add(-4) - } -} - // Done checks if the given type is done already func (f *fetchResult) Done(kind uint) bool { v := f.pending.Load() @@ -157,13 +143,6 @@ type queue struct { receiptPendPool map[string]*fetchRequest // Currently pending receipt retrieval operations receiptWakeCh chan bool // Channel to notify when receipt fetcher of new tasks - balTaskPool map[common.Hash]*types.Header // Pending bal retrieval tasks, mapping hashes to headers - balTaskQueue *prque.Prque[int64, *types.Header] // Priority queue of the headers to fetch the bals for - balPendPool map[string]*fetchRequest // Currently pending bal retrieval operations - balWakeCh chan bool // Channel to notify when bal fetcher of new tasks - - fetchBAL bool // Whether to fetch BALs (only for small sync gaps) - resultCache *resultStore // Downloaded but not yet delivered fetch results resultSize common.StorageSize // Approximate size of a block (exponential moving average) @@ -182,8 +161,6 @@ func newQueue(blockCacheLimit int, thresholdInitialSize int) *queue { blockWakeCh: make(chan bool, 1), receiptTaskQueue: prque.New[int64, *types.Header](nil), receiptWakeCh: make(chan bool, 1), - balTaskQueue: prque.New[int64, *types.Header](nil), - balWakeCh: make(chan bool, 1), active: sync.NewCond(lock), lock: lock, } @@ -208,10 +185,6 @@ func (q *queue) Reset(blockCacheLimit int, thresholdInitialSize int) { q.receiptTaskQueue.Reset() q.receiptPendPool = make(map[string]*fetchRequest) - q.balTaskPool = make(map[common.Hash]*types.Header) - q.balTaskQueue.Reset() - q.balPendPool = make(map[string]*fetchRequest) - q.resultCache = newResultStore(blockCacheLimit) q.resultCache.SetThrottleThreshold(uint64(thresholdInitialSize)) } @@ -241,14 +214,6 @@ func (q *queue) PendingReceipts() int { return q.receiptTaskQueue.Size() } -// PendingBALs retrieves the number of bals pending for retrieval. -func (q *queue) PendingBALs() int { - q.lock.Lock() - defer q.lock.Unlock() - - return q.balTaskQueue.Size() -} - // InFlightBlocks retrieves whether there are block fetch requests currently in // flight. func (q *queue) InFlightBlocks() bool { @@ -267,22 +232,13 @@ func (q *queue) InFlightReceipts() bool { return len(q.receiptPendPool) > 0 } -// InFlightBALs retrieves whether there are bal fetch requests currently -// in flight. -func (q *queue) InFlightBALs() bool { - q.lock.Lock() - defer q.lock.Unlock() - - return len(q.balPendPool) > 0 -} - // Idle returns if the queue is fully idle or has some data still inside. func (q *queue) Idle() bool { q.lock.Lock() defer q.lock.Unlock() - queued := q.blockTaskQueue.Size() + q.receiptTaskQueue.Size() + q.balTaskQueue.Size() - pending := len(q.blockPendPool) + len(q.receiptPendPool) + len(q.balPendPool) + queued := q.blockTaskQueue.Size() + q.receiptTaskQueue.Size() + pending := len(q.blockPendPool) + len(q.receiptPendPool) return (queued + pending) == 0 } @@ -324,14 +280,6 @@ func (q *queue) Schedule(headers []*types.Header, hashes []common.Hash, from uin q.receiptTaskQueue.Push(header, -int64(header.Number.Uint64())) } } - if q.mode == ethconfig.FullSync && q.fetchBAL && header.BlockAccessListHash != nil { - if _, ok := q.balTaskPool[hash]; ok { - log.Warn("Header already scheduled for BAL fetch", "number", header.Number, "hash", hash) - } else { - q.balTaskPool[hash] = header - q.balTaskQueue.Push(header, -int64(header.Number.Uint64())) - } - } inserts++ q.headerHead = hash from++ @@ -380,7 +328,6 @@ func (q *queue) Results(block bool) []*fetchResult { size += common.StorageSize(tx.Size()) } size += common.StorageSize(result.Withdrawals.Size()) - size += common.StorageSize(result.BAL.EncodedSize()) q.resultSize = common.StorageSize(blockCacheSizeWeight)*size + (1-common.StorageSize(blockCacheSizeWeight))*q.resultSize } @@ -390,7 +337,7 @@ func (q *queue) Results(block bool) []*fetchResult { throttleThreshold = q.resultCache.SetThrottleThreshold(throttleThreshold) // With results removed from the cache, wake throttled fetchers - for _, ch := range []chan bool{q.blockWakeCh, q.receiptWakeCh, q.balWakeCh} { + for _, ch := range []chan bool{q.blockWakeCh, q.receiptWakeCh} { select { case ch <- true: default: @@ -418,7 +365,6 @@ func (q *queue) stats() []interface{} { return []interface{}{ "receiptTasks", q.receiptTaskQueue.Size(), "blockTasks", q.blockTaskQueue.Size(), - "balTasks", q.balTaskQueue.Size(), "itemSize", q.resultSize, } } @@ -443,13 +389,6 @@ func (q *queue) ReserveReceipts(p *peerConnection, count int) (*fetchRequest, bo return q.reserveHeaders(p, count, q.receiptTaskPool, q.receiptTaskQueue, q.receiptPendPool, receiptType) } -func (q *queue) ReserveBALs(p *peerConnection, count int) (*fetchRequest, bool, bool) { - q.lock.Lock() - defer q.lock.Unlock() - - return q.reserveHeaders(p, count, q.balTaskPool, q.balTaskQueue, q.balPendPool, balType) -} - // reserveHeaders reserves a set of data download operations for a given peer, // skipping any previously failed ones. This method is a generic version used // by the individual special reservation functions. @@ -485,7 +424,8 @@ func (q *queue) reserveHeaders(p *peerConnection, count int, taskPool map[common // we can ask the resultcache if this header is within the // "prioritized" segment of blocks. If it is not, we need to throttle - stale, throttle, item, err := q.resultCache.AddFetch(header, q.mode == ethconfig.SnapSync, q.fetchBAL) + + stale, throttle, item, err := q.resultCache.AddFetch(header, q.mode == ethconfig.SnapSync) if stale { // Don't put back in the task queue, this item has already been // delivered upstream @@ -565,12 +505,6 @@ func (q *queue) Revoke(peerID string) { } delete(q.receiptPendPool, peerID) } - if request, ok := q.balPendPool[peerID]; ok { - for _, header := range request.Headers { - q.balTaskQueue.Push(header, -int64(header.Number.Uint64())) - } - delete(q.balPendPool, peerID) - } } // ExpireBodies checks for in flight block body requests that exceeded a timeout @@ -593,16 +527,6 @@ func (q *queue) ExpireReceipts(peer string) int { return q.expire(peer, q.receiptPendPool, q.receiptTaskQueue) } -// ExpireBALs checks for in flight bal requests that exceeded a timeout -// allowance, canceling them and returning the responsible peers for penalisation. -func (q *queue) ExpireBALs(peer string) int { - q.lock.Lock() - defer q.lock.Unlock() - - balTimeoutMeter.Mark(1) - return q.expire(peer, q.balPendPool, q.balTaskPool) -} - // expire is the generic check that moves a specific expired task from a pending // pool back into a task pool. The syntax on the passed taskQueue is a bit weird // as we would need a generic expire method to handle both types, but that is not @@ -718,31 +642,6 @@ func (q *queue) DeliverReceipts(id string, receiptList []rlp.RawValue, receiptLi receiptReqTimer, receiptInMeter, receiptDropMeter, len(receiptList), validate, reconstruct) } -func (q *queue) DeliverBALs(id string, rawBALs []eth.RawBlockAccessList, balHashes []common.Hash) (int, error) { - q.lock.Lock() - defer q.lock.Unlock() - - var bals []*bal.BlockAccessList - validate := func(index int, header *types.Header) error { - if balHashes[index] != *header.BlockAccessListHash { - return errInvalidBAL - } - b, err := rawBALs[index].Items() - if err != nil { - return fmt.Errorf("%w: bad bal: %v", errInvalidBAL, err) - } - blockBAL := bal.BlockAccessList(b) - bals = append(bals, &blockBAL) - return nil - } - reconstruct := func(index int, result *fetchResult) { - result.BAL = bals[index] - result.SetBALDone() - } - return q.deliver(id, q.balTaskPool, q.balTaskQueue, q.balPendPool, - balReqTimer, balInMeter, balDropMeter, len(rawBALs), validate, reconstruct) -} - // deliver injects a data retrieval response into the results queue. // // Note, this method expects the queue lock to be already held for writing. The @@ -826,12 +725,11 @@ func (q *queue) deliver(id string, taskPool map[common.Hash]*types.Header, // Prepare configures the result cache to allow accepting and caching inbound // fetch results. -func (q *queue) Prepare(offset uint64, mode SyncMode, fetchBAL bool) { +func (q *queue) Prepare(offset uint64, mode SyncMode) { q.lock.Lock() defer q.lock.Unlock() // Prepare the queue for sync results q.resultCache.Prepare(offset) q.mode = mode - q.fetchBAL = fetchBAL } diff --git a/eth/downloader/queue_test.go b/eth/downloader/queue_test.go index 82d75eeb3e..c7e8a0d1d6 100644 --- a/eth/downloader/queue_test.go +++ b/eth/downloader/queue_test.go @@ -103,7 +103,7 @@ func TestBasics(t *testing.T) { if !q.Idle() { t.Errorf("new queue should be idle") } - q.Prepare(1, SnapSync, false) + q.Prepare(1, SnapSync) if res := q.Results(false); len(res) != 0 { t.Fatal("new queue should have 0 results") } @@ -200,7 +200,7 @@ func TestEmptyBlocks(t *testing.T) { q := newQueue(10, 10) - q.Prepare(1, SnapSync, false) + q.Prepare(1, SnapSync) // Schedule a batch of headers headers := emptyChain.headers() @@ -279,7 +279,7 @@ func XTestDelivery(t *testing.T) { } q := newQueue(10, 10) var wg sync.WaitGroup - q.Prepare(1, SnapSync, false) + q.Prepare(1, SnapSync) wg.Add(1) go func() { // deliver headers diff --git a/eth/downloader/resultstore.go b/eth/downloader/resultstore.go index e27df98b64..36c382fefc 100644 --- a/eth/downloader/resultstore.go +++ b/eth/downloader/resultstore.go @@ -76,7 +76,7 @@ func (r *resultStore) SetThrottleThreshold(threshold uint64) uint64 { // throttled - if true, the store is at capacity, this particular header is not prio now // item - the result to store data into // err - any error that occurred -func (r *resultStore) AddFetch(header *types.Header, snapSync bool, fetchBAL bool) (stale, throttled bool, item *fetchResult, err error) { +func (r *resultStore) AddFetch(header *types.Header, snapSync bool) (stale, throttled bool, item *fetchResult, err error) { r.lock.Lock() defer r.lock.Unlock() @@ -86,7 +86,7 @@ func (r *resultStore) AddFetch(header *types.Header, snapSync bool, fetchBAL boo return stale, throttled, item, err } if item == nil { - item = newFetchResult(header, snapSync, fetchBAL) + item = newFetchResult(header, snapSync) r.items[index] = item } return stale, throttled, item, err diff --git a/eth/downloader/skeleton_test.go b/eth/downloader/skeleton_test.go index efe1b99e9f..b9e7dba6ee 100644 --- a/eth/downloader/skeleton_test.go +++ b/eth/downloader/skeleton_test.go @@ -212,10 +212,6 @@ func (p *skeletonTestPeer) RequestReceipts([]common.Hash, []uint64, []uint64, ch panic("skeleton sync must not request receipts") } -func (p *skeletonTestPeer) RequestBALs([]common.Hash, chan *eth.Response) (*eth.Request, error) { - panic("skeleton sync must not request block access lists") -} - // Tests various sync initializations based on previous leftovers in the database // and announced heads. func TestSkeletonSyncInit(t *testing.T) {