mirror of
https://github.com/ethereum/go-ethereum.git
synced 2026-06-20 13:44:31 +00:00
fix: parallel.wrapper.beforeWork channel race (#266)
## Why this should be merged Fixes a potential race in parallel handler where the goroutine that closes `whenProcessed` could observe a re-assigned channel for the next block, leading to an incorrect close and possible double-close panic. ## How this works `finishBlock()` waits for the channel to be closed by the goroutine, instead of for the `WaitGroup` that closes _just_ before it, which is what allowed the race. ## How this was tested It's not possible to include a specific test. This PR therefore also includes thorough documentation of every per-block goroutine and when it is guaranteed to complete. --------- Co-authored-by: Arran Schlosberg <me@arranschlosberg.com>
This commit is contained in:
parent
98a792673a
commit
312fa38051
1 changed files with 20 additions and 10 deletions
|
|
@ -206,6 +206,8 @@ func (w *wrapper[CD, D, R, A]) beforeWork(jobs int) {
|
|||
w.txOrder = make(chan TxResult[R], jobs)
|
||||
go func() {
|
||||
w.txsBeingProcessed.Wait()
|
||||
// [wrapper.finishBlock] blocks until this is closed, guaranteeing
|
||||
// cleanup of this goroutine.
|
||||
close(w.whenProcessed)
|
||||
}()
|
||||
}
|
||||
|
|
@ -252,6 +254,8 @@ func (w *wrapper[CD, D, R, A]) result(i int) (TxResult[R], bool) {
|
|||
|
||||
func (w *wrapper[CD, D, R, A]) postProcess() {
|
||||
go func() {
|
||||
// [wrapper.finishBlock] blocks until this is closed, guaranteeing
|
||||
// cleanup of this goroutine.
|
||||
defer close(w.txOrder)
|
||||
for i := range w.totalTxsInBlock {
|
||||
r, ok := w.result(i)
|
||||
|
|
@ -276,16 +280,22 @@ func (w *wrapper[CD, D, R, A]) finishBlock(sdb vm.StateDB, b *types.Block, rs ty
|
|||
// [wrapper.postProcess] is guaranteed to have finished because it sets
|
||||
// [wrapper.aggregated], from which we have just read. However
|
||||
// [Handler.PostProcess] is under no obligation to block on anything, and
|
||||
// the goroutine filling [wrapper.txOrder] might still be reading results.
|
||||
// We therefore guarantee its completion before "getting and keeping" all of
|
||||
// [wrapper.results] otherwise said goroutine can leak.
|
||||
for range w.txOrder {
|
||||
// Nobody needs these anymore, but we need to know that the channel has
|
||||
// been closed.
|
||||
}
|
||||
// Although we know this will unblock effectively immediately, it's safer to
|
||||
// verify the intuition than to rely on complex reasoning.
|
||||
w.txsBeingProcessed.Wait()
|
||||
// the goroutines filling [wrapper.txOrder] and [wrapper.whenProcessed]
|
||||
// might still be reading results. We therefore guarantee their completion
|
||||
// before "taking" all of [wrapper.results].
|
||||
var wg sync.WaitGroup
|
||||
wg.Add(2) // TODO(arr4n) update to Go 1.25 and use `wg.Go`
|
||||
go func() {
|
||||
for range w.txOrder {
|
||||
}
|
||||
wg.Done()
|
||||
}()
|
||||
go func() {
|
||||
for range w.whenProcessed {
|
||||
}
|
||||
wg.Done()
|
||||
}()
|
||||
wg.Wait()
|
||||
|
||||
w.common.take()
|
||||
for _, v := range w.results[:w.totalTxsInBlock] {
|
||||
|
|
|
|||
Loading…
Reference in a new issue