fix: add tests and fix errors

This commit is contained in:
healthykim 2025-11-13 22:58:03 +09:00
parent cf48bd57a5
commit 65debe9317
3 changed files with 175 additions and 18 deletions

View file

@ -527,13 +527,16 @@ func handleReceipts70(backend Backend, msg Decoder, peer *Peer) error {
return err
}
from, err := peer.ValidateReceipt(res)
from, err := peer.ReconstructReceiptsPacket(res)
if err != nil {
return err
}
if res.LastBlockIncomplete {
peer.HandlePartialReceipts(res.RequestId)
err := peer.RequestPartialReceipts(res.RequestId)
if err != nil {
return err
}
}
// Assign buffers shared between list elements

View file

@ -67,8 +67,8 @@ type Peer struct {
reqCancel chan *cancel // Dispatch channel to cancel pending requests and untrack them
resDispatch chan *response // Dispatch channel to fulfil pending requests and untrack them
requestedReceipts map[uint64][]common.Hash // requestId -> requested receipts map (can be removed if one peer cannot have more than one request in flight)
receiptBuffer map[uint64]*partialReceipt // requestId -> receiptlist map
requestedReceipts map[uint64][]common.Hash // requested receipts list
receiptBuffer map[uint64]*partialReceipt // requestId to receiptlist map
term chan struct{} // Termination channel to stop the broadcasters
}
@ -345,15 +345,30 @@ func (p *Peer) RequestReceipts(hashes []common.Hash, sink chan *Response) (*Requ
p.Log().Debug("Fetching batch of receipts", "count", len(hashes))
id := rand.Uint64()
req := &Request{
id: id,
sink: sink,
code: GetReceiptsMsg,
want: ReceiptsMsg,
data: &GetReceiptsPacket69{
RequestId: id,
GetReceiptsRequest: hashes,
},
var req *Request
if p.version > ETH69 {
req = &Request{
id: id,
sink: sink,
code: GetReceiptsMsg,
want: ReceiptsMsg,
data: &GetReceiptsPacket70{
RequestId: id,
GetReceiptsRequest: hashes,
FirstBlockReceiptIndex: 0,
},
}
} else {
req = &Request{
id: id,
sink: sink,
code: GetReceiptsMsg,
want: ReceiptsMsg,
data: &GetReceiptsPacket69{
RequestId: id,
GetReceiptsRequest: hashes,
},
}
}
if err := p.dispatchRequest(req); err != nil {
return nil, err
@ -364,7 +379,7 @@ func (p *Peer) RequestReceipts(hashes []common.Hash, sink chan *Response) (*Requ
}
// HandlePartialReceipts re-request partial receipts
func (p *Peer) HandlePartialReceipts(previousId uint64) error {
func (p *Peer) RequestPartialReceipts(previousId uint64) error {
split := p.receiptBuffer[previousId].idx
id := rand.Uint64()
@ -382,16 +397,22 @@ func (p *Peer) HandlePartialReceipts(previousId uint64) error {
previous: previousId,
}
p.receiptBuffer[id] = p.receiptBuffer[previousId]
p.requestedReceipts[id] = p.requestedReceipts[previousId]
delete(p.receiptBuffer, previousId)
delete(p.requestedReceipts, previousId)
return p.dispatchRequest(req)
}
// ValidateReceipt validates a receipt packet and checks whether a partial request is complete.
// ReconstructReceiptsPacket validates a receipt packet and checks whether a partial request is complete.
// It also mutates the packet in place, trimming the partial response or appending previously collected receipts.
func (p *Peer) ValidateReceipt(packet *ReceiptsPacket70) (int, error) {
func (p *Peer) ReconstructReceiptsPacket(packet *ReceiptsPacket70) (int, error) {
from := 0
requestId := packet.RequestId
if len(packet.List) == 0 {
return 0, fmt.Errorf("receipt list size 0")
return 0, nil
}
// Process the first block
@ -400,11 +421,14 @@ func (p *Peer) ValidateReceipt(packet *ReceiptsPacket70) (int, error) {
if firstReceipt == nil {
return 0, fmt.Errorf("nil first receipt")
}
if _, ok := p.receiptBuffer[requestId]; !ok {
if _, ok := p.receiptBuffer[requestId]; ok {
// complete packet (hash validation will be performed later)
firstReceipt.items = append(p.receiptBuffer[requestId].list.items, firstReceipt.items...)
from = p.receiptBuffer[requestId].idx
delete(p.receiptBuffer, requestId)
if !packet.LastBlockIncomplete {
delete(p.requestedReceipts, requestId)
}
}
// Trim and buffer the last block when the response is incomplete.

View file

@ -22,10 +22,12 @@ package eth
import (
"crypto/rand"
"testing"
"time"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/p2p"
"github.com/ethereum/go-ethereum/p2p/enode"
"github.com/ethereum/go-ethereum/rlp"
)
// testPeer is a simulated peer to allow testing direct network calls.
@ -88,3 +90,131 @@ func TestPeerSet(t *testing.T) {
t.Fatalf("bad size")
}
}
func TestPartialReceipt(t *testing.T) {
app, net := p2p.MsgPipe()
var id enode.ID
if _, err := rand.Read(id[:]); err != nil {
t.Fatalf("failed to create random peer: %v", err)
}
peer := NewPeer(ETH70, p2p.NewPeer(id, "peer", nil), net, nil)
packetCh := make(chan *GetReceiptsPacket70, 1)
go func() {
for {
msg, err := app.ReadMsg()
if err != nil {
return
}
if msg.Code == GetReceiptsMsg {
var pkt GetReceiptsPacket70
if err := msg.Decode(&pkt); err == nil {
select {
case packetCh <- &pkt:
default:
}
}
}
msg.Discard()
}
}()
hashes := []common.Hash{
common.HexToHash("0xaa"),
common.HexToHash("0xbb"),
common.HexToHash("0xcc"),
common.HexToHash("0xdd"),
}
sink := make(chan *Response, 1)
req, err := peer.RequestReceipts(hashes, sink)
if err != nil {
t.Fatalf("RequestReceipts failed: %v", err)
}
select {
case _ = <-packetCh:
case <-time.After(2 * time.Second):
t.Fatalf("timeout waiting for request packet")
}
delivery := &ReceiptsPacket70{
RequestId: req.id,
LastBlockIncomplete: true,
List: []*ReceiptList69{
{
items: []Receipt{
{GasUsed: 21_000, Logs: rlp.RawValue(make([]byte, 1))},
},
},
{
items: []Receipt{
{GasUsed: 21_000, Logs: rlp.RawValue(make([]byte, 2))},
},
},
},
}
if _, err := peer.ReconstructReceiptsPacket(delivery); err != nil {
t.Fatalf("first ReconstructReceiptsPacket failed: %v", err)
}
if err := peer.RequestPartialReceipts(req.id); err != nil {
t.Fatalf("RequestPartialReceipts failed: %v", err)
}
var rereq *GetReceiptsPacket70
select {
case rereq = <-packetCh:
case <-time.After(2 * time.Second):
t.Fatalf("timeout waiting for re-request packet")
}
if _, ok := peer.receiptBuffer[req.id]; ok {
t.Fatalf("receiptBuffer has stale request id")
}
if _, ok := peer.requestedReceipts[req.id]; ok {
t.Fatalf("requestedReceipts has stale request id")
}
buffer, ok := peer.receiptBuffer[rereq.RequestId]
if !ok {
t.Fatalf("receiptBuffer should buffer incomplete receipts")
}
if rereq.FirstBlockReceiptIndex != uint64(len(buffer.list.items)) {
t.Fatalf("unexpected FirstBlockReceiptIndex, got %d want %d", rereq.FirstBlockReceiptIndex, len(buffer.list.items))
}
if _, ok := peer.requestedReceipts[rereq.RequestId]; !ok {
t.Fatalf("requestedReceipts should buffer receipt hashes")
}
delivery = &ReceiptsPacket70{
RequestId: rereq.RequestId,
LastBlockIncomplete: false,
List: []*ReceiptList69{
{
items: []Receipt{
{GasUsed: 21_000, Logs: rlp.RawValue(make([]byte, 1))},
},
},
{
items: []Receipt{
{GasUsed: 21_000, Logs: rlp.RawValue(make([]byte, 1))},
},
},
{
items: []Receipt{
{GasUsed: 21_000, Logs: rlp.RawValue(make([]byte, 1))},
},
},
},
}
if _, err := peer.ReconstructReceiptsPacket(delivery); err != nil {
t.Fatalf("second ReconstructReceiptsPacket failed: %v", err)
}
if _, ok := peer.receiptBuffer[rereq.RequestId]; ok {
t.Fatalf("receiptBuffer should be cleared after delivery")
}
if _, ok := peer.requestedReceipts[rereq.RequestId]; ok {
t.Fatalf("requestedReceipts should be cleared after delivery")
}
}