buffer gasUsed in receiptRequest

This commit is contained in:
healthykim 2026-02-24 17:34:46 +09:00
parent 9a5f255c27
commit be6aa56569
6 changed files with 36 additions and 19 deletions

View file

@ -260,7 +260,7 @@ func (dlp *downloadTesterPeer) RequestBodies(hashes []common.Hash, sink chan *et
// RequestReceipts constructs a getReceipts method associated with a particular // RequestReceipts constructs a getReceipts method associated with a particular
// peer in the download tester. The returned function can be used to retrieve // peer in the download tester. The returned function can be used to retrieve
// batches of block receipts from the particularly requested peer. // batches of block receipts from the particularly requested peer.
func (dlp *downloadTesterPeer) RequestReceipts(hashes []common.Hash, sink chan *eth.Response) (*eth.Request, error) { func (dlp *downloadTesterPeer) RequestReceipts(hashes []common.Hash, gasUsed []uint64, sink chan *eth.Response) (*eth.Request, error) {
blobs := eth.ServiceGetReceiptsQuery68(dlp.chain, hashes) blobs := eth.ServiceGetReceiptsQuery68(dlp.chain, hashes)
receipts := make([]types.Receipts, len(blobs)) receipts := make([]types.Receipts, len(blobs))

View file

@ -78,11 +78,15 @@ func (q *receiptQueue) request(peer *peerConnection, req *fetchRequest, resCh ch
if q.receiptFetchHook != nil { if q.receiptFetchHook != nil {
q.receiptFetchHook(req.Headers) q.receiptFetchHook(req.Headers)
} }
hashes := make([]common.Hash, 0, len(req.Headers)) var (
gasUsed = make([]uint64, 0, len(req.Headers))
hashes = make([]common.Hash, 0, len(req.Headers))
)
for _, header := range req.Headers { for _, header := range req.Headers {
hashes = append(hashes, header.Hash()) hashes = append(hashes, header.Hash())
gasUsed = append(gasUsed, header.GasUsed)
} }
return peer.peer.RequestReceipts(hashes, resCh) return peer.peer.RequestReceipts(hashes, gasUsed, resCh)
} }
// deliver is responsible for taking a generic response packet from the concurrent // deliver is responsible for taking a generic response packet from the concurrent

View file

@ -60,7 +60,7 @@ type Peer interface {
RequestHeadersByNumber(uint64, int, int, bool, chan *eth.Response) (*eth.Request, error) RequestHeadersByNumber(uint64, int, int, bool, chan *eth.Response) (*eth.Request, error)
RequestBodies([]common.Hash, chan *eth.Response) (*eth.Request, error) RequestBodies([]common.Hash, chan *eth.Response) (*eth.Request, error)
RequestReceipts([]common.Hash, chan *eth.Response) (*eth.Request, error) RequestReceipts([]common.Hash, []uint64, chan *eth.Response) (*eth.Request, error)
} }
// newPeerConnection creates a new downloader peer. // newPeerConnection creates a new downloader peer.

View file

@ -208,7 +208,7 @@ func (p *skeletonTestPeer) RequestBodies([]common.Hash, chan *eth.Response) (*et
panic("skeleton sync must not request block bodies") panic("skeleton sync must not request block bodies")
} }
func (p *skeletonTestPeer) RequestReceipts([]common.Hash, chan *eth.Response) (*eth.Request, error) { func (p *skeletonTestPeer) RequestReceipts([]common.Hash, []uint64, chan *eth.Response) (*eth.Request, error) {
panic("skeleton sync must not request receipts") panic("skeleton sync must not request receipts")
} }

View file

