diff --git a/eth/handler.go b/eth/handler.go index aaea00e037..32d1bb6935 100644 --- a/eth/handler.go +++ b/eth/handler.go @@ -17,21 +17,22 @@ package eth import ( + "cmp" + crand "crypto/rand" "errors" "maps" "math" - "math/big" "slices" "sync" "sync/atomic" "time" + "github.com/dchest/siphash" "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/core" "github.com/ethereum/go-ethereum/core/rawdb" "github.com/ethereum/go-ethereum/core/txpool" "github.com/ethereum/go-ethereum/core/types" - "github.com/ethereum/go-ethereum/crypto" "github.com/ethereum/go-ethereum/eth/downloader" "github.com/ethereum/go-ethereum/eth/ethconfig" "github.com/ethereum/go-ethereum/eth/fetcher" @@ -119,9 +120,10 @@ type handler struct { chain *core.BlockChain maxPeers int - downloader *downloader.Downloader - txFetcher *fetcher.TxFetcher - peers *peerSet + downloader *downloader.Downloader + txFetcher *fetcher.TxFetcher + peers *peerSet + txBroadcastKey [16]byte eventMux *event.TypeMux txsCh chan core.NewTxsEvent @@ -153,6 +155,7 @@ func newHandler(config *handlerConfig) (*handler, error) { txpool: config.TxPool, chain: config.Chain, peers: newPeerSet(), + txBroadcastKey: newBroadcastChoiceKey(), requiredBlocks: config.RequiredBlocks, quitSync: make(chan struct{}), handlerDoneCh: make(chan struct{}), @@ -480,58 +483,40 @@ func (h *handler) BroadcastTransactions(txs types.Transactions) { txset = make(map[*ethPeer][]common.Hash) // Set peer->hash to transfer directly annos = make(map[*ethPeer][]common.Hash) // Set peer->hash to announce - ) - // Broadcast transactions to a batch of peers not knowing about it - direct := big.NewInt(int64(math.Sqrt(float64(h.peers.len())))) // Approximate number of peers to broadcast to - if direct.BitLen() == 0 { - direct = big.NewInt(1) - } - total := new(big.Int).Exp(direct, big.NewInt(2), nil) // Stabilise total peer count a bit based on sqrt peers - var ( - signer = types.LatestSigner(h.chain.Config()) // Don't care about chain status, we just need *a* sender - hasher = crypto.NewKeccakState() - hash = make([]byte, 32) + signer = types.LatestSigner(h.chain.Config()) + choice = newBroadcastChoice(h.nodeID, h.txBroadcastKey) + peers = h.peers.all() ) + for _, tx := range txs { - var maybeDirect bool + var directSet map[*ethPeer]struct{} switch { case tx.Type() == types.BlobTxType: blobTxs++ case tx.Size() > txMaxBroadcastSize: largeTxs++ default: - maybeDirect = true + // Get transaction sender address. Here we can ignore any error + // since we're just interested in any value. + txSender, _ := types.Sender(signer, tx) + directSet = choice.choosePeers(peers, txSender) } - // Send the transaction (if it's small enough) directly to a subset of - // the peers that have not received it yet, ensuring that the flow of - // transactions is grouped by account to (try and) avoid nonce gaps. - // - // To do this, we hash the local enode IW with together with a peer's - // enode ID together with the transaction sender and broadcast if - // `sha(self, peer, sender) mod peers < sqrt(peers)`. - for _, peer := range h.peers.peersWithoutTransaction(tx.Hash()) { - var broadcast bool - if maybeDirect { - hasher.Reset() - hasher.Write(h.nodeID.Bytes()) - hasher.Write(peer.Node().ID().Bytes()) - from, _ := types.Sender(signer, tx) // Ignore error, we only use the addr as a propagation target splitter - hasher.Write(from.Bytes()) - - hasher.Read(hash) - if new(big.Int).Mod(new(big.Int).SetBytes(hash), total).Cmp(direct) < 0 { - broadcast = true - } + for _, peer := range peers { + if peer.KnownTransaction(tx.Hash()) { + continue } - if broadcast { + if _, ok := directSet[peer]; ok { + // Send direct. txset[peer] = append(txset[peer], tx.Hash()) } else { + // Send announcement. annos[peer] = append(annos[peer], tx.Hash()) } } } + for peer, hashes := range txset { directCount += len(hashes) peer.AsyncSendTransactions(hashes) @@ -696,3 +681,62 @@ func (st *blockRangeState) stop() { func (st *blockRangeState) currentRange() eth.BlockRangeUpdatePacket { return *st.next.Load() } + +// broadcastChoice implements a deterministic random choice of peers. This is designed +// specifically for choosing which peer receives a direct broadcast of a transaction. +// +// The choice is made based on the involved p2p node IDs and the transaction sender, +// ensuring that the flow of transactions is grouped by account to (try and) avoid nonce +// gaps. +type broadcastChoice struct { + self enode.ID + key [16]byte + buffer map[*ethPeer]struct{} + tmp []broadcastPeer +} + +type broadcastPeer struct { + p *ethPeer + score uint64 +} + +func newBroadcastChoiceKey() (k [16]byte) { + crand.Read(k[:]) + return k +} + +func newBroadcastChoice(self enode.ID, key [16]byte) *broadcastChoice { + return &broadcastChoice{ + self: self, + key: key, + buffer: make(map[*ethPeer]struct{}), + } +} + +// choosePeers selects the peers that will receive a direct transaction broadcast message. +// Note the return value will only stay valid until the next call to choosePeers. +func (bc *broadcastChoice) choosePeers(peers []*ethPeer, txSender common.Address) map[*ethPeer]struct{} { + // Compute randomized scores. + bc.tmp = slices.Grow(bc.tmp[:0], len(peers))[:len(peers)] + hash := siphash.New(bc.key[:]) + for i, peer := range peers { + hash.Reset() + hash.Write(bc.self[:]) + hash.Write(peer.Peer.Peer.ID().Bytes()) + hash.Write(txSender[:]) + bc.tmp[i] = broadcastPeer{peer, hash.Sum64()} + } + + // Sort by score. + slices.SortFunc(bc.tmp, func(a, b broadcastPeer) int { + return cmp.Compare(a.score, b.score) + }) + + // Take top n. + clear(bc.buffer) + n := int(math.Ceil(math.Sqrt(float64(len(bc.tmp))))) + for i := range n { + bc.buffer[bc.tmp[i].p] = struct{}{} + } + return bc.buffer +} diff --git a/eth/handler_test.go b/eth/handler_test.go index d0da098430..b37e6227f4 100644 --- a/eth/handler_test.go +++ b/eth/handler_test.go @@ -17,9 +17,12 @@ package eth import ( + "maps" "math/big" + "math/rand" "sort" "sync" + "testing" "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/consensus/ethash" @@ -29,8 +32,11 @@ import ( "github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/crypto" "github.com/ethereum/go-ethereum/eth/ethconfig" + "github.com/ethereum/go-ethereum/eth/protocols/eth" "github.com/ethereum/go-ethereum/ethdb" "github.com/ethereum/go-ethereum/event" + "github.com/ethereum/go-ethereum/p2p" + "github.com/ethereum/go-ethereum/p2p/enode" "github.com/ethereum/go-ethereum/params" "github.com/ethereum/go-ethereum/rlp" "github.com/holiman/uint256" @@ -212,3 +218,102 @@ func (b *testHandler) close() { b.handler.Stop() b.chain.Stop() } + +func TestBroadcastChoice(t *testing.T) { + self := enode.HexID("1111111111111111111111111111111111111111111111111111111111111111") + choice49 := newBroadcastChoice(self, [16]byte{1}) + choice50 := newBroadcastChoice(self, [16]byte{1}) + + // Create test peers and random tx sender addresses. + rand := rand.New(rand.NewSource(33)) + txsenders := make([]common.Address, 400) + for i := range txsenders { + rand.Read(txsenders[i][:]) + } + peers := createTestPeers(rand, 50) + defer closePeers(peers) + + // Evaluate choice49 first. + expectedCount := 7 // sqrt(49) + var chosen49 = make([]map[*ethPeer]struct{}, len(txsenders)) + for i, txSender := range txsenders { + set := choice49.choosePeers(peers[:49], txSender) + chosen49[i] = maps.Clone(set) + + // Sanity check choices. Here we check that the function selects different peers + // for different transaction senders. + if len(set) != expectedCount { + t.Fatalf("choice49 produced wrong count %d, want %d", len(set), expectedCount) + } + if i > 0 && maps.Equal(set, chosen49[i-1]) { + t.Errorf("choice49 for tx %d is equal to tx %d", i, i-1) + } + } + + // Evaluate choice50 for the same peers and transactions. It should always yield more + // peers than choice49, and the chosen set should be a superset of choice49's. + for i, txSender := range txsenders { + set := choice50.choosePeers(peers[:50], txSender) + if len(set) < len(chosen49[i]) { + t.Errorf("for tx %d, choice50 has less peers than choice49", i) + } + for p := range chosen49[i] { + if _, ok := set[p]; !ok { + t.Errorf("for tx %d, choice50 did not choose peer %v, but choice49 did", i, p.ID()) + } + } + } +} + +func BenchmarkBroadcastChoice(b *testing.B) { + b.Run("50", func(b *testing.B) { + benchmarkBroadcastChoice(b, 50) + }) + b.Run("200", func(b *testing.B) { + benchmarkBroadcastChoice(b, 200) + }) + b.Run("500", func(b *testing.B) { + benchmarkBroadcastChoice(b, 500) + }) +} + +// This measures the overhead of sending one transaction to N peers. +func benchmarkBroadcastChoice(b *testing.B, npeers int) { + rand := rand.New(rand.NewSource(33)) + peers := createTestPeers(rand, npeers) + defer closePeers(peers) + + txsenders := make([]common.Address, b.N) + for i := range txsenders { + rand.Read(txsenders[i][:]) + } + + self := enode.HexID("1111111111111111111111111111111111111111111111111111111111111111") + choice := newBroadcastChoice(self, [16]byte{1}) + + b.ResetTimer() + for i := range b.N { + set := choice.choosePeers(peers, txsenders[i]) + if len(set) == 0 { + b.Fatal("empty result") + } + } +} + +func createTestPeers(rand *rand.Rand, n int) []*ethPeer { + peers := make([]*ethPeer, n) + for i := range peers { + var id enode.ID + rand.Read(id[:]) + p2pPeer := p2p.NewPeer(id, "test", nil) + ep := eth.NewPeer(eth.ETH69, p2pPeer, nil, nil) + peers[i] = ðPeer{Peer: ep} + } + return peers +} + +func closePeers(peers []*ethPeer) { + for _, p := range peers { + p.Close() + } +} diff --git a/eth/peerset.go b/eth/peerset.go index 6b0aff226c..e6f623f90c 100644 --- a/eth/peerset.go +++ b/eth/peerset.go @@ -19,9 +19,10 @@ package eth import ( "errors" "fmt" + "maps" + "slices" "sync" - "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/eth/protocols/eth" "github.com/ethereum/go-ethereum/eth/protocols/snap" "github.com/ethereum/go-ethereum/p2p" @@ -191,19 +192,12 @@ func (ps *peerSet) peer(id string) *ethPeer { return ps.peers[id] } -// peersWithoutTransaction retrieves a list of peers that do not have a given -// transaction in their set of known hashes. -func (ps *peerSet) peersWithoutTransaction(hash common.Hash) []*ethPeer { +// all returns all current peers. +func (ps *peerSet) all() []*ethPeer { ps.lock.RLock() defer ps.lock.RUnlock() - list := make([]*ethPeer, 0, len(ps.peers)) - for _, p := range ps.peers { - if !p.KnownTransaction(hash) { - list = append(list, p) - } - } - return list + return slices.Collect(maps.Values(ps.peers)) } // len returns if the current number of `eth` peers in the set. Since the `snap` diff --git a/go.mod b/go.mod index 363d7d3dfb..d701c08ad5 100644 --- a/go.mod +++ b/go.mod @@ -17,6 +17,7 @@ require ( github.com/crate-crypto/go-eth-kzg v1.3.0 github.com/crate-crypto/go-ipa v0.0.0-20240724233137-53bbb0ceb27a github.com/davecgh/go-spew v1.1.1 + github.com/dchest/siphash v1.2.3 github.com/deckarep/golang-set/v2 v2.6.0 github.com/decred/dcrd/dcrec/secp256k1/v4 v4.0.1 github.com/donovanhide/eventsource v0.0.0-20210830082556-c59027999da0 diff --git a/go.sum b/go.sum index 099d432ba4..53913262ae 100644 --- a/go.sum +++ b/go.sum @@ -87,6 +87,8 @@ github.com/cyberdelia/templates v0.0.0-20141128023046-ca7fffd4298c/go.mod h1:GyV github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/dchest/siphash v1.2.3 h1:QXwFc8cFOR2dSa/gE6o/HokBMWtLUaNDVd+22aKHeEA= +github.com/dchest/siphash v1.2.3/go.mod h1:0NvQU092bT0ipiFN++/rXm69QG9tVxLAlQHIXMPAkHc= github.com/deckarep/golang-set/v2 v2.6.0 h1:XfcQbWM1LlMB8BsJ8N9vW5ehnnPVIw0je80NsVHagjM= github.com/deckarep/golang-set/v2 v2.6.0/go.mod h1:VAky9rY/yGXJOLEDv3OMci+7wtDpOF4IN+y82NBOac4= github.com/decred/dcrd/crypto/blake256 v1.0.0 h1:/8DMNYp9SGi5f0w7uCm6d6M4OU2rGFK09Y2A4Xv7EE0=