fix: remove partial sink

This commit is contained in:
healthykim 2025-11-23 17:59:03 -05:00
parent 1f32d8959a
commit a62ec06a1d
8 changed files with 94 additions and 300 deletions

View file

@ -307,15 +307,7 @@ func (d *Downloader) concurrentFetch(queue typedQueue) error {
// reschedule the timeout timer. // reschedule the timeout timer.
index, live := ordering[res.Req] index, live := ordering[res.Req]
if live { if live {
req := timeouts.Remove(index) timeouts.Remove(index)
delete(ordering, res.Req)
if res.Partial {
ttl := d.peers.rates.TargetTimeout()
ordering[req] = timeouts.Size()
timeouts.Push(req, -time.Now().Add(ttl).UnixNano())
}
if index == 0 { if index == 0 {
if !timeout.Stop() { if !timeout.Stop() {
<-timeout.C <-timeout.C
@ -325,17 +317,16 @@ func (d *Downloader) concurrentFetch(queue typedQueue) error {
timeout.Reset(time.Until(time.Unix(0, -exp))) timeout.Reset(time.Until(time.Unix(0, -exp)))
} }
} }
delete(ordering, res.Req)
} }
if !res.Partial { // Delete the pending request (if it still exists) and mark the peer idle
// Delete the pending request (if it still exists) and mark the peer idle delete(pending, res.Req.Peer)
delete(pending, res.Req.Peer) delete(stales, res.Req.Peer)
delete(stales, res.Req.Peer)
res.Req.Close()
}
// Signal the dispatcher that the round trip is done. We'll drop the // Signal the dispatcher that the round trip is done. We'll drop the
// peer if the data turns out to be junk. // peer if the data turns out to be junk.
res.Done <- nil res.Done <- nil
res.Req.Close()
// If the peer was previously banned and failed to deliver its pack // If the peer was previously banned and failed to deliver its pack
// in a reasonable time frame, ignore its message. // in a reasonable time frame, ignore its message.

View file

@ -91,7 +91,7 @@ func (q *receiptQueue) deliver(peer *peerConnection, packet *eth.Response) (int,
receipts := *packet.Res.(*eth.ReceiptsRLPResponse) receipts := *packet.Res.(*eth.ReceiptsRLPResponse)
hashes := packet.Meta.([]common.Hash) // {receipt hashes} hashes := packet.Meta.([]common.Hash) // {receipt hashes}
accepted, err := q.queue.DeliverReceipts(peer.id, receipts, hashes, packet.Partial, packet.From) accepted, err := q.queue.DeliverReceipts(peer.id, receipts, hashes)
switch { switch {
case err == nil && len(receipts) == 0: case err == nil && len(receipts) == 0:
peer.log.Trace("Requested receipts delivered") peer.log.Trace("Requested receipts delivered")

View file

@ -629,13 +629,13 @@ func (q *queue) DeliverBodies(id string, txLists [][]*types.Transaction, txListH
result.SetBodyDone() result.SetBodyDone()
} }
return q.deliver(id, q.blockTaskPool, q.blockTaskQueue, q.blockPendPool, return q.deliver(id, q.blockTaskPool, q.blockTaskQueue, q.blockPendPool,
bodyReqTimer, bodyInMeter, bodyDropMeter, len(txLists), validate, reconstruct, false, 0) bodyReqTimer, bodyInMeter, bodyDropMeter, len(txLists), validate, reconstruct)
} }
// DeliverReceipts injects a receipt retrieval response into the results queue. // DeliverReceipts injects a receipt retrieval response into the results queue.
// The method returns the number of transaction receipts accepted from the delivery // The method returns the number of transaction receipts accepted from the delivery
// and also wakes any threads waiting for data delivery. // and also wakes any threads waiting for data delivery.
func (q *queue) DeliverReceipts(id string, receiptList []rlp.RawValue, receiptListHashes []common.Hash, incomplete bool, from int) (int, error) { func (q *queue) DeliverReceipts(id string, receiptList []rlp.RawValue, receiptListHashes []common.Hash) (int, error) {
q.lock.Lock() q.lock.Lock()
defer q.lock.Unlock() defer q.lock.Unlock()
@ -650,7 +650,7 @@ func (q *queue) DeliverReceipts(id string, receiptList []rlp.RawValue, receiptLi
result.SetReceiptsDone() result.SetReceiptsDone()
} }
return q.deliver(id, q.receiptTaskPool, q.receiptTaskQueue, q.receiptPendPool, return q.deliver(id, q.receiptTaskPool, q.receiptTaskQueue, q.receiptPendPool,
receiptReqTimer, receiptInMeter, receiptDropMeter, len(receiptList), validate, reconstruct, incomplete, from) receiptReqTimer, receiptInMeter, receiptDropMeter, len(receiptList), validate, reconstruct)
} }
// deliver injects a data retrieval response into the results queue. // deliver injects a data retrieval response into the results queue.
@ -662,16 +662,14 @@ func (q *queue) deliver(id string, taskPool map[common.Hash]*types.Header,
taskQueue *prque.Prque[int64, *types.Header], pendPool map[string]*fetchRequest, taskQueue *prque.Prque[int64, *types.Header], pendPool map[string]*fetchRequest,
reqTimer *metrics.Timer, resInMeter, resDropMeter *metrics.Meter, reqTimer *metrics.Timer, resInMeter, resDropMeter *metrics.Meter,
results int, validate func(index int, header *types.Header) error, results int, validate func(index int, header *types.Header) error,
reconstruct func(index int, result *fetchResult), incomplete bool, from int) (int, error) { reconstruct func(index int, result *fetchResult)) (int, error) {
// Short circuit if the data was never requested // Short circuit if the data was never requested
request := pendPool[id] request := pendPool[id]
if request == nil { if request == nil {
resDropMeter.Mark(int64(results)) resDropMeter.Mark(int64(results))
return 0, errNoFetchesPending return 0, errNoFetchesPending
} }
if !incomplete { delete(pendPool, id)
delete(pendPool, id)
}
reqTimer.UpdateSince(request.Time) reqTimer.UpdateSince(request.Time)
resInMeter.Mark(int64(results)) resInMeter.Mark(int64(results))
@ -689,7 +687,7 @@ func (q *queue) deliver(id string, taskPool map[common.Hash]*types.Header,
i int i int
hashes []common.Hash hashes []common.Hash
) )
for _, header := range request.Headers[from:] { for _, header := range request.Headers {
// Short circuit assembly if no more fetch results are found // Short circuit assembly if no more fetch results are found
if i >= results { if i >= results {
break break
@ -703,7 +701,7 @@ func (q *queue) deliver(id string, taskPool map[common.Hash]*types.Header,
i++ i++
} }
for _, header := range request.Headers[from : from+i] { for _, header := range request.Headers[:i] {
if res, stale, err := q.resultCache.GetDeliverySlot(header.Number.Uint64()); err == nil && !stale { if res, stale, err := q.resultCache.GetDeliverySlot(header.Number.Uint64()); err == nil && !stale {
reconstruct(accepted, res) reconstruct(accepted, res)
} else { } else {
@ -720,14 +718,8 @@ func (q *queue) deliver(id string, taskPool map[common.Hash]*types.Header,
resDropMeter.Mark(int64(results - accepted)) resDropMeter.Mark(int64(results - accepted))
// Return all failed or missing fetches to the queue // Return all failed or missing fetches to the queue
if incomplete { for _, header := range request.Headers[accepted:] {
for _, header := range request.Headers[from+accepted : from+results] { taskQueue.Push(header, -int64(header.Number.Uint64()))
taskQueue.Push(header, -int64(header.Number.Uint64()))
}
} else {
for _, header := range request.Headers[from+accepted:] {
taskQueue.Push(header, -int64(header.Number.Uint64()))
}
} }
// Wake up Results // Wake up Results
if accepted > 0 { if accepted > 0 {

View file

@ -32,45 +32,32 @@ import (
"github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/log" "github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/params" "github.com/ethereum/go-ethereum/params"
"github.com/ethereum/go-ethereum/rlp"
"github.com/ethereum/go-ethereum/trie" "github.com/ethereum/go-ethereum/trie"
) )
type blockConfig struct {
txPeriod int
txCount int
}
var emptyBlock = blockConfig{txPeriod: 0, txCount: 0}
var defaultBlock = blockConfig{txPeriod: 2, txCount: 1}
// makeChain creates a chain of n blocks starting at and including parent. // makeChain creates a chain of n blocks starting at and including parent.
// The returned hash chain is ordered head->parent. // The returned hash chain is ordered head->parent.
// If empty is false, every second block (i%2==0) contains one transaction. // If empty is false, every second block (i%2==0) contains one transaction.
// If config.txCount > 0, every config.txPeriod-th block contains config.txCount transactions.
// No uncles are added. // No uncles are added.
func makeChain(n int, seed byte, parent *types.Block, config blockConfig) ([]*types.Block, []types.Receipts) { func makeChain(n int, seed byte, parent *types.Block, empty bool) ([]*types.Block, []types.Receipts) {
blocks, receipts := core.GenerateChain(params.TestChainConfig, parent, ethash.NewFaker(), testDB, n, func(i int, block *core.BlockGen) { blocks, receipts := core.GenerateChain(params.TestChainConfig, parent, ethash.NewFaker(), testDB, n, func(i int, block *core.BlockGen) {
block.SetCoinbase(common.Address{seed}) block.SetCoinbase(common.Address{seed})
// Add transactions according to config // Add one tx to every second block
if config.txCount > 0 && i%config.txPeriod == 0 { if !empty && i%2 == 0 {
for range config.txCount { signer := types.MakeSigner(params.TestChainConfig, block.Number(), block.Timestamp())
signer := types.MakeSigner(params.TestChainConfig, block.Number(), block.Timestamp()) tx, err := types.SignTx(types.NewTransaction(block.TxNonce(testAddress), common.Address{seed}, big.NewInt(1000), params.TxGas, block.BaseFee(), nil), signer, testKey)
tx, err := types.SignTx(types.NewTransaction(block.TxNonce(testAddress), common.Address{seed}, big.NewInt(1000), params.TxGas, block.BaseFee(), nil), signer, testKey) if err != nil {
if err != nil { panic(err)
panic(err)
}
block.AddTx(tx)
} }
block.AddTx(tx)
} }
}) })
return blocks, receipts return blocks, receipts
} }
type chainData struct { type chainData struct {
blocks []*types.Block blocks []*types.Block
receipts []types.Receipts offset int
offset int
} }
var chain *chainData var chain *chainData
@ -79,11 +66,11 @@ var emptyChain *chainData
func init() { func init() {
// Create a chain of blocks to import // Create a chain of blocks to import
targetBlocks := 128 targetBlocks := 128
blocks, receipts := makeChain(targetBlocks, 0, testGenesis, defaultBlock) blocks, _ := makeChain(targetBlocks, 0, testGenesis, false)
chain = &chainData{blocks, receipts, 0} chain = &chainData{blocks, 0}
blocks, receipts = makeChain(targetBlocks, 0, testGenesis, emptyBlock) blocks, _ = makeChain(targetBlocks, 0, testGenesis, true)
emptyChain = &chainData{blocks, receipts, 0} emptyChain = &chainData{blocks, 0}
} }
func (chain *chainData) headers() []*types.Header { func (chain *chainData) headers() []*types.Header {
@ -274,149 +261,13 @@ func TestEmptyBlocks(t *testing.T) {
} }
} }
// TestPartialReceiptDelivery checks two points:
// 1. Receipts that fail validation should be re-requested from other peers.
// 2. Partial delivery should not expire.
func TestPartialReceiptDelivery(t *testing.T) {
blocks, receipts := makeChain(64, 0, testGenesis, blockConfig{txPeriod: 1, txCount: 5})
chain := chainData{blocks: blocks, receipts: receipts, offset: 0}
numBlock := len(chain.blocks)
q := newQueue(10, 10)
if !q.Idle() {
t.Errorf("new queue should be idle")
}
q.Prepare(1, SnapSync)
if res := q.Results(false); len(res) != 0 {
t.Fatal("new queue should have 0 results")
}
// Schedule a batch of headers
headers := chain.headers()
hashes := make([]common.Hash, len(headers))
for i, header := range headers {
hashes[i] = header.Hash()
}
q.Schedule(headers, hashes, 1)
peer := dummyPeer("peer-1")
req, _, _ := q.ReserveReceipts(peer, numBlock)
t.Logf("request: length %d", len(req.Headers))
// 1. Deliver a partial receipt: this must not clear the remaining receipts from the pending list
firstCutoff := len(req.Headers) / 3
receiptRLP, rcHashes := getPartialReceiptsDelivery(0, firstCutoff, receipts)
accepted, err := q.DeliverReceipts(peer.id, receiptRLP, rcHashes, true, 0)
if err != nil || accepted != firstCutoff {
t.Fatalf("delivery failed: err %v, accepted %d\n", err, accepted)
}
if pending := q.PendingReceipts(); pending != numBlock-len(req.Headers) {
t.Fatalf("wrong pending receipt length, got %d, exp %d", pending, numBlock-len(req.Headers))
}
for i := range firstCutoff {
headerNumber := req.Headers[i].Number.Uint64()
res, _, _, _, err := q.resultCache.getFetchResult(headerNumber)
if err != nil {
t.Fatalf("fetch result get failed: err %v", err)
}
if res == nil {
t.Fatalf("fetch result is nil: header number %d", headerNumber)
}
if !res.Done(receiptType) {
t.Fatalf("wrong result, block %d receipt not done", headerNumber)
}
}
if flight := q.InFlightReceipts(); !flight {
t.Fatalf("there should be in flight receipts")
}
// 2. Deliver a partial receipt containing an invalid entry: the invalid receipt should be removed from the pending list
secondCutoff := firstCutoff + len(req.Headers)/3
receiptRLP, rcHashes = getPartialReceiptsDelivery(firstCutoff, secondCutoff, receipts)
// one invalid receipt
rcHashes[len(rcHashes)-1] = common.Hash{}
accepted, err = q.DeliverReceipts(peer.id, receiptRLP, rcHashes, true, firstCutoff)
if accepted != len(rcHashes)-1 {
t.Fatalf("wrong accepted, got %d, exp %d", accepted, len(rcHashes)-1)
}
if err == nil {
t.Fatalf("delivery should fail")
}
// The invalid receipt should be returned to the pending pool
if pending := q.PendingReceipts(); pending != numBlock-len(req.Headers)+1 {
t.Fatalf("wrong pending receipt length, got %d, exp %d", pending, numBlock-len(req.Headers))
}
for i := range len(rcHashes) - 1 {
headerNumber := req.Headers[firstCutoff+i].Number.Uint64()
res, _, _, _, err := q.resultCache.getFetchResult(headerNumber)
if err != nil {
t.Fatalf("fetch result get failed: err %v", err)
}
if res == nil {
t.Fatalf("fetch result is nil: header number %d", headerNumber)
}
if !res.Done(receiptType) {
t.Fatalf("wrong result, block %d receipt not done", headerNumber)
}
}
// 3. Deliver the remaining receipts to complete the request
thirdCutoff := len(req.Headers)
receiptRLP, rcHashes = getPartialReceiptsDelivery(secondCutoff, thirdCutoff, receipts)
accepted, err = q.DeliverReceipts(peer.id, receiptRLP, rcHashes, false, secondCutoff)
if accepted != len(rcHashes) {
t.Fatalf("wrong accepted, got %d, exp %d", accepted, len(rcHashes)-1)
}
if err != nil || accepted != thirdCutoff-secondCutoff {
t.Fatalf("delivery failed: err %v, accepted %d\n", err, accepted)
}
for i := range len(rcHashes) {
headerNumber := req.Headers[secondCutoff+i].Number.Uint64()
res, _, _, _, err := q.resultCache.getFetchResult(headerNumber)
if err != nil {
t.Fatalf("fetch result get failed: err %v", err)
}
if res == nil {
t.Fatalf("fetch result is nil: header number %d", headerNumber)
}
if !res.Done(receiptType) {
t.Fatalf("wrong result, block %d receipt not done", headerNumber)
}
}
if q.InFlightReceipts() {
t.Fatal("there shouldn't be any remaning in-flight receipts")
}
}
func getPartialReceiptsDelivery(from int, to int, receipts []types.Receipts) ([]rlp.RawValue, []common.Hash) {
if from < 0 {
from = 0
}
if to > len(receipts) {
to = len(receipts)
}
hasher := trie.NewStackTrie(nil)
rcHashes := make([]common.Hash, to-from)
for i, rc := range receipts[from:to] {
rcHashes[i] = types.DeriveSha(rc, hasher)
}
return types.EncodeBlockReceiptLists(receipts[from:to]), rcHashes
}
// XTestDelivery does some more extensive testing of events that happen, // XTestDelivery does some more extensive testing of events that happen,
// blocks that become known and peers that make reservations and deliveries. // blocks that become known and peers that make reservations and deliveries.
// disabled since it's not really a unit-test, but can be executed to test // disabled since it's not really a unit-test, but can be executed to test
// some more advanced scenarios // some more advanced scenarios
func XTestDelivery(t *testing.T) { func XTestDelivery(t *testing.T) {
// the outside network, holding blocks // the outside network, holding blocks
blo, rec := makeChain(128, 0, testGenesis, defaultBlock) blo, rec := makeChain(128, 0, testGenesis, false)
world := newNetwork() world := newNetwork()
world.receipts = rec world.receipts = rec
world.chain = blo world.chain = blo
@ -517,7 +368,7 @@ func XTestDelivery(t *testing.T) {
for i, receipt := range rcs { for i, receipt := range rcs {
hashes[i] = types.DeriveSha(receipt, hasher) hashes[i] = types.DeriveSha(receipt, hasher)
} }
_, err := q.DeliverReceipts(peer.id, types.EncodeBlockReceiptLists(rcs), hashes, false, 0) _, err := q.DeliverReceipts(peer.id, types.EncodeBlockReceiptLists(rcs), hashes)
if err != nil { if err != nil {
fmt.Printf("delivered %d receipts %v\n", len(rcs), err) fmt.Printf("delivered %d receipts %v\n", len(rcs), err)
} }
@ -593,7 +444,7 @@ func (n *network) progress(numBlocks int) {
n.lock.Lock() n.lock.Lock()
defer n.lock.Unlock() defer n.lock.Unlock()
//fmt.Printf("progressing...\n") //fmt.Printf("progressing...\n")
newBlocks, newR := makeChain(numBlocks, 0, n.chain[len(n.chain)-1], emptyBlock) newBlocks, newR := makeChain(numBlocks, 0, n.chain[len(n.chain)-1], false)
n.chain = append(n.chain, newBlocks...) n.chain = append(n.chain, newBlocks...)
n.receipts = append(n.receipts, newR...) n.receipts = append(n.receipts, newR...)
n.cond.Broadcast() n.cond.Broadcast()

View file

@ -54,8 +54,7 @@ type Request struct {
Peer string // Demultiplexer if cross-peer requests are batched together Peer string // Demultiplexer if cross-peer requests are batched together
Sent time.Time // Timestamp when the request was sent Sent time.Time // Timestamp when the request was sent
reRequest bool continued bool
previous uint64 // id of previous index (to find sink)
} }
// Close aborts an in-flight request. Although there's no way to notify the // Close aborts an in-flight request. Although there's no way to notify the
@ -108,9 +107,6 @@ type Response struct {
Meta interface{} // Metadata generated locally on the receiver thread Meta interface{} // Metadata generated locally on the receiver thread
Time time.Duration // Time it took for the request to be served Time time.Duration // Time it took for the request to be served
Done chan error // Channel to signal message handling to the reader Done chan error // Channel to signal message handling to the reader
From int
Partial bool
} }
// response is a wrapper around a remote Response that has an error channel to // response is a wrapper around a remote Response that has an error channel to
@ -207,17 +203,10 @@ func (p *Peer) dispatcher() {
reqOp.fail <- err reqOp.fail <- err
if err == nil { if err == nil {
// reuse sink if it is re-request // do not overwrite if it is re-request
if req.reRequest { if _, ok := pending[req.id]; !ok {
if _, ok := pending[req.previous]; ok { pending[req.id] = req
req.sink = pending[req.previous].sink
} else {
reqOp.fail <- fmt.Errorf("Cannot find previous request index")
continue
}
delete(pending, req.previous)
} }
pending[req.id] = req
} }
case cancelOp := <-p.reqCancel: case cancelOp := <-p.reqCancel:
@ -231,7 +220,7 @@ func (p *Peer) dispatcher() {
// Stop tracking the request // Stop tracking the request
delete(pending, cancelOp.id) delete(pending, cancelOp.id)
// Not sure if the request is about the receipt, but removing it anyway // Not sure if the request is about the receipt, but remove it anyway
delete(p.receiptBuffer, cancelOp.id) delete(p.receiptBuffer, cancelOp.id)
delete(p.requestedReceipts, cancelOp.id) delete(p.requestedReceipts, cancelOp.id)
@ -264,12 +253,6 @@ func (p *Peer) dispatcher() {
// it can wait for a handler response and dispatch the data. // it can wait for a handler response and dispatch the data.
res.Time = res.recv.Sub(res.Req.Sent) res.Time = res.recv.Sub(res.Req.Sent)
resOp.fail <- nil resOp.fail <- nil
// Stop tracking the request, the response dispatcher will deliver
// For partial response, pending should be removed after re-request
if res.Partial {
delete(pending, res.id)
}
} }
case <-p.term: case <-p.term:

View file

@ -527,16 +527,14 @@ func handleReceipts70(backend Backend, msg Decoder, peer *Peer) error {
return err return err
} }
from, err := peer.ReconstructReceiptsPacket(res) if err := peer.BufferReceiptsPacket(res); err != nil {
if err != nil {
return err return err
} }
if res.LastBlockIncomplete { if res.LastBlockIncomplete {
err := peer.RequestPartialReceipts(res.RequestId) if err := peer.RequestPartialReceipts(res.RequestId); err != nil {
if err != nil {
return err return err
} }
return nil
} }
// Assign buffers shared between list elements // Assign buffers shared between list elements
@ -563,9 +561,6 @@ func handleReceipts70(backend Backend, msg Decoder, peer *Peer) error {
id: res.RequestId, id: res.RequestId,
code: ReceiptsMsg, code: ReceiptsMsg,
Res: &enc, Res: &enc,
From: from,
Partial: res.LastBlockIncomplete,
}, metadata) }, metadata)
} }

View file

@ -44,9 +44,8 @@ const (
) )
type partialReceipt struct { type partialReceipt struct {
idx int // position in original request list []*ReceiptList69 // list of partially collected receipts
list *ReceiptList69 // list of partially collected receipts lastLogSize uint64 // log size of last receipt list
size uint64 // log size of list
} }
// Peer is a collection of relevant information we have about a `eth` peer. // Peer is a collection of relevant information we have about a `eth` peer.
@ -379,9 +378,13 @@ func (p *Peer) RequestReceipts(hashes []common.Hash, sink chan *Response) (*Requ
} }
// HandlePartialReceipts re-request partial receipts // HandlePartialReceipts re-request partial receipts
func (p *Peer) RequestPartialReceipts(previousId uint64) error { func (p *Peer) RequestPartialReceipts(id uint64) error {
split := p.receiptBuffer[previousId].idx if _, ok := p.receiptBuffer[id]; !ok {
id := rand.Uint64() return fmt.Errorf("No partial receipt retreival in progress with id %d", id)
}
lastBlock := len(p.receiptBuffer[id].list) - 1
lastReceipt := len(p.receiptBuffer[id].list[lastBlock].items)
req := &Request{ req := &Request{
id: id, id: id,
@ -390,90 +393,76 @@ func (p *Peer) RequestPartialReceipts(previousId uint64) error {
want: ReceiptsMsg, want: ReceiptsMsg,
data: &GetReceiptsPacket70{ data: &GetReceiptsPacket70{
RequestId: id, RequestId: id,
GetReceiptsRequest: p.requestedReceipts[previousId][split:], GetReceiptsRequest: p.requestedReceipts[id][lastBlock:],
FirstBlockReceiptIndex: uint64(len(p.receiptBuffer[previousId].list.items)), FirstBlockReceiptIndex: uint64(lastReceipt),
}, },
reRequest: true, continued: true,
previous: previousId,
} }
p.receiptBuffer[id] = p.receiptBuffer[previousId]
p.requestedReceipts[id] = p.requestedReceipts[previousId]
delete(p.receiptBuffer, previousId)
delete(p.requestedReceipts, previousId)
return p.dispatchRequest(req) return p.dispatchRequest(req)
} }
// ReconstructReceiptsPacket validates a receipt packet and checks whether a partial request is complete. // BufferReceiptsPacket validates a receipt packet and buffer the incomplete packet.
// It also mutates the packet in place, trimming the partial response or appending previously collected receipts. // If the request is completed, it appends previously collected receipts.
func (p *Peer) ReconstructReceiptsPacket(packet *ReceiptsPacket70) (int, error) { func (p *Peer) BufferReceiptsPacket(packet *ReceiptsPacket70) error {
from := 0
requestId := packet.RequestId requestId := packet.RequestId
if len(packet.List) == 0 { if len(packet.List) == 0 {
return 0, nil return nil
} }
// Process the first block // Buffer the last block when the response is incomplete.
// If the request was partially collected earlier, append the buffered data so this response completes it.
firstReceipt := packet.List[0]
if firstReceipt == nil {
return 0, fmt.Errorf("nil first receipt")
}
if _, ok := p.receiptBuffer[requestId]; ok {
// complete packet (hash validation will be performed later)
firstReceipt.items = append(p.receiptBuffer[requestId].list.items, firstReceipt.items...)
from = p.receiptBuffer[requestId].idx
delete(p.receiptBuffer, requestId)
if !packet.LastBlockIncomplete {
delete(p.requestedReceipts, requestId)
}
}
// Trim and buffer the last block when the response is incomplete.
if packet.LastBlockIncomplete { if packet.LastBlockIncomplete {
lastReceipts := packet.List[len(packet.List)-1] logSize, err := p.validateLastBlockReceipt(packet.List, requestId)
logSize, err := p.validateLastBlockReceipt(lastReceipts, packet.RequestId)
if err != nil { if err != nil {
return 0, err return err
} }
// Update the buffered data and trim the packet to exclude the incomplete block. // Update the buffered data and trim the packet to exclude the incomplete block.
if buffer, ok := p.receiptBuffer[requestId]; ok { if buffer, ok := p.receiptBuffer[requestId]; ok {
buffer.idx = buffer.idx + len(packet.List) - 1 buffer.list = append(buffer.list, packet.List...)
buffer.list.items = append(buffer.list.items, lastReceipts.items...) buffer.lastLogSize = logSize
buffer.size = buffer.size + logSize
} else { } else {
p.receiptBuffer[requestId] = &partialReceipt{ p.receiptBuffer[requestId] = &partialReceipt{
idx: len(packet.List) - 1, list: packet.List,
list: lastReceipts, lastLogSize: logSize,
size: logSize,
} }
} }
packet.List = packet.List[:len(packet.List)-1] return nil
} }
return from, nil // If the request is completed, append previously collected receipts
// to the packet and remove the buffered receipts.
if buffer, ok := p.receiptBuffer[requestId]; ok {
packet.List = append(buffer.list, packet.List...)
delete(p.receiptBuffer, requestId)
delete(p.requestedReceipts, requestId)
}
return nil
} }
// validateLastBlockReceipt validates receipts and return log size of last block receipt // validateLastBlockReceipt validates receipts and return log size of last block receipt.
func (p *Peer) validateLastBlockReceipt(lastReceipts *ReceiptList69, id uint64) (uint64, error) { // This function is called only when the `lastBlockincomplete == true`.
if lastReceipts == nil { // Note that the last receipt response (which completes receiptLists of a pending block) is not verified here.
return 0, fmt.Errorf("nil partial receipt") // Those response doesn't need hueristics below since they can be verified by its trie root.
} func (p *Peer) validateLastBlockReceipt(receiptLists []*ReceiptList69, id uint64) (uint64, error) {
lastReceipts := receiptLists[len(receiptLists)-1]
// If the receipt is in the middle of retreival, use the buffered data.
// e.g. [[receipt1], [receipt1, receipt2], incomplete = true]
// [[receipt3, receipt4], incomplete = true] <<--
// [[receipt5], [receipt1], incomplete = false]
// This case happens only if len(receiptLists) == 1 && incomplete == true && buffered before.
var previousTxs int var previousTxs int
var previousLog uint64 var previousLog uint64
var log uint64 var log uint64
if buffer, ok := p.receiptBuffer[id]; ok { if buffer, ok := p.receiptBuffer[id]; ok && len(receiptLists) == 1 {
previousTxs = len(buffer.list.items) previousTxs = len(buffer.list[len(buffer.list)-1].items)
previousLog = buffer.size previousLog = buffer.lastLogSize
} }
// 1. Verify that the total number of transactions delivered is under the limit. // 1. Verify that the total number of transactions delivered is under the limit.
if uint64(previousTxs+len(lastReceipts.items)) > lastReceipts.items[0].GasUsed/21_000 { if uint64(previousTxs+len(lastReceipts.items)) > params.MaxGasLimit/21_000 {
// should be dropped, don't clear the buffer // should be dropped, don't clear the buffer
return 0, fmt.Errorf("total number of tx exceeded limit") return 0, fmt.Errorf("total number of tx exceeded limit")
} }
@ -490,7 +479,7 @@ func (p *Peer) validateLastBlockReceipt(lastReceipts *ReceiptList69, id uint64)
return 0, fmt.Errorf("total download receipt size exceeded the limit") return 0, fmt.Errorf("total download receipt size exceeded the limit")
} }
return log, nil return previousLog + log, nil
} }
// RequestTxs fetches a batch of transactions from a remote node. // RequestTxs fetches a batch of transactions from a remote node.

View file

@ -154,7 +154,7 @@ func TestPartialReceipt(t *testing.T) {
}, },
}, },
} }
if _, err := peer.ReconstructReceiptsPacket(delivery); err != nil { if err := peer.BufferReceiptsPacket(delivery); err != nil {
t.Fatalf("first ReconstructReceiptsPacket failed: %v", err) t.Fatalf("first ReconstructReceiptsPacket failed: %v", err)
} }
@ -169,19 +169,12 @@ func TestPartialReceipt(t *testing.T) {
t.Fatalf("timeout waiting for re-request packet") t.Fatalf("timeout waiting for re-request packet")
} }
if _, ok := peer.receiptBuffer[req.id]; ok {
t.Fatalf("receiptBuffer has stale request id")
}
if _, ok := peer.requestedReceipts[req.id]; ok {
t.Fatalf("requestedReceipts has stale request id")
}
buffer, ok := peer.receiptBuffer[rereq.RequestId] buffer, ok := peer.receiptBuffer[rereq.RequestId]
if !ok { if !ok {
t.Fatalf("receiptBuffer should buffer incomplete receipts") t.Fatalf("receiptBuffer should buffer incomplete receipts")
} }
if rereq.FirstBlockReceiptIndex != uint64(len(buffer.list.items)) { if rereq.FirstBlockReceiptIndex != uint64(len(buffer.list[len(buffer.list)-1].items)) {
t.Fatalf("unexpected FirstBlockReceiptIndex, got %d want %d", rereq.FirstBlockReceiptIndex, len(buffer.list.items)) t.Fatalf("unexpected FirstBlockReceiptIndex, got %d want %d", rereq.FirstBlockReceiptIndex, len(buffer.list[len(buffer.list)-1].items))
} }
if _, ok := peer.requestedReceipts[rereq.RequestId]; !ok { if _, ok := peer.requestedReceipts[rereq.RequestId]; !ok {
t.Fatalf("requestedReceipts should buffer receipt hashes") t.Fatalf("requestedReceipts should buffer receipt hashes")
@ -208,7 +201,7 @@ func TestPartialReceipt(t *testing.T) {
}, },
}, },
} }
if _, err := peer.ReconstructReceiptsPacket(delivery); err != nil { if err := peer.BufferReceiptsPacket(delivery); err != nil {
t.Fatalf("second ReconstructReceiptsPacket failed: %v", err) t.Fatalf("second ReconstructReceiptsPacket failed: %v", err)
} }
if _, ok := peer.receiptBuffer[rereq.RequestId]; ok { if _, ok := peer.receiptBuffer[rereq.RequestId]; ok {