eth/protocols/eth: add lock protection

This commit is contained in:
Gary Rong 2025-12-09 13:19:20 +08:00
parent 74dd001911
commit b2490e3f20
4 changed files with 43 additions and 26 deletions

View file

@ -219,7 +219,9 @@ func (p *Peer) dispatcher() {
delete(pending, cancelOp.id)
// Not sure if the request is about the receipt, but remove it anyway
p.receiptBufferLock.Lock()
delete(p.receiptBuffer, cancelOp.id)
p.receiptBufferLock.Unlock()
cancelOp.fail <- nil

View file

@ -526,20 +526,17 @@ func handleReceipts70(backend Backend, msg Decoder, peer *Peer) error {
if err := msg.Decode(res); err != nil {
return err
}
if err := peer.bufferReceiptsPacket(res, backend); err != nil {
return err
}
if res.LastBlockIncomplete {
return peer.requestPartialReceipts(res.RequestId)
}
// Assign buffers shared between list elements
buffers := new(receiptListBuffers)
for i := range res.List {
res.List[i].setBuffers(buffers)
}
metadata := func() interface{} {
hasher := trie.NewStackTrie(nil)
hashes := make([]common.Hash, len(res.List))
@ -548,12 +545,10 @@ func handleReceipts70(backend Backend, msg Decoder, peer *Peer) error {
}
return hashes
}
var enc ReceiptsRLPResponse
for i := range res.List {
enc = append(enc, res.List[i].EncodeForStorage())
}
return peer.dispatchResponse(&Response{
id: res.RequestId,
code: ReceiptsMsg,

View file

@ -17,8 +17,10 @@
package eth
import (
"errors"
"fmt"
"math/rand"
"sync"
"sync/atomic"
mapset "github.com/deckarep/golang-set/v2"
@ -43,8 +45,9 @@ const (
maxQueuedTxAnns = 4096
)
// receiptRequest tracks the state of an in-flight receipt retrieval operation.
type receiptRequest struct {
request []common.Hash
request []common.Hash // block hashes corresponding to the requested receipts
list []*ReceiptList69 // list of partially collected receipts
lastLogSize uint64 // log size of last receipt list
}
@ -67,7 +70,8 @@ type Peer struct {
reqCancel chan *cancel // Dispatch channel to cancel pending requests and untrack them
resDispatch chan *response // Dispatch channel to fulfil pending requests and untrack them
receiptBuffer map[uint64]*receiptRequest // Previously requested receipts to buffer partial receipts
receiptBuffer map[uint64]*receiptRequest // Previously requested receipts to buffer partial receipts
receiptBufferLock sync.RWMutex // Lock for protecting the receiptBuffer
term chan struct{} // Termination channel to stop the broadcasters
}
@ -218,7 +222,7 @@ func (p *Peer) ReplyBlockBodiesRLP(id uint64, bodies []rlp.RawValue) error {
})
}
// ReplyReceiptsRLP is the response to GetReceipts.
// ReplyReceiptsRLP69 is the response to GetReceipts.
func (p *Peer) ReplyReceiptsRLP69(id uint64, receipts []rlp.RawValue) error {
return p2p.Send(p.rw, ReceiptsMsg, &ReceiptsRLPPacket69{
RequestId: id,
@ -226,7 +230,7 @@ func (p *Peer) ReplyReceiptsRLP69(id uint64, receipts []rlp.RawValue) error {
})
}
// ReplyReceiptsRLP is the response to GetReceipts.
// ReplyReceiptsRLP70 is the response to GetReceipts.
func (p *Peer) ReplyReceiptsRLP70(id uint64, receipts []rlp.RawValue, lastBlockIncomplete bool) error {
return p2p.Send(p.rw, ReceiptsMsg, &ReceiptsRLPPacket70{
RequestId: id,
@ -356,7 +360,11 @@ func (p *Peer) RequestReceipts(hashes []common.Hash, sink chan *Response) (*Requ
FirstBlockReceiptIndex: 0,
},
}
p.receiptBuffer[id] = &receiptRequest{request: hashes}
p.receiptBufferLock.Lock()
p.receiptBuffer[id] = &receiptRequest{
request: hashes,
}
p.receiptBufferLock.Unlock()
} else {
req = &Request{
id: id,
@ -372,16 +380,17 @@ func (p *Peer) RequestReceipts(hashes []common.Hash, sink chan *Response) (*Requ
if err := p.dispatchRequest(req); err != nil {
return nil, err
}
return req, nil
}
// HandlePartialReceipts re-request partial receipts
func (p *Peer) requestPartialReceipts(id uint64) error {
if _, ok := p.receiptBuffer[id]; !ok {
return fmt.Errorf("No partial receipt retreival in progress with id %d", id)
}
p.receiptBufferLock.RLock()
defer p.receiptBufferLock.RUnlock()
if _, ok := p.receiptBuffer[id]; !ok {
return fmt.Errorf("no partial receipt retreival in progress with id %d", id)
}
lastBlock := len(p.receiptBuffer[id].list) - 1
lastReceipt := len(p.receiptBuffer[id].list[lastBlock].items)
@ -396,26 +405,33 @@ func (p *Peer) requestPartialReceipts(id uint64) error {
FirstBlockReceiptIndex: uint64(lastReceipt),
},
}
return p.dispatchRequest(req)
}
// bufferReceiptsPacket validates a receipt packet and buffer the incomplete packet.
// If the request is completed, it appends previously collected receipts.
func (p *Peer) bufferReceiptsPacket(packet *ReceiptsPacket70, backend Backend) error {
p.receiptBufferLock.Lock()
defer p.receiptBufferLock.Unlock()
requestId := packet.RequestId
buffer := p.receiptBuffer[requestId]
// Do not assign buffer to the response not requested
if buffer == nil {
return fmt.Errorf("No partial receipt retreival in progress with id %d", requestId)
return fmt.Errorf("no partial receipt retreival in progress with id %d", requestId)
}
// If the response is empty, the peer likely does not have the requested receipts.
// Forward the empty response to the internal handler regardless. However, note
// that an empty response marked as incomplete is considered invalid.
if len(packet.List) == 0 {
delete(p.receiptBuffer, requestId)
if packet.LastBlockIncomplete {
return errors.New("invalid empty receipt response with incomplete flag")
}
return nil
}
// Buffer the last block when the response is incomplete.
if packet.LastBlockIncomplete {
lastBlock := len(packet.List) - 1
@ -423,15 +439,17 @@ func (p *Peer) bufferReceiptsPacket(packet *ReceiptsPacket70, backend Backend) e
lastBlock += len(buffer.list) - 1
}
header := backend.Chain().GetHeaderByHash(buffer.request[lastBlock])
if header == nil {
return fmt.Errorf("unknown block #%d for receipt retrieval", lastBlock)
}
logSize, err := p.validateLastBlockReceipt(packet.List, requestId, header)
if err != nil {
return err
}
// Update the buffered data and trim the packet to exclude the incomplete block.
if len(buffer.list) > 0 {
// If the buffer is already allocated, it means that the previous response was incomplete
// Append the first block receipts
// If the buffer is already allocated, it means that the previous response
// was incomplete Append the first block receipts.
buffer.list[len(buffer.list)-1].Append(packet.List[0])
buffer.list = append(buffer.list, packet.List[1:]...)
buffer.lastLogSize = logSize
@ -441,7 +459,6 @@ func (p *Peer) bufferReceiptsPacket(packet *ReceiptsPacket70, backend Backend) e
}
return nil
}
// If the request is completed, append previously collected receipts
// to the packet and remove the buffered receipts.
if len(buffer.list) > 0 {
@ -449,8 +466,8 @@ func (p *Peer) bufferReceiptsPacket(packet *ReceiptsPacket70, backend Backend) e
packet.List = packet.List[1:]
}
packet.List = append(buffer.list, packet.List...)
delete(p.receiptBuffer, requestId)
delete(p.receiptBuffer, requestId)
return nil
}

View file

@ -259,13 +259,14 @@ func (p *BlockBodiesResponse) Unpack() ([][]*types.Transaction, [][]*types.Heade
// GetReceiptsRequest represents a block receipts query.
type GetReceiptsRequest []common.Hash
// GetReceiptsPacket represents a block receipts query with request ID wrapping.
// GetReceiptsPacket69 represents a block receipts query with request ID wrapping.
type GetReceiptsPacket69 struct {
RequestId uint64
GetReceiptsRequest
}
// GetReceiptsPacket represents a block receipts query with request ID wrapping.
// GetReceiptsPacket70 represents a block receipts query with request ID and
// FirstBlockReceiptIndex wrapping.
type GetReceiptsPacket70 struct {
RequestId uint64
GetReceiptsRequest
@ -299,12 +300,14 @@ type ReceiptsPacket70 struct {
// ReceiptsRLPResponse is used for receipts, when we already have it encoded
type ReceiptsRLPResponse []rlp.RawValue
// ReceiptsRLPPacket is ReceiptsRLPResponse with request ID wrapping.
// ReceiptsRLPPacket69 is ReceiptsRLPResponse with request ID wrapping.
type ReceiptsRLPPacket69 struct {
RequestId uint64
ReceiptsRLPResponse
}
// ReceiptsRLPPacket70 is ReceiptsRLPResponse with request ID and
// LastBlockIncomplete wrapping.
type ReceiptsRLPPacket70 struct {
RequestId uint64
ReceiptsRLPResponse