diff --git a/core/blockchain_reader.go b/core/blockchain_reader.go index 18afa9ce9d..a540bbc11d 100644 --- a/core/blockchain_reader.go +++ b/core/blockchain_reader.go @@ -296,6 +296,7 @@ func (bc *BlockChain) GetReceiptsRLP(hash common.Hash) rlp.RawValue { return rawdb.ReadReceiptsRLP(bc.db, hash, number) } +// GetAccessListRLP retrieves the block access list of a block in RLP encoding. func (bc *BlockChain) GetAccessListRLP(hash common.Hash) rlp.RawValue { number, ok := rawdb.ReadHeaderNumber(bc.db, hash) if !ok { diff --git a/core/types/bal/bal_encoding.go b/core/types/bal/bal_encoding.go index 03f97f3809..412d6185c9 100644 --- a/core/types/bal/bal_encoding.go +++ b/core/types/bal/bal_encoding.go @@ -104,6 +104,20 @@ func (e *BlockAccessList) Hash() common.Hash { return crypto.Keccak256Hash(enc.Bytes()) } +// EncodedSize returns the size of the RLP-encoded block access list. It is +// used by the downloader to estimate cache footprint of fetched results. +// Returns 0 for a nil receiver to keep size accounting code branch-free. +func (e *BlockAccessList) EncodedSize() int { + if e == nil { + return 0 + } + var enc bytes.Buffer + if err := e.EncodeRLP(&enc); err != nil { + return 0 + } + return enc.Len() +} + // encodingBalanceChange is the encoding format of BalanceChange. type encodingBalanceChange struct { TxIdx uint32 diff --git a/eth/downloader/downloader.go b/eth/downloader/downloader.go index 1de0933842..457df86ded 100644 --- a/eth/downloader/downloader.go +++ b/eth/downloader/downloader.go @@ -44,6 +44,7 @@ var ( MaxBlockFetch = 128 // Number of blocks to be fetched per retrieval request MaxHeaderFetch = 192 // Number of block headers to be fetched per retrieval request MaxReceiptFetch = 256 // Number of transaction receipts to allow fetching per request + MaxBALFetch = 128 // Number of transaction bals to allow fetching per request maxQueuedHeaders = 32 * 1024 // [eth/62] Maximum number of headers to queue for import (DOS protection) maxHeadersProcess = 2048 // Number of header download results to import at once into the chain @@ -52,6 +53,8 @@ var ( reorgProtHeaderDelay = 2 // Number of headers to delay delivering to cover mini reorgs + maxBALSyncGap uint64 = 128 // Maximum sync gap to enable BAL fetching + fsHeaderSafetyNet = 2048 // Number of headers to discard in case a chain violation is detected fsHeaderContCheck = 3 * time.Second // Time interval to check for header continuations during state download fsMinFullBlocks = 64 // Number of blocks to retrieve fully even in snap sync @@ -65,6 +68,7 @@ var ( errInvalidChain = errors.New("retrieved hash chain is invalid") errInvalidBody = errors.New("retrieved block body is invalid") errInvalidReceipt = errors.New("retrieved receipt is invalid") + errInvalidBAL = errors.New("retrieved bal is invalid") errCancelStateFetch = errors.New("state data download canceled (requested)") errCancelContentProcessing = errors.New("content processing canceled (requested)") errCanceled = errors.New("syncing canceled (requested)") @@ -153,6 +157,7 @@ type Downloader struct { // Testing hooks bodyFetchHook func([]*types.Header) // Method to call upon starting a block body fetch receiptFetchHook func([]*types.Header) // Method to call upon starting a receipt fetch + balFetchHook func([]*types.Header) // Method to call upon starting a bal fetch chainInsertHook func([]*fetchResult) // Method to call upon inserting a chain of blocks (possibly in multiple invocations) // Progress reporting metrics @@ -386,7 +391,7 @@ func (d *Downloader) synchronise(beaconPing chan struct{}) (err error) { d.queue.Reset(blockCacheMaxItems, blockCacheInitialItems) d.peers.Reset() - for _, ch := range []chan bool{d.queue.blockWakeCh, d.queue.receiptWakeCh} { + for _, ch := range []chan bool{d.queue.blockWakeCh, d.queue.receiptWakeCh, d.queue.balWakeCh} { select { case <-ch: default: @@ -581,14 +586,19 @@ func (d *Downloader) syncToHead() (err error) { log.Info("Skip chain segment before cutoff", "origin", origin, "cutoff", d.chainCutoffNumber) } } + // BAL fetching is only enabled for small sync gaps. Actual fetch + // is done only for blocks whose headers contain a BAL hash. + fetchBAL := mode == ethconfig.FullSync && height-origin <= maxBALSyncGap + // Initiate the sync using a concurrent header and content retrieval algorithm - d.queue.Prepare(chainOffset, mode) + d.queue.Prepare(chainOffset, mode, fetchBAL) // In beacon mode, headers are served by the skeleton syncer fetchers := []func() error{ func() error { return d.fetchHeaders(origin + 1) }, // Headers are always retrieved func() error { return d.fetchBodies(chainOffset) }, // Bodies are retrieved during normal and snap sync func() error { return d.fetchReceipts(chainOffset) }, // Receipts are retrieved during snap sync + func() error { return d.fetchBALs(chainOffset) }, func() error { return d.processHeaders(origin + 1) }, } if mode == ethconfig.SnapSync { @@ -700,6 +710,17 @@ func (d *Downloader) fetchReceipts(from uint64) error { return err } +// fetchReceipts iteratively downloads the scheduled bals, taking any +// available peers, reserving a chunk of bals for each, waiting for delivery +// and also periodically checking for timeouts. +func (d *Downloader) fetchBALs(from uint64) error { + log.Debug("Downloading bals", "origin", from) + err := d.concurrentFetch((*balQueue)(d)) + + log.Debug("BAL download terminated", "err", err) + return err +} + // processHeaders takes batches of retrieved headers from an input channel and // keeps processing and scheduling them into the header chain and downloader's // queue until the stream ends or a failure occurs. @@ -719,7 +740,7 @@ func (d *Downloader) processHeaders(origin uint64) error { // Terminate header processing if we synced up if task == nil || len(task.headers) == 0 { // Notify everyone that headers are fully processed - for _, ch := range []chan bool{d.queue.blockWakeCh, d.queue.receiptWakeCh} { + for _, ch := range []chan bool{d.queue.blockWakeCh, d.queue.receiptWakeCh, d.queue.balWakeCh} { select { case ch <- false: case <-d.cancelCh: @@ -798,7 +819,7 @@ func (d *Downloader) processHeaders(origin uint64) error { // Signal the downloader of the availability of new tasks if scheduled { - for _, ch := range []chan bool{d.queue.blockWakeCh, d.queue.receiptWakeCh} { + for _, ch := range []chan bool{d.queue.blockWakeCh, d.queue.receiptWakeCh, d.queue.balWakeCh} { select { case ch <- true: default: @@ -843,7 +864,11 @@ func (d *Downloader) importBlockResults(results []*fetchResult) error { ) blocks := make([]*types.Block, len(results)) for i, result := range results { - blocks[i] = types.NewBlockWithHeader(result.Header).WithBody(result.body()) + block := types.NewBlockWithHeader(result.Header).WithBody(result.body()) + if result.BAL != nil { + block = block.WithAccessList(result.BAL) + } + blocks[i] = block } // Downloaded blocks are always regarded as trusted after the // transition. Because the downloaded chain is guided by the diff --git a/eth/downloader/downloader_test.go b/eth/downloader/downloader_test.go index 6d5d159631..67b5cb282d 100644 --- a/eth/downloader/downloader_test.go +++ b/eth/downloader/downloader_test.go @@ -29,6 +29,7 @@ import ( "github.com/ethereum/go-ethereum/core" "github.com/ethereum/go-ethereum/core/rawdb" "github.com/ethereum/go-ethereum/core/types" + "github.com/ethereum/go-ethereum/crypto" "github.com/ethereum/go-ethereum/eth/ethconfig" "github.com/ethereum/go-ethereum/eth/protocols/eth" "github.com/ethereum/go-ethereum/eth/protocols/snap" @@ -304,6 +305,36 @@ func (dlp *downloadTesterPeer) RequestReceipts(hashes []common.Hash, gasUsed []u return res.Req, nil } +// RequestBALs constructs a getBlockAccessLists method associated with a +// particular peer in the download tester. The returned function can be used to +// retrieve batches of block access lists from the particularly requested peer. +func (dlp *downloadTesterPeer) RequestBALs(hashes []common.Hash, sink chan *eth.Response) (*eth.Request, error) { + bals := make(eth.BlockAccessListResponse, 0, len(hashes)) + respHashes := make([]common.Hash, 0, len(hashes)) + for _, hash := range hashes { + var entry eth.RawBlockAccessList + data := dlp.chain.GetAccessListRLP(hash) + if len(data) == 0 { + _ = entry.AppendRaw([]byte{0xC0}) + } else if err := rlp.DecodeBytes(data, &entry); err != nil { + return nil, err + } + bals = append(bals, entry) + respHashes = append(respHashes, crypto.Keccak256Hash(entry.Bytes())) + } + res := ð.Response{ + Req: ð.Request{Peer: dlp.id}, + Res: &bals, + Meta: respHashes, + Time: 1, + Done: make(chan error, 1), + } + go func() { + sink <- res + }() + return res.Req, nil +} + // ID retrieves the peer's unique identifier. func (dlp *downloadTesterPeer) ID() string { return dlp.id diff --git a/eth/downloader/fetchers_concurrent_bals.go b/eth/downloader/fetchers_concurrent_bals.go new file mode 100644 index 0000000000..141cbb4ccf --- /dev/null +++ b/eth/downloader/fetchers_concurrent_bals.go @@ -0,0 +1,106 @@ +// Copyright 2021 The go-ethereum Authors +// This file is part of the go-ethereum library. +// +// The go-ethereum library is free software: you can redistribute it and/or modify +// it under the terms of the GNU Lesser General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// The go-ethereum library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the go-ethereum library. If not, see . + +package downloader + +import ( + "time" + + "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/eth/protocols/eth" + "github.com/ethereum/go-ethereum/log" +) + +// balQueue implements typedQueue and is a type adapter between the generic +// concurrent fetcher and the downloader. +type balQueue Downloader + +// waker returns a notification channel that gets pinged in case more bal +// fetches have been queued up, so the fetcher might assign it to idle peers. +func (q *balQueue) waker() chan bool { + return q.queue.balWakeCh +} + +// pending returns the number of bal that are currently queued for fetching +// by the concurrent downloader. +func (q *balQueue) pending() int { + return q.queue.PendingBALs() +} + +// capacity is responsible for calculating how many bals a particular peer is +// estimated to be able to retrieve within the allotted round trip time. +func (q *balQueue) capacity(peer *peerConnection, rtt time.Duration) int { + return peer.BALCapacity(rtt) +} + +// updateCapacity is responsible for updating how many bals a particular peer +// is estimated to be able to retrieve in a unit time. +func (q *balQueue) updateCapacity(peer *peerConnection, items int, span time.Duration) { + peer.UpdateBALRate(items, span) +} + +// reserve is responsible for allocating a requested number of pending bals +// from the download queue to the specified peer. +func (q *balQueue) reserve(peer *peerConnection, items int) (*fetchRequest, bool, bool) { + return q.queue.ReserveBALs(peer, items) +} + +// unreserve is responsible for removing the current bal retrieval allocation +// assigned to a specific peer and placing it back into the pool to allow +// reassigning to some other peer. +func (q *balQueue) unreserve(peer string) int { + fails := q.queue.ExpireBALs(peer) + if fails > 2 { + log.Trace("BAL delivery timed out", "peer", peer) + } else { + log.Debug("BAL delivery stalling", "peer", peer) + } + return fails +} + +// request is responsible for converting a generic fetch request into a bal +// one and sending it to the remote peer for fulfillment. +func (q *balQueue) request(peer *peerConnection, req *fetchRequest, resCh chan *eth.Response) (*eth.Request, error) { + peer.log.Trace("Requesting new batch of bals", "count", len(req.Headers), "from", req.Headers[0].Number) + if q.balFetchHook != nil { + q.balFetchHook(req.Headers) + } + var ( + hashes = make([]common.Hash, 0, len(req.Headers)) + ) + for _, header := range req.Headers { + hashes = append(hashes, header.Hash()) + } + return peer.peer.RequestBALs(hashes, resCh) +} + +// deliver is responsible for taking a generic response packet from the concurrent +// fetcher, unpacking the bal data and delivering it to the downloader's queue. +func (q *balQueue) deliver(peer *peerConnection, packet *eth.Response) (int, error) { + bals := *packet.Res.(*eth.BlockAccessListResponse) + hashes := packet.Meta.([]common.Hash) // {bal hashes} + + accepted, err := q.queue.DeliverBALs(peer.id, bals, hashes) + switch { + case err == nil && len(bals) == 0: + peer.log.Trace("Requested bals delivered") + case err == nil: + peer.log.Trace("Delivered new batch of bals", "count", len(bals), "accepted", accepted) + default: + peer.log.Debug("Failed to deliver retrieved bals", "err", err) + } + return accepted, err +} diff --git a/eth/downloader/metrics.go b/eth/downloader/metrics.go index bfe80ddbf1..e0212dc1c7 100644 --- a/eth/downloader/metrics.go +++ b/eth/downloader/metrics.go @@ -37,5 +37,10 @@ var ( receiptDropMeter = metrics.NewRegisteredMeter("eth/downloader/receipts/drop", nil) receiptTimeoutMeter = metrics.NewRegisteredMeter("eth/downloader/receipts/timeout", nil) + balInMeter = metrics.NewRegisteredMeter("eth/downloader/bals/in", nil) + balReqTimer = metrics.NewRegisteredTimer("eth/downloader/bals/req", nil) + balDropMeter = metrics.NewRegisteredMeter("eth/downloader/bals/drop", nil) + balTimeoutMeter = metrics.NewRegisteredMeter("eth/downloader/bals/timeout", nil) + throttleCounter = metrics.NewRegisteredCounter("eth/downloader/throttle", nil) ) diff --git a/eth/downloader/peer.go b/eth/downloader/peer.go index d20bda69e9..520f29befe 100644 --- a/eth/downloader/peer.go +++ b/eth/downloader/peer.go @@ -61,6 +61,7 @@ type Peer interface { RequestBodies([]common.Hash, chan *eth.Response) (*eth.Request, error) RequestReceipts([]common.Hash, []uint64, []uint64, chan *eth.Response) (*eth.Request, error) + RequestBALs([]common.Hash, chan *eth.Response) (*eth.Request, error) } // newPeerConnection creates a new downloader peer. @@ -100,6 +101,12 @@ func (p *peerConnection) UpdateReceiptRate(delivered int, elapsed time.Duration) p.rates.Update(eth.ReceiptsMsg, elapsed, delivered) } +// UpdateBALRate updates the peer's estimated bal retrieval throughput +// with the current measurement. +func (p *peerConnection) UpdateBALRate(delivered int, elapsed time.Duration) { + p.rates.Update(eth.ReceiptsMsg, elapsed, delivered) +} + // HeaderCapacity retrieves the peer's header download allowance based on its // previously discovered throughput. func (p *peerConnection) HeaderCapacity(targetRTT time.Duration) int { @@ -130,6 +137,16 @@ func (p *peerConnection) ReceiptCapacity(targetRTT time.Duration) int { return cap } +// BALCapacity retrieves the peers bal download allowance based on its +// previously discovered throughput. +func (p *peerConnection) BALCapacity(targetRTT time.Duration) int { + cap := p.rates.Capacity(eth.BlockAccessListsMsg, targetRTT) + if cap > MaxBALFetch { + cap = MaxBALFetch + } + return cap +} + // MarkLacking appends a new entity to the set of items (blocks, receipts, states) // that a peer is known not to have (i.e. have been requested before). If the // set reaches its maximum allowed capacity, items are randomly dropped off. diff --git a/eth/downloader/queue.go b/eth/downloader/queue.go index dd17b7f1ed..c02076c59d 100644 --- a/eth/downloader/queue.go +++ b/eth/downloader/queue.go @@ -29,6 +29,7 @@ import ( "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/common/prque" "github.com/ethereum/go-ethereum/core/types" + "github.com/ethereum/go-ethereum/core/types/bal" "github.com/ethereum/go-ethereum/eth/ethconfig" "github.com/ethereum/go-ethereum/eth/protocols/eth" "github.com/ethereum/go-ethereum/log" @@ -39,6 +40,7 @@ import ( const ( bodyType = uint(0) receiptType = uint(1) + balType = uint(2) ) var ( @@ -71,9 +73,10 @@ type fetchResult struct { Transactions types.Transactions Receipts rlp.RawValue Withdrawals types.Withdrawals + BAL *bal.BlockAccessList } -func newFetchResult(header *types.Header, snapSync bool) *fetchResult { +func newFetchResult(header *types.Header, snapSync bool, fetchBAL bool) *fetchResult { item := &fetchResult{ Header: header, } @@ -89,6 +92,10 @@ func newFetchResult(header *types.Header, snapSync bool) *fetchResult { } else { item.pending.Store(item.pending.Load() | (1 << receiptType)) } + } else { + if header.BlockAccessListHash != nil && fetchBAL { + item.pending.Store(item.pending.Load() | (1 << balType)) + } } return item } @@ -121,6 +128,13 @@ func (f *fetchResult) SetReceiptsDone() { } } +// SetBALsDone flags the bals as finished. +func (f *fetchResult) SetBALDone() { + if v := f.pending.Load(); (v & (1 << balType)) != 0 { + f.pending.Add(-4) + } +} + // Done checks if the given type is done already func (f *fetchResult) Done(kind uint) bool { v := f.pending.Load() @@ -143,6 +157,13 @@ type queue struct { receiptPendPool map[string]*fetchRequest // Currently pending receipt retrieval operations receiptWakeCh chan bool // Channel to notify when receipt fetcher of new tasks + balTaskPool map[common.Hash]*types.Header // Pending bal retrieval tasks, mapping hashes to headers + balTaskQueue *prque.Prque[int64, *types.Header] // Priority queue of the headers to fetch the bals for + balPendPool map[string]*fetchRequest // Currently pending bal retrieval operations + balWakeCh chan bool // Channel to notify when bal fetcher of new tasks + + fetchBAL bool // Whether to fetch BALs (only for small sync gaps) + resultCache *resultStore // Downloaded but not yet delivered fetch results resultSize common.StorageSize // Approximate size of a block (exponential moving average) @@ -161,6 +182,8 @@ func newQueue(blockCacheLimit int, thresholdInitialSize int) *queue { blockWakeCh: make(chan bool, 1), receiptTaskQueue: prque.New[int64, *types.Header](nil), receiptWakeCh: make(chan bool, 1), + balTaskQueue: prque.New[int64, *types.Header](nil), + balWakeCh: make(chan bool, 1), active: sync.NewCond(lock), lock: lock, } @@ -185,6 +208,10 @@ func (q *queue) Reset(blockCacheLimit int, thresholdInitialSize int) { q.receiptTaskQueue.Reset() q.receiptPendPool = make(map[string]*fetchRequest) + q.balTaskPool = make(map[common.Hash]*types.Header) + q.balTaskQueue.Reset() + q.balPendPool = make(map[string]*fetchRequest) + q.resultCache = newResultStore(blockCacheLimit) q.resultCache.SetThrottleThreshold(uint64(thresholdInitialSize)) } @@ -214,6 +241,14 @@ func (q *queue) PendingReceipts() int { return q.receiptTaskQueue.Size() } +// PendingBALs retrieves the number of bals pending for retrieval. +func (q *queue) PendingBALs() int { + q.lock.Lock() + defer q.lock.Unlock() + + return q.balTaskQueue.Size() +} + // InFlightBlocks retrieves whether there are block fetch requests currently in // flight. func (q *queue) InFlightBlocks() bool { @@ -232,13 +267,22 @@ func (q *queue) InFlightReceipts() bool { return len(q.receiptPendPool) > 0 } +// InFlightBALs retrieves whether there are bal fetch requests currently +// in flight. +func (q *queue) InFlightBALs() bool { + q.lock.Lock() + defer q.lock.Unlock() + + return len(q.balPendPool) > 0 +} + // Idle returns if the queue is fully idle or has some data still inside. func (q *queue) Idle() bool { q.lock.Lock() defer q.lock.Unlock() - queued := q.blockTaskQueue.Size() + q.receiptTaskQueue.Size() - pending := len(q.blockPendPool) + len(q.receiptPendPool) + queued := q.blockTaskQueue.Size() + q.receiptTaskQueue.Size() + q.balTaskQueue.Size() + pending := len(q.blockPendPool) + len(q.receiptPendPool) + len(q.balPendPool) return (queued + pending) == 0 } @@ -280,6 +324,14 @@ func (q *queue) Schedule(headers []*types.Header, hashes []common.Hash, from uin q.receiptTaskQueue.Push(header, -int64(header.Number.Uint64())) } } + if q.mode == ethconfig.FullSync && q.fetchBAL && header.BlockAccessListHash != nil { + if _, ok := q.balTaskPool[hash]; ok { + log.Warn("Header already scheduled for BAL fetch", "number", header.Number, "hash", hash) + } else { + q.balTaskPool[hash] = header + q.balTaskQueue.Push(header, -int64(header.Number.Uint64())) + } + } inserts++ q.headerHead = hash from++ @@ -328,6 +380,7 @@ func (q *queue) Results(block bool) []*fetchResult { size += common.StorageSize(tx.Size()) } size += common.StorageSize(result.Withdrawals.Size()) + size += common.StorageSize(result.BAL.EncodedSize()) q.resultSize = common.StorageSize(blockCacheSizeWeight)*size + (1-common.StorageSize(blockCacheSizeWeight))*q.resultSize } @@ -337,7 +390,7 @@ func (q *queue) Results(block bool) []*fetchResult { throttleThreshold = q.resultCache.SetThrottleThreshold(throttleThreshold) // With results removed from the cache, wake throttled fetchers - for _, ch := range []chan bool{q.blockWakeCh, q.receiptWakeCh} { + for _, ch := range []chan bool{q.blockWakeCh, q.receiptWakeCh, q.balWakeCh} { select { case ch <- true: default: @@ -365,6 +418,7 @@ func (q *queue) stats() []interface{} { return []interface{}{ "receiptTasks", q.receiptTaskQueue.Size(), "blockTasks", q.blockTaskQueue.Size(), + "balTasks", q.balTaskQueue.Size(), "itemSize", q.resultSize, } } @@ -389,6 +443,13 @@ func (q *queue) ReserveReceipts(p *peerConnection, count int) (*fetchRequest, bo return q.reserveHeaders(p, count, q.receiptTaskPool, q.receiptTaskQueue, q.receiptPendPool, receiptType) } +func (q *queue) ReserveBALs(p *peerConnection, count int) (*fetchRequest, bool, bool) { + q.lock.Lock() + defer q.lock.Unlock() + + return q.reserveHeaders(p, count, q.balTaskPool, q.balTaskQueue, q.balPendPool, balType) +} + // reserveHeaders reserves a set of data download operations for a given peer, // skipping any previously failed ones. This method is a generic version used // by the individual special reservation functions. @@ -424,8 +485,7 @@ func (q *queue) reserveHeaders(p *peerConnection, count int, taskPool map[common // we can ask the resultcache if this header is within the // "prioritized" segment of blocks. If it is not, we need to throttle - - stale, throttle, item, err := q.resultCache.AddFetch(header, q.mode == ethconfig.SnapSync) + stale, throttle, item, err := q.resultCache.AddFetch(header, q.mode == ethconfig.SnapSync, q.fetchBAL) if stale { // Don't put back in the task queue, this item has already been // delivered upstream @@ -505,6 +565,12 @@ func (q *queue) Revoke(peerID string) { } delete(q.receiptPendPool, peerID) } + if request, ok := q.balPendPool[peerID]; ok { + for _, header := range request.Headers { + q.balTaskQueue.Push(header, -int64(header.Number.Uint64())) + } + delete(q.balPendPool, peerID) + } } // ExpireBodies checks for in flight block body requests that exceeded a timeout @@ -527,6 +593,16 @@ func (q *queue) ExpireReceipts(peer string) int { return q.expire(peer, q.receiptPendPool, q.receiptTaskQueue) } +// ExpireBALs checks for in flight bal requests that exceeded a timeout +// allowance, canceling them and returning the responsible peers for penalisation. +func (q *queue) ExpireBALs(peer string) int { + q.lock.Lock() + defer q.lock.Unlock() + + balTimeoutMeter.Mark(1) + return q.expire(peer, q.balPendPool, q.balTaskPool) +} + // expire is the generic check that moves a specific expired task from a pending // pool back into a task pool. The syntax on the passed taskQueue is a bit weird // as we would need a generic expire method to handle both types, but that is not @@ -642,6 +718,31 @@ func (q *queue) DeliverReceipts(id string, receiptList []rlp.RawValue, receiptLi receiptReqTimer, receiptInMeter, receiptDropMeter, len(receiptList), validate, reconstruct) } +func (q *queue) DeliverBALs(id string, rawBALs []eth.RawBlockAccessList, balHashes []common.Hash) (int, error) { + q.lock.Lock() + defer q.lock.Unlock() + + var bals []*bal.BlockAccessList + validate := func(index int, header *types.Header) error { + if balHashes[index] != *header.BlockAccessListHash { + return errInvalidBAL + } + b, err := rawBALs[index].Items() + if err != nil { + return fmt.Errorf("%w: bad bal: %v", errInvalidBAL, err) + } + blockBAL := bal.BlockAccessList(b) + bals = append(bals, &blockBAL) + return nil + } + reconstruct := func(index int, result *fetchResult) { + result.BAL = bals[index] + result.SetBALDone() + } + return q.deliver(id, q.balTaskPool, q.balTaskQueue, q.balPendPool, + balReqTimer, balInMeter, balDropMeter, len(rawBALs), validate, reconstruct) +} + // deliver injects a data retrieval response into the results queue. // // Note, this method expects the queue lock to be already held for writing. The @@ -725,11 +826,12 @@ func (q *queue) deliver(id string, taskPool map[common.Hash]*types.Header, // Prepare configures the result cache to allow accepting and caching inbound // fetch results. -func (q *queue) Prepare(offset uint64, mode SyncMode) { +func (q *queue) Prepare(offset uint64, mode SyncMode, fetchBAL bool) { q.lock.Lock() defer q.lock.Unlock() // Prepare the queue for sync results q.resultCache.Prepare(offset) q.mode = mode + q.fetchBAL = fetchBAL } diff --git a/eth/downloader/queue_test.go b/eth/downloader/queue_test.go index c7e8a0d1d6..82d75eeb3e 100644 --- a/eth/downloader/queue_test.go +++ b/eth/downloader/queue_test.go @@ -103,7 +103,7 @@ func TestBasics(t *testing.T) { if !q.Idle() { t.Errorf("new queue should be idle") } - q.Prepare(1, SnapSync) + q.Prepare(1, SnapSync, false) if res := q.Results(false); len(res) != 0 { t.Fatal("new queue should have 0 results") } @@ -200,7 +200,7 @@ func TestEmptyBlocks(t *testing.T) { q := newQueue(10, 10) - q.Prepare(1, SnapSync) + q.Prepare(1, SnapSync, false) // Schedule a batch of headers headers := emptyChain.headers() @@ -279,7 +279,7 @@ func XTestDelivery(t *testing.T) { } q := newQueue(10, 10) var wg sync.WaitGroup - q.Prepare(1, SnapSync) + q.Prepare(1, SnapSync, false) wg.Add(1) go func() { // deliver headers diff --git a/eth/downloader/resultstore.go b/eth/downloader/resultstore.go index 36c382fefc..e27df98b64 100644 --- a/eth/downloader/resultstore.go +++ b/eth/downloader/resultstore.go @@ -76,7 +76,7 @@ func (r *resultStore) SetThrottleThreshold(threshold uint64) uint64 { // throttled - if true, the store is at capacity, this particular header is not prio now // item - the result to store data into // err - any error that occurred -func (r *resultStore) AddFetch(header *types.Header, snapSync bool) (stale, throttled bool, item *fetchResult, err error) { +func (r *resultStore) AddFetch(header *types.Header, snapSync bool, fetchBAL bool) (stale, throttled bool, item *fetchResult, err error) { r.lock.Lock() defer r.lock.Unlock() @@ -86,7 +86,7 @@ func (r *resultStore) AddFetch(header *types.Header, snapSync bool) (stale, thro return stale, throttled, item, err } if item == nil { - item = newFetchResult(header, snapSync) + item = newFetchResult(header, snapSync, fetchBAL) r.items[index] = item } return stale, throttled, item, err diff --git a/eth/downloader/skeleton_test.go b/eth/downloader/skeleton_test.go index b9e7dba6ee..efe1b99e9f 100644 --- a/eth/downloader/skeleton_test.go +++ b/eth/downloader/skeleton_test.go @@ -212,6 +212,10 @@ func (p *skeletonTestPeer) RequestReceipts([]common.Hash, []uint64, []uint64, ch panic("skeleton sync must not request receipts") } +func (p *skeletonTestPeer) RequestBALs([]common.Hash, chan *eth.Response) (*eth.Request, error) { + panic("skeleton sync must not request block access lists") +} + // Tests various sync initializations based on previous leftovers in the database // and announced heads. func TestSkeletonSyncInit(t *testing.T) { diff --git a/eth/protocols/eth/handler.go b/eth/protocols/eth/handler.go index f7d25bd8ca..154b75130c 100644 --- a/eth/protocols/eth/handler.go +++ b/eth/protocols/eth/handler.go @@ -53,6 +53,9 @@ const ( // containing 200+ transactions nowadays, the practical limit will always // be softResponseLimit. maxReceiptsServe = 1024 + + // maxBALsServe is the maximum number of block access lists to serve. + maxBALsServe = 1024 ) // Handler is a callback to invoke from an outside runner after the boilerplate @@ -197,6 +200,22 @@ var eth70 = map[uint64]msgHandler{ BlockRangeUpdateMsg: handleBlockRangeUpdate, } +var eth71 = map[uint64]msgHandler{ + TransactionsMsg: handleTransactions, + NewPooledTransactionHashesMsg: handleNewPooledTransactionHashes, + GetBlockHeadersMsg: handleGetBlockHeaders, + BlockHeadersMsg: handleBlockHeaders, + GetBlockBodiesMsg: handleGetBlockBodies, + BlockBodiesMsg: handleBlockBodies, + GetReceiptsMsg: handleGetReceipts70, + ReceiptsMsg: handleReceipts70, + GetPooledTransactionsMsg: handleGetPooledTransactions, + PooledTransactionsMsg: handlePooledTransactions, + BlockRangeUpdateMsg: handleBlockRangeUpdate, + GetBlockAccessListsMsg: handleGetBlockAccessLists, + BlockAccessListsMsg: handleBlockAccessLists, +} + // handleMessage is invoked whenever an inbound message is received from a remote // peer. The remote connection is torn down upon returning any error. func handleMessage(backend Backend, peer *Peer) error { @@ -216,6 +235,8 @@ func handleMessage(backend Backend, peer *Peer) error { handlers = eth69 case ETH70: handlers = eth70 + case ETH71: + handlers = eth71 default: return fmt.Errorf("unknown eth protocol version: %v", peer.version) } diff --git a/eth/protocols/eth/handlers.go b/eth/protocols/eth/handlers.go index 7556df9af2..4638b01e0d 100644 --- a/eth/protocols/eth/handlers.go +++ b/eth/protocols/eth/handlers.go @@ -663,3 +663,66 @@ func handleBlockRangeUpdate(backend Backend, msg Decoder, peer *Peer) error { peer.lastRange.Store(&update) return nil } + +// handleGetBlockAccessLists serves a GetBlockAccessLists request. +func handleGetBlockAccessLists(backend Backend, msg Decoder, peer *Peer) error { + var query GetBlockAccessListsPacket + if err := msg.Decode(&query); err != nil { + return err + } + response := serviceGetBlockAccessListsQuery(backend.Chain(), query.GetBlockAccessListsRequest) + return peer.ReplyBlockAccessLists(query.RequestId, response) +} + +// serviceGetBlockAccessListsQuery assembles the response to a BAL query. +// Unavailable BALs are returned as empty list entries. +func serviceGetBlockAccessListsQuery(chain *core.BlockChain, query GetBlockAccessListsRequest) rlp.RawList[RawBlockAccessList] { + var ( + bytes int + bals rlp.RawList[RawBlockAccessList] + ) + for _, hash := range query { + if bytes >= softResponseLimit || bals.Len() >= maxBALsServe { + break + } + data := chain.GetAccessListRLP(hash) + if len(data) == 0 { + bals.AppendRaw([]byte{0xC0}) + continue + } + bals.AppendRaw(data) + bytes += len(data) + } + return bals +} + +// handleBlockAccessLists processes an incoming BlockAccessLists response, +// validates it against the request tracker, and dispatches it to the waiting caller. +func handleBlockAccessLists(backend Backend, msg Decoder, peer *Peer) error { + res := new(BlockAccessListPacket) + if err := msg.Decode(res); err != nil { + return err + } + tresp := tracker.Response{ID: res.RequestId, MsgCode: BlockAccessListsMsg, Size: res.List.Len()} + if err := peer.tracker.Fulfil(tresp); err != nil { + return fmt.Errorf("BlockAccessLists: %w", err) + } + bals, err := res.List.Items() + if err != nil { + return fmt.Errorf("BlockAccessLists: %w", err) + } + + metadata := func() interface{} { + hashes := make([]common.Hash, len(bals)) + for i := range bals { + hashes[i] = crypto.Keccak256Hash(bals[i].Bytes()) + } + return hashes + } + + return peer.dispatchResponse(&Response{ + id: res.RequestId, + code: BlockAccessListsMsg, + Res: (*BlockAccessListResponse)(&bals), + }, metadata) +} diff --git a/eth/protocols/eth/peer.go b/eth/protocols/eth/peer.go index 754fd02be3..2d7079fa12 100644 --- a/eth/protocols/eth/peer.go +++ b/eth/protocols/eth/peer.go @@ -251,6 +251,36 @@ func (p *Peer) ReplyReceiptsRLP70(id uint64, receipts rlp.RawList[*ReceiptList], }) } +// ReplyBlockAccessLists is the response to GetBlockAccessLists (EIP-8159). +func (p *Peer) ReplyBlockAccessLists(id uint64, list rlp.RawList[RawBlockAccessList]) error { + return p2p.Send(p.rw, BlockAccessListsMsg, &BlockAccessListPacket{ + RequestId: id, + List: list, + }) +} + +// RequestBALs fetches block access lists for the given block hashes (EIP-8159) +func (p *Peer) RequestBALs(hashes []common.Hash, sink chan *Response) (*Request, error) { + p.Log().Debug("Fetching block access lists", "count", len(hashes)) + id := rand.Uint64() + + req := &Request{ + id: id, + sink: sink, + code: GetBlockAccessListsMsg, + want: BlockAccessListsMsg, + numItems: len(hashes), + data: &GetBlockAccessListsPacket{ + RequestId: id, + GetBlockAccessListsRequest: hashes, + }, + } + if err := p.dispatchRequest(req); err != nil { + return nil, err + } + return req, nil +} + // RequestOneHeader is a wrapper around the header query functions to fetch a // single header. It is used solely by the fetcher. func (p *Peer) RequestOneHeader(hash common.Hash, sink chan *Response) (*Request, error) { diff --git a/eth/protocols/eth/protocol.go b/eth/protocols/eth/protocol.go index 0df0776c27..a6c45f83ec 100644 --- a/eth/protocols/eth/protocol.go +++ b/eth/protocols/eth/protocol.go @@ -24,6 +24,7 @@ import ( "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/core/forkid" "github.com/ethereum/go-ethereum/core/types" + "github.com/ethereum/go-ethereum/core/types/bal" "github.com/ethereum/go-ethereum/rlp" ) @@ -31,6 +32,7 @@ import ( const ( ETH69 = 69 ETH70 = 70 + ETH71 = 71 ) // ProtocolName is the official short name of the `eth` protocol used during @@ -39,11 +41,11 @@ const ProtocolName = "eth" // ProtocolVersions are the supported versions of the `eth` protocol (first // is primary). -var ProtocolVersions = []uint{ETH70, ETH69} +var ProtocolVersions = []uint{ETH71, ETH70, ETH69} // protocolLengths are the number of implemented message corresponding to // different protocol versions. -var protocolLengths = map[uint]uint64{ETH69: 18, ETH70: 18} +var protocolLengths = map[uint]uint64{ETH71: 20, ETH69: 18, ETH70: 18} // maxMessageSize is the maximum cap on the size of a protocol message. const maxMessageSize = 10 * 1024 * 1024 @@ -66,6 +68,8 @@ const ( GetReceiptsMsg = 0x0f ReceiptsMsg = 0x10 BlockRangeUpdateMsg = 0x11 + GetBlockAccessListsMsg = 0x12 + BlockAccessListsMsg = 0x13 ) var ( @@ -288,6 +292,24 @@ type BlockRangeUpdatePacket struct { LatestBlockHash common.Hash } +type GetBlockAccessListsRequest []common.Hash + +type GetBlockAccessListsPacket struct { + RequestId uint64 + GetBlockAccessListsRequest +} + +type RawBlockAccessList struct { + rlp.RawList[bal.AccountAccess] +} + +type BlockAccessListResponse []RawBlockAccessList + +type BlockAccessListPacket struct { + RequestId uint64 + List rlp.RawList[RawBlockAccessList] +} + func (*StatusPacket) Name() string { return "Status" } func (*StatusPacket) Kind() byte { return StatusMsg } @@ -326,3 +348,9 @@ func (*ReceiptsRLPResponse) Kind() byte { return ReceiptsMsg } func (*BlockRangeUpdatePacket) Name() string { return "BlockRangeUpdate" } func (*BlockRangeUpdatePacket) Kind() byte { return BlockRangeUpdateMsg } + +func (*GetBlockAccessListsRequest) Name() string { return "GetBlockAccessLists" } +func (*GetBlockAccessListsRequest) Kind() byte { return GetBlockAccessListsMsg } + +func (*BlockAccessListResponse) Name() string { return "BlockAccessLists" } +func (*BlockAccessListResponse) Kind() byte { return BlockAccessListsMsg }