eth: check for tx on chain as well (#33607)

The fetcher should not fetch transactions that are already on chain.
Until now we were only checking in the txpool, but that does not have
the old transaction. This was leading to extra fetches of transactions
that were announced by a peer but are already on chain.

Here we extend the check to the chain as well.
This commit is contained in:
Csaba Kiraly 2026-02-24 04:21:03 -06:00 committed by GitHub
parent c2e1785a48
commit 59ad40e562
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
5 changed files with 75 additions and 26 deletions

View file

@ -24,6 +24,7 @@ var (
txAnnounceInMeter = metrics.NewRegisteredMeter("eth/fetcher/transaction/announces/in", nil) txAnnounceInMeter = metrics.NewRegisteredMeter("eth/fetcher/transaction/announces/in", nil)
txAnnounceKnownMeter = metrics.NewRegisteredMeter("eth/fetcher/transaction/announces/known", nil) txAnnounceKnownMeter = metrics.NewRegisteredMeter("eth/fetcher/transaction/announces/known", nil)
txAnnounceUnderpricedMeter = metrics.NewRegisteredMeter("eth/fetcher/transaction/announces/underpriced", nil) txAnnounceUnderpricedMeter = metrics.NewRegisteredMeter("eth/fetcher/transaction/announces/underpriced", nil)
txAnnounceOnchainMeter = metrics.NewRegisteredMeter("eth/fetcher/transaction/announces/onchain", nil)
txAnnounceDOSMeter = metrics.NewRegisteredMeter("eth/fetcher/transaction/announces/dos", nil) txAnnounceDOSMeter = metrics.NewRegisteredMeter("eth/fetcher/transaction/announces/dos", nil)
txBroadcastInMeter = metrics.NewRegisteredMeter("eth/fetcher/transaction/broadcasts/in", nil) txBroadcastInMeter = metrics.NewRegisteredMeter("eth/fetcher/transaction/broadcasts/in", nil)

View file

@ -27,6 +27,7 @@ import (
"github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/common/lru" "github.com/ethereum/go-ethereum/common/lru"
"github.com/ethereum/go-ethereum/common/mclock" "github.com/ethereum/go-ethereum/common/mclock"
"github.com/ethereum/go-ethereum/core"
"github.com/ethereum/go-ethereum/core/txpool" "github.com/ethereum/go-ethereum/core/txpool"
"github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/log" "github.com/ethereum/go-ethereum/log"
@ -71,6 +72,11 @@ const (
// addTxsBatchSize it the max number of transactions to add in a single batch from a peer. // addTxsBatchSize it the max number of transactions to add in a single batch from a peer.
addTxsBatchSize = 128 addTxsBatchSize = 128
// txOnChainCacheLimit is number of on-chain transactions to keep in a cache to avoid
// re-fetching them soon after they are mined.
// Approx 1MB for 30 minutes of transactions at 18 tps
txOnChainCacheLimit = 32768
) )
var ( var (
@ -152,6 +158,9 @@ type TxFetcher struct {
txSeq uint64 // Unique transaction sequence number txSeq uint64 // Unique transaction sequence number
underpriced *lru.Cache[common.Hash, time.Time] // Transactions discarded as too cheap (don't re-fetch) underpriced *lru.Cache[common.Hash, time.Time] // Transactions discarded as too cheap (don't re-fetch)
chain *core.BlockChain // Blockchain interface for on-chain checks
txOnChainCache *lru.Cache[common.Hash, struct{}] // Cache to avoid fetching once the tx gets on chain
// Stage 1: Waiting lists for newly discovered transactions that might be // Stage 1: Waiting lists for newly discovered transactions that might be
// broadcast without needing explicit request/reply round trips. // broadcast without needing explicit request/reply round trips.
waitlist map[common.Hash]map[string]struct{} // Transactions waiting for an potential broadcast waitlist map[common.Hash]map[string]struct{} // Transactions waiting for an potential broadcast
@ -184,14 +193,16 @@ type TxFetcher struct {
// NewTxFetcher creates a transaction fetcher to retrieve transaction // NewTxFetcher creates a transaction fetcher to retrieve transaction
// based on hash announcements. // based on hash announcements.
func NewTxFetcher(validateMeta func(common.Hash, byte) error, addTxs func([]*types.Transaction) []error, fetchTxs func(string, []common.Hash) error, dropPeer func(string)) *TxFetcher { // Chain can be nil to disable on-chain checks.
return NewTxFetcherForTests(validateMeta, addTxs, fetchTxs, dropPeer, mclock.System{}, time.Now, nil) func NewTxFetcher(chain *core.BlockChain, validateMeta func(common.Hash, byte) error, addTxs func([]*types.Transaction) []error, fetchTxs func(string, []common.Hash) error, dropPeer func(string)) *TxFetcher {
return NewTxFetcherForTests(chain, validateMeta, addTxs, fetchTxs, dropPeer, mclock.System{}, time.Now, nil)
} }
// NewTxFetcherForTests is a testing method to mock out the realtime clock with // NewTxFetcherForTests is a testing method to mock out the realtime clock with
// a simulated version and the internal randomness with a deterministic one. // a simulated version and the internal randomness with a deterministic one.
// Chain can be nil to disable on-chain checks.
func NewTxFetcherForTests( func NewTxFetcherForTests(
validateMeta func(common.Hash, byte) error, addTxs func([]*types.Transaction) []error, fetchTxs func(string, []common.Hash) error, dropPeer func(string), chain *core.BlockChain, validateMeta func(common.Hash, byte) error, addTxs func([]*types.Transaction) []error, fetchTxs func(string, []common.Hash) error, dropPeer func(string),
clock mclock.Clock, realTime func() time.Time, rand *mrand.Rand) *TxFetcher { clock mclock.Clock, realTime func() time.Time, rand *mrand.Rand) *TxFetcher {
return &TxFetcher{ return &TxFetcher{
notify: make(chan *txAnnounce), notify: make(chan *txAnnounce),
@ -207,6 +218,8 @@ func NewTxFetcherForTests(
requests: make(map[string]*txRequest), requests: make(map[string]*txRequest),
alternates: make(map[common.Hash]map[string]struct{}), alternates: make(map[common.Hash]map[string]struct{}),
underpriced: lru.NewCache[common.Hash, time.Time](maxTxUnderpricedSetSize), underpriced: lru.NewCache[common.Hash, time.Time](maxTxUnderpricedSetSize),
txOnChainCache: lru.NewCache[common.Hash, struct{}](txOnChainCacheLimit),
chain: chain,
validateMeta: validateMeta, validateMeta: validateMeta,
addTxs: addTxs, addTxs: addTxs,
fetchTxs: fetchTxs, fetchTxs: fetchTxs,
@ -233,6 +246,7 @@ func (f *TxFetcher) Notify(peer string, types []byte, sizes []uint32, hashes []c
unknownMetas = make([]txMetadata, 0, len(hashes)) unknownMetas = make([]txMetadata, 0, len(hashes))
duplicate int64 duplicate int64
onchain int64
underpriced int64 underpriced int64
) )
for i, hash := range hashes { for i, hash := range hashes {
@ -245,6 +259,12 @@ func (f *TxFetcher) Notify(peer string, types []byte, sizes []uint32, hashes []c
continue continue
} }
// check on chain as well (no need to check limbo separately, as chain checks limbo too)
if _, exist := f.txOnChainCache.Get(hash); exist {
onchain++
continue
}
if f.isKnownUnderpriced(hash) { if f.isKnownUnderpriced(hash) {
underpriced++ underpriced++
continue continue
@ -259,6 +279,7 @@ func (f *TxFetcher) Notify(peer string, types []byte, sizes []uint32, hashes []c
} }
txAnnounceKnownMeter.Mark(duplicate) txAnnounceKnownMeter.Mark(duplicate)
txAnnounceUnderpricedMeter.Mark(underpriced) txAnnounceUnderpricedMeter.Mark(underpriced)
txAnnounceOnchainMeter.Mark(onchain)
// If anything's left to announce, push it into the internal loop // If anything's left to announce, push it into the internal loop
if len(unknownHashes) == 0 { if len(unknownHashes) == 0 {
@ -412,7 +433,18 @@ func (f *TxFetcher) loop() {
waitTrigger = make(chan struct{}, 1) waitTrigger = make(chan struct{}, 1)
timeoutTrigger = make(chan struct{}, 1) timeoutTrigger = make(chan struct{}, 1)
oldHead *types.Header
) )
// Subscribe to chain events to know when transactions are added to chain
var headEventCh chan core.ChainEvent
if f.chain != nil {
headEventCh = make(chan core.ChainEvent, 10)
sub := f.chain.SubscribeChainEvent(headEventCh)
defer sub.Unsubscribe()
}
for { for {
select { select {
case ann := <-f.notify: case ann := <-f.notify:
@ -837,6 +869,21 @@ func (f *TxFetcher) loop() {
f.rescheduleTimeout(timeoutTimer, timeoutTrigger) f.rescheduleTimeout(timeoutTimer, timeoutTrigger)
} }
case ev := <-headEventCh:
// New head(s) added
newHead := ev.Header
if oldHead != nil && newHead.ParentHash != oldHead.Hash() {
// Reorg or setHead detected, clear the cache. We could be smarter here and
// only remove/add the diff, but this is simpler and not being exact here
// only results in a few more fetches.
f.txOnChainCache.Purge()
}
oldHead = newHead
// Add all transactions from the new block to the on-chain cache
for _, tx := range ev.Transactions {
f.txOnChainCache.Add(tx.Hash(), struct{}{})
}
case <-f.quit: case <-f.quit:
return return
} }

View file

@ -91,6 +91,7 @@ type txFetcherTest struct {
// and deterministic randomness. // and deterministic randomness.
func newTestTxFetcher() *TxFetcher { func newTestTxFetcher() *TxFetcher {
return NewTxFetcher( return NewTxFetcher(
nil,
func(common.Hash, byte) error { return nil }, func(common.Hash, byte) error { return nil },
func(txs []*types.Transaction) []error { func(txs []*types.Transaction) []error {
return make([]error, len(txs)) return make([]error, len(txs))
@ -2191,6 +2192,7 @@ func TestTransactionForgotten(t *testing.T) {
} }
fetcher := NewTxFetcherForTests( fetcher := NewTxFetcherForTests(
nil,
func(common.Hash, byte) error { return nil }, func(common.Hash, byte) error { return nil },
func(txs []*types.Transaction) []error { func(txs []*types.Transaction) []error {
errs := make([]error, len(txs)) errs := make([]error, len(txs))

View file

@ -179,7 +179,6 @@ func newHandler(config *handlerConfig) (*handler, error) {
addTxs := func(txs []*types.Transaction) []error { addTxs := func(txs []*types.Transaction) []error {
return h.txpool.Add(txs, false) return h.txpool.Add(txs, false)
} }
validateMeta := func(tx common.Hash, kind byte) error { validateMeta := func(tx common.Hash, kind byte) error {
if h.txpool.Has(tx) { if h.txpool.Has(tx) {
return txpool.ErrAlreadyKnown return txpool.ErrAlreadyKnown
@ -189,8 +188,7 @@ func newHandler(config *handlerConfig) (*handler, error) {
} }
return nil return nil
} }
h.txFetcher = fetcher.NewTxFetcher(h.chain, validateMeta, addTxs, fetchTx, h.removePeer)
h.txFetcher = fetcher.NewTxFetcher(validateMeta, addTxs, fetchTx, h.removePeer)
return h, nil return h, nil
} }

View file

@ -78,6 +78,7 @@ func fuzz(input []byte) int {
rand := rand.New(rand.NewSource(0x3a29)) // Same used in package tests!!! rand := rand.New(rand.NewSource(0x3a29)) // Same used in package tests!!!
f := fetcher.NewTxFetcherForTests( f := fetcher.NewTxFetcherForTests(
nil,
func(common.Hash, byte) error { return nil }, func(common.Hash, byte) error { return nil },
func(txs []*types.Transaction) []error { func(txs []*types.Transaction) []error {
return make([]error, len(txs)) return make([]error, len(txs))