diff --git a/eth/fetcher/tx_fetcher.go b/eth/fetcher/tx_fetcher.go index ba48c6cc8d..ffed7d5acf 100644 --- a/eth/fetcher/tx_fetcher.go +++ b/eth/fetcher/tx_fetcher.go @@ -180,11 +180,12 @@ type TxFetcher struct { alternates map[common.Hash]map[string]struct{} // In-flight transaction alternate origins if retrieval fails // Callbacks - validateMeta func(common.Hash, byte) error // Validate a tx metadata based on the local txpool - addTxs func([]*types.Transaction) []error // Insert a batch of transactions into local txpool - fetchTxs func(string, []common.Hash) error // Retrieves a set of txs from a remote peer - dropPeer func(string) // Drops a peer in case of announcement violation - onAccepted func(peer string, hashes []common.Hash) // Optional: notified with accepted tx hashes per peer + validateMeta func(common.Hash, byte) error // Validate a tx metadata based on the local txpool + addTxs func([]*types.Transaction) []error // Insert a batch of transactions into local txpool + fetchTxs func(string, []common.Hash) error // Retrieves a set of txs from a remote peer + dropPeer func(string) // Drops a peer in case of announcement violation + onAccepted func(peer string, hashes []common.Hash) // Optional: notified with accepted tx hashes per peer + onRequestLatency func(peer string, latency time.Duration) // Optional: notified once per completed/timed-out tx request step chan struct{} // Notification channel when the fetcher loop iterates clock mclock.Clock // Monotonic clock or simulated clock for tests @@ -195,40 +196,41 @@ type TxFetcher struct { // NewTxFetcher creates a transaction fetcher to retrieve transaction // based on hash announcements. // Chain can be nil to disable on-chain checks. -func NewTxFetcher(chain *core.BlockChain, validateMeta func(common.Hash, byte) error, addTxs func([]*types.Transaction) []error, fetchTxs func(string, []common.Hash) error, dropPeer func(string), onAccepted func(string, []common.Hash)) *TxFetcher { - return NewTxFetcherForTests(chain, validateMeta, addTxs, fetchTxs, dropPeer, onAccepted, mclock.System{}, time.Now, nil) +func NewTxFetcher(chain *core.BlockChain, validateMeta func(common.Hash, byte) error, addTxs func([]*types.Transaction) []error, fetchTxs func(string, []common.Hash) error, dropPeer func(string), onAccepted func(string, []common.Hash), onRequestLatency func(string, time.Duration)) *TxFetcher { + return NewTxFetcherForTests(chain, validateMeta, addTxs, fetchTxs, dropPeer, onAccepted, onRequestLatency, mclock.System{}, time.Now, nil) } // NewTxFetcherForTests is a testing method to mock out the realtime clock with // a simulated version and the internal randomness with a deterministic one. // Chain can be nil to disable on-chain checks. func NewTxFetcherForTests( - chain *core.BlockChain, validateMeta func(common.Hash, byte) error, addTxs func([]*types.Transaction) []error, fetchTxs func(string, []common.Hash) error, dropPeer func(string), onAccepted func(string, []common.Hash), + chain *core.BlockChain, validateMeta func(common.Hash, byte) error, addTxs func([]*types.Transaction) []error, fetchTxs func(string, []common.Hash) error, dropPeer func(string), onAccepted func(string, []common.Hash), onRequestLatency func(string, time.Duration), clock mclock.Clock, realTime func() time.Time, rand *mrand.Rand) *TxFetcher { return &TxFetcher{ - notify: make(chan *txAnnounce), - cleanup: make(chan *txDelivery), - drop: make(chan *txDrop), - quit: make(chan struct{}), - waitlist: make(map[common.Hash]map[string]struct{}), - waittime: make(map[common.Hash]mclock.AbsTime), - waitslots: make(map[string]map[common.Hash]*txMetadataWithSeq), - announces: make(map[string]map[common.Hash]*txMetadataWithSeq), - announced: make(map[common.Hash]map[string]struct{}), - fetching: make(map[common.Hash]string), - requests: make(map[string]*txRequest), - alternates: make(map[common.Hash]map[string]struct{}), - underpriced: lru.NewCache[common.Hash, time.Time](maxTxUnderpricedSetSize), - txOnChainCache: lru.NewCache[common.Hash, struct{}](txOnChainCacheLimit), - chain: chain, - validateMeta: validateMeta, - addTxs: addTxs, - fetchTxs: fetchTxs, - dropPeer: dropPeer, - onAccepted: onAccepted, - clock: clock, - realTime: realTime, - rand: rand, + notify: make(chan *txAnnounce), + cleanup: make(chan *txDelivery), + drop: make(chan *txDrop), + quit: make(chan struct{}), + waitlist: make(map[common.Hash]map[string]struct{}), + waittime: make(map[common.Hash]mclock.AbsTime), + waitslots: make(map[string]map[common.Hash]*txMetadataWithSeq), + announces: make(map[string]map[common.Hash]*txMetadataWithSeq), + announced: make(map[common.Hash]map[string]struct{}), + fetching: make(map[common.Hash]string), + requests: make(map[string]*txRequest), + alternates: make(map[common.Hash]map[string]struct{}), + underpriced: lru.NewCache[common.Hash, time.Time](maxTxUnderpricedSetSize), + txOnChainCache: lru.NewCache[common.Hash, struct{}](txOnChainCacheLimit), + chain: chain, + validateMeta: validateMeta, + addTxs: addTxs, + fetchTxs: fetchTxs, + dropPeer: dropPeer, + onAccepted: onAccepted, + onRequestLatency: onRequestLatency, + clock: clock, + realTime: realTime, + rand: rand, } } @@ -673,6 +675,14 @@ func (f *TxFetcher) loop() { // Keep track of the request as dangling, but never expire f.requests[peer].hashes = nil txFetcherSlowPeers.Inc(1) + // Record the request as a timeout-latency sample. The slow + // EMA in the consumer counts timeouts as the timeout value + // itself, so a peer that times out repeatedly drags its + // score down without us having to wait for an eventual + // (possibly never-arriving) reply. + if f.onRequestLatency != nil { + f.onRequestLatency(peer, txFetchTimeout) + } } } // Schedule a new transaction retrieval @@ -769,6 +779,11 @@ func (f *TxFetcher) loop() { if req.hashes == nil { txFetcherSlowPeers.Dec(1) txFetcherSlowWait.Update(time.Duration(f.clock.Now() - req.time).Nanoseconds()) + // Already counted as a timeout sample at the timeout site; + // don't double-record on eventual delivery. + } else if f.onRequestLatency != nil { + // Normal in-time delivery. Record the actual round-trip. + f.onRequestLatency(delivery.origin, time.Duration(f.clock.Now()-req.time)) } delete(f.requests, delivery.origin) diff --git a/eth/fetcher/tx_fetcher_test.go b/eth/fetcher/tx_fetcher_test.go index 3fe11fda21..92f3979acb 100644 --- a/eth/fetcher/tx_fetcher_test.go +++ b/eth/fetcher/tx_fetcher_test.go @@ -22,6 +22,7 @@ import ( "math/big" "math/rand" "slices" + "sync" "testing" "time" @@ -97,7 +98,7 @@ func newTestTxFetcher() *TxFetcher { return make([]error, len(txs)) }, func(string, []common.Hash) error { return nil }, - nil, nil, + nil, nil, nil, ) } @@ -2204,6 +2205,7 @@ func TestTransactionForgotten(t *testing.T) { func(string, []common.Hash) error { return nil }, func(string) {}, nil, + nil, mockClock, mockTime, rand.New(rand.NewSource(0)), // Use fixed seed for deterministic behavior @@ -2284,3 +2286,104 @@ func TestTransactionForgotten(t *testing.T) { t.Errorf("wrong final underpriced cache size: got %d, want 1", size) } } + +// latencyRecorder is a thread-safe recorder for onRequestLatency callbacks. +type latencyRecorder struct { + mu sync.Mutex + samples []latencySample +} + +type latencySample struct { + peer string + latency time.Duration +} + +func (r *latencyRecorder) record(peer string, latency time.Duration) { + r.mu.Lock() + defer r.mu.Unlock() + r.samples = append(r.samples, latencySample{peer, latency}) +} + +func (r *latencyRecorder) snapshot() []latencySample { + r.mu.Lock() + defer r.mu.Unlock() + out := make([]latencySample, len(r.samples)) + copy(out, r.samples) + return out +} + +// TestTransactionFetcherRequestLatencyOnDelivery asserts that an in-time +// direct delivery of a requested batch fires the onRequestLatency callback +// exactly once with the actual round-trip latency. +func TestTransactionFetcherRequestLatencyOnDelivery(t *testing.T) { + rec := &latencyRecorder{} + testTransactionFetcherParallel(t, txFetcherTest{ + init: func() *TxFetcher { + f := newTestTxFetcher() + f.onRequestLatency = rec.record + return f + }, + steps: []interface{}{ + doTxNotify{peer: "A", hashes: []common.Hash{testTxsHashes[0]}, types: []byte{testTxs[0].Type()}, sizes: []uint32{uint32(testTxs[0].Size())}}, + // Wait for the announce-arrival timer; request is dispatched at this point. + doWait{time: txArriveTimeout, step: true}, + // Simulate 200ms round-trip before the response arrives. + doWait{time: 200 * time.Millisecond, step: false}, + doTxEnqueue{peer: "A", txs: []*types.Transaction{testTxs[0]}, direct: true}, + doFunc(func() { + samples := rec.snapshot() + if len(samples) != 1 { + t.Fatalf("expected 1 latency sample, got %d (%v)", len(samples), samples) + } + if samples[0].peer != "A" { + t.Errorf("peer mismatch: got %q, want A", samples[0].peer) + } + if samples[0].latency != 200*time.Millisecond { + t.Errorf("latency mismatch: got %v, want 200ms", samples[0].latency) + } + }), + }, + }) +} + +// TestTransactionFetcherRequestLatencyOnTimeout asserts that when a request +// times out (no reply within txFetchTimeout), onRequestLatency fires once +// with the timeout value, and a subsequent (late) delivery does not fire +// a duplicate sample. +func TestTransactionFetcherRequestLatencyOnTimeout(t *testing.T) { + rec := &latencyRecorder{} + testTransactionFetcherParallel(t, txFetcherTest{ + init: func() *TxFetcher { + f := newTestTxFetcher() + f.onRequestLatency = rec.record + return f + }, + steps: []interface{}{ + doTxNotify{peer: "A", hashes: []common.Hash{testTxsHashes[0]}, types: []byte{testTxs[0].Type()}, sizes: []uint32{uint32(testTxs[0].Size())}}, + doWait{time: txArriveTimeout, step: true}, + // Push the clock past the request deadline; the timeout handler + // should fire and record a single timeout-valued sample. + doWait{time: txFetchTimeout, step: true}, + doFunc(func() { + samples := rec.snapshot() + if len(samples) != 1 { + t.Fatalf("expected 1 timeout sample, got %d (%v)", len(samples), samples) + } + if samples[0].peer != "A" { + t.Errorf("peer mismatch: got %q, want A", samples[0].peer) + } + if samples[0].latency != txFetchTimeout { + t.Errorf("latency mismatch: got %v, want %v", samples[0].latency, txFetchTimeout) + } + }), + // A late reply from the slow peer must not produce a second sample. + doTxEnqueue{peer: "A", txs: []*types.Transaction{testTxs[0]}, direct: true}, + doFunc(func() { + samples := rec.snapshot() + if len(samples) != 1 { + t.Fatalf("late delivery double-counted latency: got %d samples, want 1", len(samples)) + } + }), + }, + }) +} diff --git a/eth/handler.go b/eth/handler.go index d809053533..6e6b0bb8ac 100644 --- a/eth/handler.go +++ b/eth/handler.go @@ -191,7 +191,7 @@ func newHandler(config *handlerConfig) (*handler, error) { return nil } h.txTracker = txtracker.New() - h.txFetcher = fetcher.NewTxFetcher(h.chain, validateMeta, addTxs, fetchTx, h.removePeer, h.txTracker.NotifyAccepted) + h.txFetcher = fetcher.NewTxFetcher(h.chain, validateMeta, addTxs, fetchTx, h.removePeer, h.txTracker.NotifyAccepted, nil) return h, nil } diff --git a/tests/fuzzers/txfetcher/txfetcher_fuzzer.go b/tests/fuzzers/txfetcher/txfetcher_fuzzer.go index 69674d0e62..10505af4af 100644 --- a/tests/fuzzers/txfetcher/txfetcher_fuzzer.go +++ b/tests/fuzzers/txfetcher/txfetcher_fuzzer.go @@ -84,7 +84,7 @@ func fuzz(input []byte) int { return make([]error, len(txs)) }, func(string, []common.Hash) error { return nil }, - nil, nil, + nil, nil, nil, clock, func() time.Time { nanoTime := int64(clock.Now())