refactor: track routine running instead

This commit is contained in:
Jonathan Oppenheimer 2026-01-07 03:16:50 -05:00
parent 9ca8b44870
commit 6d048460ce
No known key found for this signature in database
GPG key ID: E4CEF9010EB8B740
2 changed files with 13 additions and 6 deletions

View file

@ -19,6 +19,7 @@ package snapshot
import (
"bytes"
"sync"
"sync/atomic"
"time"
"github.com/VictoriaMetrics/fastcache"
@ -43,6 +44,7 @@ type diskLayer struct {
genMarker []byte // Marker for the state that's indexed during initial layer generation
genPending chan struct{} // Notification channel when generation is done (test synchronicity)
genAbort chan chan *generatorStats // Notification channel to abort generating the snapshot in this layer
genRunning atomic.Bool // Tracks whether the generator goroutine is actually running
lock sync.RWMutex
}
@ -192,10 +194,14 @@ func (dl *diskLayer) Update(blockHash common.Hash, accounts map[common.Hash][]by
// stopGeneration aborts the state snapshot generation if it is currently running.
func (dl *diskLayer) stopGeneration() {
// Check if generation goroutine is running by checking if genAbort channel exists.
// Note: genMarker can be nil even when the generator is still running (waiting
// for abort signal after completing generation), so we check genAbort instead.
// Check if generation goroutine is actually running
//
// Note: genMarker can be nil even when the generator is still running (waiting
// for abort signal after completing generation), so we can't rely on genMarker.
if !dl.genRunning.Load() {
return
}
// Use write lock to ensure only one goroutine can stop generation at a time,
// preventing a race where multiple callers might try to send abort signals.
dl.lock.Lock()
@ -210,14 +216,12 @@ func (dl *diskLayer) stopGeneration() {
dl.lock.Unlock()
// Perform the channel handshake without holding the lock to avoid deadlocks.
// Use a timeout to handle cases where the generator goroutine may have exited
// unexpectedly (e.g., due to panic or other runtime errors).
abort := make(chan *generatorStats)
select {
case genAbort <- abort:
// Generator received the abort signal, wait for it to respond
<-abort
case <-time.After(5 * time.Second):
log.Warn("Snapshot generator did not respond to stop signal, it may have crashed")
log.Error("Snapshot generator did not respond despite being marked as running")
}
}

View file

@ -648,6 +648,9 @@ func generateAccounts(ctx *generatorContext, dl *diskLayer, accMarker []byte) er
// gathering and logging, since the method surfs the blocks as they arrive, often
// being restarted.
func (dl *diskLayer) generate(stats *generatorStats) {
dl.genRunning.Store(true)
defer dl.genRunning.Store(false)
var (
accMarker []byte
abort chan *generatorStats