mirror of
https://github.com/ethereum/go-ethereum.git
synced 2026-02-26 07:37:20 +00:00
This changes the p2p protocol handlers to delay message decoding. It's the first part of a larger change that will delay decoding all the way through message processing. For responses, we delay the decoding until it is confirmed that the response matches an active request and does not exceed its limits. In order to make this work, all messages have been changed to use rlp.RawList instead of a slice of the decoded item type. For block bodies specifically, the decoding has been delayed all the way until after verification of the response hash. The role of p2p/tracker.Tracker changes significantly in this PR. The Tracker's original purpose was to maintain metrics about requests and responses in the peer-to-peer protocols. Each protocol maintained a single global Tracker instance. As of this change, the Tracker is now always active (regardless of metrics collection), and there is a separate instance of it for each peer. Whenever a response arrives, it is first verified that a request exists for it in the tracker. The tracker is also the place where limits are kept.
263 lines
8 KiB
Go
263 lines
8 KiB
Go
// Copyright 2021 The go-ethereum Authors
|
|
// This file is part of the go-ethereum library.
|
|
//
|
|
// The go-ethereum library is free software: you can redistribute it and/or modify
|
|
// it under the terms of the GNU Lesser General Public License as published by
|
|
// the Free Software Foundation, either version 3 of the License, or
|
|
// (at your option) any later version.
|
|
//
|
|
// The go-ethereum library is distributed in the hope that it will be useful,
|
|
// but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
|
// GNU Lesser General Public License for more details.
|
|
//
|
|
// You should have received a copy of the GNU Lesser General Public License
|
|
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
|
|
|
|
package tracker
|
|
|
|
import (
|
|
"container/list"
|
|
"errors"
|
|
"fmt"
|
|
"sync"
|
|
"time"
|
|
|
|
"github.com/ethereum/go-ethereum/log"
|
|
"github.com/ethereum/go-ethereum/metrics"
|
|
"github.com/ethereum/go-ethereum/p2p"
|
|
)
|
|
|
|
const (
|
|
trackedGaugeName = "p2p/tracked"
|
|
lostMeterName = "p2p/lost"
|
|
staleMeterName = "p2p/stale"
|
|
waitHistName = "p2p/wait"
|
|
|
|
// maxTrackedPackets is a huge number to act as a failsafe on the number of
|
|
// pending requests the node will track. It should never be hit unless an
|
|
// attacker figures out a way to spin requests.
|
|
maxTrackedPackets = 10000
|
|
)
|
|
|
|
var (
|
|
ErrNoMatchingRequest = errors.New("no matching request")
|
|
ErrTooManyItems = errors.New("response is larger than request allows")
|
|
ErrCollision = errors.New("request ID collision")
|
|
ErrCodeMismatch = errors.New("wrong response code for request")
|
|
ErrLimitReached = errors.New("request limit reached")
|
|
ErrStopped = errors.New("tracker stopped")
|
|
)
|
|
|
|
// Request tracks sent network requests which have not yet received a response.
|
|
type Request struct {
|
|
ID uint64 // Request ID
|
|
Size int // Number/size of requested items
|
|
ReqCode uint64 // Protocol message code of the request
|
|
RespCode uint64 // Protocol message code of the expected response
|
|
|
|
time time.Time // Timestamp when the request was made
|
|
expire *list.Element // Expiration marker to untrack it
|
|
}
|
|
|
|
type Response struct {
|
|
ID uint64 // Request ID of the response
|
|
MsgCode uint64 // Protocol message code
|
|
Size int // number/size of items in response
|
|
}
|
|
|
|
// Tracker is a pending network request tracker to measure how much time it takes
|
|
// a remote peer to respond.
|
|
type Tracker struct {
|
|
cap p2p.Cap // Protocol capability identifier for the metrics
|
|
|
|
peer string // Peer ID
|
|
timeout time.Duration // Global timeout after which to drop a tracked packet
|
|
|
|
pending map[uint64]*Request // Currently pending requests
|
|
expire *list.List // Linked list tracking the expiration order
|
|
wake *time.Timer // Timer tracking the expiration of the next item
|
|
|
|
lock sync.Mutex // Lock protecting from concurrent updates
|
|
}
|
|
|
|
// New creates a new network request tracker to monitor how much time it takes to
|
|
// fill certain requests and how individual peers perform.
|
|
func New(cap p2p.Cap, peerID string, timeout time.Duration) *Tracker {
|
|
return &Tracker{
|
|
cap: cap,
|
|
peer: peerID,
|
|
timeout: timeout,
|
|
pending: make(map[uint64]*Request),
|
|
expire: list.New(),
|
|
}
|
|
}
|
|
|
|
// Track adds a network request to the tracker to wait for a response to arrive
|
|
// or until the request it cancelled or times out.
|
|
func (t *Tracker) Track(req Request) error {
|
|
t.lock.Lock()
|
|
defer t.lock.Unlock()
|
|
|
|
if t.expire == nil {
|
|
return ErrStopped
|
|
}
|
|
|
|
// If there's a duplicate request, we've just random-collided (or more probably,
|
|
// we have a bug), report it. We could also add a metric, but we're not really
|
|
// expecting ourselves to be buggy, so a noisy warning should be enough.
|
|
if _, ok := t.pending[req.ID]; ok {
|
|
log.Error("Network request id collision", "cap", t.cap, "code", req.ReqCode, "id", req.ID)
|
|
return ErrCollision
|
|
}
|
|
// If we have too many pending requests, bail out instead of leaking memory
|
|
if pending := len(t.pending); pending >= maxTrackedPackets {
|
|
log.Error("Request tracker exceeded allowance", "pending", pending, "peer", t.peer, "cap", t.cap, "code", req.ReqCode)
|
|
return ErrLimitReached
|
|
}
|
|
|
|
// Id doesn't exist yet, start tracking it
|
|
req.time = time.Now()
|
|
req.expire = t.expire.PushBack(req.ID)
|
|
t.pending[req.ID] = &req
|
|
|
|
if metrics.Enabled() {
|
|
t.trackedGauge(req.ReqCode).Inc(1)
|
|
}
|
|
|
|
// If we've just inserted the first item, start the expiration timer
|
|
if t.wake == nil {
|
|
t.wake = time.AfterFunc(t.timeout, t.clean)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// clean is called automatically when a preset time passes without a response
|
|
// being delivered for the first network request.
|
|
func (t *Tracker) clean() {
|
|
t.lock.Lock()
|
|
defer t.lock.Unlock()
|
|
|
|
// Expire anything within a certain threshold (might be no items at all if
|
|
// we raced with the delivery)
|
|
for t.expire.Len() > 0 {
|
|
// Stop iterating if the next pending request is still alive
|
|
var (
|
|
head = t.expire.Front()
|
|
id = head.Value.(uint64)
|
|
req = t.pending[id]
|
|
)
|
|
if time.Since(req.time) < t.timeout+5*time.Millisecond {
|
|
break
|
|
}
|
|
// Nope, dead, drop it
|
|
t.expire.Remove(head)
|
|
delete(t.pending, id)
|
|
|
|
if metrics.Enabled() {
|
|
t.trackedGauge(req.ReqCode).Dec(1)
|
|
t.lostMeter(req.ReqCode).Mark(1)
|
|
}
|
|
}
|
|
t.schedule()
|
|
}
|
|
|
|
// schedule starts a timer to trigger on the expiration of the first network
|
|
// packet.
|
|
func (t *Tracker) schedule() {
|
|
if t.expire.Len() == 0 {
|
|
t.wake = nil
|
|
return
|
|
}
|
|
t.wake = time.AfterFunc(time.Until(t.pending[t.expire.Front().Value.(uint64)].time.Add(t.timeout)), t.clean)
|
|
}
|
|
|
|
// Stop reclaims resources of the tracker.
|
|
func (t *Tracker) Stop() {
|
|
t.lock.Lock()
|
|
defer t.lock.Unlock()
|
|
|
|
if t.wake != nil {
|
|
t.wake.Stop()
|
|
t.wake = nil
|
|
}
|
|
if metrics.Enabled() {
|
|
// Ensure metrics are decremented for pending requests.
|
|
counts := make(map[uint64]int64)
|
|
for _, req := range t.pending {
|
|
counts[req.ReqCode]++
|
|
}
|
|
for code, count := range counts {
|
|
t.trackedGauge(code).Dec(count)
|
|
}
|
|
}
|
|
clear(t.pending)
|
|
t.expire = nil
|
|
}
|
|
|
|
// Fulfil fills a pending request, if any is available, reporting on various metrics.
|
|
func (t *Tracker) Fulfil(resp Response) error {
|
|
t.lock.Lock()
|
|
defer t.lock.Unlock()
|
|
|
|
// If it's a non existing request, track as stale response
|
|
req, ok := t.pending[resp.ID]
|
|
if !ok {
|
|
if metrics.Enabled() {
|
|
t.staleMeter(resp.MsgCode).Mark(1)
|
|
}
|
|
return ErrNoMatchingRequest
|
|
}
|
|
|
|
// If the response is funky, it might be some active attack
|
|
if req.RespCode != resp.MsgCode {
|
|
log.Warn("Network response code collision",
|
|
"have", fmt.Sprintf("%s:%s/%d:%d", t.peer, t.cap.Name, t.cap.Version, resp.MsgCode),
|
|
"want", fmt.Sprintf("%s:%s/%d:%d", t.peer, t.cap.Name, t.cap.Version, req.RespCode),
|
|
)
|
|
return ErrCodeMismatch
|
|
}
|
|
if resp.Size > req.Size {
|
|
return ErrTooManyItems
|
|
}
|
|
|
|
// Everything matches, mark the request serviced and meter it
|
|
wasHead := req.expire.Prev() == nil
|
|
t.expire.Remove(req.expire)
|
|
delete(t.pending, req.ID)
|
|
if wasHead {
|
|
if t.wake.Stop() {
|
|
t.schedule()
|
|
}
|
|
}
|
|
|
|
// Update request metrics.
|
|
if metrics.Enabled() {
|
|
t.trackedGauge(req.ReqCode).Dec(1)
|
|
t.waitHistogram(req.ReqCode).Update(time.Since(req.time).Microseconds())
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func (t *Tracker) trackedGauge(code uint64) *metrics.Gauge {
|
|
name := fmt.Sprintf("%s/%s/%d/%#02x", trackedGaugeName, t.cap.Name, t.cap.Version, code)
|
|
return metrics.GetOrRegisterGauge(name, nil)
|
|
}
|
|
|
|
func (t *Tracker) lostMeter(code uint64) *metrics.Meter {
|
|
name := fmt.Sprintf("%s/%s/%d/%#02x", lostMeterName, t.cap.Name, t.cap.Version, code)
|
|
return metrics.GetOrRegisterMeter(name, nil)
|
|
}
|
|
|
|
func (t *Tracker) staleMeter(code uint64) *metrics.Meter {
|
|
name := fmt.Sprintf("%s/%s/%d/%#02x", staleMeterName, t.cap.Name, t.cap.Version, code)
|
|
return metrics.GetOrRegisterMeter(name, nil)
|
|
}
|
|
|
|
func (t *Tracker) waitHistogram(code uint64) metrics.Histogram {
|
|
name := fmt.Sprintf("%s/%s/%d/%#02x", waitHistName, t.cap.Name, t.cap.Version, code)
|
|
sampler := func() metrics.Sample {
|
|
return metrics.ResettingSample(metrics.NewExpDecaySample(1028, 0.015))
|
|
}
|
|
return metrics.GetOrRegisterHistogramLazy(name, nil, sampler)
|
|
}
|