@ -50,6 +50,7 @@ const (
// receiptRequest tracks the state of an in-flight receipt retrieval operation. // receiptRequest tracks the state of an in-flight receipt retrieval operation.
type receiptRequest struct { type receiptRequest struct {
request []common.Hash // block hashes corresponding to the requested receipts request []common.Hash // block hashes corresponding to the requested receipts
gasUsed []uint64 // block gas used corresponding to the requested receipts
list []*ReceiptList69 // list of partially collected receipts list []*ReceiptList69 // list of partially collected receipts
lastLogSize uint64 // log size of last receipt list lastLogSize uint64 // log size of last receipt list
} }
@ -354,7 +355,7 @@ func (p *Peer) RequestBodies(hashes []common.Hash, sink chan *Response) (*Reques
} }
// RequestReceipts fetches a batch of transaction receipts from a remote node. // RequestReceipts fetches a batch of transaction receipts from a remote node.
func (p *Peer) RequestReceipts(hashes []common.Hash, sink chan *Response) (*Request, error) { func (p *Peer) RequestReceipts(hashes []common.Hash, gasUsed []uint64, sink chan *Response) (*Request, error) {
p.Log().Debug("Fetching batch of receipts", "count", len(hashes)) p.Log().Debug("Fetching batch of receipts", "count", len(hashes))
id := rand.Uint64() id := rand.Uint64()
@ -375,6 +376,7 @@ func (p *Peer) RequestReceipts(hashes []common.Hash, sink chan *Response) (*Requ
p.receiptBufferLock.Lock() p.receiptBufferLock.Lock()
p.receiptBuffer[id] = &receiptRequest{ p.receiptBuffer[id] = &receiptRequest{
request: hashes, request: hashes,
gasUsed: gasUsed,
} }
p.receiptBufferLock.Unlock() p.receiptBufferLock.Unlock()
} else { } else {
@ -453,11 +455,8 @@ func (p *Peer) bufferReceipts(requestId uint64, receiptLists []*ReceiptList69, l
if len(buffer.list) > 0 { if len(buffer.list) > 0 {
lastBlock += len(buffer.list) - 1 lastBlock += len(buffer.list) - 1
} }
header := backend.Chain().GetHeaderByHash(buffer.request[lastBlock]) gasUsed := buffer.gasUsed[lastBlock]
if header == nil { logSize, err := p.validateLastBlockReceipt(receiptLists, requestId, gasUsed)
return fmt.Errorf("unknown block #%d for receipt retrieval", lastBlock)
}
logSize, err := p.validateLastBlockReceipt(receiptLists, requestId, header)
if err != nil { if err != nil {
return err return err
} }
@ -501,9 +500,11 @@ func (p *Peer) flushReceipts(requestId uint64) []*ReceiptList69 {
// validateLastBlockReceipt validates receipts and return log size of last block receipt. // validateLastBlockReceipt validates receipts and return log size of last block receipt.
// This function is called only when the `lastBlockincomplete == true`. // This function is called only when the `lastBlockincomplete == true`.
// Note that the last receipt response (which completes receiptLists of a pending block) is not verified here. //
// Those response doesn't need hueristics below since they can be verified by its trie root. // Note that the last receipt response (which completes receiptLists of a pending block)
func (p *Peer) validateLastBlockReceipt(receiptLists []*ReceiptList69, id uint64, header *types.Header) (uint64, error) { // is not verified here. Those response doesn't need hueristics below since they can be
// verified by its trie root.
func (p *Peer) validateLastBlockReceipt(receiptLists []*ReceiptList69, id uint64, gasUsed uint64) (uint64, error) {
lastReceipts := receiptLists[len(receiptLists)-1] lastReceipts := receiptLists[len(receiptLists)-1]
// If the receipt is in the middle of retrieval, use the buffered data. // If the receipt is in the middle of retrieval, use the buffered data.
@ -520,7 +521,7 @@ func (p *Peer) validateLastBlockReceipt(receiptLists []*ReceiptList69, id uint64
} }
// Verify that the total number of transactions delivered is under the limit. // Verify that the total number of transactions delivered is under the limit.
if uint64(previousTxs+lastReceipts.items.Len()) > header.GasUsed/21_000 { if uint64(previousTxs+lastReceipts.items.Len()) > gasUsed/21_000 {
// should be dropped, don't clear the buffer // should be dropped, don't clear the buffer
return 0, fmt.Errorf("total number of tx exceeded limit") return 0, fmt.Errorf("total number of tx exceeded limit")
} }
@ -533,7 +534,7 @@ func (p *Peer) validateLastBlockReceipt(receiptLists []*ReceiptList69, id uint64
log += uint64(len(rc.Logs)) log += uint64(len(rc.Logs))
} }
// Verify that the overall downloaded receipt size does not exceed the block gas limit. // Verify that the overall downloaded receipt size does not exceed the block gas limit.
if previousLog+log > header.GasUsed/params.LogDataGas { if previousLog+log > gasUsed/params.LogDataGas {
return 0, fmt.Errorf("total download receipt size exceeded the limit") return 0, fmt.Errorf("total download receipt size exceeded the limit")
} }
return previousLog + log, nil return previousLog + log, nil

View file

@ -143,9 +143,15 @@ func TestPartialReceipt(t *testing.T) {
backend.chain.GetBlockByNumber(3).Hash(), backend.chain.GetBlockByNumber(3).Hash(),
backend.chain.GetBlockByNumber(4).Hash(), backend.chain.GetBlockByNumber(4).Hash(),
} }
gasUsed := []uint64{
backend.chain.GetBlockByNumber(1).GasUsed(),
backend.chain.GetBlockByNumber(2).GasUsed(),
backend.chain.GetBlockByNumber(3).GasUsed(),
backend.chain.GetBlockByNumber(4).GasUsed(),
}
sink := make(chan *Response, 1) sink := make(chan *Response, 1)
req, err := peer.RequestReceipts(hashes, sink) req, err := peer.RequestReceipts(hashes, gasUsed, sink)
if err != nil { if err != nil {
t.Fatalf("RequestReceipts failed: %v", err) t.Fatalf("RequestReceipts failed: %v", err)
} }
@ -359,10 +365,16 @@ func TestPartialReceiptFailure(t *testing.T) {
backend.chain.GetBlockByNumber(3).Hash(), backend.chain.GetBlockByNumber(3).Hash(),
backend.chain.GetBlockByNumber(4).Hash(), backend.chain.GetBlockByNumber(4).Hash(),
} }
gasUsed := []uint64{
backend.chain.GetBlockByNumber(1).GasUsed(),
backend.chain.GetBlockByNumber(2).GasUsed(),
backend.chain.GetBlockByNumber(3).GasUsed(),
backend.chain.GetBlockByNumber(4).GasUsed(),
}
// Case 1 ) The number of receipts exceeds maximum tx count // Case 1 ) The number of receipts exceeds maximum tx count
sink := make(chan *Response, 1) sink := make(chan *Response, 1)
req, err := peer.RequestReceipts(hashes, sink) req, err := peer.RequestReceipts(hashes, gasUsed, sink)
if err != nil { if err != nil {
t.Fatalf("RequestReceipts failed: %v", err) t.Fatalf("RequestReceipts failed: %v", err)
} }
@ -391,7 +403,7 @@ func TestPartialReceiptFailure(t *testing.T) {
} }
// Case 2 ) Total receipt size exceeds the block gas limit // Case 2 ) Total receipt size exceeds the block gas limit
req, err = peer.RequestReceipts(hashes, sink) req, err = peer.RequestReceipts(hashes, gasUsed, sink)
if err != nil { if err != nil {
t.Fatalf("RequestReceipts failed: %v", err) t.Fatalf("RequestReceipts failed: %v", err)
} }