core, eth: move eth71 changes on master

This commit is contained in:
healthykim 2026-05-05 11:22:27 +02:00
parent b9c5fe6d26
commit 35781edb8f
15 changed files with 466 additions and 19 deletions

View file

@ -296,6 +296,7 @@ func (bc *BlockChain) GetReceiptsRLP(hash common.Hash) rlp.RawValue {
return rawdb.ReadReceiptsRLP(bc.db, hash, number)
}
// GetAccessListRLP retrieves the block access list of a block in RLP encoding.
func (bc *BlockChain) GetAccessListRLP(hash common.Hash) rlp.RawValue {
number, ok := rawdb.ReadHeaderNumber(bc.db, hash)
if !ok {

View file

@ -104,6 +104,20 @@ func (e *BlockAccessList) Hash() common.Hash {
return crypto.Keccak256Hash(enc.Bytes())
}
// EncodedSize returns the size of the RLP-encoded block access list. It is
// used by the downloader to estimate cache footprint of fetched results.
// Returns 0 for a nil receiver to keep size accounting code branch-free.
func (e *BlockAccessList) EncodedSize() int {
if e == nil {
return 0
}
var enc bytes.Buffer
if err := e.EncodeRLP(&enc); err != nil {
return 0
}
return enc.Len()
}
// encodingBalanceChange is the encoding format of BalanceChange.
type encodingBalanceChange struct {
TxIdx uint32

View file

@ -44,6 +44,7 @@ var (
MaxBlockFetch = 128 // Number of blocks to be fetched per retrieval request
MaxHeaderFetch = 192 // Number of block headers to be fetched per retrieval request
MaxReceiptFetch = 256 // Number of transaction receipts to allow fetching per request
MaxBALFetch = 128 // Number of transaction bals to allow fetching per request
maxQueuedHeaders = 32 * 1024 // [eth/62] Maximum number of headers to queue for import (DOS protection)
maxHeadersProcess = 2048 // Number of header download results to import at once into the chain
@ -52,6 +53,8 @@ var (
reorgProtHeaderDelay = 2 // Number of headers to delay delivering to cover mini reorgs
maxBALSyncGap uint64 = 128 // Maximum sync gap to enable BAL fetching
fsHeaderSafetyNet = 2048 // Number of headers to discard in case a chain violation is detected
fsHeaderContCheck = 3 * time.Second // Time interval to check for header continuations during state download
fsMinFullBlocks = 64 // Number of blocks to retrieve fully even in snap sync
@ -65,6 +68,7 @@ var (
errInvalidChain = errors.New("retrieved hash chain is invalid")
errInvalidBody = errors.New("retrieved block body is invalid")
errInvalidReceipt = errors.New("retrieved receipt is invalid")
errInvalidBAL = errors.New("retrieved bal is invalid")
errCancelStateFetch = errors.New("state data download canceled (requested)")
errCancelContentProcessing = errors.New("content processing canceled (requested)")
errCanceled = errors.New("syncing canceled (requested)")
@ -153,6 +157,7 @@ type Downloader struct {
// Testing hooks
bodyFetchHook func([]*types.Header) // Method to call upon starting a block body fetch
receiptFetchHook func([]*types.Header) // Method to call upon starting a receipt fetch
balFetchHook func([]*types.Header) // Method to call upon starting a bal fetch
chainInsertHook func([]*fetchResult) // Method to call upon inserting a chain of blocks (possibly in multiple invocations)
// Progress reporting metrics
@ -386,7 +391,7 @@ func (d *Downloader) synchronise(beaconPing chan struct{}) (err error) {
d.queue.Reset(blockCacheMaxItems, blockCacheInitialItems)
d.peers.Reset()
for _, ch := range []chan bool{d.queue.blockWakeCh, d.queue.receiptWakeCh} {
for _, ch := range []chan bool{d.queue.blockWakeCh, d.queue.receiptWakeCh, d.queue.balWakeCh} {
select {
case <-ch:
default:
@ -581,14 +586,19 @@ func (d *Downloader) syncToHead() (err error) {
log.Info("Skip chain segment before cutoff", "origin", origin, "cutoff", d.chainCutoffNumber)
}
}
// BAL fetching is only enabled for small sync gaps. Actual fetch
// is done only for blocks whose headers contain a BAL hash.
fetchBAL := mode == ethconfig.FullSync && height-origin <= maxBALSyncGap
// Initiate the sync using a concurrent header and content retrieval algorithm
d.queue.Prepare(chainOffset, mode)
d.queue.Prepare(chainOffset, mode, fetchBAL)
// In beacon mode, headers are served by the skeleton syncer
fetchers := []func() error{
func() error { return d.fetchHeaders(origin + 1) }, // Headers are always retrieved
func() error { return d.fetchBodies(chainOffset) }, // Bodies are retrieved during normal and snap sync
func() error { return d.fetchReceipts(chainOffset) }, // Receipts are retrieved during snap sync
func() error { return d.fetchBALs(chainOffset) },
func() error { return d.processHeaders(origin + 1) },
}
if mode == ethconfig.SnapSync {
@ -700,6 +710,17 @@ func (d *Downloader) fetchReceipts(from uint64) error {
return err
}
// fetchReceipts iteratively downloads the scheduled bals, taking any
// available peers, reserving a chunk of bals for each, waiting for delivery
// and also periodically checking for timeouts.
func (d *Downloader) fetchBALs(from uint64) error {
log.Debug("Downloading bals", "origin", from)
err := d.concurrentFetch((*balQueue)(d))
log.Debug("BAL download terminated", "err", err)
return err
}
// processHeaders takes batches of retrieved headers from an input channel and
// keeps processing and scheduling them into the header chain and downloader's
// queue until the stream ends or a failure occurs.
@ -719,7 +740,7 @@ func (d *Downloader) processHeaders(origin uint64) error {
// Terminate header processing if we synced up
if task == nil || len(task.headers) == 0 {
// Notify everyone that headers are fully processed
for _, ch := range []chan bool{d.queue.blockWakeCh, d.queue.receiptWakeCh} {
for _, ch := range []chan bool{d.queue.blockWakeCh, d.queue.receiptWakeCh, d.queue.balWakeCh} {
select {
case ch <- false:
case <-d.cancelCh:
@ -798,7 +819,7 @@ func (d *Downloader) processHeaders(origin uint64) error {
// Signal the downloader of the availability of new tasks
if scheduled {
for _, ch := range []chan bool{d.queue.blockWakeCh, d.queue.receiptWakeCh} {
for _, ch := range []chan bool{d.queue.blockWakeCh, d.queue.receiptWakeCh, d.queue.balWakeCh} {
select {
case ch <- true:
default:
@ -843,7 +864,11 @@ func (d *Downloader) importBlockResults(results []*fetchResult) error {
)
blocks := make([]*types.Block, len(results))
for i, result := range results {
blocks[i] = types.NewBlockWithHeader(result.Header).WithBody(result.body())
block := types.NewBlockWithHeader(result.Header).WithBody(result.body())
if result.BAL != nil {
block = block.WithAccessList(result.BAL)
}
blocks[i] = block
}
// Downloaded blocks are always regarded as trusted after the
// transition. Because the downloaded chain is guided by the

View file

@ -29,6 +29,7 @@ import (
"github.com/ethereum/go-ethereum/core"
"github.com/ethereum/go-ethereum/core/rawdb"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/crypto"
"github.com/ethereum/go-ethereum/eth/ethconfig"
"github.com/ethereum/go-ethereum/eth/protocols/eth"
"github.com/ethereum/go-ethereum/eth/protocols/snap"
@ -304,6 +305,36 @@ func (dlp *downloadTesterPeer) RequestReceipts(hashes []common.Hash, gasUsed []u
return res.Req, nil
}
// RequestBALs constructs a getBlockAccessLists method associated with a
// particular peer in the download tester. The returned function can be used to
// retrieve batches of block access lists from the particularly requested peer.
func (dlp *downloadTesterPeer) RequestBALs(hashes []common.Hash, sink chan *eth.Response) (*eth.Request, error) {
bals := make(eth.BlockAccessListResponse, 0, len(hashes))
respHashes := make([]common.Hash, 0, len(hashes))
for _, hash := range hashes {
var entry eth.RawBlockAccessList
data := dlp.chain.GetAccessListRLP(hash)
if len(data) == 0 {
_ = entry.AppendRaw([]byte{0xC0})
} else if err := rlp.DecodeBytes(data, &entry); err != nil {
return nil, err
}
bals = append(bals, entry)
respHashes = append(respHashes, crypto.Keccak256Hash(entry.Bytes()))
}
res := &eth.Response{
Req: &eth.Request{Peer: dlp.id},
Res: &bals,
Meta: respHashes,
Time: 1,
Done: make(chan error, 1),
}
go func() {
sink <- res
}()
return res.Req, nil
}
// ID retrieves the peer's unique identifier.
func (dlp *downloadTesterPeer) ID() string {
return dlp.id

View file

@ -0,0 +1,106 @@
// Copyright 2021 The go-ethereum Authors
// This file is part of the go-ethereum library.
//
// The go-ethereum library is free software: you can redistribute it and/or modify
// it under the terms of the GNU Lesser General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// The go-ethereum library is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Lesser General Public License for more details.
//
// You should have received a copy of the GNU Lesser General Public License
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
package downloader
import (
"time"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/eth/protocols/eth"
"github.com/ethereum/go-ethereum/log"
)
// balQueue implements typedQueue and is a type adapter between the generic
// concurrent fetcher and the downloader.
type balQueue Downloader
// waker returns a notification channel that gets pinged in case more bal
// fetches have been queued up, so the fetcher might assign it to idle peers.
func (q *balQueue) waker() chan bool {
return q.queue.balWakeCh
}
// pending returns the number of bal that are currently queued for fetching
// by the concurrent downloader.
func (q *balQueue) pending() int {
return q.queue.PendingBALs()
}
// capacity is responsible for calculating how many bals a particular peer is
// estimated to be able to retrieve within the allotted round trip time.
func (q *balQueue) capacity(peer *peerConnection, rtt time.Duration) int {
return peer.BALCapacity(rtt)
}
// updateCapacity is responsible for updating how many bals a particular peer
// is estimated to be able to retrieve in a unit time.
func (q *balQueue) updateCapacity(peer *peerConnection, items int, span time.Duration) {
peer.UpdateBALRate(items, span)
}
// reserve is responsible for allocating a requested number of pending bals
// from the download queue to the specified peer.
func (q *balQueue) reserve(peer *peerConnection, items int) (*fetchRequest, bool, bool) {
return q.queue.ReserveBALs(peer, items)
}
// unreserve is responsible for removing the current bal retrieval allocation
// assigned to a specific peer and placing it back into the pool to allow
// reassigning to some other peer.
func (q *balQueue) unreserve(peer string) int {
fails := q.queue.ExpireBALs(peer)
if fails > 2 {
log.Trace("BAL delivery timed out", "peer", peer)
} else {
log.Debug("BAL delivery stalling", "peer", peer)
}
return fails
}
// request is responsible for converting a generic fetch request into a bal
// one and sending it to the remote peer for fulfillment.
func (q *balQueue) request(peer *peerConnection, req *fetchRequest, resCh chan *eth.Response) (*eth.Request, error) {
peer.log.Trace("Requesting new batch of bals", "count", len(req.Headers), "from", req.Headers[0].Number)
if q.balFetchHook != nil {
q.balFetchHook(req.Headers)
}
var (
hashes = make([]common.Hash, 0, len(req.Headers))
)
for _, header := range req.Headers {
hashes = append(hashes, header.Hash())
}
return peer.peer.RequestBALs(hashes, resCh)
}
// deliver is responsible for taking a generic response packet from the concurrent
// fetcher, unpacking the bal data and delivering it to the downloader's queue.
func (q *balQueue) deliver(peer *peerConnection, packet *eth.Response) (int, error) {
bals := *packet.Res.(*eth.BlockAccessListResponse)
hashes := packet.Meta.([]common.Hash) // {bal hashes}
accepted, err := q.queue.DeliverBALs(peer.id, bals, hashes)
switch {
case err == nil && len(bals) == 0:
peer.log.Trace("Requested bals delivered")
case err == nil:
peer.log.Trace("Delivered new batch of bals", "count", len(bals), "accepted", accepted)
default:
peer.log.Debug("Failed to deliver retrieved bals", "err", err)
}
return accepted, err
}

View file

@ -37,5 +37,10 @@ var (
receiptDropMeter = metrics.NewRegisteredMeter("eth/downloader/receipts/drop", nil)
receiptTimeoutMeter = metrics.NewRegisteredMeter("eth/downloader/receipts/timeout", nil)
balInMeter = metrics.NewRegisteredMeter("eth/downloader/bals/in", nil)
balReqTimer = metrics.NewRegisteredTimer("eth/downloader/bals/req", nil)
balDropMeter = metrics.NewRegisteredMeter("eth/downloader/bals/drop", nil)
balTimeoutMeter = metrics.NewRegisteredMeter("eth/downloader/bals/timeout", nil)
throttleCounter = metrics.NewRegisteredCounter("eth/downloader/throttle", nil)
)

View file

@ -61,6 +61,7 @@ type Peer interface {
RequestBodies([]common.Hash, chan *eth.Response) (*eth.Request, error)
RequestReceipts([]common.Hash, []uint64, []uint64, chan *eth.Response) (*eth.Request, error)
RequestBALs([]common.Hash, chan *eth.Response) (*eth.Request, error)
}
// newPeerConnection creates a new downloader peer.
@ -100,6 +101,12 @@ func (p *peerConnection) UpdateReceiptRate(delivered int, elapsed time.Duration)
p.rates.Update(eth.ReceiptsMsg, elapsed, delivered)
}
// UpdateBALRate updates the peer's estimated bal retrieval throughput
// with the current measurement.
func (p *peerConnection) UpdateBALRate(delivered int, elapsed time.Duration) {
p.rates.Update(eth.ReceiptsMsg, elapsed, delivered)
}
// HeaderCapacity retrieves the peer's header download allowance based on its
// previously discovered throughput.
func (p *peerConnection) HeaderCapacity(targetRTT time.Duration) int {
@ -130,6 +137,16 @@ func (p *peerConnection) ReceiptCapacity(targetRTT time.Duration) int {
return cap
}
// BALCapacity retrieves the peers bal download allowance based on its
// previously discovered throughput.
func (p *peerConnection) BALCapacity(targetRTT time.Duration) int {
cap := p.rates.Capacity(eth.BlockAccessListsMsg, targetRTT)
if cap > MaxBALFetch {
cap = MaxBALFetch
}
return cap
}
// MarkLacking appends a new entity to the set of items (blocks, receipts, states)
// that a peer is known not to have (i.e. have been requested before). If the
// set reaches its maximum allowed capacity, items are randomly dropped off.

View file

@ -29,6 +29,7 @@ import (
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/common/prque"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/core/types/bal"
"github.com/ethereum/go-ethereum/eth/ethconfig"
"github.com/ethereum/go-ethereum/eth/protocols/eth"
"github.com/ethereum/go-ethereum/log"
@ -39,6 +40,7 @@ import (
const (
bodyType = uint(0)
receiptType = uint(1)
balType = uint(2)
)
var (
@ -71,9 +73,10 @@ type fetchResult struct {
Transactions types.Transactions
Receipts rlp.RawValue
Withdrawals types.Withdrawals
BAL *bal.BlockAccessList
}
func newFetchResult(header *types.Header, snapSync bool) *fetchResult {
func newFetchResult(header *types.Header, snapSync bool, fetchBAL bool) *fetchResult {
item := &fetchResult{
Header: header,
}
@ -89,6 +92,10 @@ func newFetchResult(header *types.Header, snapSync bool) *fetchResult {
} else {
item.pending.Store(item.pending.Load() | (1 << receiptType))
}
} else {
if header.BlockAccessListHash != nil && fetchBAL {
item.pending.Store(item.pending.Load() | (1 << balType))
}
}
return item
}
@ -121,6 +128,13 @@ func (f *fetchResult) SetReceiptsDone() {
}
}
// SetBALsDone flags the bals as finished.
func (f *fetchResult) SetBALDone() {
if v := f.pending.Load(); (v & (1 << balType)) != 0 {
f.pending.Add(-4)
}
}
// Done checks if the given type is done already
func (f *fetchResult) Done(kind uint) bool {
v := f.pending.Load()
@ -143,6 +157,13 @@ type queue struct {
receiptPendPool map[string]*fetchRequest // Currently pending receipt retrieval operations
receiptWakeCh chan bool // Channel to notify when receipt fetcher of new tasks
balTaskPool map[common.Hash]*types.Header // Pending bal retrieval tasks, mapping hashes to headers
balTaskQueue *prque.Prque[int64, *types.Header] // Priority queue of the headers to fetch the bals for
balPendPool map[string]*fetchRequest // Currently pending bal retrieval operations
balWakeCh chan bool // Channel to notify when bal fetcher of new tasks
fetchBAL bool // Whether to fetch BALs (only for small sync gaps)
resultCache *resultStore // Downloaded but not yet delivered fetch results
resultSize common.StorageSize // Approximate size of a block (exponential moving average)
@ -161,6 +182,8 @@ func newQueue(blockCacheLimit int, thresholdInitialSize int) *queue {
blockWakeCh: make(chan bool, 1),
receiptTaskQueue: prque.New[int64, *types.Header](nil),
receiptWakeCh: make(chan bool, 1),
balTaskQueue: prque.New[int64, *types.Header](nil),
balWakeCh: make(chan bool, 1),
active: sync.NewCond(lock),
lock: lock,
}
@ -185,6 +208,10 @@ func (q *queue) Reset(blockCacheLimit int, thresholdInitialSize int) {
q.receiptTaskQueue.Reset()
q.receiptPendPool = make(map[string]*fetchRequest)
q.balTaskPool = make(map[common.Hash]*types.Header)
q.balTaskQueue.Reset()
q.balPendPool = make(map[string]*fetchRequest)
q.resultCache = newResultStore(blockCacheLimit)
q.resultCache.SetThrottleThreshold(uint64(thresholdInitialSize))
}
@ -214,6 +241,14 @@ func (q *queue) PendingReceipts() int {
return q.receiptTaskQueue.Size()
}
// PendingBALs retrieves the number of bals pending for retrieval.
func (q *queue) PendingBALs() int {
q.lock.Lock()
defer q.lock.Unlock()
return q.balTaskQueue.Size()
}
// InFlightBlocks retrieves whether there are block fetch requests currently in
// flight.
func (q *queue) InFlightBlocks() bool {
@ -232,13 +267,22 @@ func (q *queue) InFlightReceipts() bool {
return len(q.receiptPendPool) > 0
}
// InFlightBALs retrieves whether there are bal fetch requests currently
// in flight.
func (q *queue) InFlightBALs() bool {
q.lock.Lock()
defer q.lock.Unlock()
return len(q.balPendPool) > 0
}
// Idle returns if the queue is fully idle or has some data still inside.
func (q *queue) Idle() bool {
q.lock.Lock()
defer q.lock.Unlock()
queued := q.blockTaskQueue.Size() + q.receiptTaskQueue.Size()
pending := len(q.blockPendPool) + len(q.receiptPendPool)
queued := q.blockTaskQueue.Size() + q.receiptTaskQueue.Size() + q.balTaskQueue.Size()
pending := len(q.blockPendPool) + len(q.receiptPendPool) + len(q.balPendPool)
return (queued + pending) == 0
}
@ -280,6 +324,14 @@ func (q *queue) Schedule(headers []*types.Header, hashes []common.Hash, from uin
q.receiptTaskQueue.Push(header, -int64(header.Number.Uint64()))
}
}
if q.mode == ethconfig.FullSync && q.fetchBAL && header.BlockAccessListHash != nil {
if _, ok := q.balTaskPool[hash]; ok {
log.Warn("Header already scheduled for BAL fetch", "number", header.Number, "hash", hash)
} else {
q.balTaskPool[hash] = header
q.balTaskQueue.Push(header, -int64(header.Number.Uint64()))
}
}
inserts++
q.headerHead = hash
from++
@ -328,6 +380,7 @@ func (q *queue) Results(block bool) []*fetchResult {
size += common.StorageSize(tx.Size())
}
size += common.StorageSize(result.Withdrawals.Size())
size += common.StorageSize(result.BAL.EncodedSize())
q.resultSize = common.StorageSize(blockCacheSizeWeight)*size +
(1-common.StorageSize(blockCacheSizeWeight))*q.resultSize
}
@ -337,7 +390,7 @@ func (q *queue) Results(block bool) []*fetchResult {
throttleThreshold = q.resultCache.SetThrottleThreshold(throttleThreshold)
// With results removed from the cache, wake throttled fetchers
for _, ch := range []chan bool{q.blockWakeCh, q.receiptWakeCh} {
for _, ch := range []chan bool{q.blockWakeCh, q.receiptWakeCh, q.balWakeCh} {
select {
case ch <- true:
default:
@ -365,6 +418,7 @@ func (q *queue) stats() []interface{} {
return []interface{}{
"receiptTasks", q.receiptTaskQueue.Size(),
"blockTasks", q.blockTaskQueue.Size(),
"balTasks", q.balTaskQueue.Size(),
"itemSize", q.resultSize,
}
}
@ -389,6 +443,13 @@ func (q *queue) ReserveReceipts(p *peerConnection, count int) (*fetchRequest, bo
return q.reserveHeaders(p, count, q.receiptTaskPool, q.receiptTaskQueue, q.receiptPendPool, receiptType)
}
func (q *queue) ReserveBALs(p *peerConnection, count int) (*fetchRequest, bool, bool) {
q.lock.Lock()
defer q.lock.Unlock()
return q.reserveHeaders(p, count, q.balTaskPool, q.balTaskQueue, q.balPendPool, balType)
}
// reserveHeaders reserves a set of data download operations for a given peer,
// skipping any previously failed ones. This method is a generic version used
// by the individual special reservation functions.
@ -424,8 +485,7 @@ func (q *queue) reserveHeaders(p *peerConnection, count int, taskPool map[common
// we can ask the resultcache if this header is within the
// "prioritized" segment of blocks. If it is not, we need to throttle
stale, throttle, item, err := q.resultCache.AddFetch(header, q.mode == ethconfig.SnapSync)
stale, throttle, item, err := q.resultCache.AddFetch(header, q.mode == ethconfig.SnapSync, q.fetchBAL)
if stale {
// Don't put back in the task queue, this item has already been
// delivered upstream
@ -505,6 +565,12 @@ func (q *queue) Revoke(peerID string) {
}
delete(q.receiptPendPool, peerID)
}
if request, ok := q.balPendPool[peerID]; ok {
for _, header := range request.Headers {
q.balTaskQueue.Push(header, -int64(header.Number.Uint64()))
}
delete(q.balPendPool, peerID)
}
}
// ExpireBodies checks for in flight block body requests that exceeded a timeout
@ -527,6 +593,16 @@ func (q *queue) ExpireReceipts(peer string) int {
return q.expire(peer, q.receiptPendPool, q.receiptTaskQueue)
}
// ExpireBALs checks for in flight bal requests that exceeded a timeout
// allowance, canceling them and returning the responsible peers for penalisation.
func (q *queue) ExpireBALs(peer string) int {
q.lock.Lock()
defer q.lock.Unlock()
balTimeoutMeter.Mark(1)
return q.expire(peer, q.balPendPool, q.balTaskPool)
}
// expire is the generic check that moves a specific expired task from a pending
// pool back into a task pool. The syntax on the passed taskQueue is a bit weird
// as we would need a generic expire method to handle both types, but that is not
@ -642,6 +718,31 @@ func (q *queue) DeliverReceipts(id string, receiptList []rlp.RawValue, receiptLi
receiptReqTimer, receiptInMeter, receiptDropMeter, len(receiptList), validate, reconstruct)
}
func (q *queue) DeliverBALs(id string, rawBALs []eth.RawBlockAccessList, balHashes []common.Hash) (int, error) {
q.lock.Lock()
defer q.lock.Unlock()
var bals []*bal.BlockAccessList
validate := func(index int, header *types.Header) error {
if balHashes[index] != *header.BlockAccessListHash {
return errInvalidBAL
}
b, err := rawBALs[index].Items()
if err != nil {
return fmt.Errorf("%w: bad bal: %v", errInvalidBAL, err)
}
blockBAL := bal.BlockAccessList(b)
bals = append(bals, &blockBAL)
return nil
}
reconstruct := func(index int, result *fetchResult) {
result.BAL = bals[index]
result.SetBALDone()
}
return q.deliver(id, q.balTaskPool, q.balTaskQueue, q.balPendPool,
balReqTimer, balInMeter, balDropMeter, len(rawBALs), validate, reconstruct)
}
// deliver injects a data retrieval response into the results queue.
//
// Note, this method expects the queue lock to be already held for writing. The
@ -725,11 +826,12 @@ func (q *queue) deliver(id string, taskPool map[common.Hash]*types.Header,
// Prepare configures the result cache to allow accepting and caching inbound
// fetch results.
func (q *queue) Prepare(offset uint64, mode SyncMode) {
func (q *queue) Prepare(offset uint64, mode SyncMode, fetchBAL bool) {
q.lock.Lock()
defer q.lock.Unlock()
// Prepare the queue for sync results
q.resultCache.Prepare(offset)
q.mode = mode
q.fetchBAL = fetchBAL
}

View file

@ -103,7 +103,7 @@ func TestBasics(t *testing.T) {
if !q.Idle() {
t.Errorf("new queue should be idle")
}
q.Prepare(1, SnapSync)
q.Prepare(1, SnapSync, false)
if res := q.Results(false); len(res) != 0 {
t.Fatal("new queue should have 0 results")
}
@ -200,7 +200,7 @@ func TestEmptyBlocks(t *testing.T) {
q := newQueue(10, 10)
q.Prepare(1, SnapSync)
q.Prepare(1, SnapSync, false)
// Schedule a batch of headers
headers := emptyChain.headers()
@ -279,7 +279,7 @@ func XTestDelivery(t *testing.T) {
}
q := newQueue(10, 10)
var wg sync.WaitGroup
q.Prepare(1, SnapSync)
q.Prepare(1, SnapSync, false)
wg.Add(1)
go func() {
// deliver headers

View file

@ -76,7 +76,7 @@ func (r *resultStore) SetThrottleThreshold(threshold uint64) uint64 {
// throttled - if true, the store is at capacity, this particular header is not prio now
// item - the result to store data into
// err - any error that occurred
func (r *resultStore) AddFetch(header *types.Header, snapSync bool) (stale, throttled bool, item *fetchResult, err error) {
func (r *resultStore) AddFetch(header *types.Header, snapSync bool, fetchBAL bool) (stale, throttled bool, item *fetchResult, err error) {
r.lock.Lock()
defer r.lock.Unlock()
@ -86,7 +86,7 @@ func (r *resultStore) AddFetch(header *types.Header, snapSync bool) (stale, thro
return stale, throttled, item, err
}
if item == nil {
item = newFetchResult(header, snapSync)
item = newFetchResult(header, snapSync, fetchBAL)
r.items[index] = item
}
return stale, throttled, item, err

View file

@ -212,6 +212,10 @@ func (p *skeletonTestPeer) RequestReceipts([]common.Hash, []uint64, []uint64, ch
panic("skeleton sync must not request receipts")
}
func (p *skeletonTestPeer) RequestBALs([]common.Hash, chan *eth.Response) (*eth.Request, error) {
panic("skeleton sync must not request block access lists")
}
// Tests various sync initializations based on previous leftovers in the database
// and announced heads.
func TestSkeletonSyncInit(t *testing.T) {

View file

@ -53,6 +53,9 @@ const (
// containing 200+ transactions nowadays, the practical limit will always
// be softResponseLimit.
maxReceiptsServe = 1024
// maxBALsServe is the maximum number of block access lists to serve.
maxBALsServe = 1024
)
// Handler is a callback to invoke from an outside runner after the boilerplate
@ -197,6 +200,22 @@ var eth70 = map[uint64]msgHandler{
BlockRangeUpdateMsg: handleBlockRangeUpdate,
}
var eth71 = map[uint64]msgHandler{
TransactionsMsg: handleTransactions,
NewPooledTransactionHashesMsg: handleNewPooledTransactionHashes,
GetBlockHeadersMsg: handleGetBlockHeaders,
BlockHeadersMsg: handleBlockHeaders,
GetBlockBodiesMsg: handleGetBlockBodies,
BlockBodiesMsg: handleBlockBodies,
GetReceiptsMsg: handleGetReceipts70,
ReceiptsMsg: handleReceipts70,
GetPooledTransactionsMsg: handleGetPooledTransactions,
PooledTransactionsMsg: handlePooledTransactions,
BlockRangeUpdateMsg: handleBlockRangeUpdate,
GetBlockAccessListsMsg: handleGetBlockAccessLists,
BlockAccessListsMsg: handleBlockAccessLists,
}
// handleMessage is invoked whenever an inbound message is received from a remote
// peer. The remote connection is torn down upon returning any error.
func handleMessage(backend Backend, peer *Peer) error {
@ -216,6 +235,8 @@ func handleMessage(backend Backend, peer *Peer) error {
handlers = eth69
case ETH70:
handlers = eth70
case ETH71:
handlers = eth71
default:
return fmt.Errorf("unknown eth protocol version: %v", peer.version)
}

View file

@ -663,3 +663,66 @@ func handleBlockRangeUpdate(backend Backend, msg Decoder, peer *Peer) error {
peer.lastRange.Store(&update)
return nil
}
// handleGetBlockAccessLists serves a GetBlockAccessLists request.
func handleGetBlockAccessLists(backend Backend, msg Decoder, peer *Peer) error {
var query GetBlockAccessListsPacket
if err := msg.Decode(&query); err != nil {
return err
}
response := serviceGetBlockAccessListsQuery(backend.Chain(), query.GetBlockAccessListsRequest)
return peer.ReplyBlockAccessLists(query.RequestId, response)
}
// serviceGetBlockAccessListsQuery assembles the response to a BAL query.
// Unavailable BALs are returned as empty list entries.
func serviceGetBlockAccessListsQuery(chain *core.BlockChain, query GetBlockAccessListsRequest) rlp.RawList[RawBlockAccessList] {
var (
bytes int
bals rlp.RawList[RawBlockAccessList]
)
for _, hash := range query {
if bytes >= softResponseLimit || bals.Len() >= maxBALsServe {
break
}
data := chain.GetAccessListRLP(hash)
if len(data) == 0 {
bals.AppendRaw([]byte{0xC0})
continue
}
bals.AppendRaw(data)
bytes += len(data)
}
return bals
}
// handleBlockAccessLists processes an incoming BlockAccessLists response,
// validates it against the request tracker, and dispatches it to the waiting caller.
func handleBlockAccessLists(backend Backend, msg Decoder, peer *Peer) error {
res := new(BlockAccessListPacket)
if err := msg.Decode(res); err != nil {
return err
}
tresp := tracker.Response{ID: res.RequestId, MsgCode: BlockAccessListsMsg, Size: res.List.Len()}
if err := peer.tracker.Fulfil(tresp); err != nil {
return fmt.Errorf("BlockAccessLists: %w", err)
}
bals, err := res.List.Items()
if err != nil {
return fmt.Errorf("BlockAccessLists: %w", err)
}
metadata := func() interface{} {
hashes := make([]common.Hash, len(bals))
for i := range bals {
hashes[i] = crypto.Keccak256Hash(bals[i].Bytes())
}
return hashes
}
return peer.dispatchResponse(&Response{
id: res.RequestId,
code: BlockAccessListsMsg,
Res: (*BlockAccessListResponse)(&bals),
}, metadata)
}

View file

@ -251,6 +251,36 @@ func (p *Peer) ReplyReceiptsRLP70(id uint64, receipts rlp.RawList[*ReceiptList],
})
}
// ReplyBlockAccessLists is the response to GetBlockAccessLists (EIP-8159).
func (p *Peer) ReplyBlockAccessLists(id uint64, list rlp.RawList[RawBlockAccessList]) error {
return p2p.Send(p.rw, BlockAccessListsMsg, &BlockAccessListPacket{
RequestId: id,
List: list,
})
}
// RequestBALs fetches block access lists for the given block hashes (EIP-8159)
func (p *Peer) RequestBALs(hashes []common.Hash, sink chan *Response) (*Request, error) {
p.Log().Debug("Fetching block access lists", "count", len(hashes))
id := rand.Uint64()
req := &Request{
id: id,
sink: sink,
code: GetBlockAccessListsMsg,
want: BlockAccessListsMsg,
numItems: len(hashes),
data: &GetBlockAccessListsPacket{
RequestId: id,
GetBlockAccessListsRequest: hashes,
},
}
if err := p.dispatchRequest(req); err != nil {
return nil, err
}
return req, nil
}
// RequestOneHeader is a wrapper around the header query functions to fetch a
// single header. It is used solely by the fetcher.
func (p *Peer) RequestOneHeader(hash common.Hash, sink chan *Response) (*Request, error) {

View file

@ -24,6 +24,7 @@ import (
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/forkid"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/core/types/bal"
"github.com/ethereum/go-ethereum/rlp"
)
@ -31,6 +32,7 @@ import (
const (
ETH69 = 69
ETH70 = 70
ETH71 = 71
)
// ProtocolName is the official short name of the `eth` protocol used during
@ -39,11 +41,11 @@ const ProtocolName = "eth"
// ProtocolVersions are the supported versions of the `eth` protocol (first
// is primary).
var ProtocolVersions = []uint{ETH70, ETH69}
var ProtocolVersions = []uint{ETH71, ETH70, ETH69}
// protocolLengths are the number of implemented message corresponding to
// different protocol versions.
var protocolLengths = map[uint]uint64{ETH69: 18, ETH70: 18}
var protocolLengths = map[uint]uint64{ETH71: 20, ETH69: 18, ETH70: 18}
// maxMessageSize is the maximum cap on the size of a protocol message.
const maxMessageSize = 10 * 1024 * 1024
@ -66,6 +68,8 @@ const (
GetReceiptsMsg = 0x0f
ReceiptsMsg = 0x10
BlockRangeUpdateMsg = 0x11
GetBlockAccessListsMsg = 0x12
BlockAccessListsMsg = 0x13
)
var (
@ -288,6 +292,24 @@ type BlockRangeUpdatePacket struct {
LatestBlockHash common.Hash
}
type GetBlockAccessListsRequest []common.Hash
type GetBlockAccessListsPacket struct {
RequestId uint64
GetBlockAccessListsRequest
}
type RawBlockAccessList struct {
rlp.RawList[bal.AccountAccess]
}
type BlockAccessListResponse []RawBlockAccessList
type BlockAccessListPacket struct {
RequestId uint64
List rlp.RawList[RawBlockAccessList]
}
func (*StatusPacket) Name() string { return "Status" }
func (*StatusPacket) Kind() byte { return StatusMsg }
@ -326,3 +348,9 @@ func (*ReceiptsRLPResponse) Kind() byte { return ReceiptsMsg }
func (*BlockRangeUpdatePacket) Name() string { return "BlockRangeUpdate" }
func (*BlockRangeUpdatePacket) Kind() byte { return BlockRangeUpdateMsg }
func (*GetBlockAccessListsRequest) Name() string { return "GetBlockAccessLists" }
func (*GetBlockAccessListsRequest) Kind() byte { return GetBlockAccessListsMsg }
func (*BlockAccessListResponse) Name() string { return "BlockAccessLists" }
func (*BlockAccessListResponse) Kind() byte { return BlockAccessListsMsg }