From 52ede9739f07d8de73b8c1c368f675558d035e4b Mon Sep 17 00:00:00 2001 From: healthykim Date: Wed, 26 Nov 2025 23:07:32 +0900 Subject: [PATCH] fix: remove request buffer --- eth/protocols/eth/dispatcher.go | 1 - eth/protocols/eth/handlers.go | 4 +- eth/protocols/eth/peer.go | 75 +++++++++++++-------------- eth/protocols/eth/peer_test.go | 89 +++++++++++++++++++++++---------- 4 files changed, 101 insertions(+), 68 deletions(-) diff --git a/eth/protocols/eth/dispatcher.go b/eth/protocols/eth/dispatcher.go index 8cc41e0904..7a97a7a7ca 100644 --- a/eth/protocols/eth/dispatcher.go +++ b/eth/protocols/eth/dispatcher.go @@ -222,7 +222,6 @@ func (p *Peer) dispatcher() { // Not sure if the request is about the receipt, but remove it anyway delete(p.receiptBuffer, cancelOp.id) - delete(p.requestedReceipts, cancelOp.id) cancelOp.fail <- nil diff --git a/eth/protocols/eth/handlers.go b/eth/protocols/eth/handlers.go index abf29d9f38..37bd62d03a 100644 --- a/eth/protocols/eth/handlers.go +++ b/eth/protocols/eth/handlers.go @@ -527,11 +527,11 @@ func handleReceipts70(backend Backend, msg Decoder, peer *Peer) error { return err } - if err := peer.BufferReceiptsPacket(res, backend); err != nil { + if err := peer.bufferReceiptsPacket(res, backend); err != nil { return err } if res.LastBlockIncomplete { - return peer.RequestPartialReceipts(res.RequestId) + return peer.requestPartialReceipts(res.RequestId) } // Assign buffers shared between list elements diff --git a/eth/protocols/eth/peer.go b/eth/protocols/eth/peer.go index 755c1d8290..7b5ff3065b 100644 --- a/eth/protocols/eth/peer.go +++ b/eth/protocols/eth/peer.go @@ -44,6 +44,7 @@ const ( ) type partialReceipt struct { + request []common.Hash list []*ReceiptList69 // list of partially collected receipts lastLogSize uint64 // log size of last receipt list } @@ -66,8 +67,7 @@ type Peer struct { reqCancel chan *cancel // Dispatch channel to cancel pending requests and untrack them resDispatch chan *response // Dispatch channel to fulfil pending requests and untrack them - requestedReceipts map[uint64][]common.Hash // requested receipts list - receiptBuffer map[uint64]*partialReceipt // requestId to receiptlist map + receiptBuffer map[uint64]*partialReceipt // requestId to receiptlist map term chan struct{} // Termination channel to stop the broadcasters } @@ -76,20 +76,19 @@ type Peer struct { // version. func NewPeer(version uint, p *p2p.Peer, rw p2p.MsgReadWriter, txpool TxPool) *Peer { peer := &Peer{ - id: p.ID().String(), - Peer: p, - rw: rw, - version: version, - knownTxs: newKnownCache(maxKnownTxs), - txBroadcast: make(chan []common.Hash), - txAnnounce: make(chan []common.Hash), - reqDispatch: make(chan *request), - reqCancel: make(chan *cancel), - resDispatch: make(chan *response), - txpool: txpool, - requestedReceipts: make(map[uint64][]common.Hash), - receiptBuffer: make(map[uint64]*partialReceipt), - term: make(chan struct{}), + id: p.ID().String(), + Peer: p, + rw: rw, + version: version, + knownTxs: newKnownCache(maxKnownTxs), + txBroadcast: make(chan []common.Hash), + txAnnounce: make(chan []common.Hash), + reqDispatch: make(chan *request), + reqCancel: make(chan *cancel), + resDispatch: make(chan *response), + txpool: txpool, + receiptBuffer: make(map[uint64]*partialReceipt), + term: make(chan struct{}), } // Start up all the broadcasters go peer.broadcastTransactions() @@ -357,7 +356,7 @@ func (p *Peer) RequestReceipts(hashes []common.Hash, sink chan *Response) (*Requ FirstBlockReceiptIndex: 0, }, } - p.requestedReceipts[id] = hashes + p.receiptBuffer[id] = &partialReceipt{request: hashes} } else { req = &Request{ id: id, @@ -378,7 +377,7 @@ func (p *Peer) RequestReceipts(hashes []common.Hash, sink chan *Response) (*Requ } // HandlePartialReceipts re-request partial receipts -func (p *Peer) RequestPartialReceipts(id uint64) error { +func (p *Peer) requestPartialReceipts(id uint64) error { if _, ok := p.receiptBuffer[id]; !ok { return fmt.Errorf("No partial receipt retreival in progress with id %d", id) } @@ -393,7 +392,7 @@ func (p *Peer) RequestPartialReceipts(id uint64) error { want: ReceiptsMsg, data: &GetReceiptsPacket70{ RequestId: id, - GetReceiptsRequest: p.requestedReceipts[id][lastBlock:], + GetReceiptsRequest: p.receiptBuffer[id].request[lastBlock:], FirstBlockReceiptIndex: uint64(lastReceipt), }, continued: true, @@ -402,58 +401,56 @@ func (p *Peer) RequestPartialReceipts(id uint64) error { return p.dispatchRequest(req) } -// BufferReceiptsPacket validates a receipt packet and buffer the incomplete packet. +// bufferReceiptsPacket validates a receipt packet and buffer the incomplete packet. // If the request is completed, it appends previously collected receipts. -func (p *Peer) BufferReceiptsPacket(packet *ReceiptsPacket70, backend Backend) error { +func (p *Peer) bufferReceiptsPacket(packet *ReceiptsPacket70, backend Backend) error { requestId := packet.RequestId + buffer := p.receiptBuffer[requestId] // Do not assign buffer to the response not requested - if _, ok := p.requestedReceipts[requestId]; !ok { + if buffer == nil { return fmt.Errorf("No partial receipt retreival in progress with id %d", requestId) } if len(packet.List) == 0 { delete(p.receiptBuffer, requestId) - delete(p.requestedReceipts, requestId) return nil } // Buffer the last block when the response is incomplete. if packet.LastBlockIncomplete { lastBlock := len(packet.List) - 1 - if _, ok := p.receiptBuffer[requestId]; ok { - lastBlock += len(p.receiptBuffer[requestId].list) - 1 + if len(buffer.list) > 0 { + lastBlock += len(buffer.list) - 1 } - header := backend.Chain().GetHeaderByHash(p.requestedReceipts[requestId][lastBlock]) + header := backend.Chain().GetHeaderByHash(buffer.request[lastBlock]) logSize, err := p.validateLastBlockReceipt(packet.List, requestId, header) if err != nil { return err } // Update the buffered data and trim the packet to exclude the incomplete block. - if buffer, ok := p.receiptBuffer[requestId]; ok { + if len(buffer.list) > 0 { // If the buffer is already allocated, it means that the previous response was incomplete // Append the first block receipts buffer.list[len(buffer.list)-1].Append(packet.List[0]) buffer.list = append(buffer.list, packet.List[1:]...) buffer.lastLogSize = logSize } else { - p.receiptBuffer[requestId] = &partialReceipt{ - list: packet.List, - lastLogSize: logSize, - } + buffer.list = packet.List + buffer.lastLogSize = logSize } return nil } - // Request completed - if buffer, ok := p.receiptBuffer[requestId]; ok { - // If the request is completed, append previously collected receipts - // to the packet and remove the buffered receipts. - packet.List = append(buffer.list, packet.List...) - delete(p.receiptBuffer, requestId) + // If the request is completed, append previously collected receipts + // to the packet and remove the buffered receipts. + if len(buffer.list) > 0 { + buffer.list[len(buffer.list)-1].Append(packet.List[0]) + packet.List = packet.List[1:] } - delete(p.requestedReceipts, requestId) + packet.List = append(buffer.list, packet.List...) + delete(p.receiptBuffer, requestId) return nil } @@ -473,7 +470,7 @@ func (p *Peer) validateLastBlockReceipt(receiptLists []*ReceiptList69, id uint64 var previousTxs int var previousLog uint64 var log uint64 - if buffer, ok := p.receiptBuffer[id]; ok && len(receiptLists) == 1 { + if buffer, ok := p.receiptBuffer[id]; ok && len(buffer.list) > 0 && len(receiptLists) == 1 { previousTxs = len(buffer.list[len(buffer.list)-1].items) previousLog = buffer.lastLogSize } diff --git a/eth/protocols/eth/peer_test.go b/eth/protocols/eth/peer_test.go index 90e0a0df78..bb5744ae01 100644 --- a/eth/protocols/eth/peer_test.go +++ b/eth/protocols/eth/peer_test.go @@ -154,27 +154,31 @@ func TestPartialReceipt(t *testing.T) { t.Fatalf("timeout waiting for request packet") } + receipts := []Receipt{ + {GasUsed: 21_000, Logs: rlp.RawValue(make([]byte, 1))}, + } + logReceipts := []Receipt{ + {GasUsed: 21_000, Logs: rlp.RawValue(make([]byte, 1))}, + {GasUsed: 21_000, Logs: rlp.RawValue(make([]byte, 1))}, + {GasUsed: 21_000, Logs: rlp.RawValue(make([]byte, 1))}, + } delivery := &ReceiptsPacket70{ RequestId: req.id, LastBlockIncomplete: true, List: []*ReceiptList69{ { - items: []Receipt{ - {GasUsed: 21_000, Logs: rlp.RawValue(make([]byte, 1))}, - }, + items: receipts, }, { - items: []Receipt{ - {GasUsed: 21_000, Logs: rlp.RawValue(make([]byte, 2))}, - }, + items: receipts, }, }, } - if err := peer.BufferReceiptsPacket(delivery, backend); err != nil { + if err := peer.bufferReceiptsPacket(delivery, backend); err != nil { t.Fatalf("first ReconstructReceiptsPacket failed: %v", err) } - if err := peer.RequestPartialReceipts(req.id); err != nil { + if err := peer.requestPartialReceipts(req.id); err != nil { t.Fatalf("RequestPartialReceipts failed: %v", err) } @@ -192,8 +196,39 @@ func TestPartialReceipt(t *testing.T) { if rereq.FirstBlockReceiptIndex != uint64(len(buffer.list[len(buffer.list)-1].items)) { t.Fatalf("unexpected FirstBlockReceiptIndex, got %d want %d", rereq.FirstBlockReceiptIndex, len(buffer.list[len(buffer.list)-1].items)) } - if _, ok := peer.requestedReceipts[rereq.RequestId]; !ok { - t.Fatalf("requestedReceipts should buffer receipt hashes") + + delivery = &ReceiptsPacket70{ + RequestId: req.id, + LastBlockIncomplete: true, + List: []*ReceiptList69{ + { + items: receipts, + }, + }, + } + if err := peer.bufferReceiptsPacket(delivery, backend); err != nil { + t.Fatalf("first ReconstructReceiptsPacket failed: %v", err) + } + + if err := peer.requestPartialReceipts(req.id); err != nil { + t.Fatalf("RequestPartialReceipts failed: %v", err) + } + + select { + case rereq = <-packetCh: + case <-time.After(2 * time.Second): + t.Fatalf("timeout waiting for re-request packet") + } + + buffer, ok = peer.receiptBuffer[rereq.RequestId] + if !ok { + t.Fatalf("receiptBuffer should buffer incomplete receipts") + } + if rereq.FirstBlockReceiptIndex != uint64(len(buffer.list[len(buffer.list)-1].items)) { + t.Fatalf("unexpected FirstBlockReceiptIndex, got %d want %d", rereq.FirstBlockReceiptIndex, len(buffer.list[len(buffer.list)-1].items)) + } + if len(rereq.GetReceiptsRequest) != 3 { + t.Fatalf("wrong partial request range, got %d want %d", len(rereq.GetReceiptsRequest), 3) } delivery = &ReceiptsPacket70{ @@ -201,30 +236,32 @@ func TestPartialReceipt(t *testing.T) { LastBlockIncomplete: false, List: []*ReceiptList69{ { - items: []Receipt{ - {GasUsed: 21_000, Logs: rlp.RawValue(make([]byte, 1))}, - }, + items: receipts, }, { - items: []Receipt{ - {GasUsed: 21_000, Logs: rlp.RawValue(make([]byte, 1))}, - }, + items: receipts, }, { - items: []Receipt{ - {GasUsed: 21_000, Logs: rlp.RawValue(make([]byte, 1))}, - }, + items: receipts, }, }, } - if err := peer.BufferReceiptsPacket(delivery, backend); err != nil { + if err := peer.bufferReceiptsPacket(delivery, backend); err != nil { t.Fatalf("second ReconstructReceiptsPacket failed: %v", err) } if _, ok := peer.receiptBuffer[rereq.RequestId]; ok { t.Fatalf("receiptBuffer should be cleared after delivery") } - if _, ok := peer.requestedReceipts[rereq.RequestId]; ok { - t.Fatalf("requestedReceipts should be cleared after delivery") + for i, list := range delivery.List { + if i == 1 { + if len(list.items) != len(logReceipts) { + t.Fatalf("wrong response buffering, got %d want %d", len(list.items), len(logReceipts)) + } + } else { + if len(list.items) != len(receipts) { + t.Fatalf("wrong response buffering, got %d want %d", len(list.items), len(receipts)) + } + } } } @@ -286,7 +323,7 @@ func TestPartialReceiptFailure(t *testing.T) { }, }, } - err := peer.BufferReceiptsPacket(delivery, backend) + err := peer.bufferReceiptsPacket(delivery, backend) if err == nil { t.Fatal("Unknown response should be dropped") } @@ -324,7 +361,7 @@ func TestPartialReceiptFailure(t *testing.T) { for range maxTxCount { delivery.List[0].items = append(delivery.List[0].items, Receipt{Logs: rlp.RawValue(make([]byte, 1))}) } - err = peer.BufferReceiptsPacket(delivery, backend) + err = peer.bufferReceiptsPacket(delivery, backend) if err == nil { t.Fatal("Response with the excessive number of receipts should fail the validation") } @@ -349,7 +386,7 @@ func TestPartialReceiptFailure(t *testing.T) { }, }}, } - err = peer.BufferReceiptsPacket(delivery, backend) + err = peer.bufferReceiptsPacket(delivery, backend) if err == nil { t.Fatal("Response with the excessive number of receipts should fail the validation") } @@ -374,7 +411,7 @@ func TestPartialReceiptFailure(t *testing.T) { }, }}, } - err = peer.BufferReceiptsPacket(delivery, backend) + err = peer.bufferReceiptsPacket(delivery, backend) if err == nil { t.Fatal("Response with the excessive number of receipts should fail the validation") }