From 9af1f71e78ad65a07b5704a145f502643abc2f45 Mon Sep 17 00:00:00 2001 From: Csaba Kiraly Date: Thu, 28 Aug 2025 16:05:54 +0200 Subject: [PATCH] eth: stabilize tx relay peer selection (#31714) When maxPeers was just above some perfect square, and a few peers dropped for some reason, we changed the peer selection function. When new peers were acquired, we changed again. This PR improves the selection function, in two ways. First, it will always select sqrt(peers) to broadcast to. Second, the selection now uses siphash with a secret key, to guard against information leaks about tx source. --------- Signed-off-by: Csaba Kiraly Co-authored-by: Felix Lange --- eth/handler.go | 122 ++++++++++++++++++++++++++++++-------------- eth/handler_test.go | 105 ++++++++++++++++++++++++++++++++++++++ eth/peerset.go | 16 ++---- go.mod | 1 + go.sum | 2 + 5 files changed, 196 insertions(+), 50 deletions(-) diff --git a/eth/handler.go b/eth/handler.go index aaea00e037..32d1bb6935 100644 --- a/eth/handler.go +++ b/eth/handler.go @@ -17,21 +17,22 @@ package eth import ( + "cmp" + crand "crypto/rand" "errors" "maps" "math" - "math/big" "slices" "sync" "sync/atomic" "time" + "github.com/dchest/siphash" "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/core" "github.com/ethereum/go-ethereum/core/rawdb" "github.com/ethereum/go-ethereum/core/txpool" "github.com/ethereum/go-ethereum/core/types" - "github.com/ethereum/go-ethereum/crypto" "github.com/ethereum/go-ethereum/eth/downloader" "github.com/ethereum/go-ethereum/eth/ethconfig" "github.com/ethereum/go-ethereum/eth/fetcher" @@ -119,9 +120,10 @@ type handler struct { chain *core.BlockChain maxPeers int - downloader *downloader.Downloader - txFetcher *fetcher.TxFetcher - peers *peerSet + downloader *downloader.Downloader + txFetcher *fetcher.TxFetcher + peers *peerSet + txBroadcastKey [16]byte eventMux *event.TypeMux txsCh chan core.NewTxsEvent @@ -153,6 +155,7 @@ func newHandler(config *handlerConfig) (*handler, error) { txpool: config.TxPool, chain: config.Chain, peers: newPeerSet(), + txBroadcastKey: newBroadcastChoiceKey(), requiredBlocks: config.RequiredBlocks, quitSync: make(chan struct{}), handlerDoneCh: make(chan struct{}), @@ -480,58 +483,40 @@ func (h *handler) BroadcastTransactions(txs types.Transactions) { txset = make(map[*ethPeer][]common.Hash) // Set peer->hash to transfer directly annos = make(map[*ethPeer][]common.Hash) // Set peer->hash to announce - ) - // Broadcast transactions to a batch of peers not knowing about it - direct := big.NewInt(int64(math.Sqrt(float64(h.peers.len())))) // Approximate number of peers to broadcast to - if direct.BitLen() == 0 { - direct = big.NewInt(1) - } - total := new(big.Int).Exp(direct, big.NewInt(2), nil) // Stabilise total peer count a bit based on sqrt peers - var ( - signer = types.LatestSigner(h.chain.Config()) // Don't care about chain status, we just need *a* sender - hasher = crypto.NewKeccakState() - hash = make([]byte, 32) + signer = types.LatestSigner(h.chain.Config()) + choice = newBroadcastChoice(h.nodeID, h.txBroadcastKey) + peers = h.peers.all() ) + for _, tx := range txs { - var maybeDirect bool + var directSet map[*ethPeer]struct{} switch { case tx.Type() == types.BlobTxType: blobTxs++ case tx.Size() > txMaxBroadcastSize: largeTxs++ default: - maybeDirect = true + // Get transaction sender address. Here we can ignore any error + // since we're just interested in any value. + txSender, _ := types.Sender(signer, tx) + directSet = choice.choosePeers(peers, txSender) } - // Send the transaction (if it's small enough) directly to a subset of - // the peers that have not received it yet, ensuring that the flow of - // transactions is grouped by account to (try and) avoid nonce gaps. - // - // To do this, we hash the local enode IW with together with a peer's - // enode ID together with the transaction sender and broadcast if - // `sha(self, peer, sender) mod peers < sqrt(peers)`. - for _, peer := range h.peers.peersWithoutTransaction(tx.Hash()) { - var broadcast bool - if maybeDirect { - hasher.Reset() - hasher.Write(h.nodeID.Bytes()) - hasher.Write(peer.Node().ID().Bytes()) - from, _ := types.Sender(signer, tx) // Ignore error, we only use the addr as a propagation target splitter - hasher.Write(from.Bytes()) - - hasher.Read(hash) - if new(big.Int).Mod(new(big.Int).SetBytes(hash), total).Cmp(direct) < 0 { - broadcast = true - } + for _, peer := range peers { + if peer.KnownTransaction(tx.Hash()) { + continue } - if broadcast { + if _, ok := directSet[peer]; ok { + // Send direct. txset[peer] = append(txset[peer], tx.Hash()) } else { + // Send announcement. annos[peer] = append(annos[peer], tx.Hash()) } } } + for peer, hashes := range txset { directCount += len(hashes) peer.AsyncSendTransactions(hashes) @@ -696,3 +681,62 @@ func (st *blockRangeState) stop() { func (st *blockRangeState) currentRange() eth.BlockRangeUpdatePacket { return *st.next.Load() } + +// broadcastChoice implements a deterministic random choice of peers. This is designed +// specifically for choosing which peer receives a direct broadcast of a transaction. +// +// The choice is made based on the involved p2p node IDs and the transaction sender, +// ensuring that the flow of transactions is grouped by account to (try and) avoid nonce +// gaps. +type broadcastChoice struct { + self enode.ID + key [16]byte + buffer map[*ethPeer]struct{} + tmp []broadcastPeer +} + +type broadcastPeer struct { + p *ethPeer + score uint64 +} + +func newBroadcastChoiceKey() (k [16]byte) { + crand.Read(k[:]) + return k +} + +func newBroadcastChoice(self enode.ID, key [16]byte) *broadcastChoice { + return &broadcastChoice{ + self: self, + key: key, + buffer: make(map[*ethPeer]struct{}), + } +} + +// choosePeers selects the peers that will receive a direct transaction broadcast message. +// Note the return value will only stay valid until the next call to choosePeers. +func (bc *broadcastChoice) choosePeers(peers []*ethPeer, txSender common.Address) map[*ethPeer]struct{} { + // Compute randomized scores. + bc.tmp = slices.Grow(bc.tmp[:0], len(peers))[:len(peers)] + hash := siphash.New(bc.key[:]) + for i, peer := range peers { + hash.Reset() + hash.Write(bc.self[:]) + hash.Write(peer.Peer.Peer.ID().Bytes()) + hash.Write(txSender[:]) + bc.tmp[i] = broadcastPeer{peer, hash.Sum64()} + } + + // Sort by score. + slices.SortFunc(bc.tmp, func(a, b broadcastPeer) int { + return cmp.Compare(a.score, b.score) + }) + + // Take top n. + clear(bc.buffer) + n := int(math.Ceil(math.Sqrt(float64(len(bc.tmp))))) + for i := range n { + bc.buffer[bc.tmp[i].p] = struct{}{} + } + return bc.buffer +} diff --git a/eth/handler_test.go b/eth/handler_test.go index d0da098430..b37e6227f4 100644 --- a/eth/handler_test.go +++ b/eth/handler_test.go @@ -17,9 +17,12 @@ package eth import ( + "maps" "math/big" + "math/rand" "sort" "sync" + "testing" "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/consensus/ethash" @@ -29,8 +32,11 @@ import ( "github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/crypto" "github.com/ethereum/go-ethereum/eth/ethconfig" + "github.com/ethereum/go-ethereum/eth/protocols/eth" "github.com/ethereum/go-ethereum/ethdb" "github.com/ethereum/go-ethereum/event" + "github.com/ethereum/go-ethereum/p2p" + "github.com/ethereum/go-ethereum/p2p/enode" "github.com/ethereum/go-ethereum/params" "github.com/ethereum/go-ethereum/rlp" "github.com/holiman/uint256" @@ -212,3 +218,102 @@ func (b *testHandler) close() { b.handler.Stop() b.chain.Stop() } + +func TestBroadcastChoice(t *testing.T) { + self := enode.HexID("1111111111111111111111111111111111111111111111111111111111111111") + choice49 := newBroadcastChoice(self, [16]byte{1}) + choice50 := newBroadcastChoice(self, [16]byte{1}) + + // Create test peers and random tx sender addresses. + rand := rand.New(rand.NewSource(33)) + txsenders := make([]common.Address, 400) + for i := range txsenders { + rand.Read(txsenders[i][:]) + } + peers := createTestPeers(rand, 50) + defer closePeers(peers) + + // Evaluate choice49 first. + expectedCount := 7 // sqrt(49) + var chosen49 = make([]map[*ethPeer]struct{}, len(txsenders)) + for i, txSender := range txsenders { + set := choice49.choosePeers(peers[:49], txSender) + chosen49[i] = maps.Clone(set) + + // Sanity check choices. Here we check that the function selects different peers + // for different transaction senders. + if len(set) != expectedCount { + t.Fatalf("choice49 produced wrong count %d, want %d", len(set), expectedCount) + } + if i > 0 && maps.Equal(set, chosen49[i-1]) { + t.Errorf("choice49 for tx %d is equal to tx %d", i, i-1) + } + } + + // Evaluate choice50 for the same peers and transactions. It should always yield more + // peers than choice49, and the chosen set should be a superset of choice49's. + for i, txSender := range txsenders { + set := choice50.choosePeers(peers[:50], txSender) + if len(set) < len(chosen49[i]) { + t.Errorf("for tx %d, choice50 has less peers than choice49", i) + } + for p := range chosen49[i] { + if _, ok := set[p]; !ok { + t.Errorf("for tx %d, choice50 did not choose peer %v, but choice49 did", i, p.ID()) + } + } + } +} + +func BenchmarkBroadcastChoice(b *testing.B) { + b.Run("50", func(b *testing.B) { + benchmarkBroadcastChoice(b, 50) + }) + b.Run("200", func(b *testing.B) { + benchmarkBroadcastChoice(b, 200) + }) + b.Run("500", func(b *testing.B) { + benchmarkBroadcastChoice(b, 500) + }) +} + +// This measures the overhead of sending one transaction to N peers. +func benchmarkBroadcastChoice(b *testing.B, npeers int) { + rand := rand.New(rand.NewSource(33)) + peers := createTestPeers(rand, npeers) + defer closePeers(peers) + + txsenders := make([]common.Address, b.N) + for i := range txsenders { + rand.Read(txsenders[i][:]) + } + + self := enode.HexID("1111111111111111111111111111111111111111111111111111111111111111") + choice := newBroadcastChoice(self, [16]byte{1}) + + b.ResetTimer() + for i := range b.N { + set := choice.choosePeers(peers, txsenders[i]) + if len(set) == 0 { + b.Fatal("empty result") + } + } +} + +func createTestPeers(rand *rand.Rand, n int) []*ethPeer { + peers := make([]*ethPeer, n) + for i := range peers { + var id enode.ID + rand.Read(id[:]) + p2pPeer := p2p.NewPeer(id, "test", nil) + ep := eth.NewPeer(eth.ETH69, p2pPeer, nil, nil) + peers[i] = ðPeer{Peer: ep} + } + return peers +} + +func closePeers(peers []*ethPeer) { + for _, p := range peers { + p.Close() + } +} diff --git a/eth/peerset.go b/eth/peerset.go index 6b0aff226c..e6f623f90c 100644 --- a/eth/peerset.go +++ b/eth/peerset.go @@ -19,9 +19,10 @@ package eth import ( "errors" "fmt" + "maps" + "slices" "sync" - "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/eth/protocols/eth" "github.com/ethereum/go-ethereum/eth/protocols/snap" "github.com/ethereum/go-ethereum/p2p" @@ -191,19 +192,12 @@ func (ps *peerSet) peer(id string) *ethPeer { return ps.peers[id] } -// peersWithoutTransaction retrieves a list of peers that do not have a given -// transaction in their set of known hashes. -func (ps *peerSet) peersWithoutTransaction(hash common.Hash) []*ethPeer { +// all returns all current peers. +func (ps *peerSet) all() []*ethPeer { ps.lock.RLock() defer ps.lock.RUnlock() - list := make([]*ethPeer, 0, len(ps.peers)) - for _, p := range ps.peers { - if !p.KnownTransaction(hash) { - list = append(list, p) - } - } - return list + return slices.Collect(maps.Values(ps.peers)) } // len returns if the current number of `eth` peers in the set. Since the `snap` diff --git a/go.mod b/go.mod index 363d7d3dfb..d701c08ad5 100644 --- a/go.mod +++ b/go.mod @@ -17,6 +17,7 @@ require ( github.com/crate-crypto/go-eth-kzg v1.3.0 github.com/crate-crypto/go-ipa v0.0.0-20240724233137-53bbb0ceb27a github.com/davecgh/go-spew v1.1.1 + github.com/dchest/siphash v1.2.3 github.com/deckarep/golang-set/v2 v2.6.0 github.com/decred/dcrd/dcrec/secp256k1/v4 v4.0.1 github.com/donovanhide/eventsource v0.0.0-20210830082556-c59027999da0 diff --git a/go.sum b/go.sum index 099d432ba4..53913262ae 100644 --- a/go.sum +++ b/go.sum @@ -87,6 +87,8 @@ github.com/cyberdelia/templates v0.0.0-20141128023046-ca7fffd4298c/go.mod h1:GyV github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/dchest/siphash v1.2.3 h1:QXwFc8cFOR2dSa/gE6o/HokBMWtLUaNDVd+22aKHeEA= +github.com/dchest/siphash v1.2.3/go.mod h1:0NvQU092bT0ipiFN++/rXm69QG9tVxLAlQHIXMPAkHc= github.com/deckarep/golang-set/v2 v2.6.0 h1:XfcQbWM1LlMB8BsJ8N9vW5ehnnPVIw0je80NsVHagjM= github.com/deckarep/golang-set/v2 v2.6.0/go.mod h1:VAky9rY/yGXJOLEDv3OMci+7wtDpOF4IN+y82NBOac4= github.com/decred/dcrd/crypto/blake256 v1.0.0 h1:/8DMNYp9SGi5f0w7uCm6d6M4OU2rGFK09Y2A4Xv7EE0=