From 56d201b0feb90b3e4a863349d0883c5502bc792f Mon Sep 17 00:00:00 2001 From: Bosul Mun Date: Thu, 11 Dec 2025 13:11:52 +0900 Subject: [PATCH] eth/fetcher: add metadata validation in tx announcement (#33378) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit This PR fixes the bug reported in #33365. The impact of the bug is not catastrophic. After a transaction is ultimately fetched, validation and propagation will be performed based on the fetched body, and any response with a mismatched type is treated as a protocol violation. An attacker could only waste the limited portion of victim’s bandwidth at most. However, the reasons for submitting this PR are as follows 1. Fetching a transaction announced with an arbitrary type is a weird behavior. 2. It aligns with efforts such as EIP-8077 and #33119 to make the fetcher smarter and reduce bandwidth waste. Regarding the `FilterType` function, it could potentially be implemented by modifying the Filter function's parameter itself, but I wasn’t sure whether changing that function is acceptable, so I left it as is. --- core/txpool/blobpool/blobpool.go | 7 +- core/txpool/legacypool/legacypool.go | 7 +- core/txpool/subpool.go | 3 + core/txpool/txpool.go | 11 +++ eth/fetcher/tx_fetcher.go | 83 +++++++++++--------- eth/fetcher/tx_fetcher_test.go | 85 ++++++++++++++------- eth/handler.go | 16 +++- eth/handler_test.go | 9 +++ tests/fuzzers/txfetcher/txfetcher_fuzzer.go | 2 +- 9 files changed, 153 insertions(+), 70 deletions(-) diff --git a/core/txpool/blobpool/blobpool.go b/core/txpool/blobpool/blobpool.go index bfaf4d5b8e..e49fe7bb61 100644 --- a/core/txpool/blobpool/blobpool.go +++ b/core/txpool/blobpool/blobpool.go @@ -377,7 +377,12 @@ func New(config Config, chain BlockChain, hasPendingAuth func(common.Address) bo // Filter returns whether the given transaction can be consumed by the blob pool. func (p *BlobPool) Filter(tx *types.Transaction) bool { - return tx.Type() == types.BlobTxType + return p.FilterType(tx.Type()) +} + +// FilterType returns whether the blob pool supports the given transaction type. +func (p *BlobPool) FilterType(kind byte) bool { + return kind == types.BlobTxType } // Init sets the gas price needed to keep a transaction in the pool and the chain diff --git a/core/txpool/legacypool/legacypool.go b/core/txpool/legacypool/legacypool.go index ceedc74a53..5f8dd4fac8 100644 --- a/core/txpool/legacypool/legacypool.go +++ b/core/txpool/legacypool/legacypool.go @@ -288,7 +288,12 @@ func New(config Config, chain BlockChain) *LegacyPool { // Filter returns whether the given transaction can be consumed by the legacy // pool, specifically, whether it is a Legacy, AccessList or Dynamic transaction. func (pool *LegacyPool) Filter(tx *types.Transaction) bool { - switch tx.Type() { + return pool.FilterType(tx.Type()) +} + +// FilterType returns whether the legacy pool supports the given transaction type. +func (pool *LegacyPool) FilterType(kind byte) bool { + switch kind { case types.LegacyTxType, types.AccessListTxType, types.DynamicFeeTxType, types.SetCodeTxType: return true default: diff --git a/core/txpool/subpool.go b/core/txpool/subpool.go index 519ae7b989..db099ddf98 100644 --- a/core/txpool/subpool.go +++ b/core/txpool/subpool.go @@ -100,6 +100,9 @@ type SubPool interface { // to this particular subpool. Filter(tx *types.Transaction) bool + // FilterType returns whether the subpool supports the given transaction type. + FilterType(kind byte) bool + // Init sets the base parameters of the subpool, allowing it to load any saved // transactions from disk and also permitting internal maintenance routines to // start up. diff --git a/core/txpool/txpool.go b/core/txpool/txpool.go index 437861efca..a314a83f1b 100644 --- a/core/txpool/txpool.go +++ b/core/txpool/txpool.go @@ -489,3 +489,14 @@ func (p *TxPool) Clear() { subpool.Clear() } } + +// FilterType returns whether a transaction with the given type is supported +// (can be added) by the pool. +func (p *TxPool) FilterType(kind byte) bool { + for _, subpool := range p.subpools { + if subpool.FilterType(kind) { + return true + } + } + return false +} diff --git a/eth/fetcher/tx_fetcher.go b/eth/fetcher/tx_fetcher.go index d919ac8a5f..f024f3aeba 100644 --- a/eth/fetcher/tx_fetcher.go +++ b/eth/fetcher/tx_fetcher.go @@ -170,10 +170,10 @@ type TxFetcher struct { alternates map[common.Hash]map[string]struct{} // In-flight transaction alternate origins if retrieval fails // Callbacks - hasTx func(common.Hash) bool // Retrieves a tx from the local txpool - addTxs func([]*types.Transaction) []error // Insert a batch of transactions into local txpool - fetchTxs func(string, []common.Hash) error // Retrieves a set of txs from a remote peer - dropPeer func(string) // Drops a peer in case of announcement violation + validateMeta func(common.Hash, byte) error // Validate a tx metadata based on the local txpool + addTxs func([]*types.Transaction) []error // Insert a batch of transactions into local txpool + fetchTxs func(string, []common.Hash) error // Retrieves a set of txs from a remote peer + dropPeer func(string) // Drops a peer in case of announcement violation step chan struct{} // Notification channel when the fetcher loop iterates clock mclock.Clock // Monotonic clock or simulated clock for tests @@ -183,36 +183,36 @@ type TxFetcher struct { // NewTxFetcher creates a transaction fetcher to retrieve transaction // based on hash announcements. -func NewTxFetcher(hasTx func(common.Hash) bool, addTxs func([]*types.Transaction) []error, fetchTxs func(string, []common.Hash) error, dropPeer func(string)) *TxFetcher { - return NewTxFetcherForTests(hasTx, addTxs, fetchTxs, dropPeer, mclock.System{}, time.Now, nil) +func NewTxFetcher(validateMeta func(common.Hash, byte) error, addTxs func([]*types.Transaction) []error, fetchTxs func(string, []common.Hash) error, dropPeer func(string)) *TxFetcher { + return NewTxFetcherForTests(validateMeta, addTxs, fetchTxs, dropPeer, mclock.System{}, time.Now, nil) } // NewTxFetcherForTests is a testing method to mock out the realtime clock with // a simulated version and the internal randomness with a deterministic one. func NewTxFetcherForTests( - hasTx func(common.Hash) bool, addTxs func([]*types.Transaction) []error, fetchTxs func(string, []common.Hash) error, dropPeer func(string), + validateMeta func(common.Hash, byte) error, addTxs func([]*types.Transaction) []error, fetchTxs func(string, []common.Hash) error, dropPeer func(string), clock mclock.Clock, realTime func() time.Time, rand *mrand.Rand) *TxFetcher { return &TxFetcher{ - notify: make(chan *txAnnounce), - cleanup: make(chan *txDelivery), - drop: make(chan *txDrop), - quit: make(chan struct{}), - waitlist: make(map[common.Hash]map[string]struct{}), - waittime: make(map[common.Hash]mclock.AbsTime), - waitslots: make(map[string]map[common.Hash]*txMetadataWithSeq), - announces: make(map[string]map[common.Hash]*txMetadataWithSeq), - announced: make(map[common.Hash]map[string]struct{}), - fetching: make(map[common.Hash]string), - requests: make(map[string]*txRequest), - alternates: make(map[common.Hash]map[string]struct{}), - underpriced: lru.NewCache[common.Hash, time.Time](maxTxUnderpricedSetSize), - hasTx: hasTx, - addTxs: addTxs, - fetchTxs: fetchTxs, - dropPeer: dropPeer, - clock: clock, - realTime: realTime, - rand: rand, + notify: make(chan *txAnnounce), + cleanup: make(chan *txDelivery), + drop: make(chan *txDrop), + quit: make(chan struct{}), + waitlist: make(map[common.Hash]map[string]struct{}), + waittime: make(map[common.Hash]mclock.AbsTime), + waitslots: make(map[string]map[common.Hash]*txMetadataWithSeq), + announces: make(map[string]map[common.Hash]*txMetadataWithSeq), + announced: make(map[common.Hash]map[string]struct{}), + fetching: make(map[common.Hash]string), + requests: make(map[string]*txRequest), + alternates: make(map[common.Hash]map[string]struct{}), + underpriced: lru.NewCache[common.Hash, time.Time](maxTxUnderpricedSetSize), + validateMeta: validateMeta, + addTxs: addTxs, + fetchTxs: fetchTxs, + dropPeer: dropPeer, + clock: clock, + realTime: realTime, + rand: rand, } } @@ -235,19 +235,26 @@ func (f *TxFetcher) Notify(peer string, types []byte, sizes []uint32, hashes []c underpriced int64 ) for i, hash := range hashes { - switch { - case f.hasTx(hash): + err := f.validateMeta(hash, types[i]) + if errors.Is(err, txpool.ErrAlreadyKnown) { duplicate++ - case f.isKnownUnderpriced(hash): - underpriced++ - default: - unknownHashes = append(unknownHashes, hash) - - // Transaction metadata has been available since eth68, and all - // legacy eth protocols (prior to eth68) have been deprecated. - // Therefore, metadata is always expected in the announcement. - unknownMetas = append(unknownMetas, txMetadata{kind: types[i], size: sizes[i]}) + continue } + if err != nil { + continue + } + + if f.isKnownUnderpriced(hash) { + underpriced++ + continue + } + + unknownHashes = append(unknownHashes, hash) + + // Transaction metadata has been available since eth68, and all + // legacy eth protocols (prior to eth68) have been deprecated. + // Therefore, metadata is always expected in the announcement. + unknownMetas = append(unknownMetas, txMetadata{kind: types[i], size: sizes[i]}) } txAnnounceKnownMeter.Mark(duplicate) txAnnounceUnderpricedMeter.Mark(underpriced) diff --git a/eth/fetcher/tx_fetcher_test.go b/eth/fetcher/tx_fetcher_test.go index bb41f62932..d6d5a8692e 100644 --- a/eth/fetcher/tx_fetcher_test.go +++ b/eth/fetcher/tx_fetcher_test.go @@ -93,7 +93,7 @@ func TestTransactionFetcherWaiting(t *testing.T) { testTransactionFetcherParallel(t, txFetcherTest{ init: func() *TxFetcher { return NewTxFetcher( - func(common.Hash) bool { return false }, + func(common.Hash, byte) error { return nil }, nil, func(string, []common.Hash) error { return nil }, nil, @@ -295,7 +295,7 @@ func TestTransactionFetcherSkipWaiting(t *testing.T) { testTransactionFetcherParallel(t, txFetcherTest{ init: func() *TxFetcher { return NewTxFetcher( - func(common.Hash) bool { return false }, + func(common.Hash, byte) error { return nil }, nil, func(string, []common.Hash) error { return nil }, nil, @@ -385,7 +385,7 @@ func TestTransactionFetcherSingletonRequesting(t *testing.T) { testTransactionFetcherParallel(t, txFetcherTest{ init: func() *TxFetcher { return NewTxFetcher( - func(common.Hash) bool { return false }, + func(common.Hash, byte) error { return nil }, nil, func(string, []common.Hash) error { return nil }, nil, @@ -490,7 +490,7 @@ func TestTransactionFetcherFailedRescheduling(t *testing.T) { testTransactionFetcherParallel(t, txFetcherTest{ init: func() *TxFetcher { return NewTxFetcher( - func(common.Hash) bool { return false }, + func(common.Hash, byte) error { return nil }, nil, func(origin string, hashes []common.Hash) error { <-proceed @@ -574,7 +574,7 @@ func TestTransactionFetcherCleanup(t *testing.T) { testTransactionFetcherParallel(t, txFetcherTest{ init: func() *TxFetcher { return NewTxFetcher( - func(common.Hash) bool { return false }, + func(common.Hash, byte) error { return nil }, func(txs []*types.Transaction) []error { return make([]error, len(txs)) }, @@ -618,7 +618,7 @@ func TestTransactionFetcherCleanupEmpty(t *testing.T) { testTransactionFetcherParallel(t, txFetcherTest{ init: func() *TxFetcher { return NewTxFetcher( - func(common.Hash) bool { return false }, + func(common.Hash, byte) error { return nil }, func(txs []*types.Transaction) []error { return make([]error, len(txs)) }, @@ -661,7 +661,7 @@ func TestTransactionFetcherMissingRescheduling(t *testing.T) { testTransactionFetcherParallel(t, txFetcherTest{ init: func() *TxFetcher { return NewTxFetcher( - func(common.Hash) bool { return false }, + func(common.Hash, byte) error { return nil }, func(txs []*types.Transaction) []error { return make([]error, len(txs)) }, @@ -722,7 +722,7 @@ func TestTransactionFetcherMissingCleanup(t *testing.T) { testTransactionFetcherParallel(t, txFetcherTest{ init: func() *TxFetcher { return NewTxFetcher( - func(common.Hash) bool { return false }, + func(common.Hash, byte) error { return nil }, func(txs []*types.Transaction) []error { return make([]error, len(txs)) }, @@ -771,7 +771,7 @@ func TestTransactionFetcherBroadcasts(t *testing.T) { testTransactionFetcherParallel(t, txFetcherTest{ init: func() *TxFetcher { return NewTxFetcher( - func(common.Hash) bool { return false }, + func(common.Hash, byte) error { return nil }, func(txs []*types.Transaction) []error { return make([]error, len(txs)) }, @@ -827,7 +827,7 @@ func TestTransactionFetcherWaitTimerResets(t *testing.T) { testTransactionFetcherParallel(t, txFetcherTest{ init: func() *TxFetcher { return NewTxFetcher( - func(common.Hash) bool { return false }, + func(common.Hash, byte) error { return nil }, nil, func(string, []common.Hash) error { return nil }, nil, @@ -897,7 +897,7 @@ func TestTransactionFetcherTimeoutRescheduling(t *testing.T) { testTransactionFetcherParallel(t, txFetcherTest{ init: func() *TxFetcher { return NewTxFetcher( - func(common.Hash) bool { return false }, + func(common.Hash, byte) error { return nil }, func(txs []*types.Transaction) []error { return make([]error, len(txs)) }, @@ -975,7 +975,7 @@ func TestTransactionFetcherTimeoutTimerResets(t *testing.T) { testTransactionFetcherParallel(t, txFetcherTest{ init: func() *TxFetcher { return NewTxFetcher( - func(common.Hash) bool { return false }, + func(common.Hash, byte) error { return nil }, nil, func(string, []common.Hash) error { return nil }, nil, @@ -1053,7 +1053,7 @@ func TestTransactionFetcherRateLimiting(t *testing.T) { testTransactionFetcherParallel(t, txFetcherTest{ init: func() *TxFetcher { return NewTxFetcher( - func(common.Hash) bool { return false }, + func(common.Hash, byte) error { return nil }, nil, func(string, []common.Hash) error { return nil }, nil, @@ -1083,7 +1083,7 @@ func TestTransactionFetcherBandwidthLimiting(t *testing.T) { testTransactionFetcherParallel(t, txFetcherTest{ init: func() *TxFetcher { return NewTxFetcher( - func(common.Hash) bool { return false }, + func(common.Hash, byte) error { return nil }, nil, func(string, []common.Hash) error { return nil }, nil, @@ -1200,7 +1200,7 @@ func TestTransactionFetcherDoSProtection(t *testing.T) { testTransactionFetcherParallel(t, txFetcherTest{ init: func() *TxFetcher { return NewTxFetcher( - func(common.Hash) bool { return false }, + func(common.Hash, byte) error { return nil }, nil, func(string, []common.Hash) error { return nil }, nil, @@ -1267,7 +1267,7 @@ func TestTransactionFetcherUnderpricedDedup(t *testing.T) { testTransactionFetcherParallel(t, txFetcherTest{ init: func() *TxFetcher { return NewTxFetcher( - func(common.Hash) bool { return false }, + func(common.Hash, byte) error { return nil }, func(txs []*types.Transaction) []error { errs := make([]error, len(txs)) for i := 0; i < len(errs); i++ { @@ -1368,7 +1368,7 @@ func TestTransactionFetcherUnderpricedDoSProtection(t *testing.T) { testTransactionFetcher(t, txFetcherTest{ init: func() *TxFetcher { return NewTxFetcher( - func(common.Hash) bool { return false }, + func(common.Hash, byte) error { return nil }, func(txs []*types.Transaction) []error { errs := make([]error, len(txs)) for i := 0; i < len(errs); i++ { @@ -1400,7 +1400,7 @@ func TestTransactionFetcherOutOfBoundDeliveries(t *testing.T) { testTransactionFetcherParallel(t, txFetcherTest{ init: func() *TxFetcher { return NewTxFetcher( - func(common.Hash) bool { return false }, + func(common.Hash, byte) error { return nil }, func(txs []*types.Transaction) []error { return make([]error, len(txs)) }, @@ -1459,7 +1459,7 @@ func TestTransactionFetcherDrop(t *testing.T) { testTransactionFetcherParallel(t, txFetcherTest{ init: func() *TxFetcher { return NewTxFetcher( - func(common.Hash) bool { return false }, + func(common.Hash, byte) error { return nil }, func(txs []*types.Transaction) []error { return make([]error, len(txs)) }, @@ -1533,7 +1533,7 @@ func TestTransactionFetcherDropRescheduling(t *testing.T) { testTransactionFetcherParallel(t, txFetcherTest{ init: func() *TxFetcher { return NewTxFetcher( - func(common.Hash) bool { return false }, + func(common.Hash, byte) error { return nil }, func(txs []*types.Transaction) []error { return make([]error, len(txs)) }, @@ -1579,7 +1579,7 @@ func TestInvalidAnnounceMetadata(t *testing.T) { testTransactionFetcherParallel(t, txFetcherTest{ init: func() *TxFetcher { return NewTxFetcher( - func(common.Hash) bool { return false }, + func(common.Hash, byte) error { return nil }, func(txs []*types.Transaction) []error { return make([]error, len(txs)) }, @@ -1662,7 +1662,7 @@ func TestTransactionFetcherFuzzCrash01(t *testing.T) { testTransactionFetcherParallel(t, txFetcherTest{ init: func() *TxFetcher { return NewTxFetcher( - func(common.Hash) bool { return false }, + func(common.Hash, byte) error { return nil }, func(txs []*types.Transaction) []error { return make([]error, len(txs)) }, @@ -1690,7 +1690,7 @@ func TestTransactionFetcherFuzzCrash02(t *testing.T) { testTransactionFetcherParallel(t, txFetcherTest{ init: func() *TxFetcher { return NewTxFetcher( - func(common.Hash) bool { return false }, + func(common.Hash, byte) error { return nil }, func(txs []*types.Transaction) []error { return make([]error, len(txs)) }, @@ -1720,7 +1720,7 @@ func TestTransactionFetcherFuzzCrash03(t *testing.T) { testTransactionFetcherParallel(t, txFetcherTest{ init: func() *TxFetcher { return NewTxFetcher( - func(common.Hash) bool { return false }, + func(common.Hash, byte) error { return nil }, func(txs []*types.Transaction) []error { return make([]error, len(txs)) }, @@ -1759,7 +1759,7 @@ func TestTransactionFetcherFuzzCrash04(t *testing.T) { testTransactionFetcherParallel(t, txFetcherTest{ init: func() *TxFetcher { return NewTxFetcher( - func(common.Hash) bool { return false }, + func(common.Hash, byte) error { return nil }, func(txs []*types.Transaction) []error { return make([]error, len(txs)) }, @@ -1794,7 +1794,7 @@ func TestBlobTransactionAnnounce(t *testing.T) { testTransactionFetcherParallel(t, txFetcherTest{ init: func() *TxFetcher { return NewTxFetcher( - func(common.Hash) bool { return false }, + func(common.Hash, byte) error { return nil }, nil, func(string, []common.Hash) error { return nil }, nil, @@ -1862,7 +1862,7 @@ func TestTransactionFetcherDropAlternates(t *testing.T) { testTransactionFetcherParallel(t, txFetcherTest{ init: func() *TxFetcher { return NewTxFetcher( - func(common.Hash) bool { return false }, + func(common.Hash, byte) error { return nil }, func(txs []*types.Transaction) []error { return make([]error, len(txs)) }, @@ -1908,6 +1908,35 @@ func TestTransactionFetcherDropAlternates(t *testing.T) { }) } +func TestTransactionFetcherWrongMetadata(t *testing.T) { + testTransactionFetcherParallel(t, txFetcherTest{ + init: func() *TxFetcher { + return NewTxFetcher( + func(_ common.Hash, kind byte) error { + switch kind { + case types.LegacyTxType, types.AccessListTxType, types.DynamicFeeTxType, types.BlobTxType, types.SetCodeTxType: + return nil + } + return types.ErrTxTypeNotSupported + }, + func(txs []*types.Transaction) []error { + return make([]error, len(txs)) + }, + func(string, []common.Hash) error { return nil }, + nil, + ) + }, + steps: []interface{}{ + doTxNotify{peer: "A", hashes: []common.Hash{{0x01}, {0x02}}, types: []byte{0xff, types.LegacyTxType}, sizes: []uint32{111, 222}}, + isWaiting(map[string][]announce{ + "A": { + {common.Hash{0x02}, types.LegacyTxType, 222}, + }, + }), + }, + }) +} + func testTransactionFetcherParallel(t *testing.T, tt txFetcherTest) { t.Parallel() testTransactionFetcher(t, tt) @@ -2245,7 +2274,7 @@ func TestTransactionForgotten(t *testing.T) { } fetcher := NewTxFetcherForTests( - func(common.Hash) bool { return false }, + func(common.Hash, byte) error { return nil }, func(txs []*types.Transaction) []error { errs := make([]error, len(txs)) for i := 0; i < len(errs); i++ { diff --git a/eth/handler.go b/eth/handler.go index 4510dd32f0..0d07e88c7a 100644 --- a/eth/handler.go +++ b/eth/handler.go @@ -92,6 +92,9 @@ type txPool interface { // can decide whether to receive notifications only for newly seen transactions // or also for reorged out ones. SubscribeTransactions(ch chan<- core.NewTxsEvent, reorgs bool) event.Subscription + + // FilterType returns whether the given tx type is supported by the txPool. + FilterType(kind byte) bool } // handlerConfig is the collection of initialization parameters to create a full @@ -176,7 +179,18 @@ func newHandler(config *handlerConfig) (*handler, error) { addTxs := func(txs []*types.Transaction) []error { return h.txpool.Add(txs, false) } - h.txFetcher = fetcher.NewTxFetcher(h.txpool.Has, addTxs, fetchTx, h.removePeer) + + validateMeta := func(tx common.Hash, kind byte) error { + if h.txpool.Has(tx) { + return txpool.ErrAlreadyKnown + } + if !h.txpool.FilterType(kind) { + return types.ErrTxTypeNotSupported + } + return nil + } + + h.txFetcher = fetcher.NewTxFetcher(validateMeta, addTxs, fetchTx, h.removePeer) return h, nil } diff --git a/eth/handler_test.go b/eth/handler_test.go index 312e5625ba..3470452980 100644 --- a/eth/handler_test.go +++ b/eth/handler_test.go @@ -163,6 +163,15 @@ func (p *testTxPool) SubscribeTransactions(ch chan<- core.NewTxsEvent, reorgs bo return p.txFeed.Subscribe(ch) } +// FilterType should check whether the pool supports the given type of transactions. +func (p *testTxPool) FilterType(kind byte) bool { + switch kind { + case types.LegacyTxType, types.AccessListTxType, types.DynamicFeeTxType, types.BlobTxType, types.SetCodeTxType: + return true + } + return false +} + // testHandler is a live implementation of the Ethereum protocol handler, just // preinitialized with some sane testing defaults and the transaction pool mocked // out. diff --git a/tests/fuzzers/txfetcher/txfetcher_fuzzer.go b/tests/fuzzers/txfetcher/txfetcher_fuzzer.go index c136253a62..3baff33dcc 100644 --- a/tests/fuzzers/txfetcher/txfetcher_fuzzer.go +++ b/tests/fuzzers/txfetcher/txfetcher_fuzzer.go @@ -78,7 +78,7 @@ func fuzz(input []byte) int { rand := rand.New(rand.NewSource(0x3a29)) // Same used in package tests!!! f := fetcher.NewTxFetcherForTests( - func(common.Hash) bool { return false }, + func(common.Hash, byte) error { return nil }, func(txs []*types.Transaction) []error { return make([]error, len(txs)) },