eth/fetcher: pass BlobFetcher callbacks in struct

This commit is contained in:
Felix Lange 2026-05-14 20:16:18 +02:00
parent 7aa045c60c
commit 513a834d0a
3 changed files with 135 additions and 103 deletions

View file

@ -89,6 +89,13 @@ type fetchStatus struct {
blobCount int // Number of blobs in this tx (set on first delivery) blobCount int // Number of blobs in this tx (set on first delivery)
} }
type BlobFetcherFunctions struct {
HasPayload func(common.Hash) bool
AddCells func(common.Hash, map[string]*PeerCellDelivery, *types.CustodyBitmap) error
FetchPayloads func(string, []common.Hash, *types.CustodyBitmap) error
DropPeer func(string)
}
// BlobFetcher is responsible for managing type 3 transactions based on peer announcements. // BlobFetcher is responsible for managing type 3 transactions based on peer announcements.
// //
// BlobFetcher manages three buffers: // BlobFetcher manages three buffers:
@ -128,11 +135,7 @@ type BlobFetcher struct {
// todo simplify // todo simplify
alternates map[common.Hash]map[string]*types.CustodyBitmap // In-flight transaction alternate origins (in case the peer is dropped) alternates map[common.Hash]map[string]*types.CustodyBitmap // In-flight transaction alternate origins (in case the peer is dropped)
// Callbacks fn BlobFetcherFunctions // callbacks
hasPayload func(common.Hash) bool
addCells func(common.Hash, map[string]*PeerCellDelivery, *types.CustodyBitmap) error
fetchPayloads func(string, []common.Hash, *types.CustodyBitmap) error
dropPeer func(string)
step chan struct{} // Notification channel when the fetcher loop iterates step chan struct{} // Notification channel when the fetcher loop iterates
clock mclock.Clock // Monotonic clock or simulated clock for tests clock mclock.Clock // Monotonic clock or simulated clock for tests
@ -140,33 +143,26 @@ type BlobFetcher struct {
rand random // Randomizer rand random // Randomizer
} }
func NewBlobFetcher( func NewBlobFetcher(fn BlobFetcherFunctions, custody *types.CustodyBitmap, rand random) *BlobFetcher {
hasPayload func(common.Hash) bool,
addCells func(common.Hash, map[string]*PeerCellDelivery, *types.CustodyBitmap) error,
fetchPayloads func(string, []common.Hash, *types.CustodyBitmap) error, dropPeer func(string),
custody *types.CustodyBitmap, rand random) *BlobFetcher {
return &BlobFetcher{ return &BlobFetcher{
notify: make(chan *blobTxAnnounce), notify: make(chan *blobTxAnnounce),
cleanup: make(chan *payloadDelivery), cleanup: make(chan *payloadDelivery),
drop: make(chan *txDrop), drop: make(chan *txDrop),
quit: make(chan struct{}), quit: make(chan struct{}),
full: make(map[common.Hash]struct{}), full: make(map[common.Hash]struct{}),
partial: make(map[common.Hash]struct{}), partial: make(map[common.Hash]struct{}),
waitlist: make(map[common.Hash]map[string]struct{}), waitlist: make(map[common.Hash]map[string]struct{}),
waittime: make(map[common.Hash]mclock.AbsTime), waittime: make(map[common.Hash]mclock.AbsTime),
waitslots: make(map[string]map[common.Hash]struct{}), waitslots: make(map[string]map[common.Hash]struct{}),
announces: make(map[string]map[common.Hash]*cellWithSeq), announces: make(map[string]map[common.Hash]*cellWithSeq),
fetches: make(map[common.Hash]*fetchStatus), fetches: make(map[common.Hash]*fetchStatus),
requests: make(map[string][]*cellRequest), requests: make(map[string][]*cellRequest),
alternates: make(map[common.Hash]map[string]*types.CustodyBitmap), alternates: make(map[common.Hash]map[string]*types.CustodyBitmap),
hasPayload: hasPayload, fn: fn,
addCells: addCells, custody: custody,
fetchPayloads: fetchPayloads, clock: mclock.System{},
dropPeer: dropPeer, realTime: time.Now,
custody: custody, rand: rand,
clock: mclock.System{},
realTime: time.Now,
rand: rand,
} }
} }
@ -175,7 +171,7 @@ func (f *BlobFetcher) Notify(peer string, txs []common.Hash, cells types.Custody
blobAnnounceInMeter.Mark(int64(len(txs))) blobAnnounceInMeter.Mark(int64(len(txs)))
anns := make([]common.Hash, 0) anns := make([]common.Hash, 0)
for _, tx := range txs { for _, tx := range txs {
if f.hasPayload(tx) { if f.fn.HasPayload(tx) {
continue continue
} }
anns = append(anns, tx) anns = append(anns, tx)
@ -521,7 +517,7 @@ func (f *BlobFetcher) loop() {
blobFetcherFetchTime.Update(int64(time.Duration(f.clock.Now() - request.time))) blobFetcherFetchTime.Update(int64(time.Duration(f.clock.Now() - request.time)))
status := f.fetches[hash] status := f.fetches[hash]
collectedCustody := types.NewCustodyBitmap(status.fetched) collectedCustody := types.NewCustodyBitmap(status.fetched)
f.addCells(hash, status.deliveries, &collectedCustody) f.fn.AddCells(hash, status.deliveries, &collectedCustody)
for peer, txset := range f.announces { for peer, txset := range f.announces {
delete(txset, hash) delete(txset, hash)
@ -770,7 +766,7 @@ func (f *BlobFetcher) scheduleFetches(timer *mclock.Timer, timeout chan struct{}
go func(peer string, request []*cellRequest) { go func(peer string, request []*cellRequest) {
for _, req := range request { for _, req := range request {
blobRequestOutMeter.Mark(int64(len(req.txs))) blobRequestOutMeter.Mark(int64(len(req.txs)))
if err := f.fetchPayloads(peer, req.txs, req.cells); err != nil { if err := f.fn.FetchPayloads(peer, req.txs, req.cells); err != nil {
blobRequestFailMeter.Mark(int64(len(req.txs))) blobRequestFailMeter.Mark(int64(len(req.txs)))
f.Drop(peer) f.Drop(peer)
break break

View file

@ -147,12 +147,16 @@ func TestBlobFetcherFullFetch(t *testing.T) {
testBlobFetcher(t, blobFetcherTest{ testBlobFetcher(t, blobFetcherTest{
init: func() *BlobFetcher { init: func() *BlobFetcher {
return NewBlobFetcher( return NewBlobFetcher(
func(common.Hash) bool { return false }, BlobFetcherFunctions{
func(common.Hash, map[string]*PeerCellDelivery, *types.CustodyBitmap) error { HasPayload: func(common.Hash) bool { return false },
return nil AddCells: func(common.Hash, map[string]*PeerCellDelivery, *types.CustodyBitmap) error {
return nil
},
FetchPayloads: func(string, []common.Hash, *types.CustodyBitmap) error {
return nil
},
DropPeer: func(string) {},
}, },
func(string, []common.Hash, *types.CustodyBitmap) error { return nil },
func(string) {},
&custody, &custody,
&mockRand{value: 5}, // Force full requests (5 < fetchProbability) &mockRand{value: 5}, // Force full requests (5 < fetchProbability)
) )
@ -236,12 +240,16 @@ func TestBlobFetcherPartialFetch(t *testing.T) {
testBlobFetcher(t, blobFetcherTest{ testBlobFetcher(t, blobFetcherTest{
init: func() *BlobFetcher { init: func() *BlobFetcher {
return NewBlobFetcher( return NewBlobFetcher(
func(common.Hash) bool { return false }, BlobFetcherFunctions{
func(common.Hash, map[string]*PeerCellDelivery, *types.CustodyBitmap) error { HasPayload: func(common.Hash) bool { return false },
return nil AddCells: func(common.Hash, map[string]*PeerCellDelivery, *types.CustodyBitmap) error {
return nil
},
FetchPayloads: func(string, []common.Hash, *types.CustodyBitmap) error {
return nil
},
DropPeer: func(string) {},
}, },
func(string, []common.Hash, *types.CustodyBitmap) error { return nil },
func(string) {},
&custody, &custody,
&mockRand{value: 60}, // Force partial requests (20 >= 15) &mockRand{value: 60}, // Force partial requests (20 >= 15)
) )
@ -329,12 +337,16 @@ func TestBlobFetcherFullDelivery(t *testing.T) {
testBlobFetcher(t, blobFetcherTest{ testBlobFetcher(t, blobFetcherTest{
init: func() *BlobFetcher { init: func() *BlobFetcher {
return NewBlobFetcher( return NewBlobFetcher(
func(common.Hash) bool { return false }, BlobFetcherFunctions{
func(common.Hash, map[string]*PeerCellDelivery, *types.CustodyBitmap) error { HasPayload: func(common.Hash) bool { return false },
return nil AddCells: func(common.Hash, map[string]*PeerCellDelivery, *types.CustodyBitmap) error {
return nil
},
FetchPayloads: func(string, []common.Hash, *types.CustodyBitmap) error {
return nil
},
DropPeer: func(string) {},
}, },
func(string, []common.Hash, *types.CustodyBitmap) error { return nil },
func(string) {},
&custody, &custody,
&mockRand{value: 5}, // Force full requests for simplicity &mockRand{value: 5}, // Force full requests for simplicity
) )
@ -375,12 +387,16 @@ func TestBlobFetcherPartialDelivery(t *testing.T) {
testBlobFetcher(t, blobFetcherTest{ testBlobFetcher(t, blobFetcherTest{
init: func() *BlobFetcher { init: func() *BlobFetcher {
return NewBlobFetcher( return NewBlobFetcher(
func(common.Hash) bool { return false }, BlobFetcherFunctions{
func(common.Hash, map[string]*PeerCellDelivery, *types.CustodyBitmap) error { HasPayload: func(common.Hash) bool { return false },
return nil AddCells: func(common.Hash, map[string]*PeerCellDelivery, *types.CustodyBitmap) error {
return nil
},
FetchPayloads: func(string, []common.Hash, *types.CustodyBitmap) error {
return nil
},
DropPeer: func(string) {},
}, },
func(string, []common.Hash, *types.CustodyBitmap) error { return nil },
func(string) {},
&custody, &custody,
&mockRand{value: 60}, &mockRand{value: 60},
) )
@ -509,12 +525,16 @@ func TestBlobFetcherAvailabilityTimeout(t *testing.T) {
testBlobFetcher(t, blobFetcherTest{ testBlobFetcher(t, blobFetcherTest{
init: func() *BlobFetcher { init: func() *BlobFetcher {
return NewBlobFetcher( return NewBlobFetcher(
func(common.Hash) bool { return false }, BlobFetcherFunctions{
func(common.Hash, map[string]*PeerCellDelivery, *types.CustodyBitmap) error { HasPayload: func(common.Hash) bool { return false },
return nil AddCells: func(common.Hash, map[string]*PeerCellDelivery, *types.CustodyBitmap) error {
return nil
},
FetchPayloads: func(string, []common.Hash, *types.CustodyBitmap) error {
return nil
},
DropPeer: func(string) {},
}, },
func(string, []common.Hash, *types.CustodyBitmap) error { return nil },
func(string) {},
&custody, &custody,
&mockRand{value: 60}, &mockRand{value: 60},
) )
@ -549,12 +569,16 @@ func TestBlobFetcherPeerDrop(t *testing.T) {
testBlobFetcher(t, blobFetcherTest{ testBlobFetcher(t, blobFetcherTest{
init: func() *BlobFetcher { init: func() *BlobFetcher {
return NewBlobFetcher( return NewBlobFetcher(
func(common.Hash) bool { return false }, BlobFetcherFunctions{
func(common.Hash, map[string]*PeerCellDelivery, *types.CustodyBitmap) error { HasPayload: func(common.Hash) bool { return false },
return nil AddCells: func(common.Hash, map[string]*PeerCellDelivery, *types.CustodyBitmap) error {
return nil
},
FetchPayloads: func(string, []common.Hash, *types.CustodyBitmap) error {
return nil
},
DropPeer: func(string) {},
}, },
func(string, []common.Hash, *types.CustodyBitmap) error { return nil },
func(string) {},
&custody, &custody,
&mockRand{value: 5}, &mockRand{value: 5},
) )
@ -624,12 +648,16 @@ func TestBlobFetcherFetchTimeout(t *testing.T) {
testBlobFetcher(t, blobFetcherTest{ testBlobFetcher(t, blobFetcherTest{
init: func() *BlobFetcher { init: func() *BlobFetcher {
return NewBlobFetcher( return NewBlobFetcher(
func(common.Hash) bool { return false }, BlobFetcherFunctions{
func(common.Hash, map[string]*PeerCellDelivery, *types.CustodyBitmap) error { HasPayload: func(common.Hash) bool { return false },
return nil AddCells: func(common.Hash, map[string]*PeerCellDelivery, *types.CustodyBitmap) error {
return nil
},
FetchPayloads: func(string, []common.Hash, *types.CustodyBitmap) error {
return nil
},
DropPeer: func(string) {},
}, },
func(string, []common.Hash, *types.CustodyBitmap) error { return nil },
func(string) {},
&custody, &custody,
&mockRand{value: 5}, &mockRand{value: 5},
) )
@ -1010,25 +1038,29 @@ func TestMultiBlobDeliveryVerification(t *testing.T) {
testBlobFetcher(t, blobFetcherTest{ testBlobFetcher(t, blobFetcherTest{
init: func() *BlobFetcher { init: func() *BlobFetcher {
return NewBlobFetcher( return NewBlobFetcher(
func(common.Hash) bool { return false }, BlobFetcherFunctions{
func(hash common.Hash, deliveries map[string]*PeerCellDelivery, cst *types.CustodyBitmap) error { HasPayload: func(common.Hash) bool { return false },
// Verify each peer's delivered cells pass KZG cell proof verification AddCells: func(h common.Hash, deliveries map[string]*PeerCellDelivery, custody *types.CustodyBitmap) error {
for _, d := range deliveries { // Verify each peer's delivered cells pass KZG cell proof verification
var cellProofs []kzg4844.Proof for _, d := range deliveries {
for blobIdx := 0; blobIdx < len(sidecar.Commitments); blobIdx++ { var cellProofs []kzg4844.Proof
for _, idx := range d.Indices { for blobIdx := 0; blobIdx < len(sidecar.Commitments); blobIdx++ {
cellProofs = append(cellProofs, sidecar.Proofs[blobIdx*kzg4844.CellProofsPerBlob+int(idx)]) for _, idx := range d.Indices {
cellProofs = append(cellProofs, sidecar.Proofs[blobIdx*kzg4844.CellProofsPerBlob+int(idx)])
}
}
verifyErr = kzg4844.VerifyCells(d.Cells, sidecar.Commitments, cellProofs, d.Indices)
if verifyErr != nil {
return verifyErr
} }
} }
verifyErr = kzg4844.VerifyCells(d.Cells, sidecar.Commitments, cellProofs, d.Indices) return nil
if verifyErr != nil { },
return verifyErr FetchPayloads: func(string, []common.Hash, *types.CustodyBitmap) error {
} return nil
} },
return nil DropPeer: func(string) {},
}, },
func(string, []common.Hash, *types.CustodyBitmap) error { return nil },
func(string) {},
&custody, &custody,
&mockRand{value: 60}, // Force partial requests (60 >= fetchProbability) &mockRand{value: 60}, // Force partial requests (60 >= fetchProbability)
) )

View file

@ -195,7 +195,8 @@ func newHandler(config *handlerConfig) (*handler, error) {
} }
return p.RequestTxs(hashes) return p.RequestTxs(hashes)
} }
// Construct the blob buffer for assembling blob txs from separate tx and cell deliveries
// Construct the blob buffer for assembling blob txs from separate tx and cell deliveries.
h.blobBuffer = blobpool.NewBlobBuffer(h.blobpool.AddPooledTx, h.removePeer) h.blobBuffer = blobpool.NewBlobBuffer(h.blobpool.AddPooledTx, h.removePeer)
addTxs := func(peer string, txs []*types.Transaction) []error { addTxs := func(peer string, txs []*types.Transaction) []error {
@ -233,24 +234,27 @@ func newHandler(config *handlerConfig) (*handler, error) {
h.txFetcher = fetcher.NewTxFetcher(h.chain, validateMeta, addTxs, fetchTx, h.removePeer) h.txFetcher = fetcher.NewTxFetcher(h.chain, validateMeta, addTxs, fetchTx, h.removePeer)
// Construct the blob fetcher for cell-based blob data availability // Construct the blob fetcher for cell-based blob data availability
fetchPayloads := func(peer string, hashes []common.Hash, cells *types.CustodyBitmap) error { blobCallbacks := fetcher.BlobFetcherFunctions{
p := h.peers.peer(peer) FetchPayloads: func(peer string, hashes []common.Hash, cells *types.CustodyBitmap) error {
if p == nil { p := h.peers.peer(peer)
return errors.New("unknown peer") if p == nil {
} return errors.New("unknown peer")
return p.RequestPayload(hashes, cells) }
return p.RequestPayload(hashes, cells)
},
HasPayload: func(hash common.Hash) bool {
return h.blobpool.HasPayload(hash) || h.blobBuffer.HasCells(hash)
},
AddCells: func(hash common.Hash, deliveries map[string]*fetcher.PeerCellDelivery, custody *types.CustodyBitmap) error {
converted := make(map[string]*blobpool.PeerDelivery, len(deliveries))
for peer, d := range deliveries {
converted[peer] = &blobpool.PeerDelivery{Cells: d.Cells, Indices: d.Indices}
}
return h.blobBuffer.AddCells(hash, converted, custody)
},
DropPeer: h.removePeer,
} }
hasPayload := func(hash common.Hash) bool { h.blobFetcher = fetcher.NewBlobFetcher(blobCallbacks, &config.Custody, nil)
return h.blobpool.HasPayload(hash) || h.blobBuffer.HasCells(hash)
}
addCells := func(hash common.Hash, deliveries map[string]*fetcher.PeerCellDelivery, custody *types.CustodyBitmap) error {
converted := make(map[string]*blobpool.PeerDelivery, len(deliveries))
for peer, d := range deliveries {
converted[peer] = &blobpool.PeerDelivery{Cells: d.Cells, Indices: d.Indices}
}
return h.blobBuffer.AddCells(hash, converted, custody)
}
h.blobFetcher = fetcher.NewBlobFetcher(hasPayload, addCells, fetchPayloads, h.removePeer, &config.Custody, nil)
return h, nil return h, nil
} }