fix: remove request buffer

This commit is contained in:
healthykim 2025-11-26 23:07:32 +09:00
parent 786f876959
commit 52ede9739f
4 changed files with 101 additions and 68 deletions

View file

@ -222,7 +222,6 @@ func (p *Peer) dispatcher() {
// Not sure if the request is about the receipt, but remove it anyway
delete(p.receiptBuffer, cancelOp.id)
delete(p.requestedReceipts, cancelOp.id)
cancelOp.fail <- nil

View file

@ -527,11 +527,11 @@ func handleReceipts70(backend Backend, msg Decoder, peer *Peer) error {
return err
}
if err := peer.BufferReceiptsPacket(res, backend); err != nil {
if err := peer.bufferReceiptsPacket(res, backend); err != nil {
return err
}
if res.LastBlockIncomplete {
return peer.RequestPartialReceipts(res.RequestId)
return peer.requestPartialReceipts(res.RequestId)
}
// Assign buffers shared between list elements

View file

@ -44,6 +44,7 @@ const (
)
type partialReceipt struct {
request []common.Hash
list []*ReceiptList69 // list of partially collected receipts
lastLogSize uint64 // log size of last receipt list
}
@ -66,8 +67,7 @@ type Peer struct {
reqCancel chan *cancel // Dispatch channel to cancel pending requests and untrack them
resDispatch chan *response // Dispatch channel to fulfil pending requests and untrack them
requestedReceipts map[uint64][]common.Hash // requested receipts list
receiptBuffer map[uint64]*partialReceipt // requestId to receiptlist map
receiptBuffer map[uint64]*partialReceipt // requestId to receiptlist map
term chan struct{} // Termination channel to stop the broadcasters
}
@ -76,20 +76,19 @@ type Peer struct {
// version.
func NewPeer(version uint, p *p2p.Peer, rw p2p.MsgReadWriter, txpool TxPool) *Peer {
peer := &Peer{
id: p.ID().String(),
Peer: p,
rw: rw,
version: version,
knownTxs: newKnownCache(maxKnownTxs),
txBroadcast: make(chan []common.Hash),
txAnnounce: make(chan []common.Hash),
reqDispatch: make(chan *request),
reqCancel: make(chan *cancel),
resDispatch: make(chan *response),
txpool: txpool,
requestedReceipts: make(map[uint64][]common.Hash),
receiptBuffer: make(map[uint64]*partialReceipt),
term: make(chan struct{}),
id: p.ID().String(),
Peer: p,
rw: rw,
version: version,
knownTxs: newKnownCache(maxKnownTxs),
txBroadcast: make(chan []common.Hash),
txAnnounce: make(chan []common.Hash),
reqDispatch: make(chan *request),
reqCancel: make(chan *cancel),
resDispatch: make(chan *response),
txpool: txpool,
receiptBuffer: make(map[uint64]*partialReceipt),
term: make(chan struct{}),
}
// Start up all the broadcasters
go peer.broadcastTransactions()
@ -357,7 +356,7 @@ func (p *Peer) RequestReceipts(hashes []common.Hash, sink chan *Response) (*Requ
FirstBlockReceiptIndex: 0,
},
}
p.requestedReceipts[id] = hashes
p.receiptBuffer[id] = &partialReceipt{request: hashes}
} else {
req = &Request{
id: id,
@ -378,7 +377,7 @@ func (p *Peer) RequestReceipts(hashes []common.Hash, sink chan *Response) (*Requ
}
// HandlePartialReceipts re-request partial receipts
func (p *Peer) RequestPartialReceipts(id uint64) error {
func (p *Peer) requestPartialReceipts(id uint64) error {
if _, ok := p.receiptBuffer[id]; !ok {
return fmt.Errorf("No partial receipt retreival in progress with id %d", id)
}
@ -393,7 +392,7 @@ func (p *Peer) RequestPartialReceipts(id uint64) error {
want: ReceiptsMsg,
data: &GetReceiptsPacket70{
RequestId: id,
GetReceiptsRequest: p.requestedReceipts[id][lastBlock:],
GetReceiptsRequest: p.receiptBuffer[id].request[lastBlock:],
FirstBlockReceiptIndex: uint64(lastReceipt),
},
continued: true,
@ -402,58 +401,56 @@ func (p *Peer) RequestPartialReceipts(id uint64) error {
return p.dispatchRequest(req)
}
// BufferReceiptsPacket validates a receipt packet and buffer the incomplete packet.
// bufferReceiptsPacket validates a receipt packet and buffer the incomplete packet.
// If the request is completed, it appends previously collected receipts.
func (p *Peer) BufferReceiptsPacket(packet *ReceiptsPacket70, backend Backend) error {
func (p *Peer) bufferReceiptsPacket(packet *ReceiptsPacket70, backend Backend) error {
requestId := packet.RequestId
buffer := p.receiptBuffer[requestId]
// Do not assign buffer to the response not requested
if _, ok := p.requestedReceipts[requestId]; !ok {
if buffer == nil {
return fmt.Errorf("No partial receipt retreival in progress with id %d", requestId)
}
if len(packet.List) == 0 {
delete(p.receiptBuffer, requestId)
delete(p.requestedReceipts, requestId)
return nil
}
// Buffer the last block when the response is incomplete.
if packet.LastBlockIncomplete {
lastBlock := len(packet.List) - 1
if _, ok := p.receiptBuffer[requestId]; ok {
lastBlock += len(p.receiptBuffer[requestId].list) - 1
if len(buffer.list) > 0 {
lastBlock += len(buffer.list) - 1
}
header := backend.Chain().GetHeaderByHash(p.requestedReceipts[requestId][lastBlock])
header := backend.Chain().GetHeaderByHash(buffer.request[lastBlock])
logSize, err := p.validateLastBlockReceipt(packet.List, requestId, header)
if err != nil {
return err
}
// Update the buffered data and trim the packet to exclude the incomplete block.
if buffer, ok := p.receiptBuffer[requestId]; ok {
if len(buffer.list) > 0 {
// If the buffer is already allocated, it means that the previous response was incomplete
// Append the first block receipts
buffer.list[len(buffer.list)-1].Append(packet.List[0])
buffer.list = append(buffer.list, packet.List[1:]...)
buffer.lastLogSize = logSize
} else {
p.receiptBuffer[requestId] = &partialReceipt{
list: packet.List,
lastLogSize: logSize,
}
buffer.list = packet.List
buffer.lastLogSize = logSize
}
return nil
}
// Request completed
if buffer, ok := p.receiptBuffer[requestId]; ok {
// If the request is completed, append previously collected receipts
// to the packet and remove the buffered receipts.
packet.List = append(buffer.list, packet.List...)
delete(p.receiptBuffer, requestId)
// If the request is completed, append previously collected receipts
// to the packet and remove the buffered receipts.
if len(buffer.list) > 0 {
buffer.list[len(buffer.list)-1].Append(packet.List[0])
packet.List = packet.List[1:]
}
delete(p.requestedReceipts, requestId)
packet.List = append(buffer.list, packet.List...)
delete(p.receiptBuffer, requestId)
return nil
}
@ -473,7 +470,7 @@ func (p *Peer) validateLastBlockReceipt(receiptLists []*ReceiptList69, id uint64
var previousTxs int
var previousLog uint64
var log uint64
if buffer, ok := p.receiptBuffer[id]; ok && len(receiptLists) == 1 {
if buffer, ok := p.receiptBuffer[id]; ok && len(buffer.list) > 0 && len(receiptLists) == 1 {
previousTxs = len(buffer.list[len(buffer.list)-1].items)
previousLog = buffer.lastLogSize
}

View file

@ -154,27 +154,31 @@ func TestPartialReceipt(t *testing.T) {
t.Fatalf("timeout waiting for request packet")
}
receipts := []Receipt{
{GasUsed: 21_000, Logs: rlp.RawValue(make([]byte, 1))},
}
logReceipts := []Receipt{
{GasUsed: 21_000, Logs: rlp.RawValue(make([]byte, 1))},
{GasUsed: 21_000, Logs: rlp.RawValue(make([]byte, 1))},
{GasUsed: 21_000, Logs: rlp.RawValue(make([]byte, 1))},
}
delivery := &ReceiptsPacket70{
RequestId: req.id,
LastBlockIncomplete: true,
List: []*ReceiptList69{
{
items: []Receipt{
{GasUsed: 21_000, Logs: rlp.RawValue(make([]byte, 1))},
},
items: receipts,
},
{
items: []Receipt{
{GasUsed: 21_000, Logs: rlp.RawValue(make([]byte, 2))},
},
items: receipts,
},
},
}
if err := peer.BufferReceiptsPacket(delivery, backend); err != nil {
if err := peer.bufferReceiptsPacket(delivery, backend); err != nil {
t.Fatalf("first ReconstructReceiptsPacket failed: %v", err)
}
if err := peer.RequestPartialReceipts(req.id); err != nil {
if err := peer.requestPartialReceipts(req.id); err != nil {
t.Fatalf("RequestPartialReceipts failed: %v", err)
}
@ -192,8 +196,39 @@ func TestPartialReceipt(t *testing.T) {
if rereq.FirstBlockReceiptIndex != uint64(len(buffer.list[len(buffer.list)-1].items)) {
t.Fatalf("unexpected FirstBlockReceiptIndex, got %d want %d", rereq.FirstBlockReceiptIndex, len(buffer.list[len(buffer.list)-1].items))
}
if _, ok := peer.requestedReceipts[rereq.RequestId]; !ok {
t.Fatalf("requestedReceipts should buffer receipt hashes")
delivery = &ReceiptsPacket70{
RequestId: req.id,
LastBlockIncomplete: true,
List: []*ReceiptList69{
{
items: receipts,
},
},
}
if err := peer.bufferReceiptsPacket(delivery, backend); err != nil {
t.Fatalf("first ReconstructReceiptsPacket failed: %v", err)
}
if err := peer.requestPartialReceipts(req.id); err != nil {
t.Fatalf("RequestPartialReceipts failed: %v", err)
}
select {
case rereq = <-packetCh:
case <-time.After(2 * time.Second):
t.Fatalf("timeout waiting for re-request packet")
}
buffer, ok = peer.receiptBuffer[rereq.RequestId]
if !ok {
t.Fatalf("receiptBuffer should buffer incomplete receipts")
}
if rereq.FirstBlockReceiptIndex != uint64(len(buffer.list[len(buffer.list)-1].items)) {
t.Fatalf("unexpected FirstBlockReceiptIndex, got %d want %d", rereq.FirstBlockReceiptIndex, len(buffer.list[len(buffer.list)-1].items))
}
if len(rereq.GetReceiptsRequest) != 3 {
t.Fatalf("wrong partial request range, got %d want %d", len(rereq.GetReceiptsRequest), 3)
}
delivery = &ReceiptsPacket70{
@ -201,30 +236,32 @@ func TestPartialReceipt(t *testing.T) {
LastBlockIncomplete: false,
List: []*ReceiptList69{
{
items: []Receipt{
{GasUsed: 21_000, Logs: rlp.RawValue(make([]byte, 1))},
},
items: receipts,
},
{
items: []Receipt{
{GasUsed: 21_000, Logs: rlp.RawValue(make([]byte, 1))},
},
items: receipts,
},
{
items: []Receipt{
{GasUsed: 21_000, Logs: rlp.RawValue(make([]byte, 1))},
},
items: receipts,
},
},
}
if err := peer.BufferReceiptsPacket(delivery, backend); err != nil {
if err := peer.bufferReceiptsPacket(delivery, backend); err != nil {
t.Fatalf("second ReconstructReceiptsPacket failed: %v", err)
}
if _, ok := peer.receiptBuffer[rereq.RequestId]; ok {
t.Fatalf("receiptBuffer should be cleared after delivery")
}
if _, ok := peer.requestedReceipts[rereq.RequestId]; ok {
t.Fatalf("requestedReceipts should be cleared after delivery")
for i, list := range delivery.List {
if i == 1 {
if len(list.items) != len(logReceipts) {
t.Fatalf("wrong response buffering, got %d want %d", len(list.items), len(logReceipts))
}
} else {
if len(list.items) != len(receipts) {
t.Fatalf("wrong response buffering, got %d want %d", len(list.items), len(receipts))
}
}
}
}
@ -286,7 +323,7 @@ func TestPartialReceiptFailure(t *testing.T) {
},
},
}
err := peer.BufferReceiptsPacket(delivery, backend)
err := peer.bufferReceiptsPacket(delivery, backend)
if err == nil {
t.Fatal("Unknown response should be dropped")
}
@ -324,7 +361,7 @@ func TestPartialReceiptFailure(t *testing.T) {
for range maxTxCount {
delivery.List[0].items = append(delivery.List[0].items, Receipt{Logs: rlp.RawValue(make([]byte, 1))})
}
err = peer.BufferReceiptsPacket(delivery, backend)
err = peer.bufferReceiptsPacket(delivery, backend)
if err == nil {
t.Fatal("Response with the excessive number of receipts should fail the validation")
}
@ -349,7 +386,7 @@ func TestPartialReceiptFailure(t *testing.T) {
},
}},
}
err = peer.BufferReceiptsPacket(delivery, backend)
err = peer.bufferReceiptsPacket(delivery, backend)
if err == nil {
t.Fatal("Response with the excessive number of receipts should fail the validation")
}
@ -374,7 +411,7 @@ func TestPartialReceiptFailure(t *testing.T) {
},
}},
}
err = peer.BufferReceiptsPacket(delivery, backend)
err = peer.bufferReceiptsPacket(delivery, backend)
if err == nil {
t.Fatal("Response with the excessive number of receipts should fail the validation")
}