core/txpool/blobpool: auto-start next conversion batch after completion (#33301)

This change fixes a stall in the legacy blob sidecar conversion pipeline
where tasks that arrived during an active batch could remain unprocessed
indefinitely after that batch completed, unless a new external event
arrived. 

The root cause was that the loop did not restart processing in
the case <-done: branch even when txTasks had accumulated work, relying
instead on a future event to retrigger the scheduler. This behavior is
inconsistent with the billy task pipeline, which immediately chains to
the next task via runNextBillyTask() without requiring an external trigger. 

The fix adds a symmetric restart path in `case <-done`: that checks 
`len(txTasks) > 0`, clones the accumulated tasks, clears the queue, and 
launches a new run with a fresh done and interrupt. 

This preserves batching semantics, prevents indefinite blocking of callers 
of convert(), and remains safe during shutdown since the quit path 
still interrupts and awaits the active batch. No public interfaces or logging 
were changed.
This commit is contained in:
radik878 2025-11-27 13:43:37 +02:00 committed by GitHub
parent 6426257c0f
commit 8d1b1c20d0
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
2 changed files with 76 additions and 0 deletions

View file

@ -161,6 +161,12 @@ func (q *conversionQueue) loop() {
case <-done:
done, interrupt = nil, nil
if len(txTasks) > 0 {
done, interrupt = make(chan struct{}), new(atomic.Int32)
tasks := slices.Clone(txTasks)
txTasks = txTasks[:0]
go q.run(tasks, done, interrupt)
}
case fn := <-q.startBilly:
q.billyQueue = append(q.billyQueue, fn)

View file

@ -19,7 +19,9 @@ package blobpool
import (
"crypto/ecdsa"
"crypto/sha256"
"sync"
"testing"
"time"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/types"
@ -99,3 +101,71 @@ func TestConversionQueueDoubleClose(t *testing.T) {
queue.close()
queue.close() // Should not panic
}
func TestConversionQueueAutoRestartBatch(t *testing.T) {
queue := newConversionQueue()
defer queue.close()
key, _ := crypto.GenerateKey()
// Create a heavy transaction to ensure the first batch runs long enough
// for subsequent tasks to be queued while it is active.
heavy := makeMultiBlobTx(0, 1, 1, 1, int(params.BlobTxMaxBlobs), 0, key, types.BlobSidecarVersion0)
var wg sync.WaitGroup
wg.Add(1)
heavyDone := make(chan error, 1)
go func() {
defer wg.Done()
heavyDone <- queue.convert(heavy)
}()
// Give the conversion worker a head start so that the following tasks are
// enqueued while the first batch is running.
time.Sleep(200 * time.Millisecond)
tx1 := makeTx(1, 1, 1, 1, key)
tx2 := makeTx(2, 1, 1, 1, key)
wg.Add(2)
done1 := make(chan error, 1)
done2 := make(chan error, 1)
go func() { defer wg.Done(); done1 <- queue.convert(tx1) }()
go func() { defer wg.Done(); done2 <- queue.convert(tx2) }()
select {
case err := <-done1:
if err != nil {
t.Fatalf("tx1 conversion error: %v", err)
}
case <-time.After(30 * time.Second):
t.Fatal("timeout waiting for tx1 conversion")
}
select {
case err := <-done2:
if err != nil {
t.Fatalf("tx2 conversion error: %v", err)
}
case <-time.After(30 * time.Second):
t.Fatal("timeout waiting for tx2 conversion")
}
select {
case err := <-heavyDone:
if err != nil {
t.Fatalf("heavy conversion error: %v", err)
}
case <-time.After(30 * time.Second):
t.Fatal("timeout waiting for heavy conversion")
}
wg.Wait()
if tx1.BlobTxSidecar().Version != types.BlobSidecarVersion1 {
t.Fatalf("tx1 sidecar version mismatch: have %d, want %d", tx1.BlobTxSidecar().Version, types.BlobSidecarVersion1)
}
if tx2.BlobTxSidecar().Version != types.BlobSidecarVersion1 {
t.Fatalf("tx2 sidecar version mismatch: have %d, want %d", tx2.BlobTxSidecar().Version, types.BlobSidecarVersion1)
}
}