From 88576c52e246c52438687a3679424a1cfddfbb8e Mon Sep 17 00:00:00 2001 From: Bosul Mun Date: Mon, 20 Oct 2025 11:26:55 +0900 Subject: [PATCH] eth/fetcher: remove dangling peers from alternates (#32947) This PR removes dangling peers in `alternates` map In the current code, a dropped peer is removed from alternates for only the specific transaction hash it was requesting. If that peer is listed as an alternate for other transaction hashes, those entries still stick around in alternates/announced even though that peer already got dropped. --- eth/fetcher/tx_fetcher.go | 6 +++- eth/fetcher/tx_fetcher_test.go | 50 ++++++++++++++++++++++++++++++++++ 2 files changed, 55 insertions(+), 1 deletion(-) diff --git a/eth/fetcher/tx_fetcher.go b/eth/fetcher/tx_fetcher.go index 5ba72e6b01..d919ac8a5f 100644 --- a/eth/fetcher/tx_fetcher.go +++ b/eth/fetcher/tx_fetcher.go @@ -795,6 +795,10 @@ func (f *TxFetcher) loop() { if len(f.announced[hash]) == 0 { delete(f.announced, hash) } + delete(f.alternates[hash], drop.peer) + if len(f.alternates[hash]) == 0 { + delete(f.alternates, hash) + } } delete(f.announces, drop.peer) } @@ -858,7 +862,7 @@ func (f *TxFetcher) rescheduleWait(timer *mclock.Timer, trigger chan struct{}) { // This method is a bit "flaky" "by design". In theory the timeout timer only ever // should be rescheduled if some request is pending. In practice, a timeout will // cause the timer to be rescheduled every 5 secs (until the peer comes through or -// disconnects). This is a limitation of the fetcher code because we don't trac +// disconnects). This is a limitation of the fetcher code because we don't track // pending requests and timed out requests separately. Without double tracking, if // we simply didn't reschedule the timer on all-timeout then the timer would never // be set again since len(request) > 0 => something's running. diff --git a/eth/fetcher/tx_fetcher_test.go b/eth/fetcher/tx_fetcher_test.go index 0f05a1c995..bb41f62932 100644 --- a/eth/fetcher/tx_fetcher_test.go +++ b/eth/fetcher/tx_fetcher_test.go @@ -1858,6 +1858,56 @@ func TestBlobTransactionAnnounce(t *testing.T) { }) } +func TestTransactionFetcherDropAlternates(t *testing.T) { + testTransactionFetcherParallel(t, txFetcherTest{ + init: func() *TxFetcher { + return NewTxFetcher( + func(common.Hash) bool { return false }, + func(txs []*types.Transaction) []error { + return make([]error, len(txs)) + }, + func(string, []common.Hash) error { return nil }, + nil, + ) + }, + steps: []interface{}{ + doTxNotify{peer: "A", hashes: []common.Hash{testTxsHashes[0]}, types: []byte{testTxs[0].Type()}, sizes: []uint32{uint32(testTxs[0].Size())}}, + doWait{time: txArriveTimeout, step: true}, + doTxNotify{peer: "B", hashes: []common.Hash{testTxsHashes[0]}, types: []byte{testTxs[0].Type()}, sizes: []uint32{uint32(testTxs[0].Size())}}, + + isScheduled{ + tracking: map[string][]announce{ + "A": { + {testTxsHashes[0], testTxs[0].Type(), uint32(testTxs[0].Size())}, + }, + "B": { + {testTxsHashes[0], testTxs[0].Type(), uint32(testTxs[0].Size())}, + }, + }, + fetching: map[string][]common.Hash{ + "A": {testTxsHashes[0]}, + }, + }, + doDrop("B"), + + isScheduled{ + tracking: map[string][]announce{ + "A": { + {testTxsHashes[0], testTxs[0].Type(), uint32(testTxs[0].Size())}, + }, + }, + fetching: map[string][]common.Hash{ + "A": {testTxsHashes[0]}, + }, + }, + doDrop("A"), + isScheduled{ + tracking: nil, fetching: nil, + }, + }, + }) +} + func testTransactionFetcherParallel(t *testing.T, tt txFetcherTest) { t.Parallel() testTransactionFetcher(t, tt)