diff --git a/eth/protocols/eth/dispatcher.go b/eth/protocols/eth/dispatcher.go index 95f897ef33..c81c43edeb 100644 --- a/eth/protocols/eth/dispatcher.go +++ b/eth/protocols/eth/dispatcher.go @@ -219,7 +219,9 @@ func (p *Peer) dispatcher() { delete(pending, cancelOp.id) // Not sure if the request is about the receipt, but remove it anyway + p.receiptBufferLock.Lock() delete(p.receiptBuffer, cancelOp.id) + p.receiptBufferLock.Unlock() cancelOp.fail <- nil diff --git a/eth/protocols/eth/handlers.go b/eth/protocols/eth/handlers.go index 0551b8e169..723d6244f7 100644 --- a/eth/protocols/eth/handlers.go +++ b/eth/protocols/eth/handlers.go @@ -526,20 +526,17 @@ func handleReceipts70(backend Backend, msg Decoder, peer *Peer) error { if err := msg.Decode(res); err != nil { return err } - if err := peer.bufferReceiptsPacket(res, backend); err != nil { return err } if res.LastBlockIncomplete { return peer.requestPartialReceipts(res.RequestId) } - // Assign buffers shared between list elements buffers := new(receiptListBuffers) for i := range res.List { res.List[i].setBuffers(buffers) } - metadata := func() interface{} { hasher := trie.NewStackTrie(nil) hashes := make([]common.Hash, len(res.List)) @@ -548,12 +545,10 @@ func handleReceipts70(backend Backend, msg Decoder, peer *Peer) error { } return hashes } - var enc ReceiptsRLPResponse for i := range res.List { enc = append(enc, res.List[i].EncodeForStorage()) } - return peer.dispatchResponse(&Response{ id: res.RequestId, code: ReceiptsMsg, diff --git a/eth/protocols/eth/peer.go b/eth/protocols/eth/peer.go index 5566a01e0a..e5a6edcccd 100644 --- a/eth/protocols/eth/peer.go +++ b/eth/protocols/eth/peer.go @@ -17,8 +17,10 @@ package eth import ( + "errors" "fmt" "math/rand" + "sync" "sync/atomic" mapset "github.com/deckarep/golang-set/v2" @@ -43,8 +45,9 @@ const ( maxQueuedTxAnns = 4096 ) +// receiptRequest tracks the state of an in-flight receipt retrieval operation. type receiptRequest struct { - request []common.Hash + request []common.Hash // block hashes corresponding to the requested receipts list []*ReceiptList69 // list of partially collected receipts lastLogSize uint64 // log size of last receipt list } @@ -67,7 +70,8 @@ type Peer struct { reqCancel chan *cancel // Dispatch channel to cancel pending requests and untrack them resDispatch chan *response // Dispatch channel to fulfil pending requests and untrack them - receiptBuffer map[uint64]*receiptRequest // Previously requested receipts to buffer partial receipts + receiptBuffer map[uint64]*receiptRequest // Previously requested receipts to buffer partial receipts + receiptBufferLock sync.RWMutex // Lock for protecting the receiptBuffer term chan struct{} // Termination channel to stop the broadcasters } @@ -218,7 +222,7 @@ func (p *Peer) ReplyBlockBodiesRLP(id uint64, bodies []rlp.RawValue) error { }) } -// ReplyReceiptsRLP is the response to GetReceipts. +// ReplyReceiptsRLP69 is the response to GetReceipts. func (p *Peer) ReplyReceiptsRLP69(id uint64, receipts []rlp.RawValue) error { return p2p.Send(p.rw, ReceiptsMsg, &ReceiptsRLPPacket69{ RequestId: id, @@ -226,7 +230,7 @@ func (p *Peer) ReplyReceiptsRLP69(id uint64, receipts []rlp.RawValue) error { }) } -// ReplyReceiptsRLP is the response to GetReceipts. +// ReplyReceiptsRLP70 is the response to GetReceipts. func (p *Peer) ReplyReceiptsRLP70(id uint64, receipts []rlp.RawValue, lastBlockIncomplete bool) error { return p2p.Send(p.rw, ReceiptsMsg, &ReceiptsRLPPacket70{ RequestId: id, @@ -356,7 +360,11 @@ func (p *Peer) RequestReceipts(hashes []common.Hash, sink chan *Response) (*Requ FirstBlockReceiptIndex: 0, }, } - p.receiptBuffer[id] = &receiptRequest{request: hashes} + p.receiptBufferLock.Lock() + p.receiptBuffer[id] = &receiptRequest{ + request: hashes, + } + p.receiptBufferLock.Unlock() } else { req = &Request{ id: id, @@ -372,16 +380,17 @@ func (p *Peer) RequestReceipts(hashes []common.Hash, sink chan *Response) (*Requ if err := p.dispatchRequest(req); err != nil { return nil, err } - return req, nil } // HandlePartialReceipts re-request partial receipts func (p *Peer) requestPartialReceipts(id uint64) error { - if _, ok := p.receiptBuffer[id]; !ok { - return fmt.Errorf("No partial receipt retreival in progress with id %d", id) - } + p.receiptBufferLock.RLock() + defer p.receiptBufferLock.RUnlock() + if _, ok := p.receiptBuffer[id]; !ok { + return fmt.Errorf("no partial receipt retreival in progress with id %d", id) + } lastBlock := len(p.receiptBuffer[id].list) - 1 lastReceipt := len(p.receiptBuffer[id].list[lastBlock].items) @@ -396,26 +405,33 @@ func (p *Peer) requestPartialReceipts(id uint64) error { FirstBlockReceiptIndex: uint64(lastReceipt), }, } - return p.dispatchRequest(req) } // bufferReceiptsPacket validates a receipt packet and buffer the incomplete packet. // If the request is completed, it appends previously collected receipts. func (p *Peer) bufferReceiptsPacket(packet *ReceiptsPacket70, backend Backend) error { + p.receiptBufferLock.Lock() + defer p.receiptBufferLock.Unlock() + requestId := packet.RequestId buffer := p.receiptBuffer[requestId] // Do not assign buffer to the response not requested if buffer == nil { - return fmt.Errorf("No partial receipt retreival in progress with id %d", requestId) + return fmt.Errorf("no partial receipt retreival in progress with id %d", requestId) } - + // If the response is empty, the peer likely does not have the requested receipts. + // Forward the empty response to the internal handler regardless. However, note + // that an empty response marked as incomplete is considered invalid. if len(packet.List) == 0 { delete(p.receiptBuffer, requestId) + + if packet.LastBlockIncomplete { + return errors.New("invalid empty receipt response with incomplete flag") + } return nil } - // Buffer the last block when the response is incomplete. if packet.LastBlockIncomplete { lastBlock := len(packet.List) - 1 @@ -423,15 +439,17 @@ func (p *Peer) bufferReceiptsPacket(packet *ReceiptsPacket70, backend Backend) e lastBlock += len(buffer.list) - 1 } header := backend.Chain().GetHeaderByHash(buffer.request[lastBlock]) + if header == nil { + return fmt.Errorf("unknown block #%d for receipt retrieval", lastBlock) + } logSize, err := p.validateLastBlockReceipt(packet.List, requestId, header) if err != nil { return err } - // Update the buffered data and trim the packet to exclude the incomplete block. if len(buffer.list) > 0 { - // If the buffer is already allocated, it means that the previous response was incomplete - // Append the first block receipts + // If the buffer is already allocated, it means that the previous response + // was incomplete Append the first block receipts. buffer.list[len(buffer.list)-1].Append(packet.List[0]) buffer.list = append(buffer.list, packet.List[1:]...) buffer.lastLogSize = logSize @@ -441,7 +459,6 @@ func (p *Peer) bufferReceiptsPacket(packet *ReceiptsPacket70, backend Backend) e } return nil } - // If the request is completed, append previously collected receipts // to the packet and remove the buffered receipts. if len(buffer.list) > 0 { @@ -449,8 +466,8 @@ func (p *Peer) bufferReceiptsPacket(packet *ReceiptsPacket70, backend Backend) e packet.List = packet.List[1:] } packet.List = append(buffer.list, packet.List...) - delete(p.receiptBuffer, requestId) + delete(p.receiptBuffer, requestId) return nil } diff --git a/eth/protocols/eth/protocol.go b/eth/protocols/eth/protocol.go index 7b7b396166..56ce169f2e 100644 --- a/eth/protocols/eth/protocol.go +++ b/eth/protocols/eth/protocol.go @@ -259,13 +259,14 @@ func (p *BlockBodiesResponse) Unpack() ([][]*types.Transaction, [][]*types.Heade // GetReceiptsRequest represents a block receipts query. type GetReceiptsRequest []common.Hash -// GetReceiptsPacket represents a block receipts query with request ID wrapping. +// GetReceiptsPacket69 represents a block receipts query with request ID wrapping. type GetReceiptsPacket69 struct { RequestId uint64 GetReceiptsRequest } -// GetReceiptsPacket represents a block receipts query with request ID wrapping. +// GetReceiptsPacket70 represents a block receipts query with request ID and +// FirstBlockReceiptIndex wrapping. type GetReceiptsPacket70 struct { RequestId uint64 GetReceiptsRequest @@ -299,12 +300,14 @@ type ReceiptsPacket70 struct { // ReceiptsRLPResponse is used for receipts, when we already have it encoded type ReceiptsRLPResponse []rlp.RawValue -// ReceiptsRLPPacket is ReceiptsRLPResponse with request ID wrapping. +// ReceiptsRLPPacket69 is ReceiptsRLPResponse with request ID wrapping. type ReceiptsRLPPacket69 struct { RequestId uint64 ReceiptsRLPResponse } +// ReceiptsRLPPacket70 is ReceiptsRLPResponse with request ID and +// LastBlockIncomplete wrapping. type ReceiptsRLPPacket70 struct { RequestId uint64 ReceiptsRLPResponse