diff --git a/core/state/snapshot/disklayer.go b/core/state/snapshot/disklayer.go index 202e6c70ed..3a5864f117 100644 --- a/core/state/snapshot/disklayer.go +++ b/core/state/snapshot/disklayer.go @@ -38,9 +38,17 @@ type diskLayer struct { root common.Hash // Root hash of the base snapshot stale bool // Signals that the layer became stale (state progressed) - genMarker []byte // Marker for the state that's indexed during initial layer generation - genPending chan struct{} // Notification channel when generation is done (test synchronicity) - genAbort chan chan *generatorStats // Notification channel to abort generating the snapshot in this layer + genMarker []byte // Marker for the state that's indexed during initial layer generation + genPending chan struct{} // Notification channel when generation is done (test synchronicity) + + // Generator lifecycle management: + // - [cancel] is closed to request termination (broadcast). + // - [done] is closed by the generator goroutine on exit. + cancel chan struct{} + done chan struct{} + cancelOnce sync.Once + + genStats *generatorStats // Stats for snapshot generation (generation aborted/finished if non-nil) lock sync.RWMutex } @@ -49,6 +57,10 @@ type diskLayer struct { // Reset() in order to not leak memory. // OBS: It does not invoke Close on the diskdb func (dl *diskLayer) Release() error { + // Stop any ongoing snapshot generation to prevent it from accessing + // the database after it's closed during shutdown + dl.stopGeneration() + if dl.cache != nil { dl.cache.Reset() } @@ -184,17 +196,27 @@ func (dl *diskLayer) Update(blockHash common.Hash, accounts map[common.Hash][]by return newDiffLayer(dl, blockHash, accounts, storage) } -// stopGeneration aborts the state snapshot generation if it is currently running. +// stopGeneration requests cancellation of any running snapshot generation and +// blocks until the generator goroutine (if running) has fully terminated. +// +// Concurrency guarantees: +// - Thread-safe: May be called concurrently from multiple goroutines +// - Idempotent: Safe to call multiple times; subsequent calls have no effect +// - Blocking: Returns only after the generator goroutine (if any) has exited +// - Safe to call at any time, including when no generation is running +// +// After return, it is **guaranteed** that: +// - The generator goroutine has terminated +// - It is safe to proceed with cleanup operations (e.g. closing databases) func (dl *diskLayer) stopGeneration() { - dl.lock.RLock() - generating := dl.genMarker != nil - dl.lock.RUnlock() - if !generating { + cancel := dl.cancel + done := dl.done + if cancel == nil || done == nil { return } - if dl.genAbort != nil { - abort := make(chan *generatorStats) - dl.genAbort <- abort - <-abort - } + + dl.cancelOnce.Do(func() { + close(cancel) + }) + <-done } diff --git a/core/state/snapshot/generate.go b/core/state/snapshot/generate.go index 01fb55ea4c..2cb4c7d03c 100644 --- a/core/state/snapshot/generate.go +++ b/core/state/snapshot/generate.go @@ -50,6 +50,9 @@ var ( // errMissingTrie is returned if the target trie is missing while the generation // is running. In this case the generation is aborted and wait the new signal. errMissingTrie = errors.New("missing trie") + + // errAborted is returned when snapshot generation was interrupted/aborted + errAborted = errors.New("aborted") ) // generateSnapshot regenerates a brand new snapshot based on an existing state @@ -74,7 +77,8 @@ func generateSnapshot(diskdb ethdb.KeyValueStore, triedb *triedb.Database, cache cache: fastcache.New(cache * 1024 * 1024), genMarker: genMarker, genPending: make(chan struct{}), - genAbort: make(chan chan *generatorStats), + cancel: make(chan struct{}), + done: make(chan struct{}), } go base.generate(stats) log.Debug("Start snapshot generation", "root", root) @@ -467,12 +471,14 @@ func (dl *diskLayer) generateRange(ctx *generatorContext, trieId *trie.ID, prefi // checkAndFlush checks if an interruption signal is received or the // batch size has exceeded the allowance. func (dl *diskLayer) checkAndFlush(ctx *generatorContext, current []byte) error { - var abort chan *generatorStats + aborting := false select { - case abort = <-dl.genAbort: + case <-dl.cancel: + aborting = true default: } - if ctx.batch.ValueSize() > ethdb.IdealBatchSize || abort != nil { + + if ctx.batch.ValueSize() > ethdb.IdealBatchSize || aborting { if bytes.Compare(current, dl.genMarker) < 0 { log.Error("Snapshot generator went backwards", "current", fmt.Sprintf("%x", current), "genMarker", fmt.Sprintf("%x", dl.genMarker)) } @@ -490,9 +496,9 @@ func (dl *diskLayer) checkAndFlush(ctx *generatorContext, current []byte) error dl.genMarker = current dl.lock.Unlock() - if abort != nil { + if aborting { ctx.stats.Log("Aborting state snapshot generation", dl.root, current) - return newAbortErr(abort) // bubble up an error for interruption + return errAborted } // Don't hold the iterators too long, release them to let compactor works ctx.reopenIterator(snapAccount) @@ -648,10 +654,11 @@ func generateAccounts(ctx *generatorContext, dl *diskLayer, accMarker []byte) er // gathering and logging, since the method surfs the blocks as they arrive, often // being restarted. func (dl *diskLayer) generate(stats *generatorStats) { - var ( - accMarker []byte - abort chan *generatorStats - ) + if dl.done != nil { + defer close(dl.done) + } + + var accMarker []byte if len(dl.genMarker) > 0 { // []byte{} is the start, use nil for that accMarker = dl.genMarker[:common.HashLength] } @@ -669,15 +676,11 @@ func (dl *diskLayer) generate(stats *generatorStats) { defer ctx.close() if err := generateAccounts(ctx, dl, accMarker); err != nil { - // Extract the received interruption signal if exists - if aerr, ok := err.(*abortErr); ok { - abort = aerr.abort + // Check if error was due to abort + if err == errAborted { + stats.Log("Aborting state snapshot generation", dl.root, dl.genMarker) } - // Aborted by internal error, wait the signal - if abort == nil { - abort = <-dl.genAbort - } - abort <- stats + dl.genStats = stats return } // Snapshot fully generated, set the marker to nil. @@ -686,9 +689,7 @@ func (dl *diskLayer) generate(stats *generatorStats) { journalProgress(ctx.batch, nil, stats) if err := ctx.batch.Write(); err != nil { log.Error("Failed to flush batch", "err", err) - - abort = <-dl.genAbort - abort <- stats + dl.genStats = stats return } ctx.batch.Reset() @@ -698,12 +699,9 @@ func (dl *diskLayer) generate(stats *generatorStats) { dl.lock.Lock() dl.genMarker = nil + dl.genStats = stats close(dl.genPending) dl.lock.Unlock() - - // Someone will be looking for us, wait it out - abort = <-dl.genAbort - abort <- nil } // increaseKey increase the input key by one bit. Return nil if the entire @@ -717,17 +715,3 @@ func increaseKey(key []byte) []byte { } return nil } - -// abortErr wraps an interruption signal received to represent the -// generation is aborted by external processes. -type abortErr struct { - abort chan *generatorStats -} - -func newAbortErr(abort chan *generatorStats) error { - return &abortErr{abort: abort} -} - -func (err *abortErr) Error() string { - return "aborted" -} diff --git a/core/state/snapshot/generate_test.go b/core/state/snapshot/generate_test.go index 7fb4c152dc..d5e59b123f 100644 --- a/core/state/snapshot/generate_test.go +++ b/core/state/snapshot/generate_test.go @@ -74,10 +74,10 @@ func testGeneration(t *testing.T, scheme string) { } checkSnapRoot(t, snap, root) - // Signal abortion to the generator and wait for it to tear down - stop := make(chan *generatorStats) - snap.genAbort <- stop - <-stop + // Stop the generator (if still running) and wait for it to exit. + if err := snap.Release(); err != nil { + t.Fatal(err) + } } // Tests that snapshot generation with existent flat state. @@ -115,10 +115,10 @@ func testGenerateExistentState(t *testing.T, scheme string) { } checkSnapRoot(t, snap, root) - // Signal abortion to the generator and wait for it to tear down - stop := make(chan *generatorStats) - snap.genAbort <- stop - <-stop + // Stop the generator (if still running) and wait for it to exit. + if err := snap.Release(); err != nil { + t.Fatal(err) + } } func checkSnapRoot(t *testing.T, snap *diskLayer, trieRoot common.Hash) { @@ -351,10 +351,10 @@ func testGenerateExistentStateWithWrongStorage(t *testing.T, scheme string) { t.Errorf("Snapshot generation failed") } checkSnapRoot(t, snap, root) - // Signal abortion to the generator and wait for it to tear down - stop := make(chan *generatorStats) - snap.genAbort <- stop - <-stop + // Stop the generator (if still running) and wait for it to exit. + if err := snap.Release(); err != nil { + t.Fatal(err) + } } // Tests that snapshot generation with existent flat state, where the flat state @@ -414,10 +414,10 @@ func testGenerateExistentStateWithWrongAccounts(t *testing.T, scheme string) { } checkSnapRoot(t, snap, root) - // Signal abortion to the generator and wait for it to tear down - stop := make(chan *generatorStats) - snap.genAbort <- stop - <-stop + // Stop the generator (if still running) and wait for it to exit. + if err := snap.Release(); err != nil { + t.Fatal(err) + } } // Tests that snapshot generation errors out correctly in case of a missing trie @@ -454,10 +454,10 @@ func testGenerateCorruptAccountTrie(t *testing.T, scheme string) { case <-time.After(time.Second): // Not generated fast enough, hopefully blocked inside on missing trie node fail } - // Signal abortion to the generator and wait for it to tear down - stop := make(chan *generatorStats) - snap.genAbort <- stop - <-stop + // Stop the generator (if still running) and wait for it to exit. + if err := snap.Release(); err != nil { + t.Fatal(err) + } } // Tests that snapshot generation errors out correctly in case of a missing root @@ -498,10 +498,10 @@ func testGenerateMissingStorageTrie(t *testing.T, scheme string) { case <-time.After(time.Second): // Not generated fast enough, hopefully blocked inside on missing trie node fail } - // Signal abortion to the generator and wait for it to tear down - stop := make(chan *generatorStats) - snap.genAbort <- stop - <-stop + // Stop the generator (if still running) and wait for it to exit. + if err := snap.Release(); err != nil { + t.Fatal(err) + } } // Tests that snapshot generation errors out correctly in case of a missing trie @@ -540,10 +540,10 @@ func testGenerateCorruptStorageTrie(t *testing.T, scheme string) { case <-time.After(time.Second): // Not generated fast enough, hopefully blocked inside on missing trie node fail } - // Signal abortion to the generator and wait for it to tear down - stop := make(chan *generatorStats) - snap.genAbort <- stop - <-stop + // Stop the generator (if still running) and wait for it to exit. + if err := snap.Release(); err != nil { + t.Fatal(err) + } } // Tests that snapshot generation when an extra account with storage exists in the snap state. @@ -605,10 +605,10 @@ func testGenerateWithExtraAccounts(t *testing.T, scheme string) { } checkSnapRoot(t, snap, root) - // Signal abortion to the generator and wait for it to tear down - stop := make(chan *generatorStats) - snap.genAbort <- stop - <-stop + // Stop the generator (if still running) and wait for it to exit. + if err := snap.Release(); err != nil { + t.Fatal(err) + } // If we now inspect the snap db, there should exist no extraneous storage items if data := rawdb.ReadStorageSnapshot(helper.diskdb, hashData([]byte("acc-2")), hashData([]byte("b-key-1"))); data != nil { t.Fatalf("expected slot to be removed, got %v", string(data)) @@ -666,10 +666,10 @@ func testGenerateWithManyExtraAccounts(t *testing.T, scheme string) { t.Errorf("Snapshot generation failed") } checkSnapRoot(t, snap, root) - // Signal abortion to the generator and wait for it to tear down - stop := make(chan *generatorStats) - snap.genAbort <- stop - <-stop + // Stop the generator (if still running) and wait for it to exit. + if err := snap.Release(); err != nil { + t.Fatal(err) + } } // Tests this case @@ -715,10 +715,10 @@ func testGenerateWithExtraBeforeAndAfter(t *testing.T, scheme string) { t.Errorf("Snapshot generation failed") } checkSnapRoot(t, snap, root) - // Signal abortion to the generator and wait for it to tear down - stop := make(chan *generatorStats) - snap.genAbort <- stop - <-stop + // Stop the generator (if still running) and wait for it to exit. + if err := snap.Release(); err != nil { + t.Fatal(err) + } } // TestGenerateWithMalformedSnapdata tests what happes if we have some junk @@ -755,10 +755,10 @@ func testGenerateWithMalformedSnapdata(t *testing.T, scheme string) { t.Errorf("Snapshot generation failed") } checkSnapRoot(t, snap, root) - // Signal abortion to the generator and wait for it to tear down - stop := make(chan *generatorStats) - snap.genAbort <- stop - <-stop + // Stop the generator (if still running) and wait for it to exit. + if err := snap.Release(); err != nil { + t.Fatal(err) + } // If we now inspect the snap db, there should exist no extraneous storage items if data := rawdb.ReadStorageSnapshot(helper.diskdb, hashData([]byte("acc-2")), hashData([]byte("b-key-1"))); data != nil { t.Fatalf("expected slot to be removed, got %v", string(data)) @@ -792,10 +792,10 @@ func testGenerateFromEmptySnap(t *testing.T, scheme string) { t.Errorf("Snapshot generation failed") } checkSnapRoot(t, snap, root) - // Signal abortion to the generator and wait for it to tear down - stop := make(chan *generatorStats) - snap.genAbort <- stop - <-stop + // Stop the generator (if still running) and wait for it to exit. + if err := snap.Release(); err != nil { + t.Fatal(err) + } } // Tests that snapshot generation with existent flat state, where the flat state @@ -843,10 +843,10 @@ func testGenerateWithIncompleteStorage(t *testing.T, scheme string) { t.Errorf("Snapshot generation failed") } checkSnapRoot(t, snap, root) - // Signal abortion to the generator and wait for it to tear down - stop := make(chan *generatorStats) - snap.genAbort <- stop - <-stop + // Stop the generator (if still running) and wait for it to exit. + if err := snap.Release(); err != nil { + t.Fatal(err) + } } func incKey(key []byte) []byte { @@ -939,10 +939,10 @@ func testGenerateCompleteSnapshotWithDanglingStorage(t *testing.T, scheme string } checkSnapRoot(t, snap, root) - // Signal abortion to the generator and wait for it to tear down - stop := make(chan *generatorStats) - snap.genAbort <- stop - <-stop + // Stop the generator (if still running) and wait for it to exit. + if err := snap.Release(); err != nil { + t.Fatal(err) + } } // Tests that snapshot generation with dangling storages. Dangling storage means @@ -976,8 +976,53 @@ func testGenerateBrokenSnapshotWithDanglingStorage(t *testing.T, scheme string) } checkSnapRoot(t, snap, root) - // Signal abortion to the generator and wait for it to tear down - stop := make(chan *generatorStats) - snap.genAbort <- stop - <-stop + // Stop the generator (if still running) and wait for it to exit. + if err := snap.Release(); err != nil { + t.Fatal(err) + } +} + +// TestReleaseStopsGeneration verifies that Release() properly stops ongoing +// snapshot generation without hanging. This prevents a race condition during +// shutdown where the generator could access the database after it's closed. +// +// The generator goroutine waits for an abort signal even after completing +// generation successfully. Without calling stopGeneration(), Release() would +// leave the generator hanging forever, which could prevent clean shutdown. +func TestReleaseStopsGeneration(t *testing.T) { + testReleaseStopsGeneration(t, rawdb.HashScheme) + testReleaseStopsGeneration(t, rawdb.PathScheme) +} + +func testReleaseStopsGeneration(t *testing.T, scheme string) { + var helper = newHelper(scheme) + stRoot := helper.makeStorageTrie("", []string{"key-1", "key-2", "key-3"}, []string{"val-1", "val-2", "val-3"}, false) + + helper.addTrieAccount("acc-1", &types.StateAccount{Balance: uint256.NewInt(1), Root: stRoot, CodeHash: types.EmptyCodeHash.Bytes()}) + helper.addTrieAccount("acc-2", &types.StateAccount{Balance: uint256.NewInt(2), Root: types.EmptyRootHash, CodeHash: types.EmptyCodeHash.Bytes()}) + helper.addTrieAccount("acc-3", &types.StateAccount{Balance: uint256.NewInt(3), Root: stRoot, CodeHash: types.EmptyCodeHash.Bytes()}) + + helper.makeStorageTrie("acc-1", []string{"key-1", "key-2", "key-3"}, []string{"val-1", "val-2", "val-3"}, true) + helper.makeStorageTrie("acc-3", []string{"key-1", "key-2", "key-3"}, []string{"val-1", "val-2", "val-3"}, true) + + _, snap := helper.CommitAndGenerate() + + select { + case <-snap.genPending: + case <-time.After(3 * time.Second): + t.Fatal("Snapshot generation failed") + } + + // Call Release() - this should stop generation gracefully without hanging + done := make(chan struct{}) + go func() { + snap.Release() + close(done) + }() + + select { + case <-done: + case <-time.After(3 * time.Second): + t.Fatal("Release() hung - stopGeneration() was likely not called") + } } diff --git a/core/state/snapshot/journal.go b/core/state/snapshot/journal.go index 004dd5298a..c56cb97b95 100644 --- a/core/state/snapshot/journal.go +++ b/core/state/snapshot/journal.go @@ -179,7 +179,8 @@ func loadSnapshot(diskdb ethdb.KeyValueStore, triedb *triedb.Database, root comm // if the background generation is allowed if !generator.Done && !noBuild { base.genPending = make(chan struct{}) - base.genAbort = make(chan chan *generatorStats) + base.cancel = make(chan struct{}) + base.done = make(chan struct{}) var origin uint64 if len(generator.Marker) >= 8 { @@ -199,16 +200,9 @@ func loadSnapshot(diskdb ethdb.KeyValueStore, triedb *triedb.Database, root comm // Journal terminates any in-progress snapshot generation, also implicitly pushing // the progress into the database. func (dl *diskLayer) Journal(buffer *bytes.Buffer) (common.Hash, error) { - // If the snapshot is currently being generated, abort it - var stats *generatorStats - if dl.genAbort != nil { - abort := make(chan *generatorStats) - dl.genAbort <- abort + // If the snapshot is currently being generated, stop it + dl.stopGeneration() - if stats = <-abort; stats != nil { - stats.Log("Journalling in-progress snapshot", dl.root, dl.genMarker) - } - } // Ensure the layer didn't get stale dl.lock.RLock() defer dl.lock.RUnlock() @@ -216,8 +210,8 @@ func (dl *diskLayer) Journal(buffer *bytes.Buffer) (common.Hash, error) { if dl.stale { return common.Hash{}, ErrSnapshotStale } - // Ensure the generator stats is written even if none was ran this cycle - journalProgress(dl.diskdb, dl.genMarker, stats) + // Ensure the generator marker is written even if none was ran this cycle + journalProgress(dl.diskdb, dl.genMarker, nil) log.Debug("Journalled disk layer", "root", dl.root) return dl.root, nil diff --git a/core/state/snapshot/snapshot.go b/core/state/snapshot/snapshot.go index f0f6296433..cd0a55fee6 100644 --- a/core/state/snapshot/snapshot.go +++ b/core/state/snapshot/snapshot.go @@ -492,7 +492,7 @@ func (t *Tree) cap(diff *diffLayer, layers int) *diskLayer { // there's a snapshot being generated currently. In that case, the trie // will move from underneath the generator so we **must** merge all the // partial data down into the snapshot and restart the generation. - if flattened.parent.(*diskLayer).genAbort == nil { + if flattened.parent.(*diskLayer).cancel == nil { return nil } } @@ -520,14 +520,10 @@ func diffToDisk(bottom *diffLayer) *diskLayer { var ( base = bottom.parent.(*diskLayer) batch = base.diskdb.NewBatch() - stats *generatorStats ) - // If the disk layer is running a snapshot generator, abort it - if base.genAbort != nil { - abort := make(chan *generatorStats) - base.genAbort <- abort - stats = <-abort - } + // Attempt to stop generation (if not already stopped) + base.stopGeneration() + // Put the deletion in the batch writer, flush all updates in the final step. rawdb.DeleteSnapshotRoot(batch) @@ -606,8 +602,8 @@ func diffToDisk(bottom *diffLayer) *diskLayer { // Update the snapshot block marker and write any remainder data rawdb.WriteSnapshotRoot(batch, bottom.root) - // Write out the generator progress marker and report - journalProgress(batch, base.genMarker, stats) + // Write out the generator progress marker + journalProgress(batch, base.genMarker, base.genStats) // Flush all the updates in the single db operation. Ensure the // disk layer transition is atomic. @@ -626,12 +622,13 @@ func diffToDisk(bottom *diffLayer) *diskLayer { // If snapshot generation hasn't finished yet, port over all the starts and // continue where the previous round left off. // - // Note, the `base.genAbort` comparison is not used normally, it's checked + // Note, the `base.genPending` comparison is not used normally, it's checked // to allow the tests to play with the marker without triggering this path. - if base.genMarker != nil && base.genAbort != nil { + if base.genMarker != nil && base.genPending != nil { res.genMarker = base.genMarker - res.genAbort = make(chan chan *generatorStats) - go res.generate(stats) + res.cancel = make(chan struct{}) + res.done = make(chan struct{}) + go res.generate(base.genStats) } return res }