diff --git a/eth/fetcher/blob_fetcher.go b/eth/fetcher/blob_fetcher.go index 7ceb019dcd..089a946b1a 100644 --- a/eth/fetcher/blob_fetcher.go +++ b/eth/fetcher/blob_fetcher.go @@ -89,6 +89,13 @@ type fetchStatus struct { blobCount int // Number of blobs in this tx (set on first delivery) } +type BlobFetcherFunctions struct { + HasPayload func(common.Hash) bool + AddCells func(common.Hash, map[string]*PeerCellDelivery, *types.CustodyBitmap) error + FetchPayloads func(string, []common.Hash, *types.CustodyBitmap) error + DropPeer func(string) +} + // BlobFetcher is responsible for managing type 3 transactions based on peer announcements. // // BlobFetcher manages three buffers: @@ -128,11 +135,7 @@ type BlobFetcher struct { // todo simplify alternates map[common.Hash]map[string]*types.CustodyBitmap // In-flight transaction alternate origins (in case the peer is dropped) - // Callbacks - hasPayload func(common.Hash) bool - addCells func(common.Hash, map[string]*PeerCellDelivery, *types.CustodyBitmap) error - fetchPayloads func(string, []common.Hash, *types.CustodyBitmap) error - dropPeer func(string) + fn BlobFetcherFunctions // callbacks step chan struct{} // Notification channel when the fetcher loop iterates clock mclock.Clock // Monotonic clock or simulated clock for tests @@ -140,33 +143,26 @@ type BlobFetcher struct { rand random // Randomizer } -func NewBlobFetcher( - hasPayload func(common.Hash) bool, - addCells func(common.Hash, map[string]*PeerCellDelivery, *types.CustodyBitmap) error, - fetchPayloads func(string, []common.Hash, *types.CustodyBitmap) error, dropPeer func(string), - custody *types.CustodyBitmap, rand random) *BlobFetcher { +func NewBlobFetcher(fn BlobFetcherFunctions, custody *types.CustodyBitmap, rand random) *BlobFetcher { return &BlobFetcher{ - notify: make(chan *blobTxAnnounce), - cleanup: make(chan *payloadDelivery), - drop: make(chan *txDrop), - quit: make(chan struct{}), - full: make(map[common.Hash]struct{}), - partial: make(map[common.Hash]struct{}), - waitlist: make(map[common.Hash]map[string]struct{}), - waittime: make(map[common.Hash]mclock.AbsTime), - waitslots: make(map[string]map[common.Hash]struct{}), - announces: make(map[string]map[common.Hash]*cellWithSeq), - fetches: make(map[common.Hash]*fetchStatus), - requests: make(map[string][]*cellRequest), - alternates: make(map[common.Hash]map[string]*types.CustodyBitmap), - hasPayload: hasPayload, - addCells: addCells, - fetchPayloads: fetchPayloads, - dropPeer: dropPeer, - custody: custody, - clock: mclock.System{}, - realTime: time.Now, - rand: rand, + notify: make(chan *blobTxAnnounce), + cleanup: make(chan *payloadDelivery), + drop: make(chan *txDrop), + quit: make(chan struct{}), + full: make(map[common.Hash]struct{}), + partial: make(map[common.Hash]struct{}), + waitlist: make(map[common.Hash]map[string]struct{}), + waittime: make(map[common.Hash]mclock.AbsTime), + waitslots: make(map[string]map[common.Hash]struct{}), + announces: make(map[string]map[common.Hash]*cellWithSeq), + fetches: make(map[common.Hash]*fetchStatus), + requests: make(map[string][]*cellRequest), + alternates: make(map[common.Hash]map[string]*types.CustodyBitmap), + fn: fn, + custody: custody, + clock: mclock.System{}, + realTime: time.Now, + rand: rand, } } @@ -175,7 +171,7 @@ func (f *BlobFetcher) Notify(peer string, txs []common.Hash, cells types.Custody blobAnnounceInMeter.Mark(int64(len(txs))) anns := make([]common.Hash, 0) for _, tx := range txs { - if f.hasPayload(tx) { + if f.fn.HasPayload(tx) { continue } anns = append(anns, tx) @@ -521,7 +517,7 @@ func (f *BlobFetcher) loop() { blobFetcherFetchTime.Update(int64(time.Duration(f.clock.Now() - request.time))) status := f.fetches[hash] collectedCustody := types.NewCustodyBitmap(status.fetched) - f.addCells(hash, status.deliveries, &collectedCustody) + f.fn.AddCells(hash, status.deliveries, &collectedCustody) for peer, txset := range f.announces { delete(txset, hash) @@ -770,7 +766,7 @@ func (f *BlobFetcher) scheduleFetches(timer *mclock.Timer, timeout chan struct{} go func(peer string, request []*cellRequest) { for _, req := range request { blobRequestOutMeter.Mark(int64(len(req.txs))) - if err := f.fetchPayloads(peer, req.txs, req.cells); err != nil { + if err := f.fn.FetchPayloads(peer, req.txs, req.cells); err != nil { blobRequestFailMeter.Mark(int64(len(req.txs))) f.Drop(peer) break diff --git a/eth/fetcher/blob_fetcher_test.go b/eth/fetcher/blob_fetcher_test.go index 589957bf23..e89f7602ed 100644 --- a/eth/fetcher/blob_fetcher_test.go +++ b/eth/fetcher/blob_fetcher_test.go @@ -147,12 +147,16 @@ func TestBlobFetcherFullFetch(t *testing.T) { testBlobFetcher(t, blobFetcherTest{ init: func() *BlobFetcher { return NewBlobFetcher( - func(common.Hash) bool { return false }, - func(common.Hash, map[string]*PeerCellDelivery, *types.CustodyBitmap) error { - return nil + BlobFetcherFunctions{ + HasPayload: func(common.Hash) bool { return false }, + AddCells: func(common.Hash, map[string]*PeerCellDelivery, *types.CustodyBitmap) error { + return nil + }, + FetchPayloads: func(string, []common.Hash, *types.CustodyBitmap) error { + return nil + }, + DropPeer: func(string) {}, }, - func(string, []common.Hash, *types.CustodyBitmap) error { return nil }, - func(string) {}, &custody, &mockRand{value: 5}, // Force full requests (5 < fetchProbability) ) @@ -236,12 +240,16 @@ func TestBlobFetcherPartialFetch(t *testing.T) { testBlobFetcher(t, blobFetcherTest{ init: func() *BlobFetcher { return NewBlobFetcher( - func(common.Hash) bool { return false }, - func(common.Hash, map[string]*PeerCellDelivery, *types.CustodyBitmap) error { - return nil + BlobFetcherFunctions{ + HasPayload: func(common.Hash) bool { return false }, + AddCells: func(common.Hash, map[string]*PeerCellDelivery, *types.CustodyBitmap) error { + return nil + }, + FetchPayloads: func(string, []common.Hash, *types.CustodyBitmap) error { + return nil + }, + DropPeer: func(string) {}, }, - func(string, []common.Hash, *types.CustodyBitmap) error { return nil }, - func(string) {}, &custody, &mockRand{value: 60}, // Force partial requests (20 >= 15) ) @@ -329,12 +337,16 @@ func TestBlobFetcherFullDelivery(t *testing.T) { testBlobFetcher(t, blobFetcherTest{ init: func() *BlobFetcher { return NewBlobFetcher( - func(common.Hash) bool { return false }, - func(common.Hash, map[string]*PeerCellDelivery, *types.CustodyBitmap) error { - return nil + BlobFetcherFunctions{ + HasPayload: func(common.Hash) bool { return false }, + AddCells: func(common.Hash, map[string]*PeerCellDelivery, *types.CustodyBitmap) error { + return nil + }, + FetchPayloads: func(string, []common.Hash, *types.CustodyBitmap) error { + return nil + }, + DropPeer: func(string) {}, }, - func(string, []common.Hash, *types.CustodyBitmap) error { return nil }, - func(string) {}, &custody, &mockRand{value: 5}, // Force full requests for simplicity ) @@ -375,12 +387,16 @@ func TestBlobFetcherPartialDelivery(t *testing.T) { testBlobFetcher(t, blobFetcherTest{ init: func() *BlobFetcher { return NewBlobFetcher( - func(common.Hash) bool { return false }, - func(common.Hash, map[string]*PeerCellDelivery, *types.CustodyBitmap) error { - return nil + BlobFetcherFunctions{ + HasPayload: func(common.Hash) bool { return false }, + AddCells: func(common.Hash, map[string]*PeerCellDelivery, *types.CustodyBitmap) error { + return nil + }, + FetchPayloads: func(string, []common.Hash, *types.CustodyBitmap) error { + return nil + }, + DropPeer: func(string) {}, }, - func(string, []common.Hash, *types.CustodyBitmap) error { return nil }, - func(string) {}, &custody, &mockRand{value: 60}, ) @@ -509,12 +525,16 @@ func TestBlobFetcherAvailabilityTimeout(t *testing.T) { testBlobFetcher(t, blobFetcherTest{ init: func() *BlobFetcher { return NewBlobFetcher( - func(common.Hash) bool { return false }, - func(common.Hash, map[string]*PeerCellDelivery, *types.CustodyBitmap) error { - return nil + BlobFetcherFunctions{ + HasPayload: func(common.Hash) bool { return false }, + AddCells: func(common.Hash, map[string]*PeerCellDelivery, *types.CustodyBitmap) error { + return nil + }, + FetchPayloads: func(string, []common.Hash, *types.CustodyBitmap) error { + return nil + }, + DropPeer: func(string) {}, }, - func(string, []common.Hash, *types.CustodyBitmap) error { return nil }, - func(string) {}, &custody, &mockRand{value: 60}, ) @@ -549,12 +569,16 @@ func TestBlobFetcherPeerDrop(t *testing.T) { testBlobFetcher(t, blobFetcherTest{ init: func() *BlobFetcher { return NewBlobFetcher( - func(common.Hash) bool { return false }, - func(common.Hash, map[string]*PeerCellDelivery, *types.CustodyBitmap) error { - return nil + BlobFetcherFunctions{ + HasPayload: func(common.Hash) bool { return false }, + AddCells: func(common.Hash, map[string]*PeerCellDelivery, *types.CustodyBitmap) error { + return nil + }, + FetchPayloads: func(string, []common.Hash, *types.CustodyBitmap) error { + return nil + }, + DropPeer: func(string) {}, }, - func(string, []common.Hash, *types.CustodyBitmap) error { return nil }, - func(string) {}, &custody, &mockRand{value: 5}, ) @@ -624,12 +648,16 @@ func TestBlobFetcherFetchTimeout(t *testing.T) { testBlobFetcher(t, blobFetcherTest{ init: func() *BlobFetcher { return NewBlobFetcher( - func(common.Hash) bool { return false }, - func(common.Hash, map[string]*PeerCellDelivery, *types.CustodyBitmap) error { - return nil + BlobFetcherFunctions{ + HasPayload: func(common.Hash) bool { return false }, + AddCells: func(common.Hash, map[string]*PeerCellDelivery, *types.CustodyBitmap) error { + return nil + }, + FetchPayloads: func(string, []common.Hash, *types.CustodyBitmap) error { + return nil + }, + DropPeer: func(string) {}, }, - func(string, []common.Hash, *types.CustodyBitmap) error { return nil }, - func(string) {}, &custody, &mockRand{value: 5}, ) @@ -1010,25 +1038,29 @@ func TestMultiBlobDeliveryVerification(t *testing.T) { testBlobFetcher(t, blobFetcherTest{ init: func() *BlobFetcher { return NewBlobFetcher( - func(common.Hash) bool { return false }, - func(hash common.Hash, deliveries map[string]*PeerCellDelivery, cst *types.CustodyBitmap) error { - // Verify each peer's delivered cells pass KZG cell proof verification - for _, d := range deliveries { - var cellProofs []kzg4844.Proof - for blobIdx := 0; blobIdx < len(sidecar.Commitments); blobIdx++ { - for _, idx := range d.Indices { - cellProofs = append(cellProofs, sidecar.Proofs[blobIdx*kzg4844.CellProofsPerBlob+int(idx)]) + BlobFetcherFunctions{ + HasPayload: func(common.Hash) bool { return false }, + AddCells: func(h common.Hash, deliveries map[string]*PeerCellDelivery, custody *types.CustodyBitmap) error { + // Verify each peer's delivered cells pass KZG cell proof verification + for _, d := range deliveries { + var cellProofs []kzg4844.Proof + for blobIdx := 0; blobIdx < len(sidecar.Commitments); blobIdx++ { + for _, idx := range d.Indices { + cellProofs = append(cellProofs, sidecar.Proofs[blobIdx*kzg4844.CellProofsPerBlob+int(idx)]) + } + } + verifyErr = kzg4844.VerifyCells(d.Cells, sidecar.Commitments, cellProofs, d.Indices) + if verifyErr != nil { + return verifyErr } } - verifyErr = kzg4844.VerifyCells(d.Cells, sidecar.Commitments, cellProofs, d.Indices) - if verifyErr != nil { - return verifyErr - } - } - return nil + return nil + }, + FetchPayloads: func(string, []common.Hash, *types.CustodyBitmap) error { + return nil + }, + DropPeer: func(string) {}, }, - func(string, []common.Hash, *types.CustodyBitmap) error { return nil }, - func(string) {}, &custody, &mockRand{value: 60}, // Force partial requests (60 >= fetchProbability) ) diff --git a/eth/handler.go b/eth/handler.go index 539c8a7c6d..d22c0af572 100644 --- a/eth/handler.go +++ b/eth/handler.go @@ -195,7 +195,8 @@ func newHandler(config *handlerConfig) (*handler, error) { } return p.RequestTxs(hashes) } - // Construct the blob buffer for assembling blob txs from separate tx and cell deliveries + + // Construct the blob buffer for assembling blob txs from separate tx and cell deliveries. h.blobBuffer = blobpool.NewBlobBuffer(h.blobpool.AddPooledTx, h.removePeer) addTxs := func(peer string, txs []*types.Transaction) []error { @@ -233,24 +234,27 @@ func newHandler(config *handlerConfig) (*handler, error) { h.txFetcher = fetcher.NewTxFetcher(h.chain, validateMeta, addTxs, fetchTx, h.removePeer) // Construct the blob fetcher for cell-based blob data availability - fetchPayloads := func(peer string, hashes []common.Hash, cells *types.CustodyBitmap) error { - p := h.peers.peer(peer) - if p == nil { - return errors.New("unknown peer") - } - return p.RequestPayload(hashes, cells) + blobCallbacks := fetcher.BlobFetcherFunctions{ + FetchPayloads: func(peer string, hashes []common.Hash, cells *types.CustodyBitmap) error { + p := h.peers.peer(peer) + if p == nil { + return errors.New("unknown peer") + } + return p.RequestPayload(hashes, cells) + }, + HasPayload: func(hash common.Hash) bool { + return h.blobpool.HasPayload(hash) || h.blobBuffer.HasCells(hash) + }, + AddCells: func(hash common.Hash, deliveries map[string]*fetcher.PeerCellDelivery, custody *types.CustodyBitmap) error { + converted := make(map[string]*blobpool.PeerDelivery, len(deliveries)) + for peer, d := range deliveries { + converted[peer] = &blobpool.PeerDelivery{Cells: d.Cells, Indices: d.Indices} + } + return h.blobBuffer.AddCells(hash, converted, custody) + }, + DropPeer: h.removePeer, } - hasPayload := func(hash common.Hash) bool { - return h.blobpool.HasPayload(hash) || h.blobBuffer.HasCells(hash) - } - addCells := func(hash common.Hash, deliveries map[string]*fetcher.PeerCellDelivery, custody *types.CustodyBitmap) error { - converted := make(map[string]*blobpool.PeerDelivery, len(deliveries)) - for peer, d := range deliveries { - converted[peer] = &blobpool.PeerDelivery{Cells: d.Cells, Indices: d.Indices} - } - return h.blobBuffer.AddCells(hash, converted, custody) - } - h.blobFetcher = fetcher.NewBlobFetcher(hasPayload, addCells, fetchPayloads, h.removePeer, &config.Custody, nil) + h.blobFetcher = fetcher.NewBlobFetcher(blobCallbacks, &config.Custody, nil) return h, nil }