eth/fetcher: refactor test code (#33610)
Some checks are pending
/ Linux Build (push) Waiting to run
/ Linux Build (arm) (push) Waiting to run
/ Keeper Build (push) Waiting to run
/ Windows Build (push) Waiting to run
/ Docker Image (push) Waiting to run

Remove a large amount of duplicate code from the tx_fetcher tests.

---------

Signed-off-by: Csaba Kiraly <csaba.kiraly@gmail.com>
Co-authored-by: lightclient <lightclient@protonmail.com>
This commit is contained in:
Csaba Kiraly 2026-01-15 19:37:34 +01:00 committed by GitHub
parent 494908a852
commit 9ba13b6097
No known key found for this signature in database
GPG key ID: B5690EEEBB952194

View file

@ -87,6 +87,19 @@ type txFetcherTest struct {
steps []interface{}
}
// newTestTxFetcher creates a tx fetcher with noop callbacks, simulated clock,
// and deterministic randomness.
func newTestTxFetcher() *TxFetcher {
return NewTxFetcher(
func(common.Hash, byte) error { return nil },
func(txs []*types.Transaction) []error {
return make([]error, len(txs))
},
func(string, []common.Hash) error { return nil },
nil,
)
}
// Tests that transaction announcements with associated metadata are added to a
// waitlist, and none of them are scheduled for retrieval until the wait expires.
//
@ -95,14 +108,7 @@ type txFetcherTest struct {
// with all the useless extra fields.
func TestTransactionFetcherWaiting(t *testing.T) {
testTransactionFetcherParallel(t, txFetcherTest{
init: func() *TxFetcher {
return NewTxFetcher(
func(common.Hash, byte) error { return nil },
nil,
func(string, []common.Hash) error { return nil },
nil,
)
},
init: newTestTxFetcher,
steps: []interface{}{
// Initial announcement to get something into the waitlist
doTxNotify{peer: "A", hashes: []common.Hash{{0x01}, {0x02}}, types: []byte{types.LegacyTxType, types.LegacyTxType}, sizes: []uint32{111, 222}},
@ -297,14 +303,7 @@ func TestTransactionFetcherWaiting(t *testing.T) {
// already scheduled.
func TestTransactionFetcherSkipWaiting(t *testing.T) {
testTransactionFetcherParallel(t, txFetcherTest{
init: func() *TxFetcher {
return NewTxFetcher(
func(common.Hash, byte) error { return nil },
nil,
func(string, []common.Hash) error { return nil },
nil,
)
},
init: newTestTxFetcher,
steps: []interface{}{
// Push an initial announcement through to the scheduled stage
doTxNotify{
@ -387,14 +386,7 @@ func TestTransactionFetcherSkipWaiting(t *testing.T) {
// and subsequent announces block or get allotted to someone else.
func TestTransactionFetcherSingletonRequesting(t *testing.T) {
testTransactionFetcherParallel(t, txFetcherTest{
init: func() *TxFetcher {
return NewTxFetcher(
func(common.Hash, byte) error { return nil },
nil,
func(string, []common.Hash) error { return nil },
nil,
)
},
init: newTestTxFetcher,
steps: []interface{}{
// Push an initial announcement through to the scheduled stage
doTxNotify{peer: "A", hashes: []common.Hash{{0x01}, {0x02}}, types: []byte{types.LegacyTxType, types.LegacyTxType}, sizes: []uint32{111, 222}},
@ -493,15 +485,12 @@ func TestTransactionFetcherFailedRescheduling(t *testing.T) {
proceed := make(chan struct{})
testTransactionFetcherParallel(t, txFetcherTest{
init: func() *TxFetcher {
return NewTxFetcher(
func(common.Hash, byte) error { return nil },
nil,
func(origin string, hashes []common.Hash) error {
<-proceed
return errors.New("peer disconnected")
},
nil,
)
f := newTestTxFetcher()
f.fetchTxs = func(origin string, hashes []common.Hash) error {
<-proceed
return errors.New("peer disconnected")
}
return f
},
steps: []interface{}{
// Push an initial announcement through to the scheduled stage
@ -576,16 +565,7 @@ func TestTransactionFetcherFailedRescheduling(t *testing.T) {
// are cleaned up.
func TestTransactionFetcherCleanup(t *testing.T) {
testTransactionFetcherParallel(t, txFetcherTest{
init: func() *TxFetcher {
return NewTxFetcher(
func(common.Hash, byte) error { return nil },
func(txs []*types.Transaction) []error {
return make([]error, len(txs))
},
func(string, []common.Hash) error { return nil },
nil,
)
},
init: newTestTxFetcher,
steps: []interface{}{
// Push an initial announcement through to the scheduled stage
doTxNotify{peer: "A", hashes: []common.Hash{testTxsHashes[0]}, types: []byte{testTxs[0].Type()}, sizes: []uint32{uint32(testTxs[0].Size())}},
@ -620,16 +600,7 @@ func TestTransactionFetcherCleanup(t *testing.T) {
// this was a bug)).
func TestTransactionFetcherCleanupEmpty(t *testing.T) {
testTransactionFetcherParallel(t, txFetcherTest{
init: func() *TxFetcher {
return NewTxFetcher(
func(common.Hash, byte) error { return nil },
func(txs []*types.Transaction) []error {
return make([]error, len(txs))
},
func(string, []common.Hash) error { return nil },
nil,
)
},
init: newTestTxFetcher,
steps: []interface{}{
// Push an initial announcement through to the scheduled stage
doTxNotify{peer: "A", hashes: []common.Hash{testTxsHashes[0]}, types: []byte{testTxs[0].Type()}, sizes: []uint32{uint32(testTxs[0].Size())}},
@ -663,16 +634,7 @@ func TestTransactionFetcherCleanupEmpty(t *testing.T) {
// different peer, or self if they are after the cutoff point.
func TestTransactionFetcherMissingRescheduling(t *testing.T) {
testTransactionFetcherParallel(t, txFetcherTest{
init: func() *TxFetcher {
return NewTxFetcher(
func(common.Hash, byte) error { return nil },
func(txs []*types.Transaction) []error {
return make([]error, len(txs))
},
func(string, []common.Hash) error { return nil },
nil,
)
},
init: newTestTxFetcher,
steps: []interface{}{
// Push an initial announcement through to the scheduled stage
doTxNotify{peer: "A",
@ -724,16 +686,7 @@ func TestTransactionFetcherMissingRescheduling(t *testing.T) {
// delivered, the peer gets properly cleaned out from the internal state.
func TestTransactionFetcherMissingCleanup(t *testing.T) {
testTransactionFetcherParallel(t, txFetcherTest{
init: func() *TxFetcher {
return NewTxFetcher(
func(common.Hash, byte) error { return nil },
func(txs []*types.Transaction) []error {
return make([]error, len(txs))
},
func(string, []common.Hash) error { return nil },
nil,
)
},
init: newTestTxFetcher,
steps: []interface{}{
// Push an initial announcement through to the scheduled stage
doTxNotify{peer: "A",
@ -773,16 +726,7 @@ func TestTransactionFetcherMissingCleanup(t *testing.T) {
// Tests that transaction broadcasts properly clean up announcements.
func TestTransactionFetcherBroadcasts(t *testing.T) {
testTransactionFetcherParallel(t, txFetcherTest{
init: func() *TxFetcher {
return NewTxFetcher(
func(common.Hash, byte) error { return nil },
func(txs []*types.Transaction) []error {
return make([]error, len(txs))
},
func(string, []common.Hash) error { return nil },
nil,
)
},
init: newTestTxFetcher,
steps: []interface{}{
// Set up three transactions to be in different stats, waiting, queued and fetching
doTxNotify{peer: "A", hashes: []common.Hash{testTxsHashes[0]}, types: []byte{testTxs[0].Type()}, sizes: []uint32{uint32(testTxs[0].Size())}},
@ -829,14 +773,7 @@ func TestTransactionFetcherBroadcasts(t *testing.T) {
// Tests that the waiting list timers properly reset and reschedule.
func TestTransactionFetcherWaitTimerResets(t *testing.T) {
testTransactionFetcherParallel(t, txFetcherTest{
init: func() *TxFetcher {
return NewTxFetcher(
func(common.Hash, byte) error { return nil },
nil,
func(string, []common.Hash) error { return nil },
nil,
)
},
init: newTestTxFetcher,
steps: []interface{}{
doTxNotify{peer: "A", hashes: []common.Hash{{0x01}}, types: []byte{types.LegacyTxType}, sizes: []uint32{111}},
isWaiting(map[string][]announce{
@ -899,16 +836,7 @@ func TestTransactionFetcherWaitTimerResets(t *testing.T) {
// out and be re-scheduled for someone else.
func TestTransactionFetcherTimeoutRescheduling(t *testing.T) {
testTransactionFetcherParallel(t, txFetcherTest{
init: func() *TxFetcher {
return NewTxFetcher(
func(common.Hash, byte) error { return nil },
func(txs []*types.Transaction) []error {
return make([]error, len(txs))
},
func(string, []common.Hash) error { return nil },
nil,
)
},
init: newTestTxFetcher,
steps: []interface{}{
// Push an initial announcement through to the scheduled stage
doTxNotify{
@ -977,14 +905,7 @@ func TestTransactionFetcherTimeoutRescheduling(t *testing.T) {
// Tests that the fetching timeout timers properly reset and reschedule.
func TestTransactionFetcherTimeoutTimerResets(t *testing.T) {
testTransactionFetcherParallel(t, txFetcherTest{
init: func() *TxFetcher {
return NewTxFetcher(
func(common.Hash, byte) error { return nil },
nil,
func(string, []common.Hash) error { return nil },
nil,
)
},
init: newTestTxFetcher,
steps: []interface{}{
doTxNotify{peer: "A", hashes: []common.Hash{{0x01}}, types: []byte{types.LegacyTxType}, sizes: []uint32{111}},
doWait{time: txArriveTimeout, step: true},
@ -1055,14 +976,7 @@ func TestTransactionFetcherRateLimiting(t *testing.T) {
})
}
testTransactionFetcherParallel(t, txFetcherTest{
init: func() *TxFetcher {
return NewTxFetcher(
func(common.Hash, byte) error { return nil },
nil,
func(string, []common.Hash) error { return nil },
nil,
)
},
init: newTestTxFetcher,
steps: []interface{}{
// Announce all the transactions, wait a bit and ensure only a small
// percentage gets requested
@ -1085,14 +999,7 @@ func TestTransactionFetcherRateLimiting(t *testing.T) {
// be requested at a time, to keep the responses below a reasonable level.
func TestTransactionFetcherBandwidthLimiting(t *testing.T) {
testTransactionFetcherParallel(t, txFetcherTest{
init: func() *TxFetcher {
return NewTxFetcher(
func(common.Hash, byte) error { return nil },
nil,
func(string, []common.Hash) error { return nil },
nil,
)
},
init: newTestTxFetcher,
steps: []interface{}{
// Announce mid size transactions from A to verify that multiple
// ones can be piled into a single request.
@ -1202,14 +1109,7 @@ func TestTransactionFetcherDoSProtection(t *testing.T) {
})
}
testTransactionFetcherParallel(t, txFetcherTest{
init: func() *TxFetcher {
return NewTxFetcher(
func(common.Hash, byte) error { return nil },
nil,
func(string, []common.Hash) error { return nil },
nil,
)
},
init: newTestTxFetcher,
steps: []interface{}{
// Announce half of the transaction and wait for them to be scheduled
doTxNotify{peer: "A", hashes: hashesA[:maxTxAnnounces/2], types: typesA[:maxTxAnnounces/2], sizes: sizesA[:maxTxAnnounces/2]},
@ -1270,24 +1170,21 @@ func TestTransactionFetcherDoSProtection(t *testing.T) {
func TestTransactionFetcherUnderpricedDedup(t *testing.T) {
testTransactionFetcherParallel(t, txFetcherTest{
init: func() *TxFetcher {
return NewTxFetcher(
func(common.Hash, byte) error { return nil },
func(txs []*types.Transaction) []error {
errs := make([]error, len(txs))
for i := 0; i < len(errs); i++ {
if i%3 == 0 {
errs[i] = txpool.ErrUnderpriced
} else if i%3 == 1 {
errs[i] = txpool.ErrReplaceUnderpriced
} else {
errs[i] = txpool.ErrTxGasPriceTooLow
}
f := newTestTxFetcher()
f.addTxs = func(txs []*types.Transaction) []error {
errs := make([]error, len(txs))
for i := 0; i < len(errs); i++ {
if i%3 == 0 {
errs[i] = txpool.ErrUnderpriced
} else if i%3 == 1 {
errs[i] = txpool.ErrReplaceUnderpriced
} else {
errs[i] = txpool.ErrTxGasPriceTooLow
}
return errs
},
func(string, []common.Hash) error { return nil },
nil,
)
}
return errs
}
return f
},
steps: []interface{}{
// Deliver a transaction through the fetcher, but reject as underpriced
@ -1371,18 +1268,15 @@ func TestTransactionFetcherUnderpricedDoSProtection(t *testing.T) {
}
testTransactionFetcher(t, txFetcherTest{
init: func() *TxFetcher {
return NewTxFetcher(
func(common.Hash, byte) error { return nil },
func(txs []*types.Transaction) []error {
errs := make([]error, len(txs))
for i := 0; i < len(errs); i++ {
errs[i] = txpool.ErrUnderpriced
}
return errs
},
func(string, []common.Hash) error { return nil },
nil,
)
f := newTestTxFetcher()
f.addTxs = func(txs []*types.Transaction) []error {
errs := make([]error, len(txs))
for i := 0; i < len(errs); i++ {
errs[i] = txpool.ErrUnderpriced
}
return errs
}
return f
},
steps: append(steps, []interface{}{
// The preparation of the test has already been done in `steps`, add the last check
@ -1402,16 +1296,7 @@ func TestTransactionFetcherUnderpricedDoSProtection(t *testing.T) {
// Tests that unexpected deliveries don't corrupt the internal state.
func TestTransactionFetcherOutOfBoundDeliveries(t *testing.T) {
testTransactionFetcherParallel(t, txFetcherTest{
init: func() *TxFetcher {
return NewTxFetcher(
func(common.Hash, byte) error { return nil },
func(txs []*types.Transaction) []error {
return make([]error, len(txs))
},
func(string, []common.Hash) error { return nil },
nil,
)
},
init: newTestTxFetcher,
steps: []interface{}{
// Deliver something out of the blue
isWaiting(nil),
@ -1461,16 +1346,7 @@ func TestTransactionFetcherOutOfBoundDeliveries(t *testing.T) {
// live or dangling stages.
func TestTransactionFetcherDrop(t *testing.T) {
testTransactionFetcherParallel(t, txFetcherTest{
init: func() *TxFetcher {
return NewTxFetcher(
func(common.Hash, byte) error { return nil },
func(txs []*types.Transaction) []error {
return make([]error, len(txs))
},
func(string, []common.Hash) error { return nil },
nil,
)
},
init: newTestTxFetcher,
steps: []interface{}{
// Set up a few hashes into various stages
doTxNotify{peer: "A", hashes: []common.Hash{{0x01}}, types: []byte{types.LegacyTxType}, sizes: []uint32{111}},
@ -1535,16 +1411,7 @@ func TestTransactionFetcherDrop(t *testing.T) {
// available peer.
func TestTransactionFetcherDropRescheduling(t *testing.T) {
testTransactionFetcherParallel(t, txFetcherTest{
init: func() *TxFetcher {
return NewTxFetcher(
func(common.Hash, byte) error { return nil },
func(txs []*types.Transaction) []error {
return make([]error, len(txs))
},
func(string, []common.Hash) error { return nil },
nil,
)
},
init: newTestTxFetcher,
steps: []interface{}{
// Set up a few hashes into various stages
doTxNotify{peer: "A", hashes: []common.Hash{{0x01}}, types: []byte{types.LegacyTxType}, sizes: []uint32{111}},
@ -1582,14 +1449,9 @@ func TestInvalidAnnounceMetadata(t *testing.T) {
drop := make(chan string, 2)
testTransactionFetcherParallel(t, txFetcherTest{
init: func() *TxFetcher {
return NewTxFetcher(
func(common.Hash, byte) error { return nil },
func(txs []*types.Transaction) []error {
return make([]error, len(txs))
},
func(string, []common.Hash) error { return nil },
func(peer string) { drop <- peer },
)
f := newTestTxFetcher()
f.dropPeer = func(peer string) { drop <- peer }
return f
},
steps: []interface{}{
// Initial announcement to get something into the waitlist
@ -1664,16 +1526,7 @@ func TestInvalidAnnounceMetadata(t *testing.T) {
// announced one.
func TestTransactionFetcherFuzzCrash01(t *testing.T) {
testTransactionFetcherParallel(t, txFetcherTest{
init: func() *TxFetcher {
return NewTxFetcher(
func(common.Hash, byte) error { return nil },
func(txs []*types.Transaction) []error {
return make([]error, len(txs))
},
func(string, []common.Hash) error { return nil },
nil,
)
},
init: newTestTxFetcher,
steps: []interface{}{
// Get a transaction into fetching mode and make it dangling with a broadcast
doTxNotify{peer: "A", hashes: []common.Hash{testTxsHashes[0]}, types: []byte{testTxs[0].Type()}, sizes: []uint32{uint32(testTxs[0].Size())}},
@ -1692,16 +1545,7 @@ func TestTransactionFetcherFuzzCrash01(t *testing.T) {
// concurrently announced one.
func TestTransactionFetcherFuzzCrash02(t *testing.T) {
testTransactionFetcherParallel(t, txFetcherTest{
init: func() *TxFetcher {
return NewTxFetcher(
func(common.Hash, byte) error { return nil },
func(txs []*types.Transaction) []error {
return make([]error, len(txs))
},
func(string, []common.Hash) error { return nil },
nil,
)
},
init: newTestTxFetcher,
steps: []interface{}{
// Get a transaction into fetching mode and make it dangling with a broadcast
doTxNotify{peer: "A", hashes: []common.Hash{testTxsHashes[0]}, types: []byte{testTxs[0].Type()}, sizes: []uint32{uint32(testTxs[0].Size())}},
@ -1722,16 +1566,7 @@ func TestTransactionFetcherFuzzCrash02(t *testing.T) {
// with a concurrent notify.
func TestTransactionFetcherFuzzCrash03(t *testing.T) {
testTransactionFetcherParallel(t, txFetcherTest{
init: func() *TxFetcher {
return NewTxFetcher(
func(common.Hash, byte) error { return nil },
func(txs []*types.Transaction) []error {
return make([]error, len(txs))
},
func(string, []common.Hash) error { return nil },
nil,
)
},
init: newTestTxFetcher,
steps: []interface{}{
// Get a transaction into fetching mode and make it dangling with a broadcast
doTxNotify{
@ -1762,17 +1597,12 @@ func TestTransactionFetcherFuzzCrash04(t *testing.T) {
testTransactionFetcherParallel(t, txFetcherTest{
init: func() *TxFetcher {
return NewTxFetcher(
func(common.Hash, byte) error { return nil },
func(txs []*types.Transaction) []error {
return make([]error, len(txs))
},
func(string, []common.Hash) error {
<-proceed
return errors.New("peer disconnected")
},
nil,
)
f := newTestTxFetcher()
f.fetchTxs = func(string, []common.Hash) error {
<-proceed
return errors.New("peer disconnected")
}
return f
},
steps: []interface{}{
// Get a transaction into fetching mode and make it dangling with a broadcast
@ -1796,14 +1626,7 @@ func TestTransactionFetcherFuzzCrash04(t *testing.T) {
// once they are announced in the network.
func TestBlobTransactionAnnounce(t *testing.T) {
testTransactionFetcherParallel(t, txFetcherTest{
init: func() *TxFetcher {
return NewTxFetcher(
func(common.Hash, byte) error { return nil },
nil,
func(string, []common.Hash) error { return nil },
nil,
)
},
init: newTestTxFetcher,
steps: []interface{}{
// Initial announcement to get something into the waitlist
doTxNotify{peer: "A", hashes: []common.Hash{{0x01}, {0x02}}, types: []byte{types.LegacyTxType, types.LegacyTxType}, sizes: []uint32{111, 222}},
@ -1864,16 +1687,7 @@ func TestBlobTransactionAnnounce(t *testing.T) {
func TestTransactionFetcherDropAlternates(t *testing.T) {
testTransactionFetcherParallel(t, txFetcherTest{
init: func() *TxFetcher {
return NewTxFetcher(
func(common.Hash, byte) error { return nil },
func(txs []*types.Transaction) []error {
return make([]error, len(txs))
},
func(string, []common.Hash) error { return nil },
nil,
)
},
init: newTestTxFetcher,
steps: []interface{}{
doTxNotify{peer: "A", hashes: []common.Hash{testTxsHashes[0]}, types: []byte{testTxs[0].Type()}, sizes: []uint32{uint32(testTxs[0].Size())}},
doWait{time: txArriveTimeout, step: true},
@ -1915,20 +1729,15 @@ func TestTransactionFetcherDropAlternates(t *testing.T) {
func TestTransactionFetcherWrongMetadata(t *testing.T) {
testTransactionFetcherParallel(t, txFetcherTest{
init: func() *TxFetcher {
return NewTxFetcher(
func(_ common.Hash, kind byte) error {
switch kind {
case types.LegacyTxType, types.AccessListTxType, types.DynamicFeeTxType, types.BlobTxType, types.SetCodeTxType:
return nil
}
return types.ErrTxTypeNotSupported
},
func(txs []*types.Transaction) []error {
return make([]error, len(txs))
},
func(string, []common.Hash) error { return nil },
nil,
)
f := newTestTxFetcher()
f.validateMeta = func(name common.Hash, kind byte) error {
switch kind {
case types.LegacyTxType, types.AccessListTxType, types.DynamicFeeTxType, types.BlobTxType, types.SetCodeTxType:
return nil
}
return types.ErrTxTypeNotSupported
}
return f
},
steps: []interface{}{
doTxNotify{peer: "A", hashes: []common.Hash{{0x01}, {0x02}}, types: []byte{0xff, types.LegacyTxType}, sizes: []uint32{111, 222}},
@ -1976,20 +1785,16 @@ func TestTransactionProtocolViolation(t *testing.T) {
)
testTransactionFetcherParallel(t, txFetcherTest{
init: func() *TxFetcher {
return NewTxFetcher(
func(common.Hash, byte) error { return nil },
func(txs []*types.Transaction) []error {
var errs []error
for range txs {
errs = append(errs, txpool.ErrKZGVerificationError)
}
return errs
},
func(a string, b []common.Hash) error {
return nil
},
func(peer string) { drop <- struct{}{} },
)
f := newTestTxFetcher()
f.addTxs = func(txs []*types.Transaction) []error {
var errs []error
for range txs {
errs = append(errs, txpool.ErrKZGVerificationError)
}
return errs
}
f.dropPeer = func(string) { drop <- struct{}{} }
return f
},
steps: []interface{}{
// Initial announcement to get something into the waitlist