eth/downloader: revert BAL downloading

This commit is contained in:
Felix Lange 2026-05-18 22:46:18 +02:00
parent 4cd7092ccd
commit 39c78e6910
9 changed files with 17 additions and 307 deletions

View file

@ -44,7 +44,6 @@ var (
MaxBlockFetch = 128 // Number of blocks to be fetched per retrieval request
MaxHeaderFetch = 192 // Number of block headers to be fetched per retrieval request
MaxReceiptFetch = 256 // Number of transaction receipts to allow fetching per request
MaxBALFetch = 128 // Number of transaction bals to allow fetching per request
maxQueuedHeaders = 32 * 1024 // [eth/62] Maximum number of headers to queue for import (DOS protection)
maxHeadersProcess = 2048 // Number of header download results to import at once into the chain
@ -53,8 +52,6 @@ var (
reorgProtHeaderDelay = 2 // Number of headers to delay delivering to cover mini reorgs
maxBALSyncGap uint64 = 128 // Maximum sync gap to enable BAL fetching
fsHeaderSafetyNet = 2048 // Number of headers to discard in case a chain violation is detected
fsHeaderContCheck = 3 * time.Second // Time interval to check for header continuations during state download
fsMinFullBlocks = 64 // Number of blocks to retrieve fully even in snap sync
@ -68,7 +65,6 @@ var (
errInvalidChain = errors.New("retrieved hash chain is invalid")
errInvalidBody = errors.New("retrieved block body is invalid")
errInvalidReceipt = errors.New("retrieved receipt is invalid")
errInvalidBAL = errors.New("retrieved bal is invalid")
errCancelStateFetch = errors.New("state data download canceled (requested)")
errCancelContentProcessing = errors.New("content processing canceled (requested)")
errCanceled = errors.New("syncing canceled (requested)")
@ -157,7 +153,6 @@ type Downloader struct {
// Testing hooks
bodyFetchHook func([]*types.Header) // Method to call upon starting a block body fetch
receiptFetchHook func([]*types.Header) // Method to call upon starting a receipt fetch
balFetchHook func([]*types.Header) // Method to call upon starting a bal fetch
chainInsertHook func([]*fetchResult) // Method to call upon inserting a chain of blocks (possibly in multiple invocations)
// Progress reporting metrics
@ -391,7 +386,7 @@ func (d *Downloader) synchronise(beaconPing chan struct{}) (err error) {
d.queue.Reset(blockCacheMaxItems, blockCacheInitialItems)
d.peers.Reset()
for _, ch := range []chan bool{d.queue.blockWakeCh, d.queue.receiptWakeCh, d.queue.balWakeCh} {
for _, ch := range []chan bool{d.queue.blockWakeCh, d.queue.receiptWakeCh} {
select {
case <-ch:
default:
@ -586,19 +581,14 @@ func (d *Downloader) syncToHead() (err error) {
log.Info("Skip chain segment before cutoff", "origin", origin, "cutoff", d.chainCutoffNumber)
}
}
// BAL fetching is only enabled for small sync gaps. Actual fetch
// is done only for blocks whose headers contain a BAL hash.
fetchBAL := mode == ethconfig.FullSync && height-origin <= maxBALSyncGap
// Initiate the sync using a concurrent header and content retrieval algorithm
d.queue.Prepare(chainOffset, mode, fetchBAL)
d.queue.Prepare(chainOffset, mode)
// In beacon mode, headers are served by the skeleton syncer
fetchers := []func() error{
func() error { return d.fetchHeaders(origin + 1) }, // Headers are always retrieved
func() error { return d.fetchBodies(chainOffset) }, // Bodies are retrieved during normal and snap sync
func() error { return d.fetchReceipts(chainOffset) }, // Receipts are retrieved during snap sync
func() error { return d.fetchBALs(chainOffset) },
func() error { return d.processHeaders(origin + 1) },
}
if mode == ethconfig.SnapSync {
@ -710,17 +700,6 @@ func (d *Downloader) fetchReceipts(from uint64) error {
return err
}
// fetchReceipts iteratively downloads the scheduled bals, taking any
// available peers, reserving a chunk of bals for each, waiting for delivery
// and also periodically checking for timeouts.
func (d *Downloader) fetchBALs(from uint64) error {
log.Debug("Downloading bals", "origin", from)
err := d.concurrentFetch((*balQueue)(d))
log.Debug("BAL download terminated", "err", err)
return err
}
// processHeaders takes batches of retrieved headers from an input channel and
// keeps processing and scheduling them into the header chain and downloader's
// queue until the stream ends or a failure occurs.
@ -740,7 +719,7 @@ func (d *Downloader) processHeaders(origin uint64) error {
// Terminate header processing if we synced up
if task == nil || len(task.headers) == 0 {
// Notify everyone that headers are fully processed
for _, ch := range []chan bool{d.queue.blockWakeCh, d.queue.receiptWakeCh, d.queue.balWakeCh} {
for _, ch := range []chan bool{d.queue.blockWakeCh, d.queue.receiptWakeCh} {
select {
case ch <- false:
case <-d.cancelCh:
@ -819,7 +798,7 @@ func (d *Downloader) processHeaders(origin uint64) error {
// Signal the downloader of the availability of new tasks
if scheduled {
for _, ch := range []chan bool{d.queue.blockWakeCh, d.queue.receiptWakeCh, d.queue.balWakeCh} {
for _, ch := range []chan bool{d.queue.blockWakeCh, d.queue.receiptWakeCh} {
select {
case ch <- true:
default:
@ -864,11 +843,7 @@ func (d *Downloader) importBlockResults(results []*fetchResult) error {
)
blocks := make([]*types.Block, len(results))
for i, result := range results {
block := types.NewBlockWithHeader(result.Header).WithBody(result.body())
if result.BAL != nil {
block = block.WithAccessList(result.BAL)
}
blocks[i] = block
blocks[i] = types.NewBlockWithHeader(result.Header).WithBody(result.body())
}
// Downloaded blocks are always regarded as trusted after the
// transition. Because the downloaded chain is guided by the

View file

@ -29,7 +29,6 @@ import (
"github.com/ethereum/go-ethereum/core"
"github.com/ethereum/go-ethereum/core/rawdb"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/crypto"
"github.com/ethereum/go-ethereum/eth/ethconfig"
"github.com/ethereum/go-ethereum/eth/protocols/eth"
"github.com/ethereum/go-ethereum/eth/protocols/snap"
@ -305,36 +304,6 @@ func (dlp *downloadTesterPeer) RequestReceipts(hashes []common.Hash, gasUsed []u
return res.Req, nil
}
// RequestBALs constructs a getBlockAccessLists method associated with a
// particular peer in the download tester. The returned function can be used to
// retrieve batches of block access lists from the particularly requested peer.
func (dlp *downloadTesterPeer) RequestBALs(hashes []common.Hash, sink chan *eth.Response) (*eth.Request, error) {
bals := make(eth.BlockAccessListResponse, 0, len(hashes))
respHashes := make([]common.Hash, 0, len(hashes))
for _, hash := range hashes {
var entry eth.RawBlockAccessList
data := dlp.chain.GetAccessListRLP(hash)
if len(data) == 0 {
_ = entry.AppendRaw([]byte{0xC0})
} else if err := rlp.DecodeBytes(data, &entry); err != nil {
return nil, err
}
bals = append(bals, entry)
respHashes = append(respHashes, crypto.Keccak256Hash(entry.Bytes()))
}
res := &eth.Response{
Req: &eth.Request{Peer: dlp.id},
Res: &bals,
Meta: respHashes,
Time: 1,
Done: make(chan error, 1),
}
go func() {
sink <- res
}()
return res.Req, nil
}
// ID retrieves the peer's unique identifier.
func (dlp *downloadTesterPeer) ID() string {
return dlp.id

View file

@ -1,106 +0,0 @@
// Copyright 2021 The go-ethereum Authors
// This file is part of the go-ethereum library.
//
// The go-ethereum library is free software: you can redistribute it and/or modify
// it under the terms of the GNU Lesser General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// The go-ethereum library is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Lesser General Public License for more details.
//
// You should have received a copy of the GNU Lesser General Public License
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
package downloader
import (
"time"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/eth/protocols/eth"
"github.com/ethereum/go-ethereum/log"
)
// balQueue implements typedQueue and is a type adapter between the generic
// concurrent fetcher and the downloader.
type balQueue Downloader
// waker returns a notification channel that gets pinged in case more bal
// fetches have been queued up, so the fetcher might assign it to idle peers.
func (q *balQueue) waker() chan bool {
return q.queue.balWakeCh
}
// pending returns the number of bal that are currently queued for fetching
// by the concurrent downloader.
func (q *balQueue) pending() int {
return q.queue.PendingBALs()
}
// capacity is responsible for calculating how many bals a particular peer is
// estimated to be able to retrieve within the allotted round trip time.
func (q *balQueue) capacity(peer *peerConnection, rtt time.Duration) int {
return peer.BALCapacity(rtt)
}
// updateCapacity is responsible for updating how many bals a particular peer
// is estimated to be able to retrieve in a unit time.
func (q *balQueue) updateCapacity(peer *peerConnection, items int, span time.Duration) {
peer.UpdateBALRate(items, span)
}
// reserve is responsible for allocating a requested number of pending bals
// from the download queue to the specified peer.
func (q *balQueue) reserve(peer *peerConnection, items int) (*fetchRequest, bool, bool) {
return q.queue.ReserveBALs(peer, items)
}
// unreserve is responsible for removing the current bal retrieval allocation
// assigned to a specific peer and placing it back into the pool to allow
// reassigning to some other peer.
func (q *balQueue) unreserve(peer string) int {
fails := q.queue.ExpireBALs(peer)
if fails > 2 {
log.Trace("BAL delivery timed out", "peer", peer)
} else {
log.Debug("BAL delivery stalling", "peer", peer)
}
return fails
}
// request is responsible for converting a generic fetch request into a bal
// one and sending it to the remote peer for fulfillment.
func (q *balQueue) request(peer *peerConnection, req *fetchRequest, resCh chan *eth.Response) (*eth.Request, error) {
peer.log.Trace("Requesting new batch of bals", "count", len(req.Headers), "from", req.Headers[0].Number)
if q.balFetchHook != nil {
q.balFetchHook(req.Headers)
}
var (
hashes = make([]common.Hash, 0, len(req.Headers))
)
for _, header := range req.Headers {
hashes = append(hashes, header.Hash())
}
return peer.peer.RequestBALs(hashes, resCh)
}
// deliver is responsible for taking a generic response packet from the concurrent
// fetcher, unpacking the bal data and delivering it to the downloader's queue.
func (q *balQueue) deliver(peer *peerConnection, packet *eth.Response) (int, error) {
bals := *packet.Res.(*eth.BlockAccessListResponse)
hashes := packet.Meta.([]common.Hash) // {bal hashes}
accepted, err := q.queue.DeliverBALs(peer.id, bals, hashes)
switch {
case err == nil && len(bals) == 0:
peer.log.Trace("Requested bals delivered")
case err == nil:
peer.log.Trace("Delivered new batch of bals", "count", len(bals), "accepted", accepted)
default:
peer.log.Debug("Failed to deliver retrieved bals", "err", err)
}
return accepted, err
}

View file

@ -37,10 +37,5 @@ var (
receiptDropMeter = metrics.NewRegisteredMeter("eth/downloader/receipts/drop", nil)
receiptTimeoutMeter = metrics.NewRegisteredMeter("eth/downloader/receipts/timeout", nil)
balInMeter = metrics.NewRegisteredMeter("eth/downloader/bals/in", nil)
balReqTimer = metrics.NewRegisteredTimer("eth/downloader/bals/req", nil)
balDropMeter = metrics.NewRegisteredMeter("eth/downloader/bals/drop", nil)
balTimeoutMeter = metrics.NewRegisteredMeter("eth/downloader/bals/timeout", nil)
throttleCounter = metrics.NewRegisteredCounter("eth/downloader/throttle", nil)
)

View file

@ -61,7 +61,6 @@ type Peer interface {
RequestBodies([]common.Hash, chan *eth.Response) (*eth.Request, error)
RequestReceipts([]common.Hash, []uint64, []uint64, chan *eth.Response) (*eth.Request, error)
RequestBALs([]common.Hash, chan *eth.Response) (*eth.Request, error)
}
// newPeerConnection creates a new downloader peer.
@ -101,12 +100,6 @@ func (p *peerConnection) UpdateReceiptRate(delivered int, elapsed time.Duration)
p.rates.Update(eth.ReceiptsMsg, elapsed, delivered)
}
// UpdateBALRate updates the peer's estimated bal retrieval throughput
// with the current measurement.
func (p *peerConnection) UpdateBALRate(delivered int, elapsed time.Duration) {
p.rates.Update(eth.ReceiptsMsg, elapsed, delivered)
}
// HeaderCapacity retrieves the peer's header download allowance based on its
// previously discovered throughput.
func (p *peerConnection) HeaderCapacity(targetRTT time.Duration) int {
@ -137,16 +130,6 @@ func (p *peerConnection) ReceiptCapacity(targetRTT time.Duration) int {
return cap
}
// BALCapacity retrieves the peers bal download allowance based on its
// previously discovered throughput.
func (p *peerConnection) BALCapacity(targetRTT time.Duration) int {
cap := p.rates.Capacity(eth.BlockAccessListsMsg, targetRTT)
if cap > MaxBALFetch {
cap = MaxBALFetch
}
return cap
}
// MarkLacking appends a new entity to the set of items (blocks, receipts, states)
// that a peer is known not to have (i.e. have been requested before). If the
// set reaches its maximum allowed capacity, items are randomly dropped off.

View file

@ -29,7 +29,6 @@ import (
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/common/prque"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/core/types/bal"
"github.com/ethereum/go-ethereum/eth/ethconfig"
"github.com/ethereum/go-ethereum/eth/protocols/eth"
"github.com/ethereum/go-ethereum/log"
@ -40,7 +39,6 @@ import (
const (
bodyType = uint(0)
receiptType = uint(1)
balType = uint(2)
)
var (
@ -73,10 +71,9 @@ type fetchResult struct {
Transactions types.Transactions
Receipts rlp.RawValue
Withdrawals types.Withdrawals
BAL *bal.BlockAccessList
}
func newFetchResult(header *types.Header, snapSync bool, fetchBAL bool) *fetchResult {
func newFetchResult(header *types.Header, snapSync bool) *fetchResult {
item := &fetchResult{
Header: header,
}
@ -92,10 +89,6 @@ func newFetchResult(header *types.Header, snapSync bool, fetchBAL bool) *fetchRe
} else {
item.pending.Store(item.pending.Load() | (1 << receiptType))
}
} else {
if header.BlockAccessListHash != nil && fetchBAL {
item.pending.Store(item.pending.Load() | (1 << balType))
}
}
return item
}
@ -128,13 +121,6 @@ func (f *fetchResult) SetReceiptsDone() {
}
}
// SetBALsDone flags the bals as finished.
func (f *fetchResult) SetBALDone() {
if v := f.pending.Load(); (v & (1 << balType)) != 0 {
f.pending.Add(-4)
}
}
// Done checks if the given type is done already
func (f *fetchResult) Done(kind uint) bool {
v := f.pending.Load()
@ -157,13 +143,6 @@ type queue struct {
receiptPendPool map[string]*fetchRequest // Currently pending receipt retrieval operations
receiptWakeCh chan bool // Channel to notify when receipt fetcher of new tasks
balTaskPool map[common.Hash]*types.Header // Pending bal retrieval tasks, mapping hashes to headers
balTaskQueue *prque.Prque[int64, *types.Header] // Priority queue of the headers to fetch the bals for
balPendPool map[string]*fetchRequest // Currently pending bal retrieval operations
balWakeCh chan bool // Channel to notify when bal fetcher of new tasks
fetchBAL bool // Whether to fetch BALs (only for small sync gaps)
resultCache *resultStore // Downloaded but not yet delivered fetch results
resultSize common.StorageSize // Approximate size of a block (exponential moving average)
@ -182,8 +161,6 @@ func newQueue(blockCacheLimit int, thresholdInitialSize int) *queue {
blockWakeCh: make(chan bool, 1),
receiptTaskQueue: prque.New[int64, *types.Header](nil),
receiptWakeCh: make(chan bool, 1),
balTaskQueue: prque.New[int64, *types.Header](nil),
balWakeCh: make(chan bool, 1),
active: sync.NewCond(lock),
lock: lock,
}
@ -208,10 +185,6 @@ func (q *queue) Reset(blockCacheLimit int, thresholdInitialSize int) {
q.receiptTaskQueue.Reset()
q.receiptPendPool = make(map[string]*fetchRequest)
q.balTaskPool = make(map[common.Hash]*types.Header)
q.balTaskQueue.Reset()
q.balPendPool = make(map[string]*fetchRequest)
q.resultCache = newResultStore(blockCacheLimit)
q.resultCache.SetThrottleThreshold(uint64(thresholdInitialSize))
}
@ -241,14 +214,6 @@ func (q *queue) PendingReceipts() int {
return q.receiptTaskQueue.Size()
}
// PendingBALs retrieves the number of bals pending for retrieval.
func (q *queue) PendingBALs() int {
q.lock.Lock()
defer q.lock.Unlock()
return q.balTaskQueue.Size()
}
// InFlightBlocks retrieves whether there are block fetch requests currently in
// flight.
func (q *queue) InFlightBlocks() bool {
@ -267,22 +232,13 @@ func (q *queue) InFlightReceipts() bool {
return len(q.receiptPendPool) > 0
}
// InFlightBALs retrieves whether there are bal fetch requests currently
// in flight.
func (q *queue) InFlightBALs() bool {
q.lock.Lock()
defer q.lock.Unlock()
return len(q.balPendPool) > 0
}
// Idle returns if the queue is fully idle or has some data still inside.
func (q *queue) Idle() bool {
q.lock.Lock()
defer q.lock.Unlock()
queued := q.blockTaskQueue.Size() + q.receiptTaskQueue.Size() + q.balTaskQueue.Size()
pending := len(q.blockPendPool) + len(q.receiptPendPool) + len(q.balPendPool)
queued := q.blockTaskQueue.Size() + q.receiptTaskQueue.Size()
pending := len(q.blockPendPool) + len(q.receiptPendPool)
return (queued + pending) == 0
}
@ -324,14 +280,6 @@ func (q *queue) Schedule(headers []*types.Header, hashes []common.Hash, from uin
q.receiptTaskQueue.Push(header, -int64(header.Number.Uint64()))
}
}
if q.mode == ethconfig.FullSync && q.fetchBAL && header.BlockAccessListHash != nil {
if _, ok := q.balTaskPool[hash]; ok {
log.Warn("Header already scheduled for BAL fetch", "number", header.Number, "hash", hash)
} else {
q.balTaskPool[hash] = header
q.balTaskQueue.Push(header, -int64(header.Number.Uint64()))
}
}
inserts++
q.headerHead = hash
from++
@ -380,7 +328,6 @@ func (q *queue) Results(block bool) []*fetchResult {
size += common.StorageSize(tx.Size())
}
size += common.StorageSize(result.Withdrawals.Size())
size += common.StorageSize(result.BAL.EncodedSize())
q.resultSize = common.StorageSize(blockCacheSizeWeight)*size +
(1-common.StorageSize(blockCacheSizeWeight))*q.resultSize
}
@ -390,7 +337,7 @@ func (q *queue) Results(block bool) []*fetchResult {
throttleThreshold = q.resultCache.SetThrottleThreshold(throttleThreshold)
// With results removed from the cache, wake throttled fetchers
for _, ch := range []chan bool{q.blockWakeCh, q.receiptWakeCh, q.balWakeCh} {
for _, ch := range []chan bool{q.blockWakeCh, q.receiptWakeCh} {
select {
case ch <- true:
default:
@ -418,7 +365,6 @@ func (q *queue) stats() []interface{} {
return []interface{}{
"receiptTasks", q.receiptTaskQueue.Size(),
"blockTasks", q.blockTaskQueue.Size(),
"balTasks", q.balTaskQueue.Size(),
"itemSize", q.resultSize,
}
}
@ -443,13 +389,6 @@ func (q *queue) ReserveReceipts(p *peerConnection, count int) (*fetchRequest, bo
return q.reserveHeaders(p, count, q.receiptTaskPool, q.receiptTaskQueue, q.receiptPendPool, receiptType)
}
func (q *queue) ReserveBALs(p *peerConnection, count int) (*fetchRequest, bool, bool) {
q.lock.Lock()
defer q.lock.Unlock()
return q.reserveHeaders(p, count, q.balTaskPool, q.balTaskQueue, q.balPendPool, balType)
}
// reserveHeaders reserves a set of data download operations for a given peer,
// skipping any previously failed ones. This method is a generic version used
// by the individual special reservation functions.
@ -485,7 +424,8 @@ func (q *queue) reserveHeaders(p *peerConnection, count int, taskPool map[common
// we can ask the resultcache if this header is within the
// "prioritized" segment of blocks. If it is not, we need to throttle
stale, throttle, item, err := q.resultCache.AddFetch(header, q.mode == ethconfig.SnapSync, q.fetchBAL)
stale, throttle, item, err := q.resultCache.AddFetch(header, q.mode == ethconfig.SnapSync)
if stale {
// Don't put back in the task queue, this item has already been
// delivered upstream
@ -565,12 +505,6 @@ func (q *queue) Revoke(peerID string) {
}
delete(q.receiptPendPool, peerID)
}
if request, ok := q.balPendPool[peerID]; ok {
for _, header := range request.Headers {
q.balTaskQueue.Push(header, -int64(header.Number.Uint64()))
}
delete(q.balPendPool, peerID)
}
}
// ExpireBodies checks for in flight block body requests that exceeded a timeout
@ -593,16 +527,6 @@ func (q *queue) ExpireReceipts(peer string) int {
return q.expire(peer, q.receiptPendPool, q.receiptTaskQueue)
}
// ExpireBALs checks for in flight bal requests that exceeded a timeout
// allowance, canceling them and returning the responsible peers for penalisation.
func (q *queue) ExpireBALs(peer string) int {
q.lock.Lock()
defer q.lock.Unlock()
balTimeoutMeter.Mark(1)
return q.expire(peer, q.balPendPool, q.balTaskPool)
}
// expire is the generic check that moves a specific expired task from a pending
// pool back into a task pool. The syntax on the passed taskQueue is a bit weird
// as we would need a generic expire method to handle both types, but that is not
@ -718,31 +642,6 @@ func (q *queue) DeliverReceipts(id string, receiptList []rlp.RawValue, receiptLi
receiptReqTimer, receiptInMeter, receiptDropMeter, len(receiptList), validate, reconstruct)
}
func (q *queue) DeliverBALs(id string, rawBALs []eth.RawBlockAccessList, balHashes []common.Hash) (int, error) {
q.lock.Lock()
defer q.lock.Unlock()
var bals []*bal.BlockAccessList
validate := func(index int, header *types.Header) error {
if balHashes[index] != *header.BlockAccessListHash {
return errInvalidBAL
}
b, err := rawBALs[index].Items()
if err != nil {
return fmt.Errorf("%w: bad bal: %v", errInvalidBAL, err)
}
blockBAL := bal.BlockAccessList(b)
bals = append(bals, &blockBAL)
return nil
}
reconstruct := func(index int, result *fetchResult) {
result.BAL = bals[index]
result.SetBALDone()
}
return q.deliver(id, q.balTaskPool, q.balTaskQueue, q.balPendPool,
balReqTimer, balInMeter, balDropMeter, len(rawBALs), validate, reconstruct)
}
// deliver injects a data retrieval response into the results queue.
//
// Note, this method expects the queue lock to be already held for writing. The
@ -826,12 +725,11 @@ func (q *queue) deliver(id string, taskPool map[common.Hash]*types.Header,
// Prepare configures the result cache to allow accepting and caching inbound
// fetch results.
func (q *queue) Prepare(offset uint64, mode SyncMode, fetchBAL bool) {
func (q *queue) Prepare(offset uint64, mode SyncMode) {
q.lock.Lock()
defer q.lock.Unlock()
// Prepare the queue for sync results
q.resultCache.Prepare(offset)
q.mode = mode
q.fetchBAL = fetchBAL
}

View file

@ -103,7 +103,7 @@ func TestBasics(t *testing.T) {
if !q.Idle() {
t.Errorf("new queue should be idle")
}
q.Prepare(1, SnapSync, false)
q.Prepare(1, SnapSync)
if res := q.Results(false); len(res) != 0 {
t.Fatal("new queue should have 0 results")
}
@ -200,7 +200,7 @@ func TestEmptyBlocks(t *testing.T) {
q := newQueue(10, 10)
q.Prepare(1, SnapSync, false)
q.Prepare(1, SnapSync)
// Schedule a batch of headers
headers := emptyChain.headers()
@ -279,7 +279,7 @@ func XTestDelivery(t *testing.T) {
}
q := newQueue(10, 10)
var wg sync.WaitGroup
q.Prepare(1, SnapSync, false)
q.Prepare(1, SnapSync)
wg.Add(1)
go func() {
// deliver headers

View file

@ -76,7 +76,7 @@ func (r *resultStore) SetThrottleThreshold(threshold uint64) uint64 {
// throttled - if true, the store is at capacity, this particular header is not prio now
// item - the result to store data into
// err - any error that occurred
func (r *resultStore) AddFetch(header *types.Header, snapSync bool, fetchBAL bool) (stale, throttled bool, item *fetchResult, err error) {
func (r *resultStore) AddFetch(header *types.Header, snapSync bool) (stale, throttled bool, item *fetchResult, err error) {
r.lock.Lock()
defer r.lock.Unlock()
@ -86,7 +86,7 @@ func (r *resultStore) AddFetch(header *types.Header, snapSync bool, fetchBAL boo
return stale, throttled, item, err
}
if item == nil {
item = newFetchResult(header, snapSync, fetchBAL)
item = newFetchResult(header, snapSync)
r.items[index] = item
}
return stale, throttled, item, err

View file

@ -212,10 +212,6 @@ func (p *skeletonTestPeer) RequestReceipts([]common.Hash, []uint64, []uint64, ch
panic("skeleton sync must not request receipts")
}
func (p *skeletonTestPeer) RequestBALs([]common.Hash, chan *eth.Response) (*eth.Request, error) {
panic("skeleton sync must not request block access lists")
}
// Tests various sync initializations based on previous leftovers in the database
// and announced heads.
func TestSkeletonSyncInit(t *testing.T) {