eth: stabilize tx relay peer selection (#31714)
Some checks are pending
/ Linux Build (push) Waiting to run
/ Linux Build (arm) (push) Waiting to run
/ Windows Build (push) Waiting to run
/ Docker Image (push) Waiting to run

When maxPeers was just above some perfect square, and a few peers
dropped for some reason, we changed the peer selection function.
When new peers were acquired, we changed again.

This PR improves the selection function, in two ways. First, it will always select
sqrt(peers) to broadcast to. Second, the selection now uses siphash with a secret
key, to guard against information leaks about tx source.

---------

Signed-off-by: Csaba Kiraly <csaba.kiraly@gmail.com>
Co-authored-by: Felix Lange <fjl@twurst.com>
This commit is contained in:
Csaba Kiraly 2025-08-28 16:05:54 +02:00 committed by GitHub
parent 3a89051d86
commit 9af1f71e78
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
5 changed files with 196 additions and 50 deletions

View file

@ -17,21 +17,22 @@
package eth
import (
"cmp"
crand "crypto/rand"
"errors"
"maps"
"math"
"math/big"
"slices"
"sync"
"sync/atomic"
"time"
"github.com/dchest/siphash"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core"
"github.com/ethereum/go-ethereum/core/rawdb"
"github.com/ethereum/go-ethereum/core/txpool"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/crypto"
"github.com/ethereum/go-ethereum/eth/downloader"
"github.com/ethereum/go-ethereum/eth/ethconfig"
"github.com/ethereum/go-ethereum/eth/fetcher"
@ -119,9 +120,10 @@ type handler struct {
chain *core.BlockChain
maxPeers int
downloader *downloader.Downloader
txFetcher *fetcher.TxFetcher
peers *peerSet
downloader *downloader.Downloader
txFetcher *fetcher.TxFetcher
peers *peerSet
txBroadcastKey [16]byte
eventMux *event.TypeMux
txsCh chan core.NewTxsEvent
@ -153,6 +155,7 @@ func newHandler(config *handlerConfig) (*handler, error) {
txpool: config.TxPool,
chain: config.Chain,
peers: newPeerSet(),
txBroadcastKey: newBroadcastChoiceKey(),
requiredBlocks: config.RequiredBlocks,
quitSync: make(chan struct{}),
handlerDoneCh: make(chan struct{}),
@ -480,58 +483,40 @@ func (h *handler) BroadcastTransactions(txs types.Transactions) {
txset = make(map[*ethPeer][]common.Hash) // Set peer->hash to transfer directly
annos = make(map[*ethPeer][]common.Hash) // Set peer->hash to announce
)
// Broadcast transactions to a batch of peers not knowing about it
direct := big.NewInt(int64(math.Sqrt(float64(h.peers.len())))) // Approximate number of peers to broadcast to
if direct.BitLen() == 0 {
direct = big.NewInt(1)
}
total := new(big.Int).Exp(direct, big.NewInt(2), nil) // Stabilise total peer count a bit based on sqrt peers
var (
signer = types.LatestSigner(h.chain.Config()) // Don't care about chain status, we just need *a* sender
hasher = crypto.NewKeccakState()
hash = make([]byte, 32)
signer = types.LatestSigner(h.chain.Config())
choice = newBroadcastChoice(h.nodeID, h.txBroadcastKey)
peers = h.peers.all()
)
for _, tx := range txs {
var maybeDirect bool
var directSet map[*ethPeer]struct{}
switch {
case tx.Type() == types.BlobTxType:
blobTxs++
case tx.Size() > txMaxBroadcastSize:
largeTxs++
default:
maybeDirect = true
// Get transaction sender address. Here we can ignore any error
// since we're just interested in any value.
txSender, _ := types.Sender(signer, tx)
directSet = choice.choosePeers(peers, txSender)
}
// Send the transaction (if it's small enough) directly to a subset of
// the peers that have not received it yet, ensuring that the flow of
// transactions is grouped by account to (try and) avoid nonce gaps.
//
// To do this, we hash the local enode IW with together with a peer's
// enode ID together with the transaction sender and broadcast if
// `sha(self, peer, sender) mod peers < sqrt(peers)`.
for _, peer := range h.peers.peersWithoutTransaction(tx.Hash()) {
var broadcast bool
if maybeDirect {
hasher.Reset()
hasher.Write(h.nodeID.Bytes())
hasher.Write(peer.Node().ID().Bytes())
from, _ := types.Sender(signer, tx) // Ignore error, we only use the addr as a propagation target splitter
hasher.Write(from.Bytes())
hasher.Read(hash)
if new(big.Int).Mod(new(big.Int).SetBytes(hash), total).Cmp(direct) < 0 {
broadcast = true
}
for _, peer := range peers {
if peer.KnownTransaction(tx.Hash()) {
continue
}
if broadcast {
if _, ok := directSet[peer]; ok {
// Send direct.
txset[peer] = append(txset[peer], tx.Hash())
} else {
// Send announcement.
annos[peer] = append(annos[peer], tx.Hash())
}
}
}
for peer, hashes := range txset {
directCount += len(hashes)
peer.AsyncSendTransactions(hashes)
@ -696,3 +681,62 @@ func (st *blockRangeState) stop() {
func (st *blockRangeState) currentRange() eth.BlockRangeUpdatePacket {
return *st.next.Load()
}
// broadcastChoice implements a deterministic random choice of peers. This is designed
// specifically for choosing which peer receives a direct broadcast of a transaction.
//
// The choice is made based on the involved p2p node IDs and the transaction sender,
// ensuring that the flow of transactions is grouped by account to (try and) avoid nonce
// gaps.
type broadcastChoice struct {
self enode.ID
key [16]byte
buffer map[*ethPeer]struct{}
tmp []broadcastPeer
}
type broadcastPeer struct {
p *ethPeer
score uint64
}
func newBroadcastChoiceKey() (k [16]byte) {
crand.Read(k[:])
return k
}
func newBroadcastChoice(self enode.ID, key [16]byte) *broadcastChoice {
return &broadcastChoice{
self: self,
key: key,
buffer: make(map[*ethPeer]struct{}),
}
}
// choosePeers selects the peers that will receive a direct transaction broadcast message.
// Note the return value will only stay valid until the next call to choosePeers.
func (bc *broadcastChoice) choosePeers(peers []*ethPeer, txSender common.Address) map[*ethPeer]struct{} {
// Compute randomized scores.
bc.tmp = slices.Grow(bc.tmp[:0], len(peers))[:len(peers)]
hash := siphash.New(bc.key[:])
for i, peer := range peers {
hash.Reset()
hash.Write(bc.self[:])
hash.Write(peer.Peer.Peer.ID().Bytes())
hash.Write(txSender[:])
bc.tmp[i] = broadcastPeer{peer, hash.Sum64()}
}
// Sort by score.
slices.SortFunc(bc.tmp, func(a, b broadcastPeer) int {
return cmp.Compare(a.score, b.score)
})
// Take top n.
clear(bc.buffer)
n := int(math.Ceil(math.Sqrt(float64(len(bc.tmp)))))
for i := range n {
bc.buffer[bc.tmp[i].p] = struct{}{}
}
return bc.buffer
}

View file

@ -17,9 +17,12 @@
package eth
import (
"maps"
"math/big"
"math/rand"
"sort"
"sync"
"testing"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/consensus/ethash"
@ -29,8 +32,11 @@ import (
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/crypto"
"github.com/ethereum/go-ethereum/eth/ethconfig"
"github.com/ethereum/go-ethereum/eth/protocols/eth"
"github.com/ethereum/go-ethereum/ethdb"
"github.com/ethereum/go-ethereum/event"
"github.com/ethereum/go-ethereum/p2p"
"github.com/ethereum/go-ethereum/p2p/enode"
"github.com/ethereum/go-ethereum/params"
"github.com/ethereum/go-ethereum/rlp"
"github.com/holiman/uint256"
@ -212,3 +218,102 @@ func (b *testHandler) close() {
b.handler.Stop()
b.chain.Stop()
}
func TestBroadcastChoice(t *testing.T) {
self := enode.HexID("1111111111111111111111111111111111111111111111111111111111111111")
choice49 := newBroadcastChoice(self, [16]byte{1})
choice50 := newBroadcastChoice(self, [16]byte{1})
// Create test peers and random tx sender addresses.
rand := rand.New(rand.NewSource(33))
txsenders := make([]common.Address, 400)
for i := range txsenders {
rand.Read(txsenders[i][:])
}
peers := createTestPeers(rand, 50)
defer closePeers(peers)
// Evaluate choice49 first.
expectedCount := 7 // sqrt(49)
var chosen49 = make([]map[*ethPeer]struct{}, len(txsenders))
for i, txSender := range txsenders {
set := choice49.choosePeers(peers[:49], txSender)
chosen49[i] = maps.Clone(set)
// Sanity check choices. Here we check that the function selects different peers
// for different transaction senders.
if len(set) != expectedCount {
t.Fatalf("choice49 produced wrong count %d, want %d", len(set), expectedCount)
}
if i > 0 && maps.Equal(set, chosen49[i-1]) {
t.Errorf("choice49 for tx %d is equal to tx %d", i, i-1)
}
}
// Evaluate choice50 for the same peers and transactions. It should always yield more
// peers than choice49, and the chosen set should be a superset of choice49's.
for i, txSender := range txsenders {
set := choice50.choosePeers(peers[:50], txSender)
if len(set) < len(chosen49[i]) {
t.Errorf("for tx %d, choice50 has less peers than choice49", i)
}
for p := range chosen49[i] {
if _, ok := set[p]; !ok {
t.Errorf("for tx %d, choice50 did not choose peer %v, but choice49 did", i, p.ID())
}
}
}
}
func BenchmarkBroadcastChoice(b *testing.B) {
b.Run("50", func(b *testing.B) {
benchmarkBroadcastChoice(b, 50)
})
b.Run("200", func(b *testing.B) {
benchmarkBroadcastChoice(b, 200)
})
b.Run("500", func(b *testing.B) {
benchmarkBroadcastChoice(b, 500)
})
}
// This measures the overhead of sending one transaction to N peers.
func benchmarkBroadcastChoice(b *testing.B, npeers int) {
rand := rand.New(rand.NewSource(33))
peers := createTestPeers(rand, npeers)
defer closePeers(peers)
txsenders := make([]common.Address, b.N)
for i := range txsenders {
rand.Read(txsenders[i][:])
}
self := enode.HexID("1111111111111111111111111111111111111111111111111111111111111111")
choice := newBroadcastChoice(self, [16]byte{1})
b.ResetTimer()
for i := range b.N {
set := choice.choosePeers(peers, txsenders[i])
if len(set) == 0 {
b.Fatal("empty result")
}
}
}
func createTestPeers(rand *rand.Rand, n int) []*ethPeer {
peers := make([]*ethPeer, n)
for i := range peers {
var id enode.ID
rand.Read(id[:])
p2pPeer := p2p.NewPeer(id, "test", nil)
ep := eth.NewPeer(eth.ETH69, p2pPeer, nil, nil)
peers[i] = &ethPeer{Peer: ep}
}
return peers
}
func closePeers(peers []*ethPeer) {
for _, p := range peers {
p.Close()
}
}

View file

@ -19,9 +19,10 @@ package eth
import (
"errors"
"fmt"
"maps"
"slices"
"sync"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/eth/protocols/eth"
"github.com/ethereum/go-ethereum/eth/protocols/snap"
"github.com/ethereum/go-ethereum/p2p"
@ -191,19 +192,12 @@ func (ps *peerSet) peer(id string) *ethPeer {
return ps.peers[id]
}
// peersWithoutTransaction retrieves a list of peers that do not have a given
// transaction in their set of known hashes.
func (ps *peerSet) peersWithoutTransaction(hash common.Hash) []*ethPeer {
// all returns all current peers.
func (ps *peerSet) all() []*ethPeer {
ps.lock.RLock()
defer ps.lock.RUnlock()
list := make([]*ethPeer, 0, len(ps.peers))
for _, p := range ps.peers {
if !p.KnownTransaction(hash) {
list = append(list, p)
}
}
return list
return slices.Collect(maps.Values(ps.peers))
}
// len returns if the current number of `eth` peers in the set. Since the `snap`

1
go.mod
View file

@ -17,6 +17,7 @@ require (
github.com/crate-crypto/go-eth-kzg v1.3.0
github.com/crate-crypto/go-ipa v0.0.0-20240724233137-53bbb0ceb27a
github.com/davecgh/go-spew v1.1.1
github.com/dchest/siphash v1.2.3
github.com/deckarep/golang-set/v2 v2.6.0
github.com/decred/dcrd/dcrec/secp256k1/v4 v4.0.1
github.com/donovanhide/eventsource v0.0.0-20210830082556-c59027999da0

2
go.sum
View file

@ -87,6 +87,8 @@ github.com/cyberdelia/templates v0.0.0-20141128023046-ca7fffd4298c/go.mod h1:GyV
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/dchest/siphash v1.2.3 h1:QXwFc8cFOR2dSa/gE6o/HokBMWtLUaNDVd+22aKHeEA=
github.com/dchest/siphash v1.2.3/go.mod h1:0NvQU092bT0ipiFN++/rXm69QG9tVxLAlQHIXMPAkHc=
github.com/deckarep/golang-set/v2 v2.6.0 h1:XfcQbWM1LlMB8BsJ8N9vW5ehnnPVIw0je80NsVHagjM=
github.com/deckarep/golang-set/v2 v2.6.0/go.mod h1:VAky9rY/yGXJOLEDv3OMci+7wtDpOF4IN+y82NBOac4=
github.com/decred/dcrd/crypto/blake256 v1.0.0 h1:/8DMNYp9SGi5f0w7uCm6d6M4OU2rGFK09Y2A4Xv7EE0=