eth/txtracker: replace time.Sleep with deterministic step channel

Tests used time.Sleep(50ms) to wait for async chain head processing,
making the suite slow (~2s) and flaky under CI load.

Add a step channel (buffered 1) to the Tracker, sent after each
event in the loop. Tests wait on the channel with a 1s timeout.
Suite now completes in <10ms.
This commit is contained in:
Csaba Kiraly 2026-04-11 16:35:54 +02:00
parent e2b620ab44
commit df517d1f3a
2 changed files with 26 additions and 15 deletions

View file

@ -76,6 +76,7 @@ type Tracker struct {
sub event.Subscription sub event.Subscription
quit chan struct{} quit chan struct{}
step chan struct{} // test sync: sent after each event is processed
wg sync.WaitGroup wg sync.WaitGroup
} }
@ -85,6 +86,7 @@ func New() *Tracker {
txs: make(map[common.Hash]string), txs: make(map[common.Hash]string),
peers: make(map[string]*peerStats), peers: make(map[string]*peerStats),
quit: make(chan struct{}), quit: make(chan struct{}),
step: make(chan struct{}, 1),
} }
} }
@ -171,6 +173,10 @@ func (t *Tracker) loop() {
select { select {
case ev := <-t.headCh: case ev := <-t.headCh:
t.handleChainHead(ev) t.handleChainHead(ev)
select {
case t.step <- struct{}{}:
default:
}
case <-t.sub.Err(): case <-t.sub.Err():
return return
case <-t.quit: case <-t.quit:

View file

@ -96,9 +96,14 @@ func makeTx(nonce uint64) *types.Transaction {
return types.NewTx(&types.LegacyTx{Nonce: nonce, GasPrice: big.NewInt(1), Gas: 21000}) return types.NewTx(&types.LegacyTx{Nonce: nonce, GasPrice: big.NewInt(1), Gas: 21000})
} }
// waitForHead gives the tracker time to process a chain head event. // waitStep blocks until the tracker has processed one event.
func waitForHead() { func waitStep(t *testing.T, tr *Tracker) {
time.Sleep(50 * time.Millisecond) t.Helper()
select {
case <-tr.step:
case <-time.After(time.Second):
t.Fatal("timeout waiting for tracker step")
}
} }
func TestNotifyReceived(t *testing.T) { func TestNotifyReceived(t *testing.T) {
@ -132,7 +137,7 @@ func TestInclusionEMA(t *testing.T) {
// Block 1 includes peerA's tx. // Block 1 includes peerA's tx.
chain.addBlock(1, []*types.Transaction{tx}) chain.addBlock(1, []*types.Transaction{tx})
chain.sendHead(1) chain.sendHead(1)
waitForHead() waitStep(t, tr)
stats := tr.GetAllPeerStats() stats := tr.GetAllPeerStats()
if stats["peerA"].RecentIncluded <= 0 { if stats["peerA"].RecentIncluded <= 0 {
@ -143,7 +148,7 @@ func TestInclusionEMA(t *testing.T) {
// Block 2 has no txs from peerA — EMA should decay. // Block 2 has no txs from peerA — EMA should decay.
chain.addBlock(2, nil) chain.addBlock(2, nil)
chain.sendHead(2) chain.sendHead(2)
waitForHead() waitStep(t, tr)
stats = tr.GetAllPeerStats() stats = tr.GetAllPeerStats()
if stats["peerA"].RecentIncluded >= ema1 { if stats["peerA"].RecentIncluded >= ema1 {
@ -163,7 +168,7 @@ func TestFinalization(t *testing.T) {
// Include in block 1. // Include in block 1.
chain.addBlock(1, []*types.Transaction{tx}) chain.addBlock(1, []*types.Transaction{tx})
chain.sendHead(1) chain.sendHead(1)
waitForHead() waitStep(t, tr)
// Not finalized yet. // Not finalized yet.
stats := tr.GetAllPeerStats() stats := tr.GetAllPeerStats()
@ -175,7 +180,7 @@ func TestFinalization(t *testing.T) {
chain.setFinalBlock(1) chain.setFinalBlock(1)
chain.addBlock(2, nil) chain.addBlock(2, nil)
chain.sendHead(2) chain.sendHead(2)
waitForHead() waitStep(t, tr)
stats = tr.GetAllPeerStats() stats = tr.GetAllPeerStats()
if stats["peerA"].Finalized != 1 { if stats["peerA"].Finalized != 1 {
@ -197,13 +202,13 @@ func TestMultiplePeers(t *testing.T) {
// Both included in block 1. // Both included in block 1.
chain.addBlock(1, []*types.Transaction{tx1, tx2}) chain.addBlock(1, []*types.Transaction{tx1, tx2})
chain.sendHead(1) chain.sendHead(1)
waitForHead() waitStep(t, tr)
// Finalize. // Finalize.
chain.setFinalBlock(1) chain.setFinalBlock(1)
chain.addBlock(2, nil) chain.addBlock(2, nil)
chain.sendHead(2) chain.sendHead(2)
waitForHead() waitStep(t, tr)
stats := tr.GetAllPeerStats() stats := tr.GetAllPeerStats()
if stats["peerA"].Finalized != 1 { if stats["peerA"].Finalized != 1 {
@ -226,12 +231,12 @@ func TestFirstDelivererWins(t *testing.T) {
chain.addBlock(1, []*types.Transaction{tx}) chain.addBlock(1, []*types.Transaction{tx})
chain.sendHead(1) chain.sendHead(1)
waitForHead() waitStep(t, tr)
chain.setFinalBlock(1) chain.setFinalBlock(1)
chain.addBlock(2, nil) chain.addBlock(2, nil)
chain.sendHead(2) chain.sendHead(2)
waitForHead() waitStep(t, tr)
stats := tr.GetAllPeerStats() stats := tr.GetAllPeerStats()
if stats["peerA"].Finalized != 1 { if stats["peerA"].Finalized != 1 {
@ -254,12 +259,12 @@ func TestNoFinalizationCredit(t *testing.T) {
// Include but don't finalize. // Include but don't finalize.
chain.addBlock(1, []*types.Transaction{tx}) chain.addBlock(1, []*types.Transaction{tx})
chain.sendHead(1) chain.sendHead(1)
waitForHead() waitStep(t, tr)
// Send more heads without finalization. // Send more heads without finalization.
chain.addBlock(2, nil) chain.addBlock(2, nil)
chain.sendHead(2) chain.sendHead(2)
waitForHead() waitStep(t, tr)
stats := tr.GetAllPeerStats() stats := tr.GetAllPeerStats()
if stats["peerA"].Finalized != 0 { if stats["peerA"].Finalized != 0 {
@ -279,13 +284,13 @@ func TestEMADecay(t *testing.T) {
// Include in block 1. // Include in block 1.
chain.addBlock(1, []*types.Transaction{tx}) chain.addBlock(1, []*types.Transaction{tx})
chain.sendHead(1) chain.sendHead(1)
waitForHead() waitStep(t, tr)
// Send 30 empty blocks — EMA should decay close to zero. // Send 30 empty blocks — EMA should decay close to zero.
for i := uint64(2); i <= 31; i++ { for i := uint64(2); i <= 31; i++ {
chain.addBlock(i, nil) chain.addBlock(i, nil)
chain.sendHead(i) chain.sendHead(i)
waitForHead() waitStep(t, tr)
} }
stats := tr.GetAllPeerStats() stats := tr.GetAllPeerStats()