eth/fetcher: add onRequestLatency callback to tx fetcher

Adds an optional onRequestLatency(peer, latency) callback to the tx
fetcher constructor, fired exactly once per request:

- On in-time delivery: the actual round-trip latency (clock.Now - req.time).
- On timeout (req.time + txFetchTimeout exceeded): the timeout value
  itself, so slow peers contribute samples instead of being silently
  omitted from the downstream EMA.

Late deliveries for requests already counted as timeouts do not
double-record. Existing callers (handler.go, fuzzer, tests) pass nil
for the new parameter; handler wiring to txTracker follows in a
separate commit.
This commit is contained in:
Csaba Kiraly 2026-04-13 21:05:45 +02:00
parent 111d90aef8
commit b0a266d3c8
4 changed files with 152 additions and 34 deletions

View file

@ -180,11 +180,12 @@ type TxFetcher struct {
alternates map[common.Hash]map[string]struct{} // In-flight transaction alternate origins if retrieval fails
// Callbacks
validateMeta func(common.Hash, byte) error // Validate a tx metadata based on the local txpool
addTxs func([]*types.Transaction) []error // Insert a batch of transactions into local txpool
fetchTxs func(string, []common.Hash) error // Retrieves a set of txs from a remote peer
dropPeer func(string) // Drops a peer in case of announcement violation
onAccepted func(peer string, hashes []common.Hash) // Optional: notified with accepted tx hashes per peer
validateMeta func(common.Hash, byte) error // Validate a tx metadata based on the local txpool
addTxs func([]*types.Transaction) []error // Insert a batch of transactions into local txpool
fetchTxs func(string, []common.Hash) error // Retrieves a set of txs from a remote peer
dropPeer func(string) // Drops a peer in case of announcement violation
onAccepted func(peer string, hashes []common.Hash) // Optional: notified with accepted tx hashes per peer
onRequestLatency func(peer string, latency time.Duration) // Optional: notified once per completed/timed-out tx request
step chan struct{} // Notification channel when the fetcher loop iterates
clock mclock.Clock // Monotonic clock or simulated clock for tests
@ -195,40 +196,41 @@ type TxFetcher struct {
// NewTxFetcher creates a transaction fetcher to retrieve transaction
// based on hash announcements.
// Chain can be nil to disable on-chain checks.
func NewTxFetcher(chain *core.BlockChain, validateMeta func(common.Hash, byte) error, addTxs func([]*types.Transaction) []error, fetchTxs func(string, []common.Hash) error, dropPeer func(string), onAccepted func(string, []common.Hash)) *TxFetcher {
return NewTxFetcherForTests(chain, validateMeta, addTxs, fetchTxs, dropPeer, onAccepted, mclock.System{}, time.Now, nil)
func NewTxFetcher(chain *core.BlockChain, validateMeta func(common.Hash, byte) error, addTxs func([]*types.Transaction) []error, fetchTxs func(string, []common.Hash) error, dropPeer func(string), onAccepted func(string, []common.Hash), onRequestLatency func(string, time.Duration)) *TxFetcher {
return NewTxFetcherForTests(chain, validateMeta, addTxs, fetchTxs, dropPeer, onAccepted, onRequestLatency, mclock.System{}, time.Now, nil)
}
// NewTxFetcherForTests is a testing method to mock out the realtime clock with
// a simulated version and the internal randomness with a deterministic one.
// Chain can be nil to disable on-chain checks.
func NewTxFetcherForTests(
chain *core.BlockChain, validateMeta func(common.Hash, byte) error, addTxs func([]*types.Transaction) []error, fetchTxs func(string, []common.Hash) error, dropPeer func(string), onAccepted func(string, []common.Hash),
chain *core.BlockChain, validateMeta func(common.Hash, byte) error, addTxs func([]*types.Transaction) []error, fetchTxs func(string, []common.Hash) error, dropPeer func(string), onAccepted func(string, []common.Hash), onRequestLatency func(string, time.Duration),
clock mclock.Clock, realTime func() time.Time, rand *mrand.Rand) *TxFetcher {
return &TxFetcher{
notify: make(chan *txAnnounce),
cleanup: make(chan *txDelivery),
drop: make(chan *txDrop),
quit: make(chan struct{}),
waitlist: make(map[common.Hash]map[string]struct{}),
waittime: make(map[common.Hash]mclock.AbsTime),
waitslots: make(map[string]map[common.Hash]*txMetadataWithSeq),
announces: make(map[string]map[common.Hash]*txMetadataWithSeq),
announced: make(map[common.Hash]map[string]struct{}),
fetching: make(map[common.Hash]string),
requests: make(map[string]*txRequest),
alternates: make(map[common.Hash]map[string]struct{}),
underpriced: lru.NewCache[common.Hash, time.Time](maxTxUnderpricedSetSize),
txOnChainCache: lru.NewCache[common.Hash, struct{}](txOnChainCacheLimit),
chain: chain,
validateMeta: validateMeta,
addTxs: addTxs,
fetchTxs: fetchTxs,
dropPeer: dropPeer,
onAccepted: onAccepted,
clock: clock,
realTime: realTime,
rand: rand,
notify: make(chan *txAnnounce),
cleanup: make(chan *txDelivery),
drop: make(chan *txDrop),
quit: make(chan struct{}),
waitlist: make(map[common.Hash]map[string]struct{}),
waittime: make(map[common.Hash]mclock.AbsTime),
waitslots: make(map[string]map[common.Hash]*txMetadataWithSeq),
announces: make(map[string]map[common.Hash]*txMetadataWithSeq),
announced: make(map[common.Hash]map[string]struct{}),
fetching: make(map[common.Hash]string),
requests: make(map[string]*txRequest),
alternates: make(map[common.Hash]map[string]struct{}),
underpriced: lru.NewCache[common.Hash, time.Time](maxTxUnderpricedSetSize),
txOnChainCache: lru.NewCache[common.Hash, struct{}](txOnChainCacheLimit),
chain: chain,
validateMeta: validateMeta,
addTxs: addTxs,
fetchTxs: fetchTxs,
dropPeer: dropPeer,
onAccepted: onAccepted,
onRequestLatency: onRequestLatency,
clock: clock,
realTime: realTime,
rand: rand,
}
}
@ -673,6 +675,14 @@ func (f *TxFetcher) loop() {
// Keep track of the request as dangling, but never expire
f.requests[peer].hashes = nil
txFetcherSlowPeers.Inc(1)
// Record the request as a timeout-latency sample. The slow
// EMA in the consumer counts timeouts as the timeout value
// itself, so a peer that times out repeatedly drags its
// score down without us having to wait for an eventual
// (possibly never-arriving) reply.
if f.onRequestLatency != nil {
f.onRequestLatency(peer, txFetchTimeout)
}
}
}
// Schedule a new transaction retrieval
@ -769,6 +779,11 @@ func (f *TxFetcher) loop() {
if req.hashes == nil {
txFetcherSlowPeers.Dec(1)
txFetcherSlowWait.Update(time.Duration(f.clock.Now() - req.time).Nanoseconds())
// Already counted as a timeout sample at the timeout site;
// don't double-record on eventual delivery.
} else if f.onRequestLatency != nil {
// Normal in-time delivery. Record the actual round-trip.
f.onRequestLatency(delivery.origin, time.Duration(f.clock.Now()-req.time))
}
delete(f.requests, delivery.origin)

View file

@ -22,6 +22,7 @@ import (
"math/big"
"math/rand"
"slices"
"sync"
"testing"
"time"
@ -97,7 +98,7 @@ func newTestTxFetcher() *TxFetcher {
return make([]error, len(txs))
},
func(string, []common.Hash) error { return nil },
nil, nil,
nil, nil, nil,
)
}
@ -2204,6 +2205,7 @@ func TestTransactionForgotten(t *testing.T) {
func(string, []common.Hash) error { return nil },
func(string) {},
nil,
nil,
mockClock,
mockTime,
rand.New(rand.NewSource(0)), // Use fixed seed for deterministic behavior
@ -2284,3 +2286,104 @@ func TestTransactionForgotten(t *testing.T) {
t.Errorf("wrong final underpriced cache size: got %d, want 1", size)
}
}
// latencyRecorder is a thread-safe recorder for onRequestLatency callbacks.
type latencyRecorder struct {
mu sync.Mutex
samples []latencySample
}
type latencySample struct {
peer string
latency time.Duration
}
func (r *latencyRecorder) record(peer string, latency time.Duration) {
r.mu.Lock()
defer r.mu.Unlock()
r.samples = append(r.samples, latencySample{peer, latency})
}
func (r *latencyRecorder) snapshot() []latencySample {
r.mu.Lock()
defer r.mu.Unlock()
out := make([]latencySample, len(r.samples))
copy(out, r.samples)
return out
}
// TestTransactionFetcherRequestLatencyOnDelivery asserts that an in-time
// direct delivery of a requested batch fires the onRequestLatency callback
// exactly once with the actual round-trip latency.
func TestTransactionFetcherRequestLatencyOnDelivery(t *testing.T) {
rec := &latencyRecorder{}
testTransactionFetcherParallel(t, txFetcherTest{
init: func() *TxFetcher {
f := newTestTxFetcher()
f.onRequestLatency = rec.record
return f
},
steps: []interface{}{
doTxNotify{peer: "A", hashes: []common.Hash{testTxsHashes[0]}, types: []byte{testTxs[0].Type()}, sizes: []uint32{uint32(testTxs[0].Size())}},
// Wait for the announce-arrival timer; request is dispatched at this point.
doWait{time: txArriveTimeout, step: true},
// Simulate 200ms round-trip before the response arrives.
doWait{time: 200 * time.Millisecond, step: false},
doTxEnqueue{peer: "A", txs: []*types.Transaction{testTxs[0]}, direct: true},
doFunc(func() {
samples := rec.snapshot()
if len(samples) != 1 {
t.Fatalf("expected 1 latency sample, got %d (%v)", len(samples), samples)
}
if samples[0].peer != "A" {
t.Errorf("peer mismatch: got %q, want A", samples[0].peer)
}
if samples[0].latency != 200*time.Millisecond {
t.Errorf("latency mismatch: got %v, want 200ms", samples[0].latency)
}
}),
},
})
}
// TestTransactionFetcherRequestLatencyOnTimeout asserts that when a request
// times out (no reply within txFetchTimeout), onRequestLatency fires once
// with the timeout value, and a subsequent (late) delivery does not fire
// a duplicate sample.
func TestTransactionFetcherRequestLatencyOnTimeout(t *testing.T) {
rec := &latencyRecorder{}
testTransactionFetcherParallel(t, txFetcherTest{
init: func() *TxFetcher {
f := newTestTxFetcher()
f.onRequestLatency = rec.record
return f
},
steps: []interface{}{
doTxNotify{peer: "A", hashes: []common.Hash{testTxsHashes[0]}, types: []byte{testTxs[0].Type()}, sizes: []uint32{uint32(testTxs[0].Size())}},
doWait{time: txArriveTimeout, step: true},
// Push the clock past the request deadline; the timeout handler
// should fire and record a single timeout-valued sample.
doWait{time: txFetchTimeout, step: true},
doFunc(func() {
samples := rec.snapshot()
if len(samples) != 1 {
t.Fatalf("expected 1 timeout sample, got %d (%v)", len(samples), samples)
}
if samples[0].peer != "A" {
t.Errorf("peer mismatch: got %q, want A", samples[0].peer)
}
if samples[0].latency != txFetchTimeout {
t.Errorf("latency mismatch: got %v, want %v", samples[0].latency, txFetchTimeout)
}
}),
// A late reply from the slow peer must not produce a second sample.
doTxEnqueue{peer: "A", txs: []*types.Transaction{testTxs[0]}, direct: true},
doFunc(func() {
samples := rec.snapshot()
if len(samples) != 1 {
t.Fatalf("late delivery double-counted latency: got %d samples, want 1", len(samples))
}
}),
},
})
}

View file

@ -191,7 +191,7 @@ func newHandler(config *handlerConfig) (*handler, error) {
return nil
}
h.txTracker = txtracker.New()
h.txFetcher = fetcher.NewTxFetcher(h.chain, validateMeta, addTxs, fetchTx, h.removePeer, h.txTracker.NotifyAccepted)
h.txFetcher = fetcher.NewTxFetcher(h.chain, validateMeta, addTxs, fetchTx, h.removePeer, h.txTracker.NotifyAccepted, nil)
return h, nil
}

View file

@ -84,7 +84,7 @@ func fuzz(input []byte) int {
return make([]error, len(txs))
},
func(string, []common.Hash) error { return nil },
nil, nil,
nil, nil, nil,
clock,
func() time.Time {
nanoTime := int64(clock.Now())