mirror of
https://github.com/ethereum/go-ethereum.git
synced 2026-05-24 08:49:29 +00:00
Merge e8083ed0f7 into 12eabbd76d
This commit is contained in:
commit
76789b4742
11 changed files with 1790 additions and 58 deletions
|
|
@ -456,8 +456,13 @@ func (s *Ethereum) Start() error {
|
||||||
// Start the networking layer
|
// Start the networking layer
|
||||||
s.handler.Start(s.p2pServer.MaxPeers)
|
s.handler.Start(s.p2pServer.MaxPeers)
|
||||||
|
|
||||||
// Start the connection manager
|
// Start the transaction tracker; it emits per-block inclusion and
|
||||||
s.dropper.Start(s.p2pServer, func() bool { return !s.Synced() })
|
// finalization signals to peerStats, which the dropper queries for
|
||||||
|
// protection decisions.
|
||||||
|
s.handler.txTracker.Start(s.blockchain, s.handler.peerStats)
|
||||||
|
|
||||||
|
// Start the connection manager with inclusion-based peer protection.
|
||||||
|
s.dropper.Start(s.p2pServer, func() bool { return !s.Synced() }, s.handler.peerStats.GetAllPeerStats)
|
||||||
|
|
||||||
// start log indexer
|
// start log indexer
|
||||||
s.filterMaps.Start()
|
s.filterMaps.Start()
|
||||||
|
|
@ -581,6 +586,7 @@ func (s *Ethereum) Stop() error {
|
||||||
// Stop all the peer-related stuff first.
|
// Stop all the peer-related stuff first.
|
||||||
s.discmix.Close()
|
s.discmix.Close()
|
||||||
s.dropper.Stop()
|
s.dropper.Stop()
|
||||||
|
s.handler.txTracker.Stop()
|
||||||
s.handler.Stop()
|
s.handler.Stop()
|
||||||
|
|
||||||
// Then stop everything else.
|
// Then stop everything else.
|
||||||
|
|
|
||||||
164
eth/dropper.go
164
eth/dropper.go
|
|
@ -17,6 +17,7 @@
|
||||||
package eth
|
package eth
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"cmp"
|
||||||
mrand "math/rand"
|
mrand "math/rand"
|
||||||
"slices"
|
"slices"
|
||||||
"sync"
|
"sync"
|
||||||
|
|
@ -24,6 +25,7 @@ import (
|
||||||
|
|
||||||
"github.com/ethereum/go-ethereum/common"
|
"github.com/ethereum/go-ethereum/common"
|
||||||
"github.com/ethereum/go-ethereum/common/mclock"
|
"github.com/ethereum/go-ethereum/common/mclock"
|
||||||
|
"github.com/ethereum/go-ethereum/eth/peerstats"
|
||||||
"github.com/ethereum/go-ethereum/log"
|
"github.com/ethereum/go-ethereum/log"
|
||||||
"github.com/ethereum/go-ethereum/metrics"
|
"github.com/ethereum/go-ethereum/metrics"
|
||||||
"github.com/ethereum/go-ethereum/p2p"
|
"github.com/ethereum/go-ethereum/p2p"
|
||||||
|
|
@ -40,6 +42,10 @@ const (
|
||||||
// dropping when no more peers can be added. Larger numbers result in more
|
// dropping when no more peers can be added. Larger numbers result in more
|
||||||
// aggressive drop behavior.
|
// aggressive drop behavior.
|
||||||
peerDropThreshold = 0
|
peerDropThreshold = 0
|
||||||
|
// Fraction of inbound/dialed peers to protect based on inclusion stats.
|
||||||
|
// The top inclusionProtectionFrac of each category (by score) are
|
||||||
|
// shielded from random dropping. 0.1 = top 10%.
|
||||||
|
inclusionProtectionFrac = 0.1
|
||||||
)
|
)
|
||||||
|
|
||||||
var (
|
var (
|
||||||
|
|
@ -47,18 +53,77 @@ var (
|
||||||
droppedInbound = metrics.NewRegisteredMeter("eth/dropper/inbound", nil)
|
droppedInbound = metrics.NewRegisteredMeter("eth/dropper/inbound", nil)
|
||||||
// droppedOutbound is the number of outbound peers dropped
|
// droppedOutbound is the number of outbound peers dropped
|
||||||
droppedOutbound = metrics.NewRegisteredMeter("eth/dropper/outbound", nil)
|
droppedOutbound = metrics.NewRegisteredMeter("eth/dropper/outbound", nil)
|
||||||
|
// dropSkipped counts times a drop was attempted but no peer was dropped,
|
||||||
|
// for any reason (pool has headroom, all candidates trusted/static/young,
|
||||||
|
// or protected by inclusion stats).
|
||||||
|
dropSkipped = metrics.NewRegisteredMeter("eth/dropper/skipped", nil)
|
||||||
)
|
)
|
||||||
|
|
||||||
// dropper monitors the state of the peer pool and makes changes as follows:
|
// Callback type to get per-peer inclusion statistics.
|
||||||
// - during sync the Downloader handles peer connections, so dropper is disabled
|
type getPeerStatsFunc func() map[string]peerstats.PeerStats
|
||||||
// - if not syncing and the peer count is close to the limit, it drops peers
|
|
||||||
// randomly every peerDropInterval to make space for new peers
|
// protectionCategory defines a peer scoring function and the fraction of peers
|
||||||
// - peers are dropped separately from the inboud pool and from the dialed pool
|
// to protect per inbound/dialed category. Multiple categories are unioned.
|
||||||
|
type protectionCategory struct {
|
||||||
|
name string
|
||||||
|
score func(peerstats.PeerStats) float64
|
||||||
|
frac float64 // fraction of max peers to protect (0.0–1.0)
|
||||||
|
}
|
||||||
|
|
||||||
|
// protectionCategories is the list of protection criteria. Each category
|
||||||
|
// independently selects its top-N peers per pool; the union is protected.
|
||||||
|
var protectionCategories = []protectionCategory{
|
||||||
|
{"recent-finalized", func(s peerstats.PeerStats) float64 { return s.RecentFinalized }, inclusionProtectionFrac},
|
||||||
|
{"recent-included", func(s peerstats.PeerStats) float64 { return s.RecentIncluded }, inclusionProtectionFrac},
|
||||||
|
{"request-latency", func(s peerstats.PeerStats) float64 {
|
||||||
|
// Low-latency peers should rank higher. Peers with too few samples
|
||||||
|
// score 0 so the existing `score <= 0` filter excludes them — this
|
||||||
|
// prevents a single lucky-fast reply from winning protection. Peers
|
||||||
|
// whose EMA reaches the timeout also score 0 by this path because
|
||||||
|
// the reciprocal of a very large duration is tiny but positive; the
|
||||||
|
// per-pool top-N will still push faster peers ahead of them.
|
||||||
|
if s.RequestSuccesses+s.RequestTimeouts < peerstats.MinLatencySamples {
|
||||||
|
return 0
|
||||||
|
}
|
||||||
|
// Freshness gate: a peer that earned a fast EMA but then went
|
||||||
|
// silent on announcements (no requests → no fresh samples) must
|
||||||
|
// not keep that score indefinitely. Ignore stale data.
|
||||||
|
if time.Since(s.LastLatencySample) > peerstats.MaxLatencyStaleness {
|
||||||
|
return 0
|
||||||
|
}
|
||||||
|
if s.RequestLatencyEMA <= 0 {
|
||||||
|
return 0
|
||||||
|
}
|
||||||
|
return 1.0 / float64(s.RequestLatencyEMA)
|
||||||
|
}, inclusionProtectionFrac},
|
||||||
|
}
|
||||||
|
|
||||||
|
// dropper monitors the state of the peer pool and introduces churn by
|
||||||
|
// periodically disconnecting a random peer to make room for new connections.
|
||||||
|
// The main goal is to allow new peers to join the network and to facilitate
|
||||||
|
// continuous topology adaptation.
|
||||||
|
//
|
||||||
|
// Behavior:
|
||||||
|
// - During sync the Downloader handles peer connections, so dropper is disabled.
|
||||||
|
// - When not syncing and a peer category (inbound or dialed) is close to its
|
||||||
|
// limit, a random peer from that category is disconnected every 3–7 minutes.
|
||||||
|
// - Trusted and static peers are never dropped.
|
||||||
|
// - Recently connected peers are also protected from dropping to give them time
|
||||||
|
// to prove their value before being at risk of disconnection.
|
||||||
|
// - Some peers are protected from dropping based on their contribution
|
||||||
|
// to the tx pool. Each pool (inbound/dialed) independently selects its
|
||||||
|
// top fraction of peers by a per-peer EMA score — a slow EMA of
|
||||||
|
// finalized inclusions (~1-day half-life, rewards sustained long-term
|
||||||
|
// contribution) and a fast EMA of recent block inclusions (rewards
|
||||||
|
// current activity). The union of all protected sets is shielded from
|
||||||
|
// random dropping, and the drop target is chosen randomly from the
|
||||||
|
// remainder.
|
||||||
type dropper struct {
|
type dropper struct {
|
||||||
maxDialPeers int // maximum number of dialed peers
|
maxDialPeers int // maximum number of dialed peers
|
||||||
maxInboundPeers int // maximum number of inbound peers
|
maxInboundPeers int // maximum number of inbound peers
|
||||||
peersFunc getPeersFunc
|
peersFunc getPeersFunc
|
||||||
syncingFunc getSyncingFunc
|
syncingFunc getSyncingFunc
|
||||||
|
peerStatsFunc getPeerStatsFunc // optional: inclusion stats for protection
|
||||||
|
|
||||||
// peerDropTimer introduces churn if we are close to limit capacity.
|
// peerDropTimer introduces churn if we are close to limit capacity.
|
||||||
// We handle Dialed and Inbound connections separately
|
// We handle Dialed and Inbound connections separately
|
||||||
|
|
@ -88,10 +153,12 @@ func newDropper(maxDialPeers, maxInboundPeers int) *dropper {
|
||||||
return cm
|
return cm
|
||||||
}
|
}
|
||||||
|
|
||||||
// Start the dropper.
|
// Start the dropper. peerStatsFunc is optional (nil disables inclusion
|
||||||
func (cm *dropper) Start(srv *p2p.Server, syncingFunc getSyncingFunc) {
|
// protection).
|
||||||
|
func (cm *dropper) Start(srv *p2p.Server, syncingFunc getSyncingFunc, peerStatsFunc getPeerStatsFunc) {
|
||||||
cm.peersFunc = srv.Peers
|
cm.peersFunc = srv.Peers
|
||||||
cm.syncingFunc = syncingFunc
|
cm.syncingFunc = syncingFunc
|
||||||
|
cm.peerStatsFunc = peerStatsFunc
|
||||||
cm.wg.Add(1)
|
cm.wg.Add(1)
|
||||||
go cm.loop()
|
go cm.loop()
|
||||||
}
|
}
|
||||||
|
|
@ -114,18 +181,31 @@ func (cm *dropper) dropRandomPeer() bool {
|
||||||
}
|
}
|
||||||
numDialed := len(peers) - numInbound
|
numDialed := len(peers) - numInbound
|
||||||
|
|
||||||
|
// Fast path: if neither pool is near capacity, every non-trusted/non-static
|
||||||
|
// peer is already do-not-drop by pool-threshold rules. No point computing
|
||||||
|
// inclusion protection.
|
||||||
|
if cm.maxDialPeers-numDialed > peerDropThreshold &&
|
||||||
|
cm.maxInboundPeers-numInbound > peerDropThreshold {
|
||||||
|
dropSkipped.Mark(1)
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
|
||||||
|
// Compute the set of inclusion-protected peers before filtering.
|
||||||
|
protected := cm.protectedPeers(peers)
|
||||||
|
|
||||||
selectDoNotDrop := func(p *p2p.Peer) bool {
|
selectDoNotDrop := func(p *p2p.Peer) bool {
|
||||||
// Avoid dropping trusted and static peers, or recent peers.
|
|
||||||
// Only drop peers if their respective category (dialed/inbound)
|
|
||||||
// is close to limit capacity.
|
|
||||||
return p.Trusted() || p.StaticDialed() ||
|
return p.Trusted() || p.StaticDialed() ||
|
||||||
p.Lifetime() < mclock.AbsTime(doNotDropBefore) ||
|
p.Lifetime() < mclock.AbsTime(doNotDropBefore) ||
|
||||||
(p.DynDialed() && cm.maxDialPeers-numDialed > peerDropThreshold) ||
|
(p.DynDialed() && cm.maxDialPeers-numDialed > peerDropThreshold) ||
|
||||||
(p.Inbound() && cm.maxInboundPeers-numInbound > peerDropThreshold)
|
(p.Inbound() && cm.maxInboundPeers-numInbound > peerDropThreshold) ||
|
||||||
|
protected[p]
|
||||||
}
|
}
|
||||||
|
|
||||||
droppable := slices.DeleteFunc(peers, selectDoNotDrop)
|
droppable := slices.DeleteFunc(peers, selectDoNotDrop)
|
||||||
if len(droppable) > 0 {
|
if len(droppable) == 0 {
|
||||||
|
dropSkipped.Mark(1)
|
||||||
|
return false
|
||||||
|
}
|
||||||
p := droppable[mrand.Intn(len(droppable))]
|
p := droppable[mrand.Intn(len(droppable))]
|
||||||
log.Debug("Dropping random peer", "inbound", p.Inbound(),
|
log.Debug("Dropping random peer", "inbound", p.Inbound(),
|
||||||
"id", p.ID(), "duration", common.PrettyDuration(p.Lifetime()), "peercountbefore", len(peers))
|
"id", p.ID(), "duration", common.PrettyDuration(p.Lifetime()), "peercountbefore", len(peers))
|
||||||
|
|
@ -137,7 +217,65 @@ func (cm *dropper) dropRandomPeer() bool {
|
||||||
}
|
}
|
||||||
return true
|
return true
|
||||||
}
|
}
|
||||||
return false
|
|
||||||
|
// protectedPeers computes the set of peers that should not be dropped based
|
||||||
|
// on inclusion stats. Each protection category independently selects its
|
||||||
|
// top-N peers per inbound/dialed pool; the union is returned.
|
||||||
|
func (cm *dropper) protectedPeers(peers []*p2p.Peer) map[*p2p.Peer]bool {
|
||||||
|
if cm.peerStatsFunc == nil {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
stats := cm.peerStatsFunc()
|
||||||
|
if len(stats) == 0 {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
// Split peers by direction.
|
||||||
|
var inbound, dialed []*p2p.Peer
|
||||||
|
for _, p := range peers {
|
||||||
|
if p.Inbound() {
|
||||||
|
inbound = append(inbound, p)
|
||||||
|
} else {
|
||||||
|
dialed = append(dialed, p)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
result := protectedPeersByPool(inbound, dialed, stats)
|
||||||
|
if len(result) > 0 {
|
||||||
|
log.Debug("Protecting high-value peers from drop", "protected", len(result))
|
||||||
|
}
|
||||||
|
return result
|
||||||
|
}
|
||||||
|
|
||||||
|
// protectedPeersByPool selects the union of top-N peers per protection
|
||||||
|
// category across the given already-split inbound and dialed pools.
|
||||||
|
// Factored from protectedPeers so tests can exercise the per-pool
|
||||||
|
// selection logic without needing to construct direction-flagged
|
||||||
|
// *p2p.Peer instances (which require unexported p2p types).
|
||||||
|
func protectedPeersByPool(inbound, dialed []*p2p.Peer, stats map[string]peerstats.PeerStats) map[*p2p.Peer]bool {
|
||||||
|
result := make(map[*p2p.Peer]bool)
|
||||||
|
// protectPool selects the top-frac peers from pool by score and adds them to result.
|
||||||
|
protectPool := func(pool []*p2p.Peer, score func(*p2p.Peer) float64, frac float64) {
|
||||||
|
n := int(float64(len(pool)) * frac)
|
||||||
|
if n == 0 {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
sorted := slices.SortedFunc(slices.Values(pool), func(a, b *p2p.Peer) int {
|
||||||
|
return cmp.Compare(score(b), score(a)) // descending
|
||||||
|
})
|
||||||
|
top := slices.DeleteFunc(sorted[:min(n, len(sorted))], func(p *p2p.Peer) bool {
|
||||||
|
return score(p) <= 0
|
||||||
|
})
|
||||||
|
for _, p := range top {
|
||||||
|
result[p] = true
|
||||||
|
}
|
||||||
|
}
|
||||||
|
for _, cat := range protectionCategories {
|
||||||
|
score := func(p *p2p.Peer) float64 {
|
||||||
|
return cat.score(stats[p.ID().String()])
|
||||||
|
}
|
||||||
|
protectPool(inbound, score, cat.frac)
|
||||||
|
protectPool(dialed, score, cat.frac)
|
||||||
|
}
|
||||||
|
return result
|
||||||
}
|
}
|
||||||
|
|
||||||
// randomDuration generates a random duration between min and max.
|
// randomDuration generates a random duration between min and max.
|
||||||
|
|
|
||||||
378
eth/dropper_test.go
Normal file
378
eth/dropper_test.go
Normal file
|
|
@ -0,0 +1,378 @@
|
||||||
|
// Copyright 2026 The go-ethereum Authors
|
||||||
|
// This file is part of the go-ethereum library.
|
||||||
|
//
|
||||||
|
// The go-ethereum library is free software: you can redistribute it and/or modify
|
||||||
|
// it under the terms of the GNU Lesser General Public License as published by
|
||||||
|
// the Free Software Foundation, either version 3 of the License, or
|
||||||
|
// (at your option) any later version.
|
||||||
|
//
|
||||||
|
// The go-ethereum library is distributed in the hope that it will be useful,
|
||||||
|
// but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||||
|
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||||
|
// GNU Lesser General Public License for more details.
|
||||||
|
//
|
||||||
|
// You should have received a copy of the GNU Lesser General Public License
|
||||||
|
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
|
||||||
|
|
||||||
|
package eth
|
||||||
|
|
||||||
|
import (
|
||||||
|
"fmt"
|
||||||
|
"testing"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"github.com/ethereum/go-ethereum/eth/peerstats"
|
||||||
|
"github.com/ethereum/go-ethereum/p2p"
|
||||||
|
"github.com/ethereum/go-ethereum/p2p/enode"
|
||||||
|
)
|
||||||
|
|
||||||
|
func makePeers(n int) []*p2p.Peer {
|
||||||
|
peers := make([]*p2p.Peer, n)
|
||||||
|
for i := range peers {
|
||||||
|
id := enode.ID{byte(i)}
|
||||||
|
peers[i] = p2p.NewPeer(id, fmt.Sprintf("peer%d", i), nil)
|
||||||
|
}
|
||||||
|
return peers
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestProtectedPeersNoStats(t *testing.T) {
|
||||||
|
cm := &dropper{maxDialPeers: 20, maxInboundPeers: 30}
|
||||||
|
cm.peerStatsFunc = func() map[string]peerstats.PeerStats { return nil }
|
||||||
|
|
||||||
|
peers := makePeers(10)
|
||||||
|
protected := cm.protectedPeers(peers)
|
||||||
|
if len(protected) != 0 {
|
||||||
|
t.Fatalf("expected no protected peers with nil stats, got %d", len(protected))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestProtectedPeersEmptyStats(t *testing.T) {
|
||||||
|
cm := &dropper{maxDialPeers: 20, maxInboundPeers: 30}
|
||||||
|
cm.peerStatsFunc = func() map[string]peerstats.PeerStats {
|
||||||
|
return map[string]peerstats.PeerStats{}
|
||||||
|
}
|
||||||
|
|
||||||
|
peers := makePeers(10)
|
||||||
|
protected := cm.protectedPeers(peers)
|
||||||
|
if len(protected) != 0 {
|
||||||
|
t.Fatalf("expected no protected peers with empty stats, got %d", len(protected))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestProtectedPeersTopPeer(t *testing.T) {
|
||||||
|
// 20 peers, 10% of 20 = 2 protected per category.
|
||||||
|
cm := &dropper{maxDialPeers: 20, maxInboundPeers: 30}
|
||||||
|
|
||||||
|
peers := makePeers(20)
|
||||||
|
stats := make(map[string]peerstats.PeerStats)
|
||||||
|
stats[peers[0].ID().String()] = peerstats.PeerStats{RecentFinalized: 100}
|
||||||
|
stats[peers[1].ID().String()] = peerstats.PeerStats{RecentIncluded: 5.0}
|
||||||
|
|
||||||
|
cm.peerStatsFunc = func() map[string]peerstats.PeerStats { return stats }
|
||||||
|
|
||||||
|
protected := cm.protectedPeers(peers)
|
||||||
|
if len(protected) != 2 {
|
||||||
|
t.Fatalf("expected 2 protected peers, got %d", len(protected))
|
||||||
|
}
|
||||||
|
if !protected[peers[0]] {
|
||||||
|
t.Fatal("peer 0 should be protected (top RecentFinalized)")
|
||||||
|
}
|
||||||
|
if !protected[peers[1]] {
|
||||||
|
t.Fatal("peer 1 should be protected (top RecentIncluded)")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestProtectedPeersZeroScore(t *testing.T) {
|
||||||
|
cm := &dropper{maxDialPeers: 20, maxInboundPeers: 30}
|
||||||
|
|
||||||
|
peers := makePeers(10)
|
||||||
|
stats := make(map[string]peerstats.PeerStats)
|
||||||
|
for _, p := range peers {
|
||||||
|
stats[p.ID().String()] = peerstats.PeerStats{}
|
||||||
|
}
|
||||||
|
cm.peerStatsFunc = func() map[string]peerstats.PeerStats { return stats }
|
||||||
|
|
||||||
|
protected := cm.protectedPeers(peers)
|
||||||
|
if len(protected) != 0 {
|
||||||
|
t.Fatalf("expected no protection with zero scores, got %d", len(protected))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestProtectedPeersOverlap(t *testing.T) {
|
||||||
|
// One peer is top in both categories — counted once.
|
||||||
|
cm := &dropper{maxDialPeers: 20, maxInboundPeers: 30}
|
||||||
|
|
||||||
|
peers := makePeers(20)
|
||||||
|
stats := make(map[string]peerstats.PeerStats)
|
||||||
|
stats[peers[0].ID().String()] = peerstats.PeerStats{RecentFinalized: 100, RecentIncluded: 5.0}
|
||||||
|
|
||||||
|
cm.peerStatsFunc = func() map[string]peerstats.PeerStats { return stats }
|
||||||
|
|
||||||
|
protected := cm.protectedPeers(peers)
|
||||||
|
if len(protected) != 1 {
|
||||||
|
t.Fatalf("expected 1 protected peer (overlap), got %d", len(protected))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestProtectedPeersNilFunc(t *testing.T) {
|
||||||
|
cm := &dropper{maxDialPeers: 20, maxInboundPeers: 30}
|
||||||
|
// peerStatsFunc is nil (default).
|
||||||
|
|
||||||
|
peers := makePeers(10)
|
||||||
|
protected := cm.protectedPeers(peers)
|
||||||
|
if protected != nil {
|
||||||
|
t.Fatalf("expected nil with nil stats func, got %v", protected)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// TestProtectedByPoolPerPoolTopN verifies that the top-N selection runs
|
||||||
|
// independently in each of the inbound and dialed pools, not globally.
|
||||||
|
// With 10 peers per pool and inclusionProtectionFrac=0.1, exactly 1 peer
|
||||||
|
// is protected per pool per category — so 2 total (one per pool), both
|
||||||
|
// for the RecentFinalized category since we don't set RecentIncluded.
|
||||||
|
func TestProtectedByPoolPerPoolTopN(t *testing.T) {
|
||||||
|
inbound := makePeers(10)
|
||||||
|
dialed := makePeers(10)
|
||||||
|
// Distinguish dialed peer IDs from inbound so stats maps don't collide.
|
||||||
|
for i := range dialed {
|
||||||
|
id := enode.ID{byte(100 + i)}
|
||||||
|
dialed[i] = p2p.NewPeer(id, fmt.Sprintf("dialed%d", i), nil)
|
||||||
|
}
|
||||||
|
// Strictly increasing scores: highest wins in each pool.
|
||||||
|
stats := make(map[string]peerstats.PeerStats)
|
||||||
|
for i, p := range inbound {
|
||||||
|
stats[p.ID().String()] = peerstats.PeerStats{RecentFinalized: float64(1 + i)}
|
||||||
|
}
|
||||||
|
for i, p := range dialed {
|
||||||
|
stats[p.ID().String()] = peerstats.PeerStats{RecentFinalized: float64(1 + i)}
|
||||||
|
}
|
||||||
|
|
||||||
|
protected := protectedPeersByPool(inbound, dialed, stats)
|
||||||
|
|
||||||
|
// Expect top 1 of inbound (inbound[9]) and top 1 of dialed (dialed[9]).
|
||||||
|
if len(protected) != 2 {
|
||||||
|
t.Fatalf("expected 2 protected peers (1 per pool), got %d", len(protected))
|
||||||
|
}
|
||||||
|
if !protected[inbound[9]] {
|
||||||
|
t.Error("expected top inbound peer to be protected")
|
||||||
|
}
|
||||||
|
if !protected[dialed[9]] {
|
||||||
|
t.Error("expected top dialed peer to be protected")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// TestProtectedByPoolCrossCategoryOverlap verifies that the union across
|
||||||
|
// protection categories is correctly deduplicated: a peer that wins in
|
||||||
|
// multiple categories appears once, and category winners are all
|
||||||
|
// protected. Uses a pool large enough that frac*len yields n=2 per
|
||||||
|
// category, so cross-category overlap is observable.
|
||||||
|
func TestProtectedByPoolCrossCategoryOverlap(t *testing.T) {
|
||||||
|
// 20 dialed peers so 0.1 * 20 = 2 protected per category.
|
||||||
|
dialed := makePeers(20)
|
||||||
|
// P0: high RecentFinalized only. P1: high RecentIncluded only. P2: high both.
|
||||||
|
// With n=2 per category:
|
||||||
|
// RecentFinalized winners: P2 (tie-broken-ok), P0
|
||||||
|
// RecentIncluded winners: P2, P1
|
||||||
|
// Union: {P0, P1, P2}.
|
||||||
|
stats := make(map[string]peerstats.PeerStats)
|
||||||
|
stats[dialed[0].ID().String()] = peerstats.PeerStats{RecentFinalized: 100, RecentIncluded: 0}
|
||||||
|
stats[dialed[1].ID().String()] = peerstats.PeerStats{RecentFinalized: 0, RecentIncluded: 5.0}
|
||||||
|
stats[dialed[2].ID().String()] = peerstats.PeerStats{RecentFinalized: 200, RecentIncluded: 10.0}
|
||||||
|
|
||||||
|
protected := protectedPeersByPool(nil, dialed, stats)
|
||||||
|
|
||||||
|
if len(protected) != 3 {
|
||||||
|
t.Fatalf("expected 3 protected peers (union of category winners), got %d", len(protected))
|
||||||
|
}
|
||||||
|
for _, idx := range []int{0, 1, 2} {
|
||||||
|
if !protected[dialed[idx]] {
|
||||||
|
t.Errorf("peer %d should be protected", idx)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// TestProtectedByPoolPerPoolIndependence locks in that selection runs
|
||||||
|
// per-pool, not globally. Every inbound peer scores higher than every
|
||||||
|
// dialed peer, so a global top-N would pick only inbound peers. Per-pool
|
||||||
|
// top-N must still protect the top dialed peers.
|
||||||
|
func TestProtectedByPoolPerPoolIndependence(t *testing.T) {
|
||||||
|
// 20 inbound, 20 dialed — frac=0.1 → 2 protected per pool per category.
|
||||||
|
// Global top-4 of RecentFinalized would be inbound[16..19] — zero dialed.
|
||||||
|
inbound := makePeers(20)
|
||||||
|
dialed := make([]*p2p.Peer, 20)
|
||||||
|
for i := range dialed {
|
||||||
|
id := enode.ID{byte(100 + i)}
|
||||||
|
dialed[i] = p2p.NewPeer(id, fmt.Sprintf("dialed%d", i), nil)
|
||||||
|
}
|
||||||
|
stats := make(map[string]peerstats.PeerStats)
|
||||||
|
// Every inbound peer outscores every dialed peer.
|
||||||
|
for i, p := range inbound {
|
||||||
|
stats[p.ID().String()] = peerstats.PeerStats{RecentFinalized: float64(1000 + i)}
|
||||||
|
}
|
||||||
|
for i, p := range dialed {
|
||||||
|
stats[p.ID().String()] = peerstats.PeerStats{RecentFinalized: float64(1 + i)}
|
||||||
|
}
|
||||||
|
|
||||||
|
protected := protectedPeersByPool(inbound, dialed, stats)
|
||||||
|
|
||||||
|
// Per-pool top-2 of RecentFinalized:
|
||||||
|
// inbound: inbound[18], inbound[19]
|
||||||
|
// dialed: dialed[18], dialed[19]
|
||||||
|
// Global top-N would contain zero dialed peers, so asserting the top
|
||||||
|
// dialed peers are protected enforces per-pool independence.
|
||||||
|
if !protected[dialed[19]] {
|
||||||
|
t.Fatal("top dialed peer must be protected regardless of globally-higher inbound peers")
|
||||||
|
}
|
||||||
|
if !protected[dialed[18]] {
|
||||||
|
t.Fatal("second-top dialed peer must be protected regardless of globally-higher inbound peers")
|
||||||
|
}
|
||||||
|
if !protected[inbound[19]] || !protected[inbound[18]] {
|
||||||
|
t.Fatal("top inbound peers must also be protected")
|
||||||
|
}
|
||||||
|
if len(protected) != 4 {
|
||||||
|
t.Fatalf("expected 4 protected peers (top-2 of each pool), got %d", len(protected))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// TestProtectedByPoolRequestLatencyBasic verifies the latency protection
|
||||||
|
// category: with no competing inclusion stats, the lowest-latency peers
|
||||||
|
// (among those with enough samples) win top-N protection.
|
||||||
|
func TestProtectedByPoolRequestLatencyBasic(t *testing.T) {
|
||||||
|
dialed := makePeers(20) // frac=0.1 → n=2 per category
|
||||||
|
stats := make(map[string]peerstats.PeerStats)
|
||||||
|
// Three peers have enough samples; the two fastest should win.
|
||||||
|
stats[dialed[0].ID().String()] = peerstats.PeerStats{
|
||||||
|
RequestLatencyEMA: 50 * time.Millisecond,
|
||||||
|
RequestSuccesses: peerstats.MinLatencySamples,
|
||||||
|
LastLatencySample: time.Now(),
|
||||||
|
}
|
||||||
|
stats[dialed[1].ID().String()] = peerstats.PeerStats{
|
||||||
|
RequestLatencyEMA: 100 * time.Millisecond,
|
||||||
|
RequestSuccesses: peerstats.MinLatencySamples,
|
||||||
|
LastLatencySample: time.Now(),
|
||||||
|
}
|
||||||
|
stats[dialed[2].ID().String()] = peerstats.PeerStats{
|
||||||
|
RequestLatencyEMA: 2 * time.Second,
|
||||||
|
RequestSuccesses: peerstats.MinLatencySamples,
|
||||||
|
LastLatencySample: time.Now(),
|
||||||
|
}
|
||||||
|
|
||||||
|
protected := protectedPeersByPool(nil, dialed, stats)
|
||||||
|
|
||||||
|
if !protected[dialed[0]] {
|
||||||
|
t.Error("fastest peer should be protected")
|
||||||
|
}
|
||||||
|
if !protected[dialed[1]] {
|
||||||
|
t.Error("second-fastest peer should be protected")
|
||||||
|
}
|
||||||
|
if protected[dialed[2]] {
|
||||||
|
t.Error("slowest peer should not be in top-2")
|
||||||
|
}
|
||||||
|
if len(protected) != 2 {
|
||||||
|
t.Fatalf("expected top-2 latency protection, got %d", len(protected))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// TestProtectedByPoolRequestLatencyBootstrapGuard verifies that peers with
|
||||||
|
// fewer than MinLatencySamples do not earn latency-based protection, even
|
||||||
|
// if their few samples indicate very low latency.
|
||||||
|
func TestProtectedByPoolRequestLatencyBootstrapGuard(t *testing.T) {
|
||||||
|
dialed := makePeers(20)
|
||||||
|
stats := make(map[string]peerstats.PeerStats)
|
||||||
|
// A lucky-fast peer with only 1 sample — must NOT be protected.
|
||||||
|
stats[dialed[0].ID().String()] = peerstats.PeerStats{
|
||||||
|
RequestLatencyEMA: 1 * time.Millisecond,
|
||||||
|
RequestSuccesses: 1,
|
||||||
|
}
|
||||||
|
// A warmed-up but slower peer — should be protected on latency.
|
||||||
|
stats[dialed[1].ID().String()] = peerstats.PeerStats{
|
||||||
|
RequestLatencyEMA: 500 * time.Millisecond,
|
||||||
|
RequestSuccesses: peerstats.MinLatencySamples,
|
||||||
|
LastLatencySample: time.Now(),
|
||||||
|
}
|
||||||
|
|
||||||
|
protected := protectedPeersByPool(nil, dialed, stats)
|
||||||
|
|
||||||
|
if protected[dialed[0]] {
|
||||||
|
t.Error("under-sampled peer should not be protected (bootstrap guard)")
|
||||||
|
}
|
||||||
|
if !protected[dialed[1]] {
|
||||||
|
t.Error("warmed-up peer should be protected")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// TestProtectedByPoolRequestLatencyPerPool verifies that the latency
|
||||||
|
// category selects top-N per pool independently, consistent with the
|
||||||
|
// other categories. An inbound peer with lower latency does not prevent
|
||||||
|
// a dialed peer from being protected as top of the dialed pool.
|
||||||
|
func TestProtectedByPoolRequestLatencyPerPool(t *testing.T) {
|
||||||
|
inbound := makePeers(20)
|
||||||
|
dialed := make([]*p2p.Peer, 20)
|
||||||
|
for i := range dialed {
|
||||||
|
id := enode.ID{byte(100 + i)}
|
||||||
|
dialed[i] = p2p.NewPeer(id, fmt.Sprintf("dialed%d", i), nil)
|
||||||
|
}
|
||||||
|
stats := make(map[string]peerstats.PeerStats)
|
||||||
|
// All inbound peers are very fast (50ms).
|
||||||
|
for _, p := range inbound {
|
||||||
|
stats[p.ID().String()] = peerstats.PeerStats{
|
||||||
|
RequestLatencyEMA: 50 * time.Millisecond,
|
||||||
|
RequestSuccesses: peerstats.MinLatencySamples,
|
||||||
|
LastLatencySample: time.Now(),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
// Dialed peers are slower (1s) — globally they would all lose, but
|
||||||
|
// per-pool top-N should still protect two of them.
|
||||||
|
for _, p := range dialed {
|
||||||
|
stats[p.ID().String()] = peerstats.PeerStats{
|
||||||
|
RequestLatencyEMA: 1 * time.Second,
|
||||||
|
RequestSuccesses: peerstats.MinLatencySamples,
|
||||||
|
LastLatencySample: time.Now(),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
protected := protectedPeersByPool(inbound, dialed, stats)
|
||||||
|
|
||||||
|
// 2 from inbound + 2 from dialed = 4.
|
||||||
|
var dialedProtected int
|
||||||
|
for _, p := range dialed {
|
||||||
|
if protected[p] {
|
||||||
|
dialedProtected++
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if dialedProtected != 2 {
|
||||||
|
t.Fatalf("expected 2 dialed peers protected by per-pool top-N, got %d", dialedProtected)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// TestProtectedByPoolRequestLatencyStale verifies that the freshness gate
|
||||||
|
// excludes peers whose latency EMA is valid (meeting the sample count and
|
||||||
|
// fast value) but whose last sample is older than MaxLatencyStaleness.
|
||||||
|
// A peer cannot serve a burst of fast replies, go silent on announcements,
|
||||||
|
// and keep latency-based protection indefinitely.
|
||||||
|
func TestProtectedByPoolRequestLatencyStale(t *testing.T) {
|
||||||
|
dialed := makePeers(20)
|
||||||
|
stats := make(map[string]peerstats.PeerStats)
|
||||||
|
// Fresh, fast peer — should be protected.
|
||||||
|
stats[dialed[0].ID().String()] = peerstats.PeerStats{
|
||||||
|
RequestLatencyEMA: 50 * time.Millisecond,
|
||||||
|
RequestSuccesses: peerstats.MinLatencySamples,
|
||||||
|
LastLatencySample: time.Now(),
|
||||||
|
}
|
||||||
|
// Stale, fast peer — was fast, but hasn't answered in too long.
|
||||||
|
// Same EMA and sample count as the fresh peer; only staleness differs.
|
||||||
|
stats[dialed[1].ID().String()] = peerstats.PeerStats{
|
||||||
|
RequestLatencyEMA: 50 * time.Millisecond,
|
||||||
|
RequestSuccesses: peerstats.MinLatencySamples,
|
||||||
|
LastLatencySample: time.Now().Add(-2 * peerstats.MaxLatencyStaleness),
|
||||||
|
}
|
||||||
|
|
||||||
|
protected := protectedPeersByPool(nil, dialed, stats)
|
||||||
|
|
||||||
|
if !protected[dialed[0]] {
|
||||||
|
t.Error("fresh fast peer must be protected")
|
||||||
|
}
|
||||||
|
if protected[dialed[1]] {
|
||||||
|
t.Error("stale peer must NOT keep latency protection despite fast EMA")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
@ -184,6 +184,8 @@ type TxFetcher struct {
|
||||||
addTxs func([]*types.Transaction) []error // Insert a batch of transactions into local txpool
|
addTxs func([]*types.Transaction) []error // Insert a batch of transactions into local txpool
|
||||||
fetchTxs func(string, []common.Hash) error // Retrieves a set of txs from a remote peer
|
fetchTxs func(string, []common.Hash) error // Retrieves a set of txs from a remote peer
|
||||||
dropPeer func(string) // Drops a peer in case of announcement violation
|
dropPeer func(string) // Drops a peer in case of announcement violation
|
||||||
|
onAccepted func(peer string, hashes []common.Hash) // Optional: notified with accepted tx hashes per peer
|
||||||
|
onRequestResult func(peer string, latency time.Duration, timeout bool) // Optional: notified once per completed/timed-out tx request
|
||||||
|
|
||||||
step chan struct{} // Notification channel when the fetcher loop iterates
|
step chan struct{} // Notification channel when the fetcher loop iterates
|
||||||
clock mclock.Clock // Monotonic clock or simulated clock for tests
|
clock mclock.Clock // Monotonic clock or simulated clock for tests
|
||||||
|
|
@ -194,15 +196,15 @@ type TxFetcher struct {
|
||||||
// NewTxFetcher creates a transaction fetcher to retrieve transaction
|
// NewTxFetcher creates a transaction fetcher to retrieve transaction
|
||||||
// based on hash announcements.
|
// based on hash announcements.
|
||||||
// Chain can be nil to disable on-chain checks.
|
// Chain can be nil to disable on-chain checks.
|
||||||
func NewTxFetcher(chain *core.BlockChain, validateMeta func(common.Hash, byte) error, addTxs func([]*types.Transaction) []error, fetchTxs func(string, []common.Hash) error, dropPeer func(string)) *TxFetcher {
|
func NewTxFetcher(chain *core.BlockChain, validateMeta func(common.Hash, byte) error, addTxs func([]*types.Transaction) []error, fetchTxs func(string, []common.Hash) error, dropPeer func(string), onAccepted func(string, []common.Hash), onRequestResult func(string, time.Duration, bool)) *TxFetcher {
|
||||||
return NewTxFetcherForTests(chain, validateMeta, addTxs, fetchTxs, dropPeer, mclock.System{}, time.Now, nil)
|
return NewTxFetcherForTests(chain, validateMeta, addTxs, fetchTxs, dropPeer, onAccepted, onRequestResult, mclock.System{}, time.Now, nil)
|
||||||
}
|
}
|
||||||
|
|
||||||
// NewTxFetcherForTests is a testing method to mock out the realtime clock with
|
// NewTxFetcherForTests is a testing method to mock out the realtime clock with
|
||||||
// a simulated version and the internal randomness with a deterministic one.
|
// a simulated version and the internal randomness with a deterministic one.
|
||||||
// Chain can be nil to disable on-chain checks.
|
// Chain can be nil to disable on-chain checks.
|
||||||
func NewTxFetcherForTests(
|
func NewTxFetcherForTests(
|
||||||
chain *core.BlockChain, validateMeta func(common.Hash, byte) error, addTxs func([]*types.Transaction) []error, fetchTxs func(string, []common.Hash) error, dropPeer func(string),
|
chain *core.BlockChain, validateMeta func(common.Hash, byte) error, addTxs func([]*types.Transaction) []error, fetchTxs func(string, []common.Hash) error, dropPeer func(string), onAccepted func(string, []common.Hash), onRequestResult func(string, time.Duration, bool),
|
||||||
clock mclock.Clock, realTime func() time.Time, rand *mrand.Rand) *TxFetcher {
|
clock mclock.Clock, realTime func() time.Time, rand *mrand.Rand) *TxFetcher {
|
||||||
return &TxFetcher{
|
return &TxFetcher{
|
||||||
notify: make(chan *txAnnounce),
|
notify: make(chan *txAnnounce),
|
||||||
|
|
@ -224,6 +226,8 @@ func NewTxFetcherForTests(
|
||||||
addTxs: addTxs,
|
addTxs: addTxs,
|
||||||
fetchTxs: fetchTxs,
|
fetchTxs: fetchTxs,
|
||||||
dropPeer: dropPeer,
|
dropPeer: dropPeer,
|
||||||
|
onAccepted: onAccepted,
|
||||||
|
onRequestResult: onRequestResult,
|
||||||
clock: clock,
|
clock: clock,
|
||||||
realTime: realTime,
|
realTime: realTime,
|
||||||
rand: rand,
|
rand: rand,
|
||||||
|
|
@ -344,6 +348,8 @@ func (f *TxFetcher) Enqueue(peer string, txs []*types.Transaction, direct bool)
|
||||||
)
|
)
|
||||||
batch := txs[i:end]
|
batch := txs[i:end]
|
||||||
|
|
||||||
|
var accepted []common.Hash
|
||||||
|
|
||||||
for j, err := range f.addTxs(batch) {
|
for j, err := range f.addTxs(batch) {
|
||||||
// Track the transaction hash if the price is too low for us.
|
// Track the transaction hash if the price is too low for us.
|
||||||
// Avoid re-request this transaction when we receive another
|
// Avoid re-request this transaction when we receive another
|
||||||
|
|
@ -353,7 +359,8 @@ func (f *TxFetcher) Enqueue(peer string, txs []*types.Transaction, direct bool)
|
||||||
}
|
}
|
||||||
// Track a few interesting failure types
|
// Track a few interesting failure types
|
||||||
switch {
|
switch {
|
||||||
case err == nil: // Noop, but need to handle to not count these
|
case err == nil:
|
||||||
|
accepted = append(accepted, batch[j].Hash())
|
||||||
|
|
||||||
case errors.Is(err, txpool.ErrAlreadyKnown):
|
case errors.Is(err, txpool.ErrAlreadyKnown):
|
||||||
duplicate++
|
duplicate++
|
||||||
|
|
@ -385,6 +392,10 @@ func (f *TxFetcher) Enqueue(peer string, txs []*types.Transaction, direct bool)
|
||||||
underpricedMeter.Mark(underpriced)
|
underpricedMeter.Mark(underpriced)
|
||||||
otherRejectMeter.Mark(otherreject)
|
otherRejectMeter.Mark(otherreject)
|
||||||
|
|
||||||
|
// Notify the tracker which txs from this peer were accepted.
|
||||||
|
if f.onAccepted != nil && len(accepted) > 0 {
|
||||||
|
f.onAccepted(peer, accepted)
|
||||||
|
}
|
||||||
// If 'other reject' is >25% of the deliveries in any batch, sleep a bit.
|
// If 'other reject' is >25% of the deliveries in any batch, sleep a bit.
|
||||||
if otherreject > int64((len(batch)+3)/4) {
|
if otherreject > int64((len(batch)+3)/4) {
|
||||||
log.Debug("Peer delivering stale or invalid transactions", "peer", peer, "rejected", otherreject)
|
log.Debug("Peer delivering stale or invalid transactions", "peer", peer, "rejected", otherreject)
|
||||||
|
|
@ -664,6 +675,14 @@ func (f *TxFetcher) loop() {
|
||||||
// Keep track of the request as dangling, but never expire
|
// Keep track of the request as dangling, but never expire
|
||||||
f.requests[peer].hashes = nil
|
f.requests[peer].hashes = nil
|
||||||
txFetcherSlowPeers.Inc(1)
|
txFetcherSlowPeers.Inc(1)
|
||||||
|
// Record the request as a timeout-latency sample. The slow
|
||||||
|
// EMA in the consumer counts timeouts as the timeout value
|
||||||
|
// itself, so a peer that times out repeatedly drags its
|
||||||
|
// score down without us having to wait for an eventual
|
||||||
|
// (possibly never-arriving) reply.
|
||||||
|
if f.onRequestResult != nil {
|
||||||
|
f.onRequestResult(peer, txFetchTimeout, true)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
// Schedule a new transaction retrieval
|
// Schedule a new transaction retrieval
|
||||||
|
|
@ -760,6 +779,11 @@ func (f *TxFetcher) loop() {
|
||||||
if req.hashes == nil {
|
if req.hashes == nil {
|
||||||
txFetcherSlowPeers.Dec(1)
|
txFetcherSlowPeers.Dec(1)
|
||||||
txFetcherSlowWait.Update(time.Duration(f.clock.Now() - req.time).Nanoseconds())
|
txFetcherSlowWait.Update(time.Duration(f.clock.Now() - req.time).Nanoseconds())
|
||||||
|
// Already counted as a timeout sample at the timeout site;
|
||||||
|
// don't double-record on eventual delivery.
|
||||||
|
} else if f.onRequestResult != nil {
|
||||||
|
// Normal in-time delivery. Record the actual round-trip.
|
||||||
|
f.onRequestResult(delivery.origin, time.Duration(f.clock.Now()-req.time), false)
|
||||||
}
|
}
|
||||||
delete(f.requests, delivery.origin)
|
delete(f.requests, delivery.origin)
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -22,6 +22,7 @@ import (
|
||||||
"math/big"
|
"math/big"
|
||||||
"math/rand"
|
"math/rand"
|
||||||
"slices"
|
"slices"
|
||||||
|
"sync"
|
||||||
"testing"
|
"testing"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
|
@ -97,7 +98,7 @@ func newTestTxFetcher() *TxFetcher {
|
||||||
return make([]error, len(txs))
|
return make([]error, len(txs))
|
||||||
},
|
},
|
||||||
func(string, []common.Hash) error { return nil },
|
func(string, []common.Hash) error { return nil },
|
||||||
nil,
|
nil, nil, nil,
|
||||||
)
|
)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -2203,6 +2204,8 @@ func TestTransactionForgotten(t *testing.T) {
|
||||||
},
|
},
|
||||||
func(string, []common.Hash) error { return nil },
|
func(string, []common.Hash) error { return nil },
|
||||||
func(string) {},
|
func(string) {},
|
||||||
|
nil,
|
||||||
|
nil,
|
||||||
mockClock,
|
mockClock,
|
||||||
mockTime,
|
mockTime,
|
||||||
rand.New(rand.NewSource(0)), // Use fixed seed for deterministic behavior
|
rand.New(rand.NewSource(0)), // Use fixed seed for deterministic behavior
|
||||||
|
|
@ -2283,3 +2286,103 @@ func TestTransactionForgotten(t *testing.T) {
|
||||||
t.Errorf("wrong final underpriced cache size: got %d, want 1", size)
|
t.Errorf("wrong final underpriced cache size: got %d, want 1", size)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// resultRecorder is a thread-safe recorder for onRequestResult callbacks.
|
||||||
|
type resultRecorder struct {
|
||||||
|
mu sync.Mutex
|
||||||
|
samples []resultSample
|
||||||
|
}
|
||||||
|
|
||||||
|
type resultSample struct {
|
||||||
|
peer string
|
||||||
|
latency time.Duration
|
||||||
|
timeout bool
|
||||||
|
}
|
||||||
|
|
||||||
|
func (r *resultRecorder) record(peer string, latency time.Duration, timeout bool) {
|
||||||
|
r.mu.Lock()
|
||||||
|
defer r.mu.Unlock()
|
||||||
|
r.samples = append(r.samples, resultSample{peer, latency, timeout})
|
||||||
|
}
|
||||||
|
|
||||||
|
func (r *resultRecorder) snapshot() []resultSample {
|
||||||
|
r.mu.Lock()
|
||||||
|
defer r.mu.Unlock()
|
||||||
|
out := make([]resultSample, len(r.samples))
|
||||||
|
copy(out, r.samples)
|
||||||
|
return out
|
||||||
|
}
|
||||||
|
|
||||||
|
// TestTransactionFetcherRequestResultOnDelivery asserts that an in-time
|
||||||
|
// direct delivery fires the onRequestResult callback with timeout=false.
|
||||||
|
func TestTransactionFetcherRequestResultOnDelivery(t *testing.T) {
|
||||||
|
rec := &resultRecorder{}
|
||||||
|
testTransactionFetcherParallel(t, txFetcherTest{
|
||||||
|
init: func() *TxFetcher {
|
||||||
|
f := newTestTxFetcher()
|
||||||
|
f.onRequestResult = rec.record
|
||||||
|
return f
|
||||||
|
},
|
||||||
|
steps: []interface{}{
|
||||||
|
doTxNotify{peer: "A", hashes: []common.Hash{testTxsHashes[0]}, types: []byte{testTxs[0].Type()}, sizes: []uint32{uint32(testTxs[0].Size())}},
|
||||||
|
doWait{time: txArriveTimeout, step: true},
|
||||||
|
doWait{time: 200 * time.Millisecond, step: false},
|
||||||
|
doTxEnqueue{peer: "A", txs: []*types.Transaction{testTxs[0]}, direct: true},
|
||||||
|
doFunc(func() {
|
||||||
|
samples := rec.snapshot()
|
||||||
|
if len(samples) != 1 {
|
||||||
|
t.Fatalf("expected 1 sample, got %d (%v)", len(samples), samples)
|
||||||
|
}
|
||||||
|
if samples[0].peer != "A" {
|
||||||
|
t.Errorf("peer mismatch: got %q, want A", samples[0].peer)
|
||||||
|
}
|
||||||
|
if samples[0].latency != 200*time.Millisecond {
|
||||||
|
t.Errorf("latency mismatch: got %v, want 200ms", samples[0].latency)
|
||||||
|
}
|
||||||
|
if samples[0].timeout {
|
||||||
|
t.Error("expected timeout=false for delivery")
|
||||||
|
}
|
||||||
|
}),
|
||||||
|
},
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
// TestTransactionFetcherRequestResultOnTimeout asserts that a timed-out
|
||||||
|
// request fires onRequestResult with timeout=true and the timeout value,
|
||||||
|
// and a subsequent (late) delivery does not fire a duplicate sample.
|
||||||
|
func TestTransactionFetcherRequestResultOnTimeout(t *testing.T) {
|
||||||
|
rec := &resultRecorder{}
|
||||||
|
testTransactionFetcherParallel(t, txFetcherTest{
|
||||||
|
init: func() *TxFetcher {
|
||||||
|
f := newTestTxFetcher()
|
||||||
|
f.onRequestResult = rec.record
|
||||||
|
return f
|
||||||
|
},
|
||||||
|
steps: []interface{}{
|
||||||
|
doTxNotify{peer: "A", hashes: []common.Hash{testTxsHashes[0]}, types: []byte{testTxs[0].Type()}, sizes: []uint32{uint32(testTxs[0].Size())}},
|
||||||
|
doWait{time: txArriveTimeout, step: true},
|
||||||
|
doWait{time: txFetchTimeout, step: true},
|
||||||
|
doFunc(func() {
|
||||||
|
samples := rec.snapshot()
|
||||||
|
if len(samples) != 1 {
|
||||||
|
t.Fatalf("expected 1 timeout sample, got %d (%v)", len(samples), samples)
|
||||||
|
}
|
||||||
|
if samples[0].peer != "A" {
|
||||||
|
t.Errorf("peer mismatch: got %q, want A", samples[0].peer)
|
||||||
|
}
|
||||||
|
if samples[0].latency != txFetchTimeout {
|
||||||
|
t.Errorf("latency mismatch: got %v, want %v", samples[0].latency, txFetchTimeout)
|
||||||
|
}
|
||||||
|
if !samples[0].timeout {
|
||||||
|
t.Error("expected timeout=true for timed-out request")
|
||||||
|
}
|
||||||
|
}),
|
||||||
|
doTxEnqueue{peer: "A", txs: []*types.Transaction{testTxs[0]}, direct: true},
|
||||||
|
doFunc(func() {
|
||||||
|
if len(rec.snapshot()) != 1 {
|
||||||
|
t.Fatalf("late delivery double-counted: got %d samples, want 1", len(rec.snapshot()))
|
||||||
|
}
|
||||||
|
}),
|
||||||
|
},
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
|
||||||
|
|
@ -36,8 +36,10 @@ import (
|
||||||
"github.com/ethereum/go-ethereum/eth/downloader"
|
"github.com/ethereum/go-ethereum/eth/downloader"
|
||||||
"github.com/ethereum/go-ethereum/eth/ethconfig"
|
"github.com/ethereum/go-ethereum/eth/ethconfig"
|
||||||
"github.com/ethereum/go-ethereum/eth/fetcher"
|
"github.com/ethereum/go-ethereum/eth/fetcher"
|
||||||
|
"github.com/ethereum/go-ethereum/eth/peerstats"
|
||||||
"github.com/ethereum/go-ethereum/eth/protocols/eth"
|
"github.com/ethereum/go-ethereum/eth/protocols/eth"
|
||||||
"github.com/ethereum/go-ethereum/eth/protocols/snap"
|
"github.com/ethereum/go-ethereum/eth/protocols/snap"
|
||||||
|
"github.com/ethereum/go-ethereum/eth/txtracker"
|
||||||
"github.com/ethereum/go-ethereum/ethdb"
|
"github.com/ethereum/go-ethereum/ethdb"
|
||||||
"github.com/ethereum/go-ethereum/event"
|
"github.com/ethereum/go-ethereum/event"
|
||||||
"github.com/ethereum/go-ethereum/log"
|
"github.com/ethereum/go-ethereum/log"
|
||||||
|
|
@ -122,6 +124,8 @@ type handler struct {
|
||||||
|
|
||||||
downloader *downloader.Downloader
|
downloader *downloader.Downloader
|
||||||
txFetcher *fetcher.TxFetcher
|
txFetcher *fetcher.TxFetcher
|
||||||
|
txTracker *txtracker.Tracker
|
||||||
|
peerStats *peerstats.Stats
|
||||||
peers *peerSet
|
peers *peerSet
|
||||||
txBroadcastKey [16]byte
|
txBroadcastKey [16]byte
|
||||||
|
|
||||||
|
|
@ -181,7 +185,9 @@ func newHandler(config *handlerConfig) (*handler, error) {
|
||||||
}
|
}
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
h.txFetcher = fetcher.NewTxFetcher(h.chain, validateMeta, addTxs, fetchTx, h.removePeer)
|
h.txTracker = txtracker.New()
|
||||||
|
h.peerStats = peerstats.New()
|
||||||
|
h.txFetcher = fetcher.NewTxFetcher(h.chain, validateMeta, addTxs, fetchTx, h.removePeer, h.txTracker.NotifyAccepted, h.peerStats.NotifyRequestResult)
|
||||||
return h, nil
|
return h, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -396,6 +402,7 @@ func (h *handler) unregisterPeer(id string) {
|
||||||
}
|
}
|
||||||
h.downloader.UnregisterPeer(id)
|
h.downloader.UnregisterPeer(id)
|
||||||
h.txFetcher.Drop(id)
|
h.txFetcher.Drop(id)
|
||||||
|
h.peerStats.NotifyPeerDrop(id)
|
||||||
|
|
||||||
if err := h.peers.unregisterPeer(id); err != nil {
|
if err := h.peers.unregisterPeer(id); err != nil {
|
||||||
logger.Error("Ethereum peer removal failed", "err", err)
|
logger.Error("Ethereum peer removal failed", "err", err)
|
||||||
|
|
|
||||||
193
eth/peerstats/peerstats.go
Normal file
193
eth/peerstats/peerstats.go
Normal file
|
|
@ -0,0 +1,193 @@
|
||||||
|
// Copyright 2026 The go-ethereum Authors
|
||||||
|
// This file is part of the go-ethereum library.
|
||||||
|
//
|
||||||
|
// The go-ethereum library is free software: you can redistribute it and/or modify
|
||||||
|
// it under the terms of the GNU Lesser General Public License as published by
|
||||||
|
// the Free Software Foundation, either version 3 of the License, or
|
||||||
|
// (at your option) any later version.
|
||||||
|
//
|
||||||
|
// The go-ethereum library is distributed in the hope that it will be useful,
|
||||||
|
// but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||||
|
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||||
|
// GNU Lesser General Public License for more details.
|
||||||
|
//
|
||||||
|
// You should have received a copy of the GNU Lesser General Public License
|
||||||
|
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
|
||||||
|
|
||||||
|
// Package peerstats maintains per-peer quality metrics used by the peer
|
||||||
|
// dropper to protect high-value peers from random disconnection.
|
||||||
|
//
|
||||||
|
// The package is a passive accumulator: it exposes entry points for its
|
||||||
|
// signal producers (txtracker for inclusion/finalization, the tx fetcher
|
||||||
|
// for latency, the handler for peer-drop cleanup) and a read-only
|
||||||
|
// snapshot for its consumer (the dropper). It has no goroutine of its
|
||||||
|
// own — all mutation is serialized by a single mutex.
|
||||||
|
//
|
||||||
|
// Signal sources:
|
||||||
|
// - NotifyBlock(inclusions, finalized) — per-block deltas from txtracker
|
||||||
|
// (computed under txtracker's own lock, then passed in after release)
|
||||||
|
// - NotifyRequestResult(peer, latency, timeout) — per-request outcomes
|
||||||
|
// from the fetcher; timeouts are reported with the timeout value so
|
||||||
|
// slow peers contribute to the EMA, and the timeout flag increments
|
||||||
|
// the per-peer timeout counter
|
||||||
|
// - NotifyPeerDrop(peer) — called from the handler on disconnect
|
||||||
|
package peerstats
|
||||||
|
|
||||||
|
import (
|
||||||
|
"sync"
|
||||||
|
"time"
|
||||||
|
)
|
||||||
|
|
||||||
|
const (
|
||||||
|
// EMA smoothing factor for per-block inclusion rate.
|
||||||
|
emaAlpha = 0.05
|
||||||
|
// EMA smoothing factor for per-block finalization rate. Very slow on
|
||||||
|
// purpose: finalization is permanent, and the score should reflect
|
||||||
|
// sustained contribution over long windows, not recent bursts.
|
||||||
|
// Half-life ≈ 6930 chain heads (~23 hours on 12s blocks).
|
||||||
|
finalizedEMAAlpha = 0.0001
|
||||||
|
// EMA smoothing factor for per-request latency average. Slow on purpose:
|
||||||
|
// short bursts shouldn't shift the score, sustained behavior should.
|
||||||
|
// Half-life ≈ ln(0.5)/ln(0.99) ≈ 69 samples.
|
||||||
|
latencyEMAAlpha = 0.01
|
||||||
|
// MinLatencySamples is the number of latency samples a peer must accumulate
|
||||||
|
// before its RequestLatencyEMA is considered meaningful for protection.
|
||||||
|
// Prevents a single lucky-fast reply from displacing established peers.
|
||||||
|
MinLatencySamples = 100
|
||||||
|
// MaxLatencyStaleness is the oldest allowed age of a peer's last
|
||||||
|
// latency sample before their RequestLatencyEMA is disregarded for
|
||||||
|
// protection. Prevents a peer from earning a fast score during a
|
||||||
|
// burst of activity and then holding protection indefinitely by
|
||||||
|
// going silent on tx announcements (no further requests → no fresh
|
||||||
|
// samples → EMA frozen at its last value).
|
||||||
|
MaxLatencyStaleness = 10 * time.Minute
|
||||||
|
)
|
||||||
|
|
||||||
|
// PeerStats is the exported per-peer snapshot returned by GetAllPeerStats.
|
||||||
|
type PeerStats struct {
|
||||||
|
RecentFinalized float64 // EMA of per-block finalization credits (slow)
|
||||||
|
RecentIncluded float64 // EMA of per-block inclusions (fast)
|
||||||
|
RequestLatencyEMA time.Duration // Slow EMA of tx-request response latency (timeouts count as the timeout value)
|
||||||
|
RequestSuccesses int64 // Requests answered before timeout
|
||||||
|
RequestTimeouts int64 // Requests that timed out
|
||||||
|
LastLatencySample time.Time // Wall-clock time of the most recent request result (for staleness gate)
|
||||||
|
}
|
||||||
|
|
||||||
|
// peerStats is the internal mutable state per peer.
|
||||||
|
type peerStats struct {
|
||||||
|
recentFinalized float64
|
||||||
|
recentIncluded float64
|
||||||
|
requestLatencyEMA time.Duration
|
||||||
|
requestSuccesses int64
|
||||||
|
requestTimeouts int64
|
||||||
|
lastLatencySample time.Time
|
||||||
|
}
|
||||||
|
|
||||||
|
// Stats is the per-peer quality aggregator.
|
||||||
|
type Stats struct {
|
||||||
|
mu sync.Mutex
|
||||||
|
peers map[string]*peerStats
|
||||||
|
}
|
||||||
|
|
||||||
|
// New creates an empty Stats.
|
||||||
|
func New() *Stats {
|
||||||
|
return &Stats{peers: make(map[string]*peerStats)}
|
||||||
|
}
|
||||||
|
|
||||||
|
// NotifyBlock ingests a per-block update. `inclusions` is the count of the head
|
||||||
|
// block's transactions attributed to each peer; peers with a positive
|
||||||
|
// count get a stats entry created if one doesn't exist (this is how
|
||||||
|
// peerstats learns about newly-active peers). Peers not in the map but
|
||||||
|
// already tracked have their EMA decay with a zero sample.
|
||||||
|
//
|
||||||
|
// `finalized` is per-peer credits accumulated since the last NotifyBlock;
|
||||||
|
// credits are only applied to peers already tracked — we don't resurrect
|
||||||
|
// dropped peers from historical finalization data.
|
||||||
|
//
|
||||||
|
// NotifyBlock must NOT be called while the caller holds any other lock that
|
||||||
|
// could be acquired by peerstats callers in reverse order. Current callers
|
||||||
|
// (txtracker.handleChainHead) release their lock before invoking NotifyBlock.
|
||||||
|
func (s *Stats) NotifyBlock(inclusions, finalized map[string]int) {
|
||||||
|
s.mu.Lock()
|
||||||
|
defer s.mu.Unlock()
|
||||||
|
|
||||||
|
// Ensure a stats entry exists for any peer that just had an inclusion.
|
||||||
|
// This is the primary path by which peerstats learns about a peer's
|
||||||
|
// inclusion activity.
|
||||||
|
for peer, count := range inclusions {
|
||||||
|
if count > 0 && s.peers[peer] == nil {
|
||||||
|
s.peers[peer] = &peerStats{}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
// Update inclusion and finalization EMAs for every tracked peer. A
|
||||||
|
// peer not present in the respective delta map gets a 0 contribution
|
||||||
|
// — pure decay. Finalization credits for peers no longer tracked are
|
||||||
|
// ignored (don't resurrect dropped peers from historical data).
|
||||||
|
for peer, ps := range s.peers {
|
||||||
|
ps.recentIncluded = (1-emaAlpha)*ps.recentIncluded + emaAlpha*float64(inclusions[peer])
|
||||||
|
ps.recentFinalized = (1-finalizedEMAAlpha)*ps.recentFinalized + finalizedEMAAlpha*float64(finalized[peer])
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// NotifyRequestResult records a tx-request outcome for the given peer.
|
||||||
|
// latency is the round-trip time (for timeouts, pass the timeout value).
|
||||||
|
// timeout indicates whether the request timed out rather than receiving a
|
||||||
|
// normal delivery. Both cases update the latency EMA; the timeout flag
|
||||||
|
// additionally increments the per-peer timeout counter.
|
||||||
|
// Creates a peer entry if one doesn't exist.
|
||||||
|
func (s *Stats) NotifyRequestResult(peer string, latency time.Duration, timeout bool) {
|
||||||
|
s.mu.Lock()
|
||||||
|
defer s.mu.Unlock()
|
||||||
|
|
||||||
|
ps := s.peers[peer]
|
||||||
|
if ps == nil {
|
||||||
|
ps = &peerStats{}
|
||||||
|
s.peers[peer] = ps
|
||||||
|
}
|
||||||
|
if ps.requestSuccesses+ps.requestTimeouts == 0 {
|
||||||
|
// Bootstrap the EMA with the first sample so it doesn't drift up
|
||||||
|
// from zero over many samples before reaching realistic values.
|
||||||
|
ps.requestLatencyEMA = latency
|
||||||
|
} else {
|
||||||
|
ps.requestLatencyEMA = time.Duration(
|
||||||
|
float64(ps.requestLatencyEMA)*(1-latencyEMAAlpha) +
|
||||||
|
float64(latency)*latencyEMAAlpha,
|
||||||
|
)
|
||||||
|
}
|
||||||
|
if timeout {
|
||||||
|
ps.requestTimeouts++
|
||||||
|
} else {
|
||||||
|
ps.requestSuccesses++
|
||||||
|
}
|
||||||
|
ps.lastLatencySample = time.Now()
|
||||||
|
}
|
||||||
|
|
||||||
|
// NotifyPeerDrop removes a peer's stats on disconnect. A rare stale
|
||||||
|
// latency sample racing with the drop may recreate the peer entry with
|
||||||
|
// one sample; that entry can never earn protection (MinLatencySamples
|
||||||
|
// guard) and is harmless.
|
||||||
|
func (s *Stats) NotifyPeerDrop(peer string) {
|
||||||
|
s.mu.Lock()
|
||||||
|
defer s.mu.Unlock()
|
||||||
|
delete(s.peers, peer)
|
||||||
|
}
|
||||||
|
|
||||||
|
// GetAllPeerStats returns a snapshot of per-peer stats. Called by the
|
||||||
|
// dropper every few minutes; allocation cost is negligible at that rate.
|
||||||
|
func (s *Stats) GetAllPeerStats() map[string]PeerStats {
|
||||||
|
s.mu.Lock()
|
||||||
|
defer s.mu.Unlock()
|
||||||
|
|
||||||
|
result := make(map[string]PeerStats, len(s.peers))
|
||||||
|
for id, ps := range s.peers {
|
||||||
|
result[id] = PeerStats{
|
||||||
|
RecentFinalized: ps.recentFinalized,
|
||||||
|
RecentIncluded: ps.recentIncluded,
|
||||||
|
RequestLatencyEMA: ps.requestLatencyEMA,
|
||||||
|
RequestSuccesses: ps.requestSuccesses,
|
||||||
|
RequestTimeouts: ps.requestTimeouts,
|
||||||
|
LastLatencySample: ps.lastLatencySample,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return result
|
||||||
|
}
|
||||||
293
eth/peerstats/peerstats_test.go
Normal file
293
eth/peerstats/peerstats_test.go
Normal file
|
|
@ -0,0 +1,293 @@
|
||||||
|
// Copyright 2026 The go-ethereum Authors
|
||||||
|
// This file is part of the go-ethereum library.
|
||||||
|
//
|
||||||
|
// The go-ethereum library is free software: you can redistribute it and/or modify
|
||||||
|
// it under the terms of the GNU Lesser General Public License as published by
|
||||||
|
// the Free Software Foundation, either version 3 of the License, or
|
||||||
|
// (at your option) any later version.
|
||||||
|
//
|
||||||
|
// The go-ethereum library is distributed in the hope that it will be useful,
|
||||||
|
// but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||||
|
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||||
|
// GNU Lesser General Public License for more details.
|
||||||
|
//
|
||||||
|
// You should have received a copy of the GNU Lesser General Public License
|
||||||
|
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
|
||||||
|
|
||||||
|
package peerstats
|
||||||
|
|
||||||
|
import (
|
||||||
|
"testing"
|
||||||
|
"time"
|
||||||
|
)
|
||||||
|
|
||||||
|
// TestNotifyBlockBootstrapsFromInclusions verifies that a peer with a positive
|
||||||
|
// inclusion count in the first NotifyBlock gets a stats entry created.
|
||||||
|
func TestNotifyBlockBootstrapsFromInclusions(t *testing.T) {
|
||||||
|
s := New()
|
||||||
|
s.NotifyBlock(map[string]int{"peerA": 3}, nil)
|
||||||
|
|
||||||
|
stats := s.GetAllPeerStats()
|
||||||
|
if len(stats) != 1 {
|
||||||
|
t.Fatalf("expected 1 peer entry, got %d", len(stats))
|
||||||
|
}
|
||||||
|
ps, ok := stats["peerA"]
|
||||||
|
if !ok {
|
||||||
|
t.Fatal("expected peerA entry")
|
||||||
|
}
|
||||||
|
// EMA after first block: (1-0.05)*0 + 0.05*3 = 0.15
|
||||||
|
if ps.RecentIncluded <= 0 {
|
||||||
|
t.Fatalf("expected RecentIncluded > 0 after inclusion, got %f", ps.RecentIncluded)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// TestNotifyBlockDecaysKnownPeers verifies that peers already tracked get their
|
||||||
|
// RecentIncluded EMA decayed when they have no inclusions in a block.
|
||||||
|
func TestNotifyBlockDecaysKnownPeers(t *testing.T) {
|
||||||
|
s := New()
|
||||||
|
// Seed peerA with an inclusion.
|
||||||
|
s.NotifyBlock(map[string]int{"peerA": 3}, nil)
|
||||||
|
initial := s.GetAllPeerStats()["peerA"].RecentIncluded
|
||||||
|
|
||||||
|
// Empty block — peerA should decay.
|
||||||
|
s.NotifyBlock(nil, nil)
|
||||||
|
after := s.GetAllPeerStats()["peerA"].RecentIncluded
|
||||||
|
|
||||||
|
if after >= initial {
|
||||||
|
t.Fatalf("expected decay, got %f >= %f", after, initial)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// TestNotifyBlockDoesNotResurrectDroppedPeers verifies that finalization
|
||||||
|
// credits to a peer with no entry don't create one.
|
||||||
|
func TestNotifyBlockDoesNotResurrectFromFinalization(t *testing.T) {
|
||||||
|
s := New()
|
||||||
|
s.NotifyBlock(nil, map[string]int{"peerA": 5})
|
||||||
|
|
||||||
|
if stats := s.GetAllPeerStats(); len(stats) != 0 {
|
||||||
|
t.Fatalf("finalization credits must not create entries, got %d peers", len(stats))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// TestNotifyBlockDropThenFinalizeNoResurrect verifies the full drop→finalize
|
||||||
|
// sequence: a dropped peer doesn't come back via finalization credits.
|
||||||
|
func TestNotifyBlockDropThenFinalizeNoResurrect(t *testing.T) {
|
||||||
|
s := New()
|
||||||
|
s.NotifyBlock(map[string]int{"peerA": 1}, nil)
|
||||||
|
s.NotifyPeerDrop("peerA")
|
||||||
|
s.NotifyBlock(nil, map[string]int{"peerA": 10})
|
||||||
|
|
||||||
|
if stats := s.GetAllPeerStats(); len(stats) != 0 {
|
||||||
|
t.Fatalf("dropped peer must not be resurrected, got %d peers", len(stats))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// TestNotifyBlockFinalizationCredits an existing peer.
|
||||||
|
func TestNotifyBlockFinalizationCredits(t *testing.T) {
|
||||||
|
s := New()
|
||||||
|
s.NotifyBlock(map[string]int{"peerA": 1}, nil)
|
||||||
|
s.NotifyBlock(nil, map[string]int{"peerA": 3})
|
||||||
|
|
||||||
|
// RecentFinalized is a slow EMA, not a cumulative count: assert it
|
||||||
|
// moved in the positive direction, not the exact value.
|
||||||
|
if got := s.GetAllPeerStats()["peerA"].RecentFinalized; got <= 0 {
|
||||||
|
t.Fatalf("expected RecentFinalized>0 after credits, got %f", got)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// TestNotifyBlockInclusionEMAUpdate verifies the EMA formula (1-α)·old + α·count.
|
||||||
|
func TestNotifyBlockInclusionEMAUpdate(t *testing.T) {
|
||||||
|
s := New()
|
||||||
|
// Three inclusions: EMA = 0.05 * 3 = 0.15
|
||||||
|
s.NotifyBlock(map[string]int{"peerA": 3}, nil)
|
||||||
|
got := s.GetAllPeerStats()["peerA"].RecentIncluded
|
||||||
|
want := 0.15
|
||||||
|
if diff := got - want; diff < -1e-9 || diff > 1e-9 {
|
||||||
|
t.Fatalf("EMA after one sample: got %f, want %f", got, want)
|
||||||
|
}
|
||||||
|
// Next block with 10 inclusions: EMA = 0.95*0.15 + 0.05*10 = 0.6425
|
||||||
|
s.NotifyBlock(map[string]int{"peerA": 10}, nil)
|
||||||
|
got = s.GetAllPeerStats()["peerA"].RecentIncluded
|
||||||
|
want = 0.6425
|
||||||
|
if diff := got - want; diff < -1e-9 || diff > 1e-9 {
|
||||||
|
t.Fatalf("EMA after two samples: got %f, want %f", got, want)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// TestNotifyRequestResultFirstSampleBootstrap asserts that the first
|
||||||
|
// latency sample seeds the EMA directly.
|
||||||
|
func TestNotifyRequestResultFirstSampleBootstrap(t *testing.T) {
|
||||||
|
s := New()
|
||||||
|
s.NotifyRequestResult("peerA", 200*time.Millisecond, false)
|
||||||
|
|
||||||
|
ps := s.GetAllPeerStats()["peerA"]
|
||||||
|
if ps.RequestLatencyEMA != 200*time.Millisecond {
|
||||||
|
t.Fatalf("expected first sample to seed EMA at 200ms, got %v", ps.RequestLatencyEMA)
|
||||||
|
}
|
||||||
|
if ps.RequestSuccesses != 1 {
|
||||||
|
t.Fatalf("expected RequestSuccesses=1, got %d", ps.RequestSuccesses)
|
||||||
|
}
|
||||||
|
if ps.RequestTimeouts != 0 {
|
||||||
|
t.Fatalf("expected RequestTimeouts=0, got %d", ps.RequestTimeouts)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// TestNotifyRequestResultEMAUpdate verifies the EMA formula for latency.
|
||||||
|
func TestNotifyRequestResultEMAUpdate(t *testing.T) {
|
||||||
|
s := New()
|
||||||
|
s.NotifyRequestResult("peerA", 100*time.Millisecond, false)
|
||||||
|
s.NotifyRequestResult("peerA", 1000*time.Millisecond, false)
|
||||||
|
|
||||||
|
// Expected: 0.99*100ms + 0.01*1000ms = 109ms
|
||||||
|
got := s.GetAllPeerStats()["peerA"].RequestLatencyEMA
|
||||||
|
want := 109 * time.Millisecond
|
||||||
|
delta := got - want
|
||||||
|
if delta < 0 {
|
||||||
|
delta = -delta
|
||||||
|
}
|
||||||
|
if delta > 1*time.Microsecond {
|
||||||
|
t.Fatalf("EMA mismatch: got %v, want %v", got, want)
|
||||||
|
}
|
||||||
|
ps := s.GetAllPeerStats()["peerA"]
|
||||||
|
if ps.RequestSuccesses != 2 {
|
||||||
|
t.Fatalf("expected RequestSuccesses=2, got %d", ps.RequestSuccesses)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// TestNotifyRequestResultSlowConvergence verifies the slow alpha
|
||||||
|
// damps convergence under sustained timeouts.
|
||||||
|
func TestNotifyRequestResultSlowConvergence(t *testing.T) {
|
||||||
|
s := New()
|
||||||
|
s.NotifyRequestResult("peerA", 100*time.Millisecond, false)
|
||||||
|
for i := 0; i < 50; i++ {
|
||||||
|
s.NotifyRequestResult("peerA", 5*time.Second, false)
|
||||||
|
}
|
||||||
|
got := s.GetAllPeerStats()["peerA"].RequestLatencyEMA
|
||||||
|
if got < 1*time.Second {
|
||||||
|
t.Fatalf("EMA did not move enough under sustained timeouts, got %v", got)
|
||||||
|
}
|
||||||
|
if got > 3*time.Second {
|
||||||
|
t.Fatalf("EMA converged too fast for slow alpha=0.01, got %v", got)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// TestNotifyPeerDropClearsStats verifies that a dropped peer disappears
|
||||||
|
// from GetAllPeerStats.
|
||||||
|
func TestNotifyPeerDropClearsStats(t *testing.T) {
|
||||||
|
s := New()
|
||||||
|
s.NotifyRequestResult("peerA", 200*time.Millisecond, false)
|
||||||
|
s.NotifyPeerDrop("peerA")
|
||||||
|
|
||||||
|
if _, ok := s.GetAllPeerStats()["peerA"]; ok {
|
||||||
|
t.Fatal("peerA stats should be removed after NotifyPeerDrop")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// TestStaleRequestLatencyAfterDrop documents the accepted behavior: a
|
||||||
|
// late sample after NotifyPeerDrop recreates a 1-sample entry. The
|
||||||
|
// dropper's MinLatencySamples=100 guard ensures this is harmless.
|
||||||
|
func TestStaleRequestLatencyAfterDrop(t *testing.T) {
|
||||||
|
s := New()
|
||||||
|
s.NotifyRequestResult("peerA", 200*time.Millisecond, false)
|
||||||
|
s.NotifyPeerDrop("peerA")
|
||||||
|
// Late sample racing with the drop.
|
||||||
|
s.NotifyRequestResult("peerA", 50*time.Millisecond, false)
|
||||||
|
|
||||||
|
ps := s.GetAllPeerStats()["peerA"]
|
||||||
|
if ps.RequestSuccesses != 1 {
|
||||||
|
t.Fatalf("expected fresh RequestSuccesses=1, got %d", ps.RequestSuccesses)
|
||||||
|
}
|
||||||
|
if ps.RequestLatencyEMA != 50*time.Millisecond {
|
||||||
|
t.Fatalf("expected fresh bootstrap at 50ms, got %v", ps.RequestLatencyEMA)
|
||||||
|
}
|
||||||
|
// The dropper's MinLatencySamples guard (in eth/dropper.go) prevents
|
||||||
|
// this 1-sample entry from earning latency-based protection.
|
||||||
|
}
|
||||||
|
|
||||||
|
// TestMultiplePeersIsolated verifies per-peer isolation across signal types.
|
||||||
|
func TestMultiplePeersIsolated(t *testing.T) {
|
||||||
|
s := New()
|
||||||
|
s.NotifyBlock(map[string]int{"peerA": 5, "peerB": 0}, nil)
|
||||||
|
s.NotifyRequestResult("peerA", 100*time.Millisecond, false)
|
||||||
|
s.NotifyRequestResult("peerB", 5*time.Second, false)
|
||||||
|
s.NotifyBlock(nil, map[string]int{"peerA": 2})
|
||||||
|
|
||||||
|
stats := s.GetAllPeerStats()
|
||||||
|
// Only peerA receives finalization credits; peerB's EMA stays at zero
|
||||||
|
// (no credits, pure decay from zero).
|
||||||
|
if stats["peerA"].RecentFinalized <= 0 || stats["peerB"].RecentFinalized != 0 {
|
||||||
|
t.Errorf("finalization leaked: A=%f B=%f", stats["peerA"].RecentFinalized, stats["peerB"].RecentFinalized)
|
||||||
|
}
|
||||||
|
if stats["peerA"].RequestLatencyEMA != 100*time.Millisecond {
|
||||||
|
t.Errorf("peerA latency: got %v, want 100ms", stats["peerA"].RequestLatencyEMA)
|
||||||
|
}
|
||||||
|
if stats["peerB"].RequestLatencyEMA != 5*time.Second {
|
||||||
|
t.Errorf("peerB latency: got %v, want 5s", stats["peerB"].RequestLatencyEMA)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// TestLatencyTimestampSet verifies that NotifyRequestResult stamps the
|
||||||
|
// peer's LastLatencySample with approximately time.Now().
|
||||||
|
func TestLatencyTimestampSet(t *testing.T) {
|
||||||
|
s := New()
|
||||||
|
before := time.Now()
|
||||||
|
s.NotifyRequestResult("peerA", 100*time.Millisecond, false)
|
||||||
|
after := time.Now()
|
||||||
|
|
||||||
|
got := s.GetAllPeerStats()["peerA"].LastLatencySample
|
||||||
|
if got.Before(before) || got.After(after) {
|
||||||
|
t.Fatalf("LastLatencySample = %v not in [%v, %v]", got, before, after)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// TestLatencyTimestampUpdatesOnEachSample verifies that a later
|
||||||
|
// NotifyRequestResult call advances LastLatencySample.
|
||||||
|
func TestLatencyTimestampUpdatesOnEachSample(t *testing.T) {
|
||||||
|
s := New()
|
||||||
|
s.NotifyRequestResult("peerA", 100*time.Millisecond, false)
|
||||||
|
first := s.GetAllPeerStats()["peerA"].LastLatencySample
|
||||||
|
|
||||||
|
// Small sleep so the second timestamp is detectably later.
|
||||||
|
time.Sleep(2 * time.Millisecond)
|
||||||
|
s.NotifyRequestResult("peerA", 200*time.Millisecond, false)
|
||||||
|
second := s.GetAllPeerStats()["peerA"].LastLatencySample
|
||||||
|
|
||||||
|
if !second.After(first) {
|
||||||
|
t.Fatalf("expected second sample timestamp > first, got first=%v second=%v", first, second)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// TestRequestResultTimeoutCounting verifies that timeout=true increments
|
||||||
|
// RequestTimeouts (not RequestSuccesses) and still updates the EMA.
|
||||||
|
func TestRequestResultTimeoutCounting(t *testing.T) {
|
||||||
|
s := New()
|
||||||
|
s.NotifyRequestResult("peerA", 5*time.Second, true)
|
||||||
|
|
||||||
|
ps := s.GetAllPeerStats()["peerA"]
|
||||||
|
if ps.RequestTimeouts != 1 {
|
||||||
|
t.Fatalf("expected RequestTimeouts=1, got %d", ps.RequestTimeouts)
|
||||||
|
}
|
||||||
|
if ps.RequestSuccesses != 0 {
|
||||||
|
t.Fatalf("expected RequestSuccesses=0, got %d", ps.RequestSuccesses)
|
||||||
|
}
|
||||||
|
if ps.RequestLatencyEMA != 5*time.Second {
|
||||||
|
t.Fatalf("EMA should bootstrap to timeout value, got %v", ps.RequestLatencyEMA)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// TestRequestResultMixedCounting verifies that a mix of successes and
|
||||||
|
// timeouts increments the correct counters independently.
|
||||||
|
func TestRequestResultMixedCounting(t *testing.T) {
|
||||||
|
s := New()
|
||||||
|
s.NotifyRequestResult("peerA", 100*time.Millisecond, false)
|
||||||
|
s.NotifyRequestResult("peerA", 100*time.Millisecond, false)
|
||||||
|
s.NotifyRequestResult("peerA", 5*time.Second, true)
|
||||||
|
|
||||||
|
ps := s.GetAllPeerStats()["peerA"]
|
||||||
|
if ps.RequestSuccesses != 2 {
|
||||||
|
t.Fatalf("expected RequestSuccesses=2, got %d", ps.RequestSuccesses)
|
||||||
|
}
|
||||||
|
if ps.RequestTimeouts != 1 {
|
||||||
|
t.Fatalf("expected RequestTimeouts=1, got %d", ps.RequestTimeouts)
|
||||||
|
}
|
||||||
|
}
|
||||||
228
eth/txtracker/tracker.go
Normal file
228
eth/txtracker/tracker.go
Normal file
|
|
@ -0,0 +1,228 @@
|
||||||
|
// Copyright 2026 The go-ethereum Authors
|
||||||
|
// This file is part of the go-ethereum library.
|
||||||
|
//
|
||||||
|
// The go-ethereum library is free software: you can redistribute it and/or modify
|
||||||
|
// it under the terms of the GNU Lesser General Public License as published by
|
||||||
|
// the Free Software Foundation, either version 3 of the License, or
|
||||||
|
// (at your option) any later version.
|
||||||
|
//
|
||||||
|
// The go-ethereum library is distributed in the hope that it will be useful,
|
||||||
|
// but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||||
|
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||||
|
// GNU Lesser General Public License for more details.
|
||||||
|
//
|
||||||
|
// You should have received a copy of the GNU Lesser General Public License
|
||||||
|
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
|
||||||
|
|
||||||
|
// Package txtracker maps accepted transactions to their delivering peer
|
||||||
|
// and observes chain-head and finalization events to emit per-block
|
||||||
|
// per-peer signals to a StatsConsumer (typically eth/peerstats).
|
||||||
|
//
|
||||||
|
// The tracker owns the tx-hash → deliverer-peer map with FIFO eviction,
|
||||||
|
// a chain-head subscription goroutine, and the computation of per-block
|
||||||
|
// inclusion counts and finalization credits. It does NOT maintain
|
||||||
|
// per-peer aggregates — that is peerstats' job.
|
||||||
|
package txtracker
|
||||||
|
|
||||||
|
import (
|
||||||
|
"sync"
|
||||||
|
|
||||||
|
"github.com/ethereum/go-ethereum/common"
|
||||||
|
"github.com/ethereum/go-ethereum/core"
|
||||||
|
"github.com/ethereum/go-ethereum/core/types"
|
||||||
|
"github.com/ethereum/go-ethereum/event"
|
||||||
|
"github.com/ethereum/go-ethereum/log"
|
||||||
|
)
|
||||||
|
|
||||||
|
const (
|
||||||
|
// Maximum number of tx→deliverer mappings to retain.
|
||||||
|
maxTracked = 262144
|
||||||
|
)
|
||||||
|
|
||||||
|
// Chain is the blockchain interface needed by the tracker.
|
||||||
|
type Chain interface {
|
||||||
|
SubscribeChainHeadEvent(ch chan<- core.ChainHeadEvent) event.Subscription
|
||||||
|
GetBlockByNumber(number uint64) *types.Block
|
||||||
|
GetBlock(hash common.Hash, number uint64) *types.Block
|
||||||
|
CurrentFinalBlock() *types.Header
|
||||||
|
}
|
||||||
|
|
||||||
|
// StatsConsumer receives per-block signals about peer inclusion and
|
||||||
|
// finalization. The tracker invokes NotifyBlock exactly once per handled chain
|
||||||
|
// head, AFTER releasing its own lock, with:
|
||||||
|
//
|
||||||
|
// - inclusions: per-peer count of transactions in the head block
|
||||||
|
// - finalized: per-peer count of transactions in blocks that became
|
||||||
|
// finalized since the previous call (possibly zero-range)
|
||||||
|
//
|
||||||
|
// Either map may be empty but the slice/map itself is never nil when
|
||||||
|
// called. NotifyBlock must not call back into the tracker.
|
||||||
|
type StatsConsumer interface {
|
||||||
|
NotifyBlock(inclusions, finalized map[string]int)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Tracker records which peer delivered each transaction and emits
|
||||||
|
// per-block inclusion and finalization signals to a StatsConsumer.
|
||||||
|
type Tracker struct {
|
||||||
|
mu sync.Mutex
|
||||||
|
txs map[common.Hash]string // hash → deliverer peer ID
|
||||||
|
order []common.Hash // insertion order for FIFO eviction
|
||||||
|
|
||||||
|
chain Chain
|
||||||
|
consumer StatsConsumer
|
||||||
|
lastFinalNum uint64 // last finalized block number processed
|
||||||
|
headCh chan core.ChainHeadEvent
|
||||||
|
sub event.Subscription
|
||||||
|
|
||||||
|
quit chan struct{}
|
||||||
|
step chan struct{} // test sync: sent after each event is processed
|
||||||
|
wg sync.WaitGroup
|
||||||
|
}
|
||||||
|
|
||||||
|
// New creates a new tracker.
|
||||||
|
func New() *Tracker {
|
||||||
|
return &Tracker{
|
||||||
|
txs: make(map[common.Hash]string),
|
||||||
|
quit: make(chan struct{}),
|
||||||
|
step: make(chan struct{}, 1),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Start begins listening for chain head events. `consumer` receives
|
||||||
|
// per-block signals; if nil, signals are computed but discarded
|
||||||
|
// (useful in tests that exercise only the tx-lifecycle surface).
|
||||||
|
func (t *Tracker) Start(chain Chain, consumer StatsConsumer) {
|
||||||
|
t.chain = chain
|
||||||
|
t.consumer = consumer
|
||||||
|
// Seed lastFinalNum so checkFinalization doesn't backfill from genesis.
|
||||||
|
if fh := chain.CurrentFinalBlock(); fh != nil {
|
||||||
|
t.lastFinalNum = fh.Number.Uint64()
|
||||||
|
}
|
||||||
|
t.headCh = make(chan core.ChainHeadEvent, 128)
|
||||||
|
t.sub = chain.SubscribeChainHeadEvent(t.headCh)
|
||||||
|
t.wg.Add(1)
|
||||||
|
go t.loop()
|
||||||
|
}
|
||||||
|
|
||||||
|
// Stop shuts down the tracker.
|
||||||
|
func (t *Tracker) Stop() {
|
||||||
|
t.sub.Unsubscribe()
|
||||||
|
close(t.quit)
|
||||||
|
t.wg.Wait()
|
||||||
|
}
|
||||||
|
|
||||||
|
// NotifyAccepted records that a peer delivered transactions that were accepted
|
||||||
|
// by the pool. Only accepted (not rejected/duplicate) txs should be recorded
|
||||||
|
// to prevent attribution poisoning from replayed or invalid txs.
|
||||||
|
// Safe to call from any goroutine.
|
||||||
|
func (t *Tracker) NotifyAccepted(peer string, hashes []common.Hash) {
|
||||||
|
t.mu.Lock()
|
||||||
|
defer t.mu.Unlock()
|
||||||
|
|
||||||
|
for _, hash := range hashes {
|
||||||
|
if _, ok := t.txs[hash]; ok {
|
||||||
|
continue // already tracked, keep first deliverer
|
||||||
|
}
|
||||||
|
t.txs[hash] = peer
|
||||||
|
t.order = append(t.order, hash)
|
||||||
|
}
|
||||||
|
// Evict oldest entries if over capacity.
|
||||||
|
for len(t.txs) > maxTracked {
|
||||||
|
oldest := t.order[0]
|
||||||
|
t.order = t.order[1:]
|
||||||
|
delete(t.txs, oldest)
|
||||||
|
}
|
||||||
|
// Compact the backing array when it grows too large. Reslicing
|
||||||
|
// with order[1:] doesn't free earlier slots in the array.
|
||||||
|
if cap(t.order) > 2*maxTracked {
|
||||||
|
t.order = append([]common.Hash(nil), t.order...)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (t *Tracker) loop() {
|
||||||
|
defer t.wg.Done()
|
||||||
|
|
||||||
|
for {
|
||||||
|
select {
|
||||||
|
case ev := <-t.headCh:
|
||||||
|
t.handleChainHead(ev)
|
||||||
|
select {
|
||||||
|
case t.step <- struct{}{}:
|
||||||
|
default:
|
||||||
|
}
|
||||||
|
case <-t.sub.Err():
|
||||||
|
return
|
||||||
|
case <-t.quit:
|
||||||
|
return
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// handleChainHead computes per-peer deltas for the new head block and any
|
||||||
|
// newly-finalized blocks, then hands them to the StatsConsumer AFTER
|
||||||
|
// releasing t.mu. The lock-release-before-consumer pattern avoids any
|
||||||
|
// cross-package lock ordering.
|
||||||
|
func (t *Tracker) handleChainHead(ev core.ChainHeadEvent) {
|
||||||
|
// Fetch the head block by hash (not just number) to avoid using a
|
||||||
|
// reorged block if the tracker goroutine lags behind the chain.
|
||||||
|
block := t.chain.GetBlock(ev.Header.Hash(), ev.Header.Number.Uint64())
|
||||||
|
if block == nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
t.mu.Lock()
|
||||||
|
// Count per-peer inclusions in the head block.
|
||||||
|
inclusions := make(map[string]int)
|
||||||
|
for _, tx := range block.Transactions() {
|
||||||
|
if peer := t.txs[tx.Hash()]; peer != "" {
|
||||||
|
inclusions[peer]++
|
||||||
|
}
|
||||||
|
}
|
||||||
|
// Compute per-peer finalization credits since the last call.
|
||||||
|
finalized := t.collectFinalization()
|
||||||
|
t.mu.Unlock()
|
||||||
|
|
||||||
|
if t.consumer != nil {
|
||||||
|
t.consumer.NotifyBlock(inclusions, finalized)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// collectFinalization accumulates per-peer finalization credits for
|
||||||
|
// blocks newly finalized since lastFinalNum. Returns a (possibly empty)
|
||||||
|
// map; advances lastFinalNum. Must be called with t.mu held.
|
||||||
|
func (t *Tracker) collectFinalization() map[string]int {
|
||||||
|
credits := make(map[string]int)
|
||||||
|
finalHeader := t.chain.CurrentFinalBlock()
|
||||||
|
if finalHeader == nil {
|
||||||
|
return credits
|
||||||
|
}
|
||||||
|
finalNum := finalHeader.Number.Uint64()
|
||||||
|
if finalNum <= t.lastFinalNum {
|
||||||
|
return credits
|
||||||
|
}
|
||||||
|
for num := t.lastFinalNum + 1; num <= finalNum; num++ {
|
||||||
|
block := t.chain.GetBlockByNumber(num)
|
||||||
|
if block == nil {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
for _, tx := range block.Transactions() {
|
||||||
|
if peer := t.txs[tx.Hash()]; peer != "" {
|
||||||
|
credits[peer]++
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if total := sumCounts(credits); total > 0 {
|
||||||
|
log.Trace("Accumulated finalization credits",
|
||||||
|
"from", t.lastFinalNum+1, "to", finalNum, "txs", total)
|
||||||
|
}
|
||||||
|
t.lastFinalNum = finalNum
|
||||||
|
return credits
|
||||||
|
}
|
||||||
|
|
||||||
|
func sumCounts(m map[string]int) int {
|
||||||
|
var sum int
|
||||||
|
for _, v := range m {
|
||||||
|
sum += v
|
||||||
|
}
|
||||||
|
return sum
|
||||||
|
}
|
||||||
362
eth/txtracker/tracker_test.go
Normal file
362
eth/txtracker/tracker_test.go
Normal file
|
|
@ -0,0 +1,362 @@
|
||||||
|
// Copyright 2026 The go-ethereum Authors
|
||||||
|
// This file is part of the go-ethereum library.
|
||||||
|
//
|
||||||
|
// The go-ethereum library is free software: you can redistribute it and/or modify
|
||||||
|
// it under the terms of the GNU Lesser General Public License as published by
|
||||||
|
// the Free Software Foundation, either version 3 of the License, or
|
||||||
|
// (at your option) any later version.
|
||||||
|
//
|
||||||
|
// The go-ethereum library is distributed in the hope that it will be useful,
|
||||||
|
// but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||||
|
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||||
|
// GNU Lesser General Public License for more details.
|
||||||
|
//
|
||||||
|
// You should have received a copy of the GNU Lesser General Public License
|
||||||
|
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
|
||||||
|
|
||||||
|
package txtracker
|
||||||
|
|
||||||
|
import (
|
||||||
|
"math/big"
|
||||||
|
"sync"
|
||||||
|
"testing"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"github.com/ethereum/go-ethereum/common"
|
||||||
|
"github.com/ethereum/go-ethereum/core"
|
||||||
|
"github.com/ethereum/go-ethereum/core/types"
|
||||||
|
"github.com/ethereum/go-ethereum/event"
|
||||||
|
"github.com/ethereum/go-ethereum/trie"
|
||||||
|
)
|
||||||
|
|
||||||
|
// mockChain implements the Chain interface for testing.
|
||||||
|
//
|
||||||
|
// Blocks are stored by hash to exercise the reorg-safe lookup path in
|
||||||
|
// tracker.handleChainHead (which calls GetBlock(hash, number)). A separate
|
||||||
|
// canonicalByNum index maps each height to its canonical block hash, used
|
||||||
|
// by GetBlockByNumber (the finalization path).
|
||||||
|
type mockChain struct {
|
||||||
|
mu sync.Mutex
|
||||||
|
headFeed event.Feed
|
||||||
|
blocksByHash map[common.Hash]*types.Block
|
||||||
|
canonicalByNum map[uint64]common.Hash
|
||||||
|
finalNum uint64
|
||||||
|
}
|
||||||
|
|
||||||
|
func newMockChain() *mockChain {
|
||||||
|
return &mockChain{
|
||||||
|
blocksByHash: make(map[common.Hash]*types.Block),
|
||||||
|
canonicalByNum: make(map[uint64]common.Hash),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *mockChain) SubscribeChainHeadEvent(ch chan<- core.ChainHeadEvent) event.Subscription {
|
||||||
|
return c.headFeed.Subscribe(ch)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *mockChain) GetBlockByNumber(number uint64) *types.Block {
|
||||||
|
c.mu.Lock()
|
||||||
|
defer c.mu.Unlock()
|
||||||
|
hash, ok := c.canonicalByNum[number]
|
||||||
|
if !ok {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
return c.blocksByHash[hash]
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *mockChain) GetBlock(hash common.Hash, number uint64) *types.Block {
|
||||||
|
c.mu.Lock()
|
||||||
|
defer c.mu.Unlock()
|
||||||
|
return c.blocksByHash[hash]
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *mockChain) CurrentFinalBlock() *types.Header {
|
||||||
|
c.mu.Lock()
|
||||||
|
defer c.mu.Unlock()
|
||||||
|
if c.finalNum == 0 {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
return &types.Header{Number: new(big.Int).SetUint64(c.finalNum)}
|
||||||
|
}
|
||||||
|
|
||||||
|
// addBlock adds a canonical block at the given height.
|
||||||
|
func (c *mockChain) addBlock(num uint64, txs []*types.Transaction) *types.Block {
|
||||||
|
return c.addBlockAtHeight(num, num, txs, true)
|
||||||
|
}
|
||||||
|
|
||||||
|
// addBlockAtHeight adds a block at the given height. The salt parameter
|
||||||
|
// ensures distinct block hashes for two blocks at the same height. If
|
||||||
|
// canonical is true, the block becomes the canonical block for that height.
|
||||||
|
func (c *mockChain) addBlockAtHeight(num, salt uint64, txs []*types.Transaction, canonical bool) *types.Block {
|
||||||
|
c.mu.Lock()
|
||||||
|
defer c.mu.Unlock()
|
||||||
|
header := &types.Header{
|
||||||
|
Number: new(big.Int).SetUint64(num),
|
||||||
|
Extra: big.NewInt(int64(salt)).Bytes(),
|
||||||
|
}
|
||||||
|
block := types.NewBlock(header, &types.Body{Transactions: txs}, nil, trie.NewListHasher())
|
||||||
|
c.blocksByHash[block.Hash()] = block
|
||||||
|
if canonical {
|
||||||
|
c.canonicalByNum[num] = block.Hash()
|
||||||
|
}
|
||||||
|
return block
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *mockChain) setFinalBlock(num uint64) {
|
||||||
|
c.mu.Lock()
|
||||||
|
defer c.mu.Unlock()
|
||||||
|
c.finalNum = num
|
||||||
|
}
|
||||||
|
|
||||||
|
// sendHead emits a chain head event for the canonical block at the given height.
|
||||||
|
func (c *mockChain) sendHead(num uint64) {
|
||||||
|
c.mu.Lock()
|
||||||
|
hash := c.canonicalByNum[num]
|
||||||
|
block := c.blocksByHash[hash]
|
||||||
|
c.mu.Unlock()
|
||||||
|
if block == nil {
|
||||||
|
panic("sendHead: no canonical block at height")
|
||||||
|
}
|
||||||
|
c.headFeed.Send(core.ChainHeadEvent{Header: block.Header()})
|
||||||
|
}
|
||||||
|
|
||||||
|
// sendHeadBlock emits a chain head event for the given block (may be
|
||||||
|
// non-canonical). Used for reorg tests.
|
||||||
|
func (c *mockChain) sendHeadBlock(block *types.Block) {
|
||||||
|
c.headFeed.Send(core.ChainHeadEvent{Header: block.Header()})
|
||||||
|
}
|
||||||
|
|
||||||
|
func hashTxs(txs []*types.Transaction) []common.Hash {
|
||||||
|
hashes := make([]common.Hash, len(txs))
|
||||||
|
for i, tx := range txs {
|
||||||
|
hashes[i] = tx.Hash()
|
||||||
|
}
|
||||||
|
return hashes
|
||||||
|
}
|
||||||
|
|
||||||
|
func makeTx(nonce uint64) *types.Transaction {
|
||||||
|
return types.NewTx(&types.LegacyTx{Nonce: nonce, GasPrice: big.NewInt(1), Gas: 21000})
|
||||||
|
}
|
||||||
|
|
||||||
|
// mockConsumer captures NotifyBlock invocations so tests can assert on the
|
||||||
|
// signals the tracker emits.
|
||||||
|
type mockConsumer struct {
|
||||||
|
mu sync.Mutex
|
||||||
|
signals []signal
|
||||||
|
}
|
||||||
|
|
||||||
|
type signal struct {
|
||||||
|
inclusions, finalized map[string]int
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *mockConsumer) NotifyBlock(inclusions, finalized map[string]int) {
|
||||||
|
c.mu.Lock()
|
||||||
|
defer c.mu.Unlock()
|
||||||
|
// Deep-copy so tests inspecting older signals aren't tripped up by
|
||||||
|
// later iterations mutating the same map (they don't today, but
|
||||||
|
// this keeps the assertion model simple).
|
||||||
|
in := make(map[string]int, len(inclusions))
|
||||||
|
for k, v := range inclusions {
|
||||||
|
in[k] = v
|
||||||
|
}
|
||||||
|
fn := make(map[string]int, len(finalized))
|
||||||
|
for k, v := range finalized {
|
||||||
|
fn[k] = v
|
||||||
|
}
|
||||||
|
c.signals = append(c.signals, signal{in, fn})
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *mockConsumer) last() signal {
|
||||||
|
c.mu.Lock()
|
||||||
|
defer c.mu.Unlock()
|
||||||
|
if len(c.signals) == 0 {
|
||||||
|
return signal{}
|
||||||
|
}
|
||||||
|
return c.signals[len(c.signals)-1]
|
||||||
|
}
|
||||||
|
|
||||||
|
// waitStep blocks until the tracker has processed one event.
|
||||||
|
func waitStep(t *testing.T, tr *Tracker) {
|
||||||
|
t.Helper()
|
||||||
|
select {
|
||||||
|
case <-tr.step:
|
||||||
|
case <-time.After(time.Second):
|
||||||
|
t.Fatal("timeout waiting for tracker step")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// TestNotifyAcceptedRecordsMapping verifies the tx-lifecycle surface:
|
||||||
|
// NotifyAccepted records tx→peer mappings in insertion order, with
|
||||||
|
// first-deliverer-wins semantics on duplicates.
|
||||||
|
func TestNotifyAcceptedRecordsMapping(t *testing.T) {
|
||||||
|
tr := New()
|
||||||
|
|
||||||
|
txs := []*types.Transaction{makeTx(1), makeTx(2), makeTx(3)}
|
||||||
|
hashes := hashTxs(txs)
|
||||||
|
tr.NotifyAccepted("peerA", hashes)
|
||||||
|
|
||||||
|
tr.mu.Lock()
|
||||||
|
defer tr.mu.Unlock()
|
||||||
|
if len(tr.txs) != 3 {
|
||||||
|
t.Fatalf("expected 3 tracked txs, got %d", len(tr.txs))
|
||||||
|
}
|
||||||
|
if len(tr.order) != 3 {
|
||||||
|
t.Fatalf("expected order length 3, got %d", len(tr.order))
|
||||||
|
}
|
||||||
|
for i, h := range hashes {
|
||||||
|
if got := tr.txs[h]; got != "peerA" {
|
||||||
|
t.Fatalf("tx %d: expected deliverer=peerA, got %q", i, got)
|
||||||
|
}
|
||||||
|
if tr.order[i] != h {
|
||||||
|
t.Fatalf("order[%d] mismatch", i)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// TestNotifyAcceptedFirstDelivererWins verifies duplicate accepts
|
||||||
|
// preserve the original deliverer.
|
||||||
|
func TestNotifyAcceptedFirstDelivererWins(t *testing.T) {
|
||||||
|
tr := New()
|
||||||
|
tx := makeTx(1)
|
||||||
|
tr.NotifyAccepted("peerA", []common.Hash{tx.Hash()})
|
||||||
|
tr.NotifyAccepted("peerB", []common.Hash{tx.Hash()})
|
||||||
|
|
||||||
|
tr.mu.Lock()
|
||||||
|
defer tr.mu.Unlock()
|
||||||
|
if got := tr.txs[tx.Hash()]; got != "peerA" {
|
||||||
|
t.Fatalf("expected first deliverer peerA to win, got %q", got)
|
||||||
|
}
|
||||||
|
if len(tr.order) != 1 {
|
||||||
|
t.Fatalf("expected single order entry, got %d", len(tr.order))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// TestHandleChainHeadEmitsInclusions verifies the tracker emits a
|
||||||
|
// correct per-peer inclusion map to its consumer when a head block
|
||||||
|
// contains tracked transactions.
|
||||||
|
func TestHandleChainHeadEmitsInclusions(t *testing.T) {
|
||||||
|
tr := New()
|
||||||
|
chain := newMockChain()
|
||||||
|
consumer := &mockConsumer{}
|
||||||
|
tr.Start(chain, consumer)
|
||||||
|
defer tr.Stop()
|
||||||
|
|
||||||
|
tx1, tx2 := makeTx(1), makeTx(2)
|
||||||
|
tr.NotifyAccepted("peerA", []common.Hash{tx1.Hash()})
|
||||||
|
tr.NotifyAccepted("peerB", []common.Hash{tx2.Hash()})
|
||||||
|
|
||||||
|
chain.addBlock(1, []*types.Transaction{tx1, tx2})
|
||||||
|
chain.sendHead(1)
|
||||||
|
waitStep(t, tr)
|
||||||
|
|
||||||
|
sig := consumer.last()
|
||||||
|
if sig.inclusions["peerA"] != 1 {
|
||||||
|
t.Errorf("peerA inclusions: got %d, want 1", sig.inclusions["peerA"])
|
||||||
|
}
|
||||||
|
if sig.inclusions["peerB"] != 1 {
|
||||||
|
t.Errorf("peerB inclusions: got %d, want 1", sig.inclusions["peerB"])
|
||||||
|
}
|
||||||
|
if len(sig.finalized) != 0 {
|
||||||
|
t.Errorf("expected empty finalized map, got %v", sig.finalized)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// TestHandleChainHeadEmptyBlock verifies an empty head block emits an
|
||||||
|
// empty inclusion map (so peerstats can decay all known peers).
|
||||||
|
func TestHandleChainHeadEmptyBlock(t *testing.T) {
|
||||||
|
tr := New()
|
||||||
|
chain := newMockChain()
|
||||||
|
consumer := &mockConsumer{}
|
||||||
|
tr.Start(chain, consumer)
|
||||||
|
defer tr.Stop()
|
||||||
|
|
||||||
|
chain.addBlock(1, nil)
|
||||||
|
chain.sendHead(1)
|
||||||
|
waitStep(t, tr)
|
||||||
|
|
||||||
|
sig := consumer.last()
|
||||||
|
if len(sig.inclusions) != 0 {
|
||||||
|
t.Errorf("expected empty inclusions, got %v", sig.inclusions)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// TestHandleChainHeadEmitsFinalization verifies that when finalization
|
||||||
|
// advances, the consumer receives per-peer finalization credits
|
||||||
|
// accumulated over the newly-finalized range.
|
||||||
|
func TestHandleChainHeadEmitsFinalization(t *testing.T) {
|
||||||
|
tr := New()
|
||||||
|
chain := newMockChain()
|
||||||
|
consumer := &mockConsumer{}
|
||||||
|
tr.Start(chain, consumer)
|
||||||
|
defer tr.Stop()
|
||||||
|
|
||||||
|
tx := makeTx(1)
|
||||||
|
tr.NotifyAccepted("peerA", []common.Hash{tx.Hash()})
|
||||||
|
|
||||||
|
// Include in block 1, not yet finalized.
|
||||||
|
chain.addBlock(1, []*types.Transaction{tx})
|
||||||
|
chain.sendHead(1)
|
||||||
|
waitStep(t, tr)
|
||||||
|
|
||||||
|
if credits := consumer.last().finalized["peerA"]; credits != 0 {
|
||||||
|
t.Fatalf("expected no finalization credits before finalization, got %d", credits)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Finalize block 1; next head triggers the finalization scan.
|
||||||
|
chain.setFinalBlock(1)
|
||||||
|
chain.addBlock(2, nil)
|
||||||
|
chain.sendHead(2)
|
||||||
|
waitStep(t, tr)
|
||||||
|
|
||||||
|
if credits := consumer.last().finalized["peerA"]; credits != 1 {
|
||||||
|
t.Fatalf("expected 1 finalization credit, got %d", credits)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// TestReorgSafety verifies the tracker resolves the head block by HASH
|
||||||
|
// so a head event pointing at a sibling block does not emit inclusions
|
||||||
|
// from the canonical block at the same height.
|
||||||
|
func TestReorgSafety(t *testing.T) {
|
||||||
|
tr := New()
|
||||||
|
chain := newMockChain()
|
||||||
|
consumer := &mockConsumer{}
|
||||||
|
tr.Start(chain, consumer)
|
||||||
|
defer tr.Stop()
|
||||||
|
|
||||||
|
tx := makeTx(1)
|
||||||
|
tr.NotifyAccepted("peerA", []common.Hash{tx.Hash()})
|
||||||
|
|
||||||
|
// Two blocks at height 1: canonical A contains tx; sibling B does not.
|
||||||
|
blockA := chain.addBlockAtHeight(1, 1, []*types.Transaction{tx}, true)
|
||||||
|
blockB := chain.addBlockAtHeight(1, 2, nil, false)
|
||||||
|
if blockA.Hash() == blockB.Hash() {
|
||||||
|
t.Fatal("sibling blocks ended up with the same hash")
|
||||||
|
}
|
||||||
|
|
||||||
|
// Head announces sibling B — emit must contain no peerA inclusions.
|
||||||
|
chain.sendHeadBlock(blockB)
|
||||||
|
waitStep(t, tr)
|
||||||
|
if incl := consumer.last().inclusions["peerA"]; incl != 0 {
|
||||||
|
t.Fatalf("sibling-B head should emit 0 peerA inclusions, got %d", incl)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Head announces canonical A — emit must contain 1 peerA inclusion.
|
||||||
|
chain.sendHeadBlock(blockA)
|
||||||
|
waitStep(t, tr)
|
||||||
|
if incl := consumer.last().inclusions["peerA"]; incl != 1 {
|
||||||
|
t.Fatalf("canonical-A head should emit 1 peerA inclusion, got %d", incl)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// TestHandleChainHeadNilConsumer verifies the tracker tolerates a nil
|
||||||
|
// consumer (useful for tests that only exercise tx-lifecycle behavior).
|
||||||
|
func TestHandleChainHeadNilConsumer(t *testing.T) {
|
||||||
|
tr := New()
|
||||||
|
chain := newMockChain()
|
||||||
|
tr.Start(chain, nil)
|
||||||
|
defer tr.Stop()
|
||||||
|
|
||||||
|
chain.addBlock(1, nil)
|
||||||
|
chain.sendHead(1)
|
||||||
|
waitStep(t, tr) // should not panic
|
||||||
|
}
|
||||||
|
|
@ -84,7 +84,7 @@ func fuzz(input []byte) int {
|
||||||
return make([]error, len(txs))
|
return make([]error, len(txs))
|
||||||
},
|
},
|
||||||
func(string, []common.Hash) error { return nil },
|
func(string, []common.Hash) error { return nil },
|
||||||
nil,
|
nil, nil, nil,
|
||||||
clock,
|
clock,
|
||||||
func() time.Time {
|
func() time.Time {
|
||||||
nanoTime := int64(clock.Now())
|
nanoTime := int64(clock.Now())
|
||||||
|
|
|
||||||
Loading…
Reference in a new issue