eth/fetcher: add metadata validation in tx announcement (#33378)

This PR fixes the bug reported in #33365.

The impact of the bug is not catastrophic. After a transaction is
ultimately fetched, validation and propagation will be performed based
on the fetched body, and any response with a mismatched type is treated
as a protocol violation. An attacker could only waste the limited
portion of victim’s bandwidth at most.

However, the reasons for submitting this PR are as follows

1. Fetching a transaction announced with an arbitrary type is a weird
behavior.

2. It aligns with efforts such as EIP-8077 and #33119 to make the
fetcher smarter and reduce bandwidth waste.

Regarding the `FilterType` function, it could potentially be implemented
by modifying the Filter function's parameter itself, but I wasn’t sure
whether changing that function is acceptable, so I left it as is.
This commit is contained in:
Bosul Mun 2025-12-11 13:11:52 +09:00 committed by GitHub
parent 1b702f71d9
commit 56d201b0fe
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
9 changed files with 153 additions and 70 deletions

View file

@ -377,7 +377,12 @@ func New(config Config, chain BlockChain, hasPendingAuth func(common.Address) bo
// Filter returns whether the given transaction can be consumed by the blob pool.
func (p *BlobPool) Filter(tx *types.Transaction) bool {
return tx.Type() == types.BlobTxType
return p.FilterType(tx.Type())
}
// FilterType returns whether the blob pool supports the given transaction type.
func (p *BlobPool) FilterType(kind byte) bool {
return kind == types.BlobTxType
}
// Init sets the gas price needed to keep a transaction in the pool and the chain

View file

@ -288,7 +288,12 @@ func New(config Config, chain BlockChain) *LegacyPool {
// Filter returns whether the given transaction can be consumed by the legacy
// pool, specifically, whether it is a Legacy, AccessList or Dynamic transaction.
func (pool *LegacyPool) Filter(tx *types.Transaction) bool {
switch tx.Type() {
return pool.FilterType(tx.Type())
}
// FilterType returns whether the legacy pool supports the given transaction type.
func (pool *LegacyPool) FilterType(kind byte) bool {
switch kind {
case types.LegacyTxType, types.AccessListTxType, types.DynamicFeeTxType, types.SetCodeTxType:
return true
default:

View file

@ -100,6 +100,9 @@ type SubPool interface {
// to this particular subpool.
Filter(tx *types.Transaction) bool
// FilterType returns whether the subpool supports the given transaction type.
FilterType(kind byte) bool
// Init sets the base parameters of the subpool, allowing it to load any saved
// transactions from disk and also permitting internal maintenance routines to
// start up.

View file

@ -489,3 +489,14 @@ func (p *TxPool) Clear() {
subpool.Clear()
}
}
// FilterType returns whether a transaction with the given type is supported
// (can be added) by the pool.
func (p *TxPool) FilterType(kind byte) bool {
for _, subpool := range p.subpools {
if subpool.FilterType(kind) {
return true
}
}
return false
}

View file

@ -170,10 +170,10 @@ type TxFetcher struct {
alternates map[common.Hash]map[string]struct{} // In-flight transaction alternate origins if retrieval fails
// Callbacks
hasTx func(common.Hash) bool // Retrieves a tx from the local txpool
addTxs func([]*types.Transaction) []error // Insert a batch of transactions into local txpool
fetchTxs func(string, []common.Hash) error // Retrieves a set of txs from a remote peer
dropPeer func(string) // Drops a peer in case of announcement violation
validateMeta func(common.Hash, byte) error // Validate a tx metadata based on the local txpool
addTxs func([]*types.Transaction) []error // Insert a batch of transactions into local txpool
fetchTxs func(string, []common.Hash) error // Retrieves a set of txs from a remote peer
dropPeer func(string) // Drops a peer in case of announcement violation
step chan struct{} // Notification channel when the fetcher loop iterates
clock mclock.Clock // Monotonic clock or simulated clock for tests
@ -183,36 +183,36 @@ type TxFetcher struct {
// NewTxFetcher creates a transaction fetcher to retrieve transaction
// based on hash announcements.
func NewTxFetcher(hasTx func(common.Hash) bool, addTxs func([]*types.Transaction) []error, fetchTxs func(string, []common.Hash) error, dropPeer func(string)) *TxFetcher {
return NewTxFetcherForTests(hasTx, addTxs, fetchTxs, dropPeer, mclock.System{}, time.Now, nil)
func NewTxFetcher(validateMeta func(common.Hash, byte) error, addTxs func([]*types.Transaction) []error, fetchTxs func(string, []common.Hash) error, dropPeer func(string)) *TxFetcher {
return NewTxFetcherForTests(validateMeta, addTxs, fetchTxs, dropPeer, mclock.System{}, time.Now, nil)
}
// NewTxFetcherForTests is a testing method to mock out the realtime clock with
// a simulated version and the internal randomness with a deterministic one.
func NewTxFetcherForTests(
hasTx func(common.Hash) bool, addTxs func([]*types.Transaction) []error, fetchTxs func(string, []common.Hash) error, dropPeer func(string),
validateMeta func(common.Hash, byte) error, addTxs func([]*types.Transaction) []error, fetchTxs func(string, []common.Hash) error, dropPeer func(string),
clock mclock.Clock, realTime func() time.Time, rand *mrand.Rand) *TxFetcher {
return &TxFetcher{
notify: make(chan *txAnnounce),
cleanup: make(chan *txDelivery),
drop: make(chan *txDrop),
quit: make(chan struct{}),
waitlist: make(map[common.Hash]map[string]struct{}),
waittime: make(map[common.Hash]mclock.AbsTime),
waitslots: make(map[string]map[common.Hash]*txMetadataWithSeq),
announces: make(map[string]map[common.Hash]*txMetadataWithSeq),
announced: make(map[common.Hash]map[string]struct{}),
fetching: make(map[common.Hash]string),
requests: make(map[string]*txRequest),
alternates: make(map[common.Hash]map[string]struct{}),
underpriced: lru.NewCache[common.Hash, time.Time](maxTxUnderpricedSetSize),
hasTx: hasTx,
addTxs: addTxs,
fetchTxs: fetchTxs,
dropPeer: dropPeer,
clock: clock,
realTime: realTime,
rand: rand,
notify: make(chan *txAnnounce),
cleanup: make(chan *txDelivery),
drop: make(chan *txDrop),
quit: make(chan struct{}),
waitlist: make(map[common.Hash]map[string]struct{}),
waittime: make(map[common.Hash]mclock.AbsTime),
waitslots: make(map[string]map[common.Hash]*txMetadataWithSeq),
announces: make(map[string]map[common.Hash]*txMetadataWithSeq),
announced: make(map[common.Hash]map[string]struct{}),
fetching: make(map[common.Hash]string),
requests: make(map[string]*txRequest),
alternates: make(map[common.Hash]map[string]struct{}),
underpriced: lru.NewCache[common.Hash, time.Time](maxTxUnderpricedSetSize),
validateMeta: validateMeta,
addTxs: addTxs,
fetchTxs: fetchTxs,
dropPeer: dropPeer,
clock: clock,
realTime: realTime,
rand: rand,
}
}
@ -235,19 +235,26 @@ func (f *TxFetcher) Notify(peer string, types []byte, sizes []uint32, hashes []c
underpriced int64
)
for i, hash := range hashes {
switch {
case f.hasTx(hash):
err := f.validateMeta(hash, types[i])
if errors.Is(err, txpool.ErrAlreadyKnown) {
duplicate++
case f.isKnownUnderpriced(hash):
underpriced++
default:
unknownHashes = append(unknownHashes, hash)
// Transaction metadata has been available since eth68, and all
// legacy eth protocols (prior to eth68) have been deprecated.
// Therefore, metadata is always expected in the announcement.
unknownMetas = append(unknownMetas, txMetadata{kind: types[i], size: sizes[i]})
continue
}
if err != nil {
continue
}
if f.isKnownUnderpriced(hash) {
underpriced++
continue
}
unknownHashes = append(unknownHashes, hash)
// Transaction metadata has been available since eth68, and all
// legacy eth protocols (prior to eth68) have been deprecated.
// Therefore, metadata is always expected in the announcement.
unknownMetas = append(unknownMetas, txMetadata{kind: types[i], size: sizes[i]})
}
txAnnounceKnownMeter.Mark(duplicate)
txAnnounceUnderpricedMeter.Mark(underpriced)

View file

@ -93,7 +93,7 @@ func TestTransactionFetcherWaiting(t *testing.T) {
testTransactionFetcherParallel(t, txFetcherTest{
init: func() *TxFetcher {
return NewTxFetcher(
func(common.Hash) bool { return false },
func(common.Hash, byte) error { return nil },
nil,
func(string, []common.Hash) error { return nil },
nil,
@ -295,7 +295,7 @@ func TestTransactionFetcherSkipWaiting(t *testing.T) {
testTransactionFetcherParallel(t, txFetcherTest{
init: func() *TxFetcher {
return NewTxFetcher(
func(common.Hash) bool { return false },
func(common.Hash, byte) error { return nil },
nil,
func(string, []common.Hash) error { return nil },
nil,
@ -385,7 +385,7 @@ func TestTransactionFetcherSingletonRequesting(t *testing.T) {
testTransactionFetcherParallel(t, txFetcherTest{
init: func() *TxFetcher {
return NewTxFetcher(
func(common.Hash) bool { return false },
func(common.Hash, byte) error { return nil },
nil,
func(string, []common.Hash) error { return nil },
nil,
@ -490,7 +490,7 @@ func TestTransactionFetcherFailedRescheduling(t *testing.T) {
testTransactionFetcherParallel(t, txFetcherTest{
init: func() *TxFetcher {
return NewTxFetcher(
func(common.Hash) bool { return false },
func(common.Hash, byte) error { return nil },
nil,
func(origin string, hashes []common.Hash) error {
<-proceed
@ -574,7 +574,7 @@ func TestTransactionFetcherCleanup(t *testing.T) {
testTransactionFetcherParallel(t, txFetcherTest{
init: func() *TxFetcher {
return NewTxFetcher(
func(common.Hash) bool { return false },
func(common.Hash, byte) error { return nil },
func(txs []*types.Transaction) []error {
return make([]error, len(txs))
},
@ -618,7 +618,7 @@ func TestTransactionFetcherCleanupEmpty(t *testing.T) {
testTransactionFetcherParallel(t, txFetcherTest{
init: func() *TxFetcher {
return NewTxFetcher(
func(common.Hash) bool { return false },
func(common.Hash, byte) error { return nil },
func(txs []*types.Transaction) []error {
return make([]error, len(txs))
},
@ -661,7 +661,7 @@ func TestTransactionFetcherMissingRescheduling(t *testing.T) {
testTransactionFetcherParallel(t, txFetcherTest{
init: func() *TxFetcher {
return NewTxFetcher(
func(common.Hash) bool { return false },
func(common.Hash, byte) error { return nil },
func(txs []*types.Transaction) []error {
return make([]error, len(txs))
},
@ -722,7 +722,7 @@ func TestTransactionFetcherMissingCleanup(t *testing.T) {
testTransactionFetcherParallel(t, txFetcherTest{
init: func() *TxFetcher {
return NewTxFetcher(
func(common.Hash) bool { return false },
func(common.Hash, byte) error { return nil },
func(txs []*types.Transaction) []error {
return make([]error, len(txs))
},
@ -771,7 +771,7 @@ func TestTransactionFetcherBroadcasts(t *testing.T) {
testTransactionFetcherParallel(t, txFetcherTest{
init: func() *TxFetcher {
return NewTxFetcher(
func(common.Hash) bool { return false },
func(common.Hash, byte) error { return nil },
func(txs []*types.Transaction) []error {
return make([]error, len(txs))
},
@ -827,7 +827,7 @@ func TestTransactionFetcherWaitTimerResets(t *testing.T) {
testTransactionFetcherParallel(t, txFetcherTest{
init: func() *TxFetcher {
return NewTxFetcher(
func(common.Hash) bool { return false },
func(common.Hash, byte) error { return nil },
nil,
func(string, []common.Hash) error { return nil },
nil,
@ -897,7 +897,7 @@ func TestTransactionFetcherTimeoutRescheduling(t *testing.T) {
testTransactionFetcherParallel(t, txFetcherTest{
init: func() *TxFetcher {
return NewTxFetcher(
func(common.Hash) bool { return false },
func(common.Hash, byte) error { return nil },
func(txs []*types.Transaction) []error {
return make([]error, len(txs))
},
@ -975,7 +975,7 @@ func TestTransactionFetcherTimeoutTimerResets(t *testing.T) {
testTransactionFetcherParallel(t, txFetcherTest{
init: func() *TxFetcher {
return NewTxFetcher(
func(common.Hash) bool { return false },
func(common.Hash, byte) error { return nil },
nil,
func(string, []common.Hash) error { return nil },
nil,
@ -1053,7 +1053,7 @@ func TestTransactionFetcherRateLimiting(t *testing.T) {
testTransactionFetcherParallel(t, txFetcherTest{
init: func() *TxFetcher {
return NewTxFetcher(
func(common.Hash) bool { return false },
func(common.Hash, byte) error { return nil },
nil,
func(string, []common.Hash) error { return nil },
nil,
@ -1083,7 +1083,7 @@ func TestTransactionFetcherBandwidthLimiting(t *testing.T) {
testTransactionFetcherParallel(t, txFetcherTest{
init: func() *TxFetcher {
return NewTxFetcher(
func(common.Hash) bool { return false },
func(common.Hash, byte) error { return nil },
nil,
func(string, []common.Hash) error { return nil },
nil,
@ -1200,7 +1200,7 @@ func TestTransactionFetcherDoSProtection(t *testing.T) {
testTransactionFetcherParallel(t, txFetcherTest{
init: func() *TxFetcher {
return NewTxFetcher(
func(common.Hash) bool { return false },
func(common.Hash, byte) error { return nil },
nil,
func(string, []common.Hash) error { return nil },
nil,
@ -1267,7 +1267,7 @@ func TestTransactionFetcherUnderpricedDedup(t *testing.T) {
testTransactionFetcherParallel(t, txFetcherTest{
init: func() *TxFetcher {
return NewTxFetcher(
func(common.Hash) bool { return false },
func(common.Hash, byte) error { return nil },
func(txs []*types.Transaction) []error {
errs := make([]error, len(txs))
for i := 0; i < len(errs); i++ {
@ -1368,7 +1368,7 @@ func TestTransactionFetcherUnderpricedDoSProtection(t *testing.T) {
testTransactionFetcher(t, txFetcherTest{
init: func() *TxFetcher {
return NewTxFetcher(
func(common.Hash) bool { return false },
func(common.Hash, byte) error { return nil },
func(txs []*types.Transaction) []error {
errs := make([]error, len(txs))
for i := 0; i < len(errs); i++ {
@ -1400,7 +1400,7 @@ func TestTransactionFetcherOutOfBoundDeliveries(t *testing.T) {
testTransactionFetcherParallel(t, txFetcherTest{
init: func() *TxFetcher {
return NewTxFetcher(
func(common.Hash) bool { return false },
func(common.Hash, byte) error { return nil },
func(txs []*types.Transaction) []error {
return make([]error, len(txs))
},
@ -1459,7 +1459,7 @@ func TestTransactionFetcherDrop(t *testing.T) {
testTransactionFetcherParallel(t, txFetcherTest{
init: func() *TxFetcher {
return NewTxFetcher(
func(common.Hash) bool { return false },
func(common.Hash, byte) error { return nil },
func(txs []*types.Transaction) []error {
return make([]error, len(txs))
},
@ -1533,7 +1533,7 @@ func TestTransactionFetcherDropRescheduling(t *testing.T) {
testTransactionFetcherParallel(t, txFetcherTest{
init: func() *TxFetcher {
return NewTxFetcher(
func(common.Hash) bool { return false },
func(common.Hash, byte) error { return nil },
func(txs []*types.Transaction) []error {
return make([]error, len(txs))
},
@ -1579,7 +1579,7 @@ func TestInvalidAnnounceMetadata(t *testing.T) {
testTransactionFetcherParallel(t, txFetcherTest{
init: func() *TxFetcher {
return NewTxFetcher(
func(common.Hash) bool { return false },
func(common.Hash, byte) error { return nil },
func(txs []*types.Transaction) []error {
return make([]error, len(txs))
},
@ -1662,7 +1662,7 @@ func TestTransactionFetcherFuzzCrash01(t *testing.T) {
testTransactionFetcherParallel(t, txFetcherTest{
init: func() *TxFetcher {
return NewTxFetcher(
func(common.Hash) bool { return false },
func(common.Hash, byte) error { return nil },
func(txs []*types.Transaction) []error {
return make([]error, len(txs))
},
@ -1690,7 +1690,7 @@ func TestTransactionFetcherFuzzCrash02(t *testing.T) {
testTransactionFetcherParallel(t, txFetcherTest{
init: func() *TxFetcher {
return NewTxFetcher(
func(common.Hash) bool { return false },
func(common.Hash, byte) error { return nil },
func(txs []*types.Transaction) []error {
return make([]error, len(txs))
},
@ -1720,7 +1720,7 @@ func TestTransactionFetcherFuzzCrash03(t *testing.T) {
testTransactionFetcherParallel(t, txFetcherTest{
init: func() *TxFetcher {
return NewTxFetcher(
func(common.Hash) bool { return false },
func(common.Hash, byte) error { return nil },
func(txs []*types.Transaction) []error {
return make([]error, len(txs))
},
@ -1759,7 +1759,7 @@ func TestTransactionFetcherFuzzCrash04(t *testing.T) {
testTransactionFetcherParallel(t, txFetcherTest{
init: func() *TxFetcher {
return NewTxFetcher(
func(common.Hash) bool { return false },
func(common.Hash, byte) error { return nil },
func(txs []*types.Transaction) []error {
return make([]error, len(txs))
},
@ -1794,7 +1794,7 @@ func TestBlobTransactionAnnounce(t *testing.T) {
testTransactionFetcherParallel(t, txFetcherTest{
init: func() *TxFetcher {
return NewTxFetcher(
func(common.Hash) bool { return false },
func(common.Hash, byte) error { return nil },
nil,
func(string, []common.Hash) error { return nil },
nil,
@ -1862,7 +1862,7 @@ func TestTransactionFetcherDropAlternates(t *testing.T) {
testTransactionFetcherParallel(t, txFetcherTest{
init: func() *TxFetcher {
return NewTxFetcher(
func(common.Hash) bool { return false },
func(common.Hash, byte) error { return nil },
func(txs []*types.Transaction) []error {
return make([]error, len(txs))
},
@ -1908,6 +1908,35 @@ func TestTransactionFetcherDropAlternates(t *testing.T) {
})
}
func TestTransactionFetcherWrongMetadata(t *testing.T) {
testTransactionFetcherParallel(t, txFetcherTest{
init: func() *TxFetcher {
return NewTxFetcher(
func(_ common.Hash, kind byte) error {
switch kind {
case types.LegacyTxType, types.AccessListTxType, types.DynamicFeeTxType, types.BlobTxType, types.SetCodeTxType:
return nil
}
return types.ErrTxTypeNotSupported
},
func(txs []*types.Transaction) []error {
return make([]error, len(txs))
},
func(string, []common.Hash) error { return nil },
nil,
)
},
steps: []interface{}{
doTxNotify{peer: "A", hashes: []common.Hash{{0x01}, {0x02}}, types: []byte{0xff, types.LegacyTxType}, sizes: []uint32{111, 222}},
isWaiting(map[string][]announce{
"A": {
{common.Hash{0x02}, types.LegacyTxType, 222},
},
}),
},
})
}
func testTransactionFetcherParallel(t *testing.T, tt txFetcherTest) {
t.Parallel()
testTransactionFetcher(t, tt)
@ -2245,7 +2274,7 @@ func TestTransactionForgotten(t *testing.T) {
}
fetcher := NewTxFetcherForTests(
func(common.Hash) bool { return false },
func(common.Hash, byte) error { return nil },
func(txs []*types.Transaction) []error {
errs := make([]error, len(txs))
for i := 0; i < len(errs); i++ {

View file

@ -92,6 +92,9 @@ type txPool interface {
// can decide whether to receive notifications only for newly seen transactions
// or also for reorged out ones.
SubscribeTransactions(ch chan<- core.NewTxsEvent, reorgs bool) event.Subscription
// FilterType returns whether the given tx type is supported by the txPool.
FilterType(kind byte) bool
}
// handlerConfig is the collection of initialization parameters to create a full
@ -176,7 +179,18 @@ func newHandler(config *handlerConfig) (*handler, error) {
addTxs := func(txs []*types.Transaction) []error {
return h.txpool.Add(txs, false)
}
h.txFetcher = fetcher.NewTxFetcher(h.txpool.Has, addTxs, fetchTx, h.removePeer)
validateMeta := func(tx common.Hash, kind byte) error {
if h.txpool.Has(tx) {
return txpool.ErrAlreadyKnown
}
if !h.txpool.FilterType(kind) {
return types.ErrTxTypeNotSupported
}
return nil
}
h.txFetcher = fetcher.NewTxFetcher(validateMeta, addTxs, fetchTx, h.removePeer)
return h, nil
}

View file

@ -163,6 +163,15 @@ func (p *testTxPool) SubscribeTransactions(ch chan<- core.NewTxsEvent, reorgs bo
return p.txFeed.Subscribe(ch)
}
// FilterType should check whether the pool supports the given type of transactions.
func (p *testTxPool) FilterType(kind byte) bool {
switch kind {
case types.LegacyTxType, types.AccessListTxType, types.DynamicFeeTxType, types.BlobTxType, types.SetCodeTxType:
return true
}
return false
}
// testHandler is a live implementation of the Ethereum protocol handler, just
// preinitialized with some sane testing defaults and the transaction pool mocked
// out.

View file

@ -78,7 +78,7 @@ func fuzz(input []byte) int {
rand := rand.New(rand.NewSource(0x3a29)) // Same used in package tests!!!
f := fetcher.NewTxFetcherForTests(
func(common.Hash) bool { return false },
func(common.Hash, byte) error { return nil },
func(txs []*types.Transaction) []error {
return make([]error, len(txs))
},