From 65debe9317e4ffba1501a4c93b9059d3300c6b4a Mon Sep 17 00:00:00 2001 From: healthykim Date: Thu, 13 Nov 2025 22:58:03 +0900 Subject: [PATCH] fix: add tests and fix errors --- eth/protocols/eth/handlers.go | 7 +- eth/protocols/eth/peer.go | 56 ++++++++++---- eth/protocols/eth/peer_test.go | 130 +++++++++++++++++++++++++++++++++ 3 files changed, 175 insertions(+), 18 deletions(-) diff --git a/eth/protocols/eth/handlers.go b/eth/protocols/eth/handlers.go index 6bba81fa79..55423be64e 100644 --- a/eth/protocols/eth/handlers.go +++ b/eth/protocols/eth/handlers.go @@ -527,13 +527,16 @@ func handleReceipts70(backend Backend, msg Decoder, peer *Peer) error { return err } - from, err := peer.ValidateReceipt(res) + from, err := peer.ReconstructReceiptsPacket(res) if err != nil { return err } if res.LastBlockIncomplete { - peer.HandlePartialReceipts(res.RequestId) + err := peer.RequestPartialReceipts(res.RequestId) + if err != nil { + return err + } } // Assign buffers shared between list elements diff --git a/eth/protocols/eth/peer.go b/eth/protocols/eth/peer.go index 0f6d54c444..645e286cdc 100644 --- a/eth/protocols/eth/peer.go +++ b/eth/protocols/eth/peer.go @@ -67,8 +67,8 @@ type Peer struct { reqCancel chan *cancel // Dispatch channel to cancel pending requests and untrack them resDispatch chan *response // Dispatch channel to fulfil pending requests and untrack them - requestedReceipts map[uint64][]common.Hash // requestId -> requested receipts map (can be removed if one peer cannot have more than one request in flight) - receiptBuffer map[uint64]*partialReceipt // requestId -> receiptlist map + requestedReceipts map[uint64][]common.Hash // requested receipts list + receiptBuffer map[uint64]*partialReceipt // requestId to receiptlist map term chan struct{} // Termination channel to stop the broadcasters } @@ -345,15 +345,30 @@ func (p *Peer) RequestReceipts(hashes []common.Hash, sink chan *Response) (*Requ p.Log().Debug("Fetching batch of receipts", "count", len(hashes)) id := rand.Uint64() - req := &Request{ - id: id, - sink: sink, - code: GetReceiptsMsg, - want: ReceiptsMsg, - data: &GetReceiptsPacket69{ - RequestId: id, - GetReceiptsRequest: hashes, - }, + var req *Request + if p.version > ETH69 { + req = &Request{ + id: id, + sink: sink, + code: GetReceiptsMsg, + want: ReceiptsMsg, + data: &GetReceiptsPacket70{ + RequestId: id, + GetReceiptsRequest: hashes, + FirstBlockReceiptIndex: 0, + }, + } + } else { + req = &Request{ + id: id, + sink: sink, + code: GetReceiptsMsg, + want: ReceiptsMsg, + data: &GetReceiptsPacket69{ + RequestId: id, + GetReceiptsRequest: hashes, + }, + } } if err := p.dispatchRequest(req); err != nil { return nil, err @@ -364,7 +379,7 @@ func (p *Peer) RequestReceipts(hashes []common.Hash, sink chan *Response) (*Requ } // HandlePartialReceipts re-request partial receipts -func (p *Peer) HandlePartialReceipts(previousId uint64) error { +func (p *Peer) RequestPartialReceipts(previousId uint64) error { split := p.receiptBuffer[previousId].idx id := rand.Uint64() @@ -382,16 +397,22 @@ func (p *Peer) HandlePartialReceipts(previousId uint64) error { previous: previousId, } + p.receiptBuffer[id] = p.receiptBuffer[previousId] + p.requestedReceipts[id] = p.requestedReceipts[previousId] + + delete(p.receiptBuffer, previousId) + delete(p.requestedReceipts, previousId) + return p.dispatchRequest(req) } -// ValidateReceipt validates a receipt packet and checks whether a partial request is complete. +// ReconstructReceiptsPacket validates a receipt packet and checks whether a partial request is complete. // It also mutates the packet in place, trimming the partial response or appending previously collected receipts. -func (p *Peer) ValidateReceipt(packet *ReceiptsPacket70) (int, error) { +func (p *Peer) ReconstructReceiptsPacket(packet *ReceiptsPacket70) (int, error) { from := 0 requestId := packet.RequestId if len(packet.List) == 0 { - return 0, fmt.Errorf("receipt list size 0") + return 0, nil } // Process the first block @@ -400,11 +421,14 @@ func (p *Peer) ValidateReceipt(packet *ReceiptsPacket70) (int, error) { if firstReceipt == nil { return 0, fmt.Errorf("nil first receipt") } - if _, ok := p.receiptBuffer[requestId]; !ok { + if _, ok := p.receiptBuffer[requestId]; ok { // complete packet (hash validation will be performed later) firstReceipt.items = append(p.receiptBuffer[requestId].list.items, firstReceipt.items...) from = p.receiptBuffer[requestId].idx delete(p.receiptBuffer, requestId) + if !packet.LastBlockIncomplete { + delete(p.requestedReceipts, requestId) + } } // Trim and buffer the last block when the response is incomplete. diff --git a/eth/protocols/eth/peer_test.go b/eth/protocols/eth/peer_test.go index efbbbc6fff..b3ca35a343 100644 --- a/eth/protocols/eth/peer_test.go +++ b/eth/protocols/eth/peer_test.go @@ -22,10 +22,12 @@ package eth import ( "crypto/rand" "testing" + "time" "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/p2p" "github.com/ethereum/go-ethereum/p2p/enode" + "github.com/ethereum/go-ethereum/rlp" ) // testPeer is a simulated peer to allow testing direct network calls. @@ -88,3 +90,131 @@ func TestPeerSet(t *testing.T) { t.Fatalf("bad size") } } + +func TestPartialReceipt(t *testing.T) { + app, net := p2p.MsgPipe() + var id enode.ID + if _, err := rand.Read(id[:]); err != nil { + t.Fatalf("failed to create random peer: %v", err) + } + + peer := NewPeer(ETH70, p2p.NewPeer(id, "peer", nil), net, nil) + + packetCh := make(chan *GetReceiptsPacket70, 1) + go func() { + for { + msg, err := app.ReadMsg() + if err != nil { + return + } + if msg.Code == GetReceiptsMsg { + var pkt GetReceiptsPacket70 + if err := msg.Decode(&pkt); err == nil { + select { + case packetCh <- &pkt: + default: + } + } + } + msg.Discard() + } + }() + + hashes := []common.Hash{ + common.HexToHash("0xaa"), + common.HexToHash("0xbb"), + common.HexToHash("0xcc"), + common.HexToHash("0xdd"), + } + + sink := make(chan *Response, 1) + req, err := peer.RequestReceipts(hashes, sink) + if err != nil { + t.Fatalf("RequestReceipts failed: %v", err) + } + select { + case _ = <-packetCh: + case <-time.After(2 * time.Second): + t.Fatalf("timeout waiting for request packet") + } + + delivery := &ReceiptsPacket70{ + RequestId: req.id, + LastBlockIncomplete: true, + List: []*ReceiptList69{ + { + items: []Receipt{ + {GasUsed: 21_000, Logs: rlp.RawValue(make([]byte, 1))}, + }, + }, + { + items: []Receipt{ + {GasUsed: 21_000, Logs: rlp.RawValue(make([]byte, 2))}, + }, + }, + }, + } + if _, err := peer.ReconstructReceiptsPacket(delivery); err != nil { + t.Fatalf("first ReconstructReceiptsPacket failed: %v", err) + } + + if err := peer.RequestPartialReceipts(req.id); err != nil { + t.Fatalf("RequestPartialReceipts failed: %v", err) + } + + var rereq *GetReceiptsPacket70 + select { + case rereq = <-packetCh: + case <-time.After(2 * time.Second): + t.Fatalf("timeout waiting for re-request packet") + } + + if _, ok := peer.receiptBuffer[req.id]; ok { + t.Fatalf("receiptBuffer has stale request id") + } + if _, ok := peer.requestedReceipts[req.id]; ok { + t.Fatalf("requestedReceipts has stale request id") + } + + buffer, ok := peer.receiptBuffer[rereq.RequestId] + if !ok { + t.Fatalf("receiptBuffer should buffer incomplete receipts") + } + if rereq.FirstBlockReceiptIndex != uint64(len(buffer.list.items)) { + t.Fatalf("unexpected FirstBlockReceiptIndex, got %d want %d", rereq.FirstBlockReceiptIndex, len(buffer.list.items)) + } + if _, ok := peer.requestedReceipts[rereq.RequestId]; !ok { + t.Fatalf("requestedReceipts should buffer receipt hashes") + } + + delivery = &ReceiptsPacket70{ + RequestId: rereq.RequestId, + LastBlockIncomplete: false, + List: []*ReceiptList69{ + { + items: []Receipt{ + {GasUsed: 21_000, Logs: rlp.RawValue(make([]byte, 1))}, + }, + }, + { + items: []Receipt{ + {GasUsed: 21_000, Logs: rlp.RawValue(make([]byte, 1))}, + }, + }, + { + items: []Receipt{ + {GasUsed: 21_000, Logs: rlp.RawValue(make([]byte, 1))}, + }, + }, + }, + } + if _, err := peer.ReconstructReceiptsPacket(delivery); err != nil { + t.Fatalf("second ReconstructReceiptsPacket failed: %v", err) + } + if _, ok := peer.receiptBuffer[rereq.RequestId]; ok { + t.Fatalf("receiptBuffer should be cleared after delivery") + } + if _, ok := peer.requestedReceipts[rereq.RequestId]; ok { + t.Fatalf("requestedReceipts should be cleared after delivery") + } +}