go-ethereum/eth/protocols/eth/handlers.go
Felix Lange 0cba803fba
Some checks failed
/ Linux Build (push) Has been cancelled
/ Linux Build (arm) (push) Has been cancelled
/ Keeper Build (push) Has been cancelled
/ Windows Build (push) Has been cancelled
/ Docker Image (push) Has been cancelled
eth/protocols/eth, eth/protocols/snap: delayed p2p message decoding (#33835)
This changes the p2p protocol handlers to delay message decoding. It's
the first part of a larger change that will delay decoding all the way
through message processing. For responses, we delay the decoding until
it is confirmed that the response matches an active request and does not
exceed its limits.

In order to make this work, all messages have been changed to use
rlp.RawList instead of a slice of the decoded item type. For block
bodies specifically, the decoding has been delayed all the way until
after verification of the response hash.

The role of p2p/tracker.Tracker changes significantly in this PR. The
Tracker's original purpose was to maintain metrics about requests and
responses in the peer-to-peer protocols. Each protocol maintained a
single global Tracker instance. As of this change, the Tracker is now
always active (regardless of metrics collection), and there is a
separate instance of it for each peer. Whenever a response arrives, it
is first verified that a request exists for it in the tracker. The
tracker is also the place where limits are kept.
2026-02-15 21:21:16 +08:00

642 lines
19 KiB
Go

// Copyright 2021 The go-ethereum Authors
// This file is part of the go-ethereum library.
//
// The go-ethereum library is free software: you can redistribute it and/or modify
// it under the terms of the GNU Lesser General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// The go-ethereum library is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Lesser General Public License for more details.
//
// You should have received a copy of the GNU Lesser General Public License
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
package eth
import (
"bytes"
"encoding/json"
"errors"
"fmt"
"math"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/crypto"
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/p2p/tracker"
"github.com/ethereum/go-ethereum/rlp"
"github.com/ethereum/go-ethereum/trie"
)
func handleGetBlockHeaders(backend Backend, msg Decoder, peer *Peer) error {
// Decode the complex header query
var query GetBlockHeadersPacket
if err := msg.Decode(&query); err != nil {
return err
}
response := ServiceGetBlockHeadersQuery(backend.Chain(), query.GetBlockHeadersRequest, peer)
return peer.ReplyBlockHeadersRLP(query.RequestId, response)
}
// ServiceGetBlockHeadersQuery assembles the response to a header query. It is
// exposed to allow external packages to test protocol behavior.
func ServiceGetBlockHeadersQuery(chain *core.BlockChain, query *GetBlockHeadersRequest, peer *Peer) []rlp.RawValue {
if query.Amount == 0 {
return nil
}
if query.Skip == 0 {
// The fast path: when the request is for a contiguous segment of headers.
return serviceContiguousBlockHeaderQuery(chain, query)
} else {
return serviceNonContiguousBlockHeaderQuery(chain, query, peer)
}
}
func serviceNonContiguousBlockHeaderQuery(chain *core.BlockChain, query *GetBlockHeadersRequest, peer *Peer) []rlp.RawValue {
hashMode := query.Origin.Hash != (common.Hash{})
first := true
maxNonCanonical := uint64(100)
// Gather headers until the fetch or network limits is reached
var (
bytes common.StorageSize
headers []rlp.RawValue
unknown bool
lookups int
)
for !unknown && len(headers) < int(query.Amount) && bytes < softResponseLimit &&
len(headers) < maxHeadersServe && lookups < 2*maxHeadersServe {
lookups++
// Retrieve the next header satisfying the query
var origin *types.Header
if hashMode {
if first {
first = false
origin = chain.GetHeaderByHash(query.Origin.Hash)
if origin != nil {
query.Origin.Number = origin.Number.Uint64()
}
} else {
origin = chain.GetHeader(query.Origin.Hash, query.Origin.Number)
}
} else {
origin = chain.GetHeaderByNumber(query.Origin.Number)
}
if origin == nil {
break
}
if rlpData, err := rlp.EncodeToBytes(origin); err != nil {
log.Crit("Unable to encode our own headers", "err", err)
} else {
headers = append(headers, rlp.RawValue(rlpData))
bytes += common.StorageSize(len(rlpData))
}
// Advance to the next header of the query
switch {
case hashMode && query.Reverse:
// Hash based traversal towards the genesis block
ancestor := query.Skip + 1
if ancestor == 0 {
unknown = true
} else {
query.Origin.Hash, query.Origin.Number = chain.GetAncestor(query.Origin.Hash, query.Origin.Number, ancestor, &maxNonCanonical)
unknown = (query.Origin.Hash == common.Hash{})
}
case hashMode && !query.Reverse:
// Hash based traversal towards the leaf block
var (
current = origin.Number.Uint64()
next = current + query.Skip + 1
)
if next <= current {
infos, _ := json.MarshalIndent(peer.Peer.Info(), "", " ")
peer.Log().Warn("GetBlockHeaders skip overflow attack", "current", current, "skip", query.Skip, "next", next, "attacker", infos)
unknown = true
} else {
if header := chain.GetHeaderByNumber(next); header != nil {
nextHash := header.Hash()
expOldHash, _ := chain.GetAncestor(nextHash, next, query.Skip+1, &maxNonCanonical)
if expOldHash == query.Origin.Hash {
query.Origin.Hash, query.Origin.Number = nextHash, next
} else {
unknown = true
}
} else {
unknown = true
}
}
case query.Reverse:
// Number based traversal towards the genesis block
current := query.Origin.Number
ancestor := current - (query.Skip + 1)
if ancestor >= current { // check for underflow
unknown = true
} else {
query.Origin.Number = ancestor
}
case !query.Reverse:
current := query.Origin.Number
next := current + query.Skip + 1
if next <= current { // check for overflow
unknown = true
} else {
query.Origin.Number = next
}
}
}
return headers
}
func serviceContiguousBlockHeaderQuery(chain *core.BlockChain, query *GetBlockHeadersRequest) []rlp.RawValue {
count := query.Amount
if count > maxHeadersServe {
count = maxHeadersServe
}
if query.Origin.Hash == (common.Hash{}) {
// Number mode, just return the canon chain segment. The backend
// delivers in [N, N-1, N-2..] descending order, so we need to
// accommodate for that.
from := query.Origin.Number
if !query.Reverse {
from = from + count - 1
}
headers := chain.GetHeadersFrom(from, count)
if !query.Reverse {
for i, j := 0, len(headers)-1; i < j; i, j = i+1, j-1 {
headers[i], headers[j] = headers[j], headers[i]
}
}
return headers
}
// Hash mode.
var (
headers []rlp.RawValue
hash = query.Origin.Hash
header = chain.GetHeaderByHash(hash)
)
if header != nil {
rlpData, _ := rlp.EncodeToBytes(header)
headers = append(headers, rlpData)
} else {
// We don't even have the origin header
return headers
}
num := header.Number.Uint64()
if !query.Reverse {
// Theoretically, we are tasked to deliver header by hash H, and onwards.
// However, if H is not canon, we will be unable to deliver any descendants of
// H.
if canonHash := chain.GetCanonicalHash(num); canonHash != hash {
// Not canon, we can't deliver descendants
return headers
}
descendants := chain.GetHeadersFrom(num+count-1, count-1)
for i, j := 0, len(descendants)-1; i < j; i, j = i+1, j-1 {
descendants[i], descendants[j] = descendants[j], descendants[i]
}
headers = append(headers, descendants...)
return headers
}
{ // Last mode: deliver ancestors of H
for i := uint64(1); i < count; i++ {
header = chain.GetHeaderByHash(header.ParentHash)
if header == nil {
break
}
rlpData, _ := rlp.EncodeToBytes(header)
headers = append(headers, rlpData)
}
return headers
}
}
func handleGetBlockBodies(backend Backend, msg Decoder, peer *Peer) error {
// Decode the block body retrieval message
var query GetBlockBodiesPacket
if err := msg.Decode(&query); err != nil {
return err
}
response := ServiceGetBlockBodiesQuery(backend.Chain(), query.GetBlockBodiesRequest)
return peer.ReplyBlockBodiesRLP(query.RequestId, response)
}
// ServiceGetBlockBodiesQuery assembles the response to a body query. It is
// exposed to allow external packages to test protocol behavior.
func ServiceGetBlockBodiesQuery(chain *core.BlockChain, query GetBlockBodiesRequest) []rlp.RawValue {
// Gather blocks until the fetch or network limits is reached
var (
bytes int
bodies []rlp.RawValue
)
for lookups, hash := range query {
if bytes >= softResponseLimit || len(bodies) >= maxBodiesServe ||
lookups >= 2*maxBodiesServe {
break
}
if data := chain.GetBodyRLP(hash); len(data) != 0 {
bodies = append(bodies, data)
bytes += len(data)
}
}
return bodies
}
func handleGetReceipts68(backend Backend, msg Decoder, peer *Peer) error {
// Decode the block receipts retrieval message
var query GetReceiptsPacket
if err := msg.Decode(&query); err != nil {
return err
}
response := ServiceGetReceiptsQuery68(backend.Chain(), query.GetReceiptsRequest)
return peer.ReplyReceiptsRLP(query.RequestId, response)
}
func handleGetReceipts69(backend Backend, msg Decoder, peer *Peer) error {
// Decode the block receipts retrieval message
var query GetReceiptsPacket
if err := msg.Decode(&query); err != nil {
return err
}
response := serviceGetReceiptsQuery69(backend.Chain(), query.GetReceiptsRequest)
return peer.ReplyReceiptsRLP(query.RequestId, response)
}
// ServiceGetReceiptsQuery68 assembles the response to a receipt query. It is
// exposed to allow external packages to test protocol behavior.
func ServiceGetReceiptsQuery68(chain *core.BlockChain, query GetReceiptsRequest) []rlp.RawValue {
// Gather state data until the fetch or network limits is reached
var (
bytes int
receipts []rlp.RawValue
)
for lookups, hash := range query {
if bytes >= softResponseLimit || len(receipts) >= maxReceiptsServe ||
lookups >= 2*maxReceiptsServe {
break
}
// Retrieve the requested block's receipts
results := chain.GetReceiptsRLP(hash)
if results == nil {
if header := chain.GetHeaderByHash(hash); header == nil || header.ReceiptHash != types.EmptyRootHash {
continue
}
} else {
body := chain.GetBodyRLP(hash)
if body == nil {
continue
}
var err error
results, err = blockReceiptsToNetwork68(results, body)
if err != nil {
log.Error("Error in block receipts conversion", "hash", hash, "err", err)
continue
}
}
receipts = append(receipts, results)
bytes += len(results)
}
return receipts
}
// serviceGetReceiptsQuery69 assembles the response to a receipt query.
// It does not send the bloom filters for the receipts
func serviceGetReceiptsQuery69(chain *core.BlockChain, query GetReceiptsRequest) []rlp.RawValue {
// Gather state data until the fetch or network limits is reached
var (
bytes int
receipts []rlp.RawValue
)
for lookups, hash := range query {
if bytes >= softResponseLimit || len(receipts) >= maxReceiptsServe ||
lookups >= 2*maxReceiptsServe {
break
}
// Retrieve the requested block's receipts
results := chain.GetReceiptsRLP(hash)
if results == nil {
if header := chain.GetHeaderByHash(hash); header == nil || header.ReceiptHash != types.EmptyRootHash {
continue
}
} else {
body := chain.GetBodyRLP(hash)
if body == nil {
continue
}
var err error
results, err = blockReceiptsToNetwork69(results, body)
if err != nil {
log.Error("Error in block receipts conversion", "hash", hash, "err", err)
continue
}
}
receipts = append(receipts, results)
bytes += len(results)
}
return receipts
}
func handleNewBlockhashes(backend Backend, msg Decoder, peer *Peer) error {
return errors.New("block announcements disallowed") // We dropped support for non-merge networks
}
func handleNewBlock(backend Backend, msg Decoder, peer *Peer) error {
return errors.New("block broadcasts disallowed") // We dropped support for non-merge networks
}
func handleBlockHeaders(backend Backend, msg Decoder, peer *Peer) error {
// A batch of headers arrived to one of our previous requests
res := new(BlockHeadersPacket)
if err := msg.Decode(res); err != nil {
return err
}
tresp := tracker.Response{ID: res.RequestId, MsgCode: BlockHeadersMsg, Size: res.List.Len()}
if err := peer.tracker.Fulfil(tresp); err != nil {
return fmt.Errorf("BlockHeaders: %w", err)
}
headers, err := res.List.Items()
if err != nil {
return fmt.Errorf("BlockHeaders: %w", err)
}
metadata := func() interface{} {
hashes := make([]common.Hash, len(headers))
for i, header := range headers {
hashes[i] = header.Hash()
}
return hashes
}
return peer.dispatchResponse(&Response{
id: res.RequestId,
code: BlockHeadersMsg,
Res: (*BlockHeadersRequest)(&headers),
}, metadata)
}
func handleBlockBodies(backend Backend, msg Decoder, peer *Peer) error {
// A batch of block bodies arrived to one of our previous requests
res := new(BlockBodiesPacket)
if err := msg.Decode(res); err != nil {
return err
}
// Check against the request.
length := res.List.Len()
tresp := tracker.Response{ID: res.RequestId, MsgCode: BlockBodiesMsg, Size: length}
if err := peer.tracker.Fulfil(tresp); err != nil {
return fmt.Errorf("BlockBodies: %w", err)
}
// Collect items and dispatch.
items, err := res.List.Items()
if err != nil {
return fmt.Errorf("BlockBodies: %w", err)
}
metadata := func() any { return hashBodyParts(items) }
return peer.dispatchResponse(&Response{
id: res.RequestId,
code: BlockBodiesMsg,
Res: (*BlockBodiesResponse)(&items),
}, metadata)
}
// BlockBodyHashes contains the lists of block body part roots for a list of block bodies.
type BlockBodyHashes struct {
TransactionRoots []common.Hash
WithdrawalRoots []common.Hash
UncleHashes []common.Hash
}
func hashBodyParts(items []BlockBody) BlockBodyHashes {
h := BlockBodyHashes{
TransactionRoots: make([]common.Hash, len(items)),
WithdrawalRoots: make([]common.Hash, len(items)),
UncleHashes: make([]common.Hash, len(items)),
}
hasher := trie.NewStackTrie(nil)
for i, body := range items {
// txs
txsList := newDerivableRawList(&body.Transactions, writeTxForHash)
h.TransactionRoots[i] = types.DeriveSha(txsList, hasher)
// uncles
if body.Uncles.Len() == 0 {
h.UncleHashes[i] = types.EmptyUncleHash
} else {
h.UncleHashes[i] = crypto.Keccak256Hash(body.Uncles.Bytes())
}
// withdrawals
if body.Withdrawals != nil {
wdlist := newDerivableRawList(body.Withdrawals, nil)
h.WithdrawalRoots[i] = types.DeriveSha(wdlist, hasher)
}
}
return h
}
// derivableRawList implements types.DerivableList for a serialized RLP list.
type derivableRawList struct {
data []byte
offsets []uint32
write func([]byte, *bytes.Buffer)
}
func newDerivableRawList[T any](list *rlp.RawList[T], write func([]byte, *bytes.Buffer)) *derivableRawList {
dl := derivableRawList{data: list.Content(), write: write}
if dl.write == nil {
// default transform is identity
dl.write = func(b []byte, buf *bytes.Buffer) { buf.Write(b) }
}
// Assert to ensure 32-bit offsets are valid. This can never trigger
// unless a block body component or p2p receipt list is larger than 4GB.
if uint(len(dl.data)) > math.MaxUint32 {
panic("list data too big for derivableRawList")
}
it := list.ContentIterator()
dl.offsets = make([]uint32, list.Len())
for i := 0; it.Next(); i++ {
dl.offsets[i] = uint32(it.Offset())
}
return &dl
}
// Len returns the number of items in the list.
func (dl *derivableRawList) Len() int {
return len(dl.offsets)
}
// EncodeIndex writes the i'th item to the buffer.
func (dl *derivableRawList) EncodeIndex(i int, buf *bytes.Buffer) {
start := dl.offsets[i]
end := uint32(len(dl.data))
if i != len(dl.offsets)-1 {
end = dl.offsets[i+1]
}
dl.write(dl.data[start:end], buf)
}
// writeTxForHash changes a transaction in 'network encoding' into the format used for
// the transactions MPT.
func writeTxForHash(tx []byte, buf *bytes.Buffer) {
k, content, _, _ := rlp.Split(tx)
if k == rlp.List {
buf.Write(tx) // legacy tx
} else {
buf.Write(content) // typed tx
}
}
func handleReceipts[L ReceiptsList](backend Backend, msg Decoder, peer *Peer) error {
// A batch of receipts arrived to one of our previous requests
res := new(ReceiptsPacket[L])
if err := msg.Decode(res); err != nil {
return err
}
tresp := tracker.Response{ID: res.RequestId, MsgCode: ReceiptsMsg, Size: res.List.Len()}
if err := peer.tracker.Fulfil(tresp); err != nil {
return fmt.Errorf("Receipts: %w", err)
}
// Assign temporary hashing buffer to each list item, the same buffer is shared
// between all receipt list instances.
receiptLists, err := res.List.Items()
if err != nil {
return fmt.Errorf("Receipts: %w", err)
}
buffers := new(receiptListBuffers)
for i := range receiptLists {
receiptLists[i].setBuffers(buffers)
}
metadata := func() interface{} {
hasher := trie.NewStackTrie(nil)
hashes := make([]common.Hash, len(receiptLists))
for i := range receiptLists {
hashes[i] = types.DeriveSha(receiptLists[i].Derivable(), hasher)
}
return hashes
}
var enc ReceiptsRLPResponse
for i := range receiptLists {
encReceipts, err := receiptLists[i].EncodeForStorage()
if err != nil {
return fmt.Errorf("Receipts: invalid list %d: %v", i, err)
}
enc = append(enc, encReceipts)
}
return peer.dispatchResponse(&Response{
id: res.RequestId,
code: ReceiptsMsg,
Res: &enc,
}, metadata)
}
func handleNewPooledTransactionHashes(backend Backend, msg Decoder, peer *Peer) error {
// New transaction announcement arrived, make sure we have
// a valid and fresh chain to handle them
if !backend.AcceptTxs() {
return nil
}
ann := new(NewPooledTransactionHashesPacket)
if err := msg.Decode(ann); err != nil {
return err
}
if len(ann.Hashes) != len(ann.Types) || len(ann.Hashes) != len(ann.Sizes) {
return fmt.Errorf("NewPooledTransactionHashes: invalid len of fields in %v %v %v", len(ann.Hashes), len(ann.Types), len(ann.Sizes))
}
// Schedule all the unknown hashes for retrieval
for _, hash := range ann.Hashes {
peer.MarkTransaction(hash)
}
return backend.Handle(peer, ann)
}
func handleGetPooledTransactions(backend Backend, msg Decoder, peer *Peer) error {
// Decode the pooled transactions retrieval message
var query GetPooledTransactionsPacket
if err := msg.Decode(&query); err != nil {
return err
}
hashes, txs := answerGetPooledTransactions(backend, query.GetPooledTransactionsRequest)
return peer.ReplyPooledTransactionsRLP(query.RequestId, hashes, txs)
}
func answerGetPooledTransactions(backend Backend, query GetPooledTransactionsRequest) ([]common.Hash, []rlp.RawValue) {
// Gather transactions until the fetch or network limits is reached
var (
bytes int
hashes []common.Hash
txs []rlp.RawValue
)
for _, hash := range query {
if bytes >= softResponseLimit {
break
}
// Retrieve the requested transaction, skipping if unknown to us
encoded := backend.TxPool().GetRLP(hash)
if len(encoded) == 0 {
continue
}
hashes = append(hashes, hash)
txs = append(txs, encoded)
bytes += len(encoded)
}
return hashes, txs
}
func handleTransactions(backend Backend, msg Decoder, peer *Peer) error {
// Transactions arrived, make sure we have a valid and fresh chain to handle them
if !backend.AcceptTxs() {
return nil
}
// Transactions can be processed, parse all of them and deliver to the pool
var txs TransactionsPacket
if err := msg.Decode(&txs); err != nil {
return err
}
if txs.Len() > maxTransactionAnnouncements {
return fmt.Errorf("too many transactions")
}
return backend.Handle(peer, &txs)
}
func handlePooledTransactions(backend Backend, msg Decoder, peer *Peer) error {
// Transactions arrived, make sure we have a valid and fresh chain to handle them
if !backend.AcceptTxs() {
return nil
}
// Check against request and decode.
var resp PooledTransactionsPacket
if err := msg.Decode(&resp); err != nil {
return err
}
tresp := tracker.Response{
ID: resp.RequestId,
MsgCode: PooledTransactionsMsg,
Size: resp.List.Len(),
}
if err := peer.tracker.Fulfil(tresp); err != nil {
return fmt.Errorf("PooledTransactions: %w", err)
}
return backend.Handle(peer, &resp)
}
func handleBlockRangeUpdate(backend Backend, msg Decoder, peer *Peer) error {
var update BlockRangeUpdatePacket
if err := msg.Decode(&update); err != nil {
return err
}
if err := update.Validate(); err != nil {
return err
}
// We don't do anything with these messages for now, just store them on the peer.
peer.lastRange.Store(&update)
return nil
}