From 429066f2d9d8a3dac42d94f590c4854f9d0f5db4 Mon Sep 17 00:00:00 2001 From: healthykim Date: Sat, 8 Nov 2025 17:28:43 +0900 Subject: [PATCH] feat: add handleReceipts70 --- eth/protocols/eth/dispatcher.go | 5 ++- eth/protocols/eth/handler.go | 4 +- eth/protocols/eth/handlers.go | 73 ++++++++++++++++++++++++++++++++- eth/protocols/eth/peer.go | 31 ++++++++------ eth/protocols/eth/protocol.go | 6 +++ 5 files changed, 103 insertions(+), 16 deletions(-) diff --git a/eth/protocols/eth/dispatcher.go b/eth/protocols/eth/dispatcher.go index cba40596fc..347d320884 100644 --- a/eth/protocols/eth/dispatcher.go +++ b/eth/protocols/eth/dispatcher.go @@ -201,7 +201,10 @@ func (p *Peer) dispatcher() { reqOp.fail <- err if err == nil { - pending[req.id] = req + // do not overwrite if it is re-request + if _, ok := pending[req.id]; !ok { + pending[req.id] = req + } } case cancelOp := <-p.reqCancel: diff --git a/eth/protocols/eth/handler.go b/eth/protocols/eth/handler.go index 2467e0c713..25cef94374 100644 --- a/eth/protocols/eth/handler.go +++ b/eth/protocols/eth/handler.go @@ -176,7 +176,7 @@ var eth68 = map[uint64]msgHandler{ GetBlockBodiesMsg: handleGetBlockBodies, BlockBodiesMsg: handleBlockBodies, GetReceiptsMsg: handleGetReceipts68, - ReceiptsMsg: handleReceipts[*ReceiptList68], + ReceiptsMsg: handleReceipts69[*ReceiptList68], GetPooledTransactionsMsg: handleGetPooledTransactions, PooledTransactionsMsg: handlePooledTransactions, } @@ -189,7 +189,7 @@ var eth69 = map[uint64]msgHandler{ GetBlockBodiesMsg: handleGetBlockBodies, BlockBodiesMsg: handleBlockBodies, GetReceiptsMsg: handleGetReceipts69, - ReceiptsMsg: handleReceipts[*ReceiptList69], + ReceiptsMsg: handleReceipts69[*ReceiptList69], GetPooledTransactionsMsg: handleGetPooledTransactions, PooledTransactionsMsg: handlePooledTransactions, BlockRangeUpdateMsg: handleBlockRangeUpdate, diff --git a/eth/protocols/eth/handlers.go b/eth/protocols/eth/handlers.go index aad3353d88..713b49a5ae 100644 --- a/eth/protocols/eth/handlers.go +++ b/eth/protocols/eth/handlers.go @@ -399,7 +399,7 @@ func handleBlockBodies(backend Backend, msg Decoder, peer *Peer) error { }, metadata) } -func handleReceipts[L ReceiptsList](backend Backend, msg Decoder, peer *Peer) error { +func handleReceipts69[L ReceiptsList](backend Backend, msg Decoder, peer *Peer) error { // A batch of receipts arrived to one of our previous requests res := new(ReceiptsPacket[L]) if err := msg.Decode(res); err != nil { @@ -431,6 +431,77 @@ func handleReceipts[L ReceiptsList](backend Backend, msg Decoder, peer *Peer) er }, metadata) } +func handleReceipts70(backend Backend, msg Decoder, peer *Peer) error { + res := new(ReceiptsPacket70) + if err := msg.Decode(res); err != nil { + return err + } + + if res.LastBlockIncomplete { + return handlePartialReceipts(peer, res) + } + + if buf, ok := peer.receiptBuffer[res.RequestId]; ok { + res.List = append(buf, res.List...) + delete(peer.receiptBuffer, res.RequestId) + delete(peer.requestedReceipts, res.RequestId) + } + + // Assign buffers shared between list elements + buffers := new(receiptListBuffers) + for i := range res.List { + res.List[i].setBuffers(buffers) + } + + metadata := func() interface{} { + hasher := trie.NewStackTrie(nil) + hashes := make([]common.Hash, len(res.List)) + for i := range res.List { + hashes[i] = types.DeriveSha(res.List[i], hasher) + } + return hashes + } + + var enc ReceiptsRLPResponse + for i := range res.List { + enc = append(enc, res.List[i].EncodeForStorage()) + } + + return peer.dispatchResponse(&Response{ + id: res.RequestId, + code: ReceiptsMsg, + Res: &enc, + }, metadata) +} + +func handlePartialReceipts(peer *Peer, res *ReceiptsPacket70) error { + id := res.RequestId + + peer.receiptBuffer[id] = append(peer.receiptBuffer[id], res.List...) + + last := res.List[len(res.List)-1] + if !validatePartialReceipt(last) { + return fmt.Errorf("Receipts: validation error, should drop the peer") + } + + req := &Request{ + id: id, + sink: nil, + code: GetReceiptsMsg, + want: ReceiptsMsg, + data: &GetReceiptsPacket{ + RequestId: id, + GetReceiptsRequest: peer.requestedReceipts[id][len(res.List)-1:], + }, + } + return peer.dispatchRequest(req) +} + +// TODO: position? +func validatePartialReceipt(receipt *ReceiptList69) bool { + return true +} + func handleNewPooledTransactionHashes(backend Backend, msg Decoder, peer *Peer) error { // New transaction announcement arrived, make sure we have // a valid and fresh chain to handle them diff --git a/eth/protocols/eth/peer.go b/eth/protocols/eth/peer.go index 40c54a3570..2d31ccc81e 100644 --- a/eth/protocols/eth/peer.go +++ b/eth/protocols/eth/peer.go @@ -59,6 +59,9 @@ type Peer struct { reqCancel chan *cancel // Dispatch channel to cancel pending requests and untrack them resDispatch chan *response // Dispatch channel to fulfil pending requests and untrack them + requestedReceipts map[uint64][]common.Hash + receiptBuffer map[uint64][]*ReceiptList69 + term chan struct{} // Termination channel to stop the broadcasters } @@ -66,18 +69,20 @@ type Peer struct { // version. func NewPeer(version uint, p *p2p.Peer, rw p2p.MsgReadWriter, txpool TxPool) *Peer { peer := &Peer{ - id: p.ID().String(), - Peer: p, - rw: rw, - version: version, - knownTxs: newKnownCache(maxKnownTxs), - txBroadcast: make(chan []common.Hash), - txAnnounce: make(chan []common.Hash), - reqDispatch: make(chan *request), - reqCancel: make(chan *cancel), - resDispatch: make(chan *response), - txpool: txpool, - term: make(chan struct{}), + id: p.ID().String(), + Peer: p, + rw: rw, + version: version, + knownTxs: newKnownCache(maxKnownTxs), + txBroadcast: make(chan []common.Hash), + txAnnounce: make(chan []common.Hash), + reqDispatch: make(chan *request), + reqCancel: make(chan *cancel), + resDispatch: make(chan *response), + txpool: txpool, + requestedReceipts: make(map[uint64][]common.Hash), + receiptBuffer: make(map[uint64][]*ReceiptList69), + term: make(chan struct{}), } // Start up all the broadcasters go peer.broadcastTransactions() @@ -336,6 +341,8 @@ func (p *Peer) RequestReceipts(hashes []common.Hash, sink chan *Response) (*Requ if err := p.dispatchRequest(req); err != nil { return nil, err } + + p.requestedReceipts[id] = hashes return req, nil } diff --git a/eth/protocols/eth/protocol.go b/eth/protocols/eth/protocol.go index 7c41e7a996..067688816c 100644 --- a/eth/protocols/eth/protocol.go +++ b/eth/protocols/eth/protocol.go @@ -282,6 +282,12 @@ type ReceiptsPacket[L ReceiptsList] struct { List []L } +type ReceiptsPacket70 struct { + RequestId uint64 + List []*ReceiptList69 + LastBlockIncomplete bool +} + // ReceiptsRLPResponse is used for receipts, when we already have it encoded type ReceiptsRLPResponse []rlp.RawValue