feat: add handleReceipts70

This commit is contained in:
healthykim 2025-11-08 17:28:43 +09:00
parent 653f8d4994
commit 429066f2d9
5 changed files with 103 additions and 16 deletions

View file

@ -201,7 +201,10 @@ func (p *Peer) dispatcher() {
reqOp.fail <- err
if err == nil {
pending[req.id] = req
// do not overwrite if it is re-request
if _, ok := pending[req.id]; !ok {
pending[req.id] = req
}
}
case cancelOp := <-p.reqCancel:

View file

@ -176,7 +176,7 @@ var eth68 = map[uint64]msgHandler{
GetBlockBodiesMsg: handleGetBlockBodies,
BlockBodiesMsg: handleBlockBodies,
GetReceiptsMsg: handleGetReceipts68,
ReceiptsMsg: handleReceipts[*ReceiptList68],
ReceiptsMsg: handleReceipts69[*ReceiptList68],
GetPooledTransactionsMsg: handleGetPooledTransactions,
PooledTransactionsMsg: handlePooledTransactions,
}
@ -189,7 +189,7 @@ var eth69 = map[uint64]msgHandler{
GetBlockBodiesMsg: handleGetBlockBodies,
BlockBodiesMsg: handleBlockBodies,
GetReceiptsMsg: handleGetReceipts69,
ReceiptsMsg: handleReceipts[*ReceiptList69],
ReceiptsMsg: handleReceipts69[*ReceiptList69],
GetPooledTransactionsMsg: handleGetPooledTransactions,
PooledTransactionsMsg: handlePooledTransactions,
BlockRangeUpdateMsg: handleBlockRangeUpdate,

View file

@ -399,7 +399,7 @@ func handleBlockBodies(backend Backend, msg Decoder, peer *Peer) error {
}, metadata)
}
func handleReceipts[L ReceiptsList](backend Backend, msg Decoder, peer *Peer) error {
func handleReceipts69[L ReceiptsList](backend Backend, msg Decoder, peer *Peer) error {
// A batch of receipts arrived to one of our previous requests
res := new(ReceiptsPacket[L])
if err := msg.Decode(res); err != nil {
@ -431,6 +431,77 @@ func handleReceipts[L ReceiptsList](backend Backend, msg Decoder, peer *Peer) er
}, metadata)
}
func handleReceipts70(backend Backend, msg Decoder, peer *Peer) error {
res := new(ReceiptsPacket70)
if err := msg.Decode(res); err != nil {
return err
}
if res.LastBlockIncomplete {
return handlePartialReceipts(peer, res)
}
if buf, ok := peer.receiptBuffer[res.RequestId]; ok {
res.List = append(buf, res.List...)
delete(peer.receiptBuffer, res.RequestId)
delete(peer.requestedReceipts, res.RequestId)
}
// Assign buffers shared between list elements
buffers := new(receiptListBuffers)
for i := range res.List {
res.List[i].setBuffers(buffers)
}
metadata := func() interface{} {
hasher := trie.NewStackTrie(nil)
hashes := make([]common.Hash, len(res.List))
for i := range res.List {
hashes[i] = types.DeriveSha(res.List[i], hasher)
}
return hashes
}
var enc ReceiptsRLPResponse
for i := range res.List {
enc = append(enc, res.List[i].EncodeForStorage())
}
return peer.dispatchResponse(&Response{
id: res.RequestId,
code: ReceiptsMsg,
Res: &enc,
}, metadata)
}
func handlePartialReceipts(peer *Peer, res *ReceiptsPacket70) error {
id := res.RequestId
peer.receiptBuffer[id] = append(peer.receiptBuffer[id], res.List...)
last := res.List[len(res.List)-1]
if !validatePartialReceipt(last) {
return fmt.Errorf("Receipts: validation error, should drop the peer")
}
req := &Request{
id: id,
sink: nil,
code: GetReceiptsMsg,
want: ReceiptsMsg,
data: &GetReceiptsPacket{
RequestId: id,
GetReceiptsRequest: peer.requestedReceipts[id][len(res.List)-1:],
},
}
return peer.dispatchRequest(req)
}
// TODO: position?
func validatePartialReceipt(receipt *ReceiptList69) bool {
return true
}
func handleNewPooledTransactionHashes(backend Backend, msg Decoder, peer *Peer) error {
// New transaction announcement arrived, make sure we have
// a valid and fresh chain to handle them

View file

@ -59,6 +59,9 @@ type Peer struct {
reqCancel chan *cancel // Dispatch channel to cancel pending requests and untrack them
resDispatch chan *response // Dispatch channel to fulfil pending requests and untrack them
requestedReceipts map[uint64][]common.Hash
receiptBuffer map[uint64][]*ReceiptList69
term chan struct{} // Termination channel to stop the broadcasters
}
@ -66,18 +69,20 @@ type Peer struct {
// version.
func NewPeer(version uint, p *p2p.Peer, rw p2p.MsgReadWriter, txpool TxPool) *Peer {
peer := &Peer{
id: p.ID().String(),
Peer: p,
rw: rw,
version: version,
knownTxs: newKnownCache(maxKnownTxs),
txBroadcast: make(chan []common.Hash),
txAnnounce: make(chan []common.Hash),
reqDispatch: make(chan *request),
reqCancel: make(chan *cancel),
resDispatch: make(chan *response),
txpool: txpool,
term: make(chan struct{}),
id: p.ID().String(),
Peer: p,
rw: rw,
version: version,
knownTxs: newKnownCache(maxKnownTxs),
txBroadcast: make(chan []common.Hash),
txAnnounce: make(chan []common.Hash),
reqDispatch: make(chan *request),
reqCancel: make(chan *cancel),
resDispatch: make(chan *response),
txpool: txpool,
requestedReceipts: make(map[uint64][]common.Hash),
receiptBuffer: make(map[uint64][]*ReceiptList69),
term: make(chan struct{}),
}
// Start up all the broadcasters
go peer.broadcastTransactions()
@ -336,6 +341,8 @@ func (p *Peer) RequestReceipts(hashes []common.Hash, sink chan *Response) (*Requ
if err := p.dispatchRequest(req); err != nil {
return nil, err
}
p.requestedReceipts[id] = hashes
return req, nil
}

View file

@ -282,6 +282,12 @@ type ReceiptsPacket[L ReceiptsList] struct {
List []L
}
type ReceiptsPacket70 struct {
RequestId uint64
List []*ReceiptList69
LastBlockIncomplete bool
}
// ReceiptsRLPResponse is used for receipts, when we already have it encoded
type ReceiptsRLPResponse []rlp.RawValue