From 59ad40e562ffd235a735d447514da8ccfddebc84 Mon Sep 17 00:00:00 2001 From: Csaba Kiraly Date: Tue, 24 Feb 2026 04:21:03 -0600 Subject: [PATCH] eth: check for tx on chain as well (#33607) The fetcher should not fetch transactions that are already on chain. Until now we were only checking in the txpool, but that does not have the old transaction. This was leading to extra fetches of transactions that were announced by a peer but are already on chain. Here we extend the check to the chain as well. --- eth/fetcher/metrics.go | 1 + eth/fetcher/tx_fetcher.go | 93 ++++++++++++++++----- eth/fetcher/tx_fetcher_test.go | 2 + eth/handler.go | 4 +- tests/fuzzers/txfetcher/txfetcher_fuzzer.go | 1 + 5 files changed, 75 insertions(+), 26 deletions(-) diff --git a/eth/fetcher/metrics.go b/eth/fetcher/metrics.go index fd1678dd30..3c0d6a8fd8 100644 --- a/eth/fetcher/metrics.go +++ b/eth/fetcher/metrics.go @@ -24,6 +24,7 @@ var ( txAnnounceInMeter = metrics.NewRegisteredMeter("eth/fetcher/transaction/announces/in", nil) txAnnounceKnownMeter = metrics.NewRegisteredMeter("eth/fetcher/transaction/announces/known", nil) txAnnounceUnderpricedMeter = metrics.NewRegisteredMeter("eth/fetcher/transaction/announces/underpriced", nil) + txAnnounceOnchainMeter = metrics.NewRegisteredMeter("eth/fetcher/transaction/announces/onchain", nil) txAnnounceDOSMeter = metrics.NewRegisteredMeter("eth/fetcher/transaction/announces/dos", nil) txBroadcastInMeter = metrics.NewRegisteredMeter("eth/fetcher/transaction/broadcasts/in", nil) diff --git a/eth/fetcher/tx_fetcher.go b/eth/fetcher/tx_fetcher.go index 50d6f2f7ad..bc422e6abe 100644 --- a/eth/fetcher/tx_fetcher.go +++ b/eth/fetcher/tx_fetcher.go @@ -27,6 +27,7 @@ import ( "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/common/lru" "github.com/ethereum/go-ethereum/common/mclock" + "github.com/ethereum/go-ethereum/core" "github.com/ethereum/go-ethereum/core/txpool" "github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/log" @@ -71,6 +72,11 @@ const ( // addTxsBatchSize it the max number of transactions to add in a single batch from a peer. addTxsBatchSize = 128 + + // txOnChainCacheLimit is number of on-chain transactions to keep in a cache to avoid + // re-fetching them soon after they are mined. + // Approx 1MB for 30 minutes of transactions at 18 tps + txOnChainCacheLimit = 32768 ) var ( @@ -152,6 +158,9 @@ type TxFetcher struct { txSeq uint64 // Unique transaction sequence number underpriced *lru.Cache[common.Hash, time.Time] // Transactions discarded as too cheap (don't re-fetch) + chain *core.BlockChain // Blockchain interface for on-chain checks + txOnChainCache *lru.Cache[common.Hash, struct{}] // Cache to avoid fetching once the tx gets on chain + // Stage 1: Waiting lists for newly discovered transactions that might be // broadcast without needing explicit request/reply round trips. waitlist map[common.Hash]map[string]struct{} // Transactions waiting for an potential broadcast @@ -184,36 +193,40 @@ type TxFetcher struct { // NewTxFetcher creates a transaction fetcher to retrieve transaction // based on hash announcements. -func NewTxFetcher(validateMeta func(common.Hash, byte) error, addTxs func([]*types.Transaction) []error, fetchTxs func(string, []common.Hash) error, dropPeer func(string)) *TxFetcher { - return NewTxFetcherForTests(validateMeta, addTxs, fetchTxs, dropPeer, mclock.System{}, time.Now, nil) +// Chain can be nil to disable on-chain checks. +func NewTxFetcher(chain *core.BlockChain, validateMeta func(common.Hash, byte) error, addTxs func([]*types.Transaction) []error, fetchTxs func(string, []common.Hash) error, dropPeer func(string)) *TxFetcher { + return NewTxFetcherForTests(chain, validateMeta, addTxs, fetchTxs, dropPeer, mclock.System{}, time.Now, nil) } // NewTxFetcherForTests is a testing method to mock out the realtime clock with // a simulated version and the internal randomness with a deterministic one. +// Chain can be nil to disable on-chain checks. func NewTxFetcherForTests( - validateMeta func(common.Hash, byte) error, addTxs func([]*types.Transaction) []error, fetchTxs func(string, []common.Hash) error, dropPeer func(string), + chain *core.BlockChain, validateMeta func(common.Hash, byte) error, addTxs func([]*types.Transaction) []error, fetchTxs func(string, []common.Hash) error, dropPeer func(string), clock mclock.Clock, realTime func() time.Time, rand *mrand.Rand) *TxFetcher { return &TxFetcher{ - notify: make(chan *txAnnounce), - cleanup: make(chan *txDelivery), - drop: make(chan *txDrop), - quit: make(chan struct{}), - waitlist: make(map[common.Hash]map[string]struct{}), - waittime: make(map[common.Hash]mclock.AbsTime), - waitslots: make(map[string]map[common.Hash]*txMetadataWithSeq), - announces: make(map[string]map[common.Hash]*txMetadataWithSeq), - announced: make(map[common.Hash]map[string]struct{}), - fetching: make(map[common.Hash]string), - requests: make(map[string]*txRequest), - alternates: make(map[common.Hash]map[string]struct{}), - underpriced: lru.NewCache[common.Hash, time.Time](maxTxUnderpricedSetSize), - validateMeta: validateMeta, - addTxs: addTxs, - fetchTxs: fetchTxs, - dropPeer: dropPeer, - clock: clock, - realTime: realTime, - rand: rand, + notify: make(chan *txAnnounce), + cleanup: make(chan *txDelivery), + drop: make(chan *txDrop), + quit: make(chan struct{}), + waitlist: make(map[common.Hash]map[string]struct{}), + waittime: make(map[common.Hash]mclock.AbsTime), + waitslots: make(map[string]map[common.Hash]*txMetadataWithSeq), + announces: make(map[string]map[common.Hash]*txMetadataWithSeq), + announced: make(map[common.Hash]map[string]struct{}), + fetching: make(map[common.Hash]string), + requests: make(map[string]*txRequest), + alternates: make(map[common.Hash]map[string]struct{}), + underpriced: lru.NewCache[common.Hash, time.Time](maxTxUnderpricedSetSize), + txOnChainCache: lru.NewCache[common.Hash, struct{}](txOnChainCacheLimit), + chain: chain, + validateMeta: validateMeta, + addTxs: addTxs, + fetchTxs: fetchTxs, + dropPeer: dropPeer, + clock: clock, + realTime: realTime, + rand: rand, } } @@ -233,6 +246,7 @@ func (f *TxFetcher) Notify(peer string, types []byte, sizes []uint32, hashes []c unknownMetas = make([]txMetadata, 0, len(hashes)) duplicate int64 + onchain int64 underpriced int64 ) for i, hash := range hashes { @@ -245,6 +259,12 @@ func (f *TxFetcher) Notify(peer string, types []byte, sizes []uint32, hashes []c continue } + // check on chain as well (no need to check limbo separately, as chain checks limbo too) + if _, exist := f.txOnChainCache.Get(hash); exist { + onchain++ + continue + } + if f.isKnownUnderpriced(hash) { underpriced++ continue @@ -259,6 +279,7 @@ func (f *TxFetcher) Notify(peer string, types []byte, sizes []uint32, hashes []c } txAnnounceKnownMeter.Mark(duplicate) txAnnounceUnderpricedMeter.Mark(underpriced) + txAnnounceOnchainMeter.Mark(onchain) // If anything's left to announce, push it into the internal loop if len(unknownHashes) == 0 { @@ -412,7 +433,18 @@ func (f *TxFetcher) loop() { waitTrigger = make(chan struct{}, 1) timeoutTrigger = make(chan struct{}, 1) + + oldHead *types.Header ) + + // Subscribe to chain events to know when transactions are added to chain + var headEventCh chan core.ChainEvent + if f.chain != nil { + headEventCh = make(chan core.ChainEvent, 10) + sub := f.chain.SubscribeChainEvent(headEventCh) + defer sub.Unsubscribe() + } + for { select { case ann := <-f.notify: @@ -837,6 +869,21 @@ func (f *TxFetcher) loop() { f.rescheduleTimeout(timeoutTimer, timeoutTrigger) } + case ev := <-headEventCh: + // New head(s) added + newHead := ev.Header + if oldHead != nil && newHead.ParentHash != oldHead.Hash() { + // Reorg or setHead detected, clear the cache. We could be smarter here and + // only remove/add the diff, but this is simpler and not being exact here + // only results in a few more fetches. + f.txOnChainCache.Purge() + } + oldHead = newHead + // Add all transactions from the new block to the on-chain cache + for _, tx := range ev.Transactions { + f.txOnChainCache.Add(tx.Hash(), struct{}{}) + } + case <-f.quit: return } diff --git a/eth/fetcher/tx_fetcher_test.go b/eth/fetcher/tx_fetcher_test.go index 87fbe9f38c..6c2719631e 100644 --- a/eth/fetcher/tx_fetcher_test.go +++ b/eth/fetcher/tx_fetcher_test.go @@ -91,6 +91,7 @@ type txFetcherTest struct { // and deterministic randomness. func newTestTxFetcher() *TxFetcher { return NewTxFetcher( + nil, func(common.Hash, byte) error { return nil }, func(txs []*types.Transaction) []error { return make([]error, len(txs)) @@ -2191,6 +2192,7 @@ func TestTransactionForgotten(t *testing.T) { } fetcher := NewTxFetcherForTests( + nil, func(common.Hash, byte) error { return nil }, func(txs []*types.Transaction) []error { errs := make([]error, len(txs)) diff --git a/eth/handler.go b/eth/handler.go index 46634cae88..bb2cd5f88b 100644 --- a/eth/handler.go +++ b/eth/handler.go @@ -179,7 +179,6 @@ func newHandler(config *handlerConfig) (*handler, error) { addTxs := func(txs []*types.Transaction) []error { return h.txpool.Add(txs, false) } - validateMeta := func(tx common.Hash, kind byte) error { if h.txpool.Has(tx) { return txpool.ErrAlreadyKnown @@ -189,8 +188,7 @@ func newHandler(config *handlerConfig) (*handler, error) { } return nil } - - h.txFetcher = fetcher.NewTxFetcher(validateMeta, addTxs, fetchTx, h.removePeer) + h.txFetcher = fetcher.NewTxFetcher(h.chain, validateMeta, addTxs, fetchTx, h.removePeer) return h, nil } diff --git a/tests/fuzzers/txfetcher/txfetcher_fuzzer.go b/tests/fuzzers/txfetcher/txfetcher_fuzzer.go index 3baff33dcc..bcceaff383 100644 --- a/tests/fuzzers/txfetcher/txfetcher_fuzzer.go +++ b/tests/fuzzers/txfetcher/txfetcher_fuzzer.go @@ -78,6 +78,7 @@ func fuzz(input []byte) int { rand := rand.New(rand.NewSource(0x3a29)) // Same used in package tests!!! f := fetcher.NewTxFetcherForTests( + nil, func(common.Hash, byte) error { return nil }, func(txs []*types.Transaction) []error { return make([]error, len(txs))