mirror of
https://github.com/ethereum/go-ethereum.git
synced 2026-06-05 22:48:36 +00:00
core/state/snapshot: snapshot generation shutdown race condition (#33540)
## Overview
This PR fixes a race condition during blockchain shutdown where snapshot
generation could continue accessing the trie database after it has been
closed, leading to iterator errors. We noticed this in one of our nodes
on https://github.com/ava-labs/avalanchego, which relies on an older
version of geth with the same issue (so this behavior does happen!).
During node shutdown, the following sequence occurs:
1. `BlockChain.Stop()` calls `snaps.Release()` to clean up snapshot
resources
2. `Release()` only resets the cache but doesn't stop the generator
goroutine
3. The trie database is then closed via `triedb.Close()`
4. The still-running generator attempts to iterate storage tries
5. Iterator fails because the database is closed (`"Generator failed to
iterate storage trie"`)
## Problem
There are three related bugs:
1. `Release()` doesn't stop generation: The `diskLayer.Release()` method
only resets the cache without stopping ongoing snapshot generation,
leaving the generator goroutine running after database closure.
2. `stopGeneration()` has an incorrect completion check: The
`stopGeneration()` method checks `genMarker != nil` to determine if
generation is running. However, `genMarker` is set to nil when
generation completes successfully, even though the generator goroutine
is still waiting for the abort signal at the end of `generate()`. See
line 705 in `generate.go`:
eaaa5b716d/core/state/snapshot/generate.go (L699-L707)
This means `stopGeneration()` returns early without sending the abort
signal.
3. Node shutdown doesn't stop generation: During shutdown, no code path
calls `stopGeneration()` or sends the abort signal to the generator,
causing the generator to access a closed database and error.
## Fix
- Modified `diskLayer.Release()` to call `stopGeneration()` before
releasing resources
- Added cancelation architecture, removing reliance on someone having to
wait
- Fixed `stopGeneration()` to properly and safely stop snapshot
generation
- Added `TestGenerateGoroutineLeak` to verify the fix and prevent
regression. The test fails without the fix and passes with it.
- The test creates a snapshot with active generation, waits for
completion, then calls `Release()`, and uses `go.uber.org/goleak` to
assert no generator goroutine survives.
- Without the fix, the test fails: `Release()` returns without stopping
the generator, which stays parked at `generate.go:705` waiting for an
abort signal that never comes:
```
--- FAIL: TestGenerateGoroutineLeak (0.88s)
generate_test.go: found unexpected goroutines:
[Goroutine 6 in state chan receive, with
core/state/snapshot.(*diskLayer).generate on top of the stack:
core/state/snapshot.(*diskLayer).generate(...)
core/state/snapshot/generate.go:705
created by core/state/snapshot.generateSnapshot
core/state/snapshot/generate.go:79 ]
```
- With the fix, the test passes: `Release()` -> `stopGeneration()`
blocks until the generator goroutine has fully exited, so nothing leaks
Note that this fix follows the same pattern used in `Tree.Disable()` in
https://github.com/ethereum/go-ethereum/pull/30040, which introduced
`stopGeneration()` for use in `Disable()` and `Rebuild()` but didn't
address the shutdown path.
The test follows the same pattern used in
`TestCheckSimBackendGoroutineLeak`
This commit is contained in:
parent
f5c62d0552
commit
bc1967f088
6 changed files with 179 additions and 138 deletions
|
|
@ -127,6 +127,8 @@ go.opentelemetry.io/otel/sdk v1.41.0 h1:YPIEXKmiAwkGl3Gu1huk1aYWwtpRLeskpV+wPisx
|
||||||
go.opentelemetry.io/otel/sdk v1.41.0/go.mod h1:ahFdU0G5y8IxglBf0QBJXgSe7agzjE4GiTJ6HT9ud90=
|
go.opentelemetry.io/otel/sdk v1.41.0/go.mod h1:ahFdU0G5y8IxglBf0QBJXgSe7agzjE4GiTJ6HT9ud90=
|
||||||
go.opentelemetry.io/otel/trace v1.41.0 h1:Vbk2co6bhj8L59ZJ6/xFTskY+tGAbOnCtQGVVa9TIN0=
|
go.opentelemetry.io/otel/trace v1.41.0 h1:Vbk2co6bhj8L59ZJ6/xFTskY+tGAbOnCtQGVVa9TIN0=
|
||||||
go.opentelemetry.io/otel/trace v1.41.0/go.mod h1:U1NU4ULCoxeDKc09yCWdWe+3QoyweJcISEVa1RBzOis=
|
go.opentelemetry.io/otel/trace v1.41.0/go.mod h1:U1NU4ULCoxeDKc09yCWdWe+3QoyweJcISEVa1RBzOis=
|
||||||
|
go.uber.org/goleak v1.3.0 h1:2K3zAYmnTNqV73imy9J1T3WC+gmCePx2hEGkimedGto=
|
||||||
|
go.uber.org/goleak v1.3.0/go.mod h1:CoHD4mav9JJNrW/WLlf7HGZPjdw8EucARQHekz1X6bE=
|
||||||
golang.org/x/crypto v0.48.0 h1:/VRzVqiRSggnhY7gNRxPauEQ5Drw9haKdM0jqfcCFts=
|
golang.org/x/crypto v0.48.0 h1:/VRzVqiRSggnhY7gNRxPauEQ5Drw9haKdM0jqfcCFts=
|
||||||
golang.org/x/crypto v0.48.0/go.mod h1:r0kV5h3qnFPlQnBSrULhlsRfryS2pmewsg+XfMgkVos=
|
golang.org/x/crypto v0.48.0/go.mod h1:r0kV5h3qnFPlQnBSrULhlsRfryS2pmewsg+XfMgkVos=
|
||||||
golang.org/x/exp v0.0.0-20230626212559-97b1e661b5df h1:UA2aFVmmsIlefxMk29Dp2juaUSth8Pyn3Tq5Y5mJGME=
|
golang.org/x/exp v0.0.0-20230626212559-97b1e661b5df h1:UA2aFVmmsIlefxMk29Dp2juaUSth8Pyn3Tq5Y5mJGME=
|
||||||
|
|
|
||||||
|
|
@ -38,9 +38,17 @@ type diskLayer struct {
|
||||||
root common.Hash // Root hash of the base snapshot
|
root common.Hash // Root hash of the base snapshot
|
||||||
stale bool // Signals that the layer became stale (state progressed)
|
stale bool // Signals that the layer became stale (state progressed)
|
||||||
|
|
||||||
genMarker []byte // Marker for the state that's indexed during initial layer generation
|
genMarker []byte // Marker for the state that's indexed during initial layer generation
|
||||||
genPending chan struct{} // Notification channel when generation is done (test synchronicity)
|
genPending chan struct{} // Notification channel when generation is done (test synchronicity)
|
||||||
genAbort chan chan *generatorStats // Notification channel to abort generating the snapshot in this layer
|
|
||||||
|
// Generator lifecycle management:
|
||||||
|
// - [cancel] is closed to request termination (broadcast).
|
||||||
|
// - [done] is closed by the generator goroutine on exit.
|
||||||
|
cancel chan struct{}
|
||||||
|
done chan struct{}
|
||||||
|
cancelOnce sync.Once
|
||||||
|
|
||||||
|
genStats *generatorStats // Stats for snapshot generation (generation aborted/finished if non-nil)
|
||||||
|
|
||||||
lock sync.RWMutex
|
lock sync.RWMutex
|
||||||
}
|
}
|
||||||
|
|
@ -49,6 +57,10 @@ type diskLayer struct {
|
||||||
// Reset() in order to not leak memory.
|
// Reset() in order to not leak memory.
|
||||||
// OBS: It does not invoke Close on the diskdb
|
// OBS: It does not invoke Close on the diskdb
|
||||||
func (dl *diskLayer) Release() error {
|
func (dl *diskLayer) Release() error {
|
||||||
|
// Stop any ongoing snapshot generation to prevent it from accessing
|
||||||
|
// the database after it's closed during shutdown
|
||||||
|
dl.stopGeneration()
|
||||||
|
|
||||||
if dl.cache != nil {
|
if dl.cache != nil {
|
||||||
dl.cache.Reset()
|
dl.cache.Reset()
|
||||||
}
|
}
|
||||||
|
|
@ -184,17 +196,27 @@ func (dl *diskLayer) Update(blockHash common.Hash, accounts map[common.Hash][]by
|
||||||
return newDiffLayer(dl, blockHash, accounts, storage)
|
return newDiffLayer(dl, blockHash, accounts, storage)
|
||||||
}
|
}
|
||||||
|
|
||||||
// stopGeneration aborts the state snapshot generation if it is currently running.
|
// stopGeneration requests cancellation of any running snapshot generation and
|
||||||
|
// blocks until the generator goroutine (if running) has fully terminated.
|
||||||
|
//
|
||||||
|
// Concurrency guarantees:
|
||||||
|
// - Thread-safe: May be called concurrently from multiple goroutines
|
||||||
|
// - Idempotent: Safe to call multiple times; subsequent calls have no effect
|
||||||
|
// - Blocking: Returns only after the generator goroutine (if any) has exited
|
||||||
|
// - Safe to call at any time, including when no generation is running
|
||||||
|
//
|
||||||
|
// After return, it is **guaranteed** that:
|
||||||
|
// - The generator goroutine has terminated
|
||||||
|
// - It is safe to proceed with cleanup operations (e.g. closing databases)
|
||||||
func (dl *diskLayer) stopGeneration() {
|
func (dl *diskLayer) stopGeneration() {
|
||||||
dl.lock.RLock()
|
cancel := dl.cancel
|
||||||
generating := dl.genMarker != nil
|
done := dl.done
|
||||||
dl.lock.RUnlock()
|
if cancel == nil || done == nil {
|
||||||
if !generating {
|
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
if dl.genAbort != nil {
|
|
||||||
abort := make(chan *generatorStats)
|
dl.cancelOnce.Do(func() {
|
||||||
dl.genAbort <- abort
|
close(cancel)
|
||||||
<-abort
|
})
|
||||||
}
|
<-done
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -50,6 +50,9 @@ var (
|
||||||
// errMissingTrie is returned if the target trie is missing while the generation
|
// errMissingTrie is returned if the target trie is missing while the generation
|
||||||
// is running. In this case the generation is aborted and wait the new signal.
|
// is running. In this case the generation is aborted and wait the new signal.
|
||||||
errMissingTrie = errors.New("missing trie")
|
errMissingTrie = errors.New("missing trie")
|
||||||
|
|
||||||
|
// errAborted is returned when snapshot generation was interrupted/aborted
|
||||||
|
errAborted = errors.New("aborted")
|
||||||
)
|
)
|
||||||
|
|
||||||
// generateSnapshot regenerates a brand new snapshot based on an existing state
|
// generateSnapshot regenerates a brand new snapshot based on an existing state
|
||||||
|
|
@ -74,7 +77,8 @@ func generateSnapshot(diskdb ethdb.KeyValueStore, triedb *triedb.Database, cache
|
||||||
cache: fastcache.New(cache * 1024 * 1024),
|
cache: fastcache.New(cache * 1024 * 1024),
|
||||||
genMarker: genMarker,
|
genMarker: genMarker,
|
||||||
genPending: make(chan struct{}),
|
genPending: make(chan struct{}),
|
||||||
genAbort: make(chan chan *generatorStats),
|
cancel: make(chan struct{}),
|
||||||
|
done: make(chan struct{}),
|
||||||
}
|
}
|
||||||
go base.generate(stats)
|
go base.generate(stats)
|
||||||
log.Debug("Start snapshot generation", "root", root)
|
log.Debug("Start snapshot generation", "root", root)
|
||||||
|
|
@ -467,12 +471,14 @@ func (dl *diskLayer) generateRange(ctx *generatorContext, trieId *trie.ID, prefi
|
||||||
// checkAndFlush checks if an interruption signal is received or the
|
// checkAndFlush checks if an interruption signal is received or the
|
||||||
// batch size has exceeded the allowance.
|
// batch size has exceeded the allowance.
|
||||||
func (dl *diskLayer) checkAndFlush(ctx *generatorContext, current []byte) error {
|
func (dl *diskLayer) checkAndFlush(ctx *generatorContext, current []byte) error {
|
||||||
var abort chan *generatorStats
|
aborting := false
|
||||||
select {
|
select {
|
||||||
case abort = <-dl.genAbort:
|
case <-dl.cancel:
|
||||||
|
aborting = true
|
||||||
default:
|
default:
|
||||||
}
|
}
|
||||||
if ctx.batch.ValueSize() > ethdb.IdealBatchSize || abort != nil {
|
|
||||||
|
if ctx.batch.ValueSize() > ethdb.IdealBatchSize || aborting {
|
||||||
if bytes.Compare(current, dl.genMarker) < 0 {
|
if bytes.Compare(current, dl.genMarker) < 0 {
|
||||||
log.Error("Snapshot generator went backwards", "current", fmt.Sprintf("%x", current), "genMarker", fmt.Sprintf("%x", dl.genMarker))
|
log.Error("Snapshot generator went backwards", "current", fmt.Sprintf("%x", current), "genMarker", fmt.Sprintf("%x", dl.genMarker))
|
||||||
}
|
}
|
||||||
|
|
@ -490,9 +496,9 @@ func (dl *diskLayer) checkAndFlush(ctx *generatorContext, current []byte) error
|
||||||
dl.genMarker = current
|
dl.genMarker = current
|
||||||
dl.lock.Unlock()
|
dl.lock.Unlock()
|
||||||
|
|
||||||
if abort != nil {
|
if aborting {
|
||||||
ctx.stats.Log("Aborting state snapshot generation", dl.root, current)
|
ctx.stats.Log("Aborting state snapshot generation", dl.root, current)
|
||||||
return newAbortErr(abort) // bubble up an error for interruption
|
return errAborted
|
||||||
}
|
}
|
||||||
// Don't hold the iterators too long, release them to let compactor works
|
// Don't hold the iterators too long, release them to let compactor works
|
||||||
ctx.reopenIterator(snapAccount)
|
ctx.reopenIterator(snapAccount)
|
||||||
|
|
@ -648,10 +654,11 @@ func generateAccounts(ctx *generatorContext, dl *diskLayer, accMarker []byte) er
|
||||||
// gathering and logging, since the method surfs the blocks as they arrive, often
|
// gathering and logging, since the method surfs the blocks as they arrive, often
|
||||||
// being restarted.
|
// being restarted.
|
||||||
func (dl *diskLayer) generate(stats *generatorStats) {
|
func (dl *diskLayer) generate(stats *generatorStats) {
|
||||||
var (
|
if dl.done != nil {
|
||||||
accMarker []byte
|
defer close(dl.done)
|
||||||
abort chan *generatorStats
|
}
|
||||||
)
|
|
||||||
|
var accMarker []byte
|
||||||
if len(dl.genMarker) > 0 { // []byte{} is the start, use nil for that
|
if len(dl.genMarker) > 0 { // []byte{} is the start, use nil for that
|
||||||
accMarker = dl.genMarker[:common.HashLength]
|
accMarker = dl.genMarker[:common.HashLength]
|
||||||
}
|
}
|
||||||
|
|
@ -669,15 +676,11 @@ func (dl *diskLayer) generate(stats *generatorStats) {
|
||||||
defer ctx.close()
|
defer ctx.close()
|
||||||
|
|
||||||
if err := generateAccounts(ctx, dl, accMarker); err != nil {
|
if err := generateAccounts(ctx, dl, accMarker); err != nil {
|
||||||
// Extract the received interruption signal if exists
|
// Check if error was due to abort
|
||||||
if aerr, ok := err.(*abortErr); ok {
|
if err == errAborted {
|
||||||
abort = aerr.abort
|
stats.Log("Aborting state snapshot generation", dl.root, dl.genMarker)
|
||||||
}
|
}
|
||||||
// Aborted by internal error, wait the signal
|
dl.genStats = stats
|
||||||
if abort == nil {
|
|
||||||
abort = <-dl.genAbort
|
|
||||||
}
|
|
||||||
abort <- stats
|
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
// Snapshot fully generated, set the marker to nil.
|
// Snapshot fully generated, set the marker to nil.
|
||||||
|
|
@ -686,9 +689,7 @@ func (dl *diskLayer) generate(stats *generatorStats) {
|
||||||
journalProgress(ctx.batch, nil, stats)
|
journalProgress(ctx.batch, nil, stats)
|
||||||
if err := ctx.batch.Write(); err != nil {
|
if err := ctx.batch.Write(); err != nil {
|
||||||
log.Error("Failed to flush batch", "err", err)
|
log.Error("Failed to flush batch", "err", err)
|
||||||
|
dl.genStats = stats
|
||||||
abort = <-dl.genAbort
|
|
||||||
abort <- stats
|
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
ctx.batch.Reset()
|
ctx.batch.Reset()
|
||||||
|
|
@ -698,12 +699,9 @@ func (dl *diskLayer) generate(stats *generatorStats) {
|
||||||
|
|
||||||
dl.lock.Lock()
|
dl.lock.Lock()
|
||||||
dl.genMarker = nil
|
dl.genMarker = nil
|
||||||
|
dl.genStats = stats
|
||||||
close(dl.genPending)
|
close(dl.genPending)
|
||||||
dl.lock.Unlock()
|
dl.lock.Unlock()
|
||||||
|
|
||||||
// Someone will be looking for us, wait it out
|
|
||||||
abort = <-dl.genAbort
|
|
||||||
abort <- nil
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// increaseKey increase the input key by one bit. Return nil if the entire
|
// increaseKey increase the input key by one bit. Return nil if the entire
|
||||||
|
|
@ -717,17 +715,3 @@ func increaseKey(key []byte) []byte {
|
||||||
}
|
}
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// abortErr wraps an interruption signal received to represent the
|
|
||||||
// generation is aborted by external processes.
|
|
||||||
type abortErr struct {
|
|
||||||
abort chan *generatorStats
|
|
||||||
}
|
|
||||||
|
|
||||||
func newAbortErr(abort chan *generatorStats) error {
|
|
||||||
return &abortErr{abort: abort}
|
|
||||||
}
|
|
||||||
|
|
||||||
func (err *abortErr) Error() string {
|
|
||||||
return "aborted"
|
|
||||||
}
|
|
||||||
|
|
|
||||||
|
|
@ -35,6 +35,7 @@ import (
|
||||||
"github.com/ethereum/go-ethereum/triedb/hashdb"
|
"github.com/ethereum/go-ethereum/triedb/hashdb"
|
||||||
"github.com/ethereum/go-ethereum/triedb/pathdb"
|
"github.com/ethereum/go-ethereum/triedb/pathdb"
|
||||||
"github.com/holiman/uint256"
|
"github.com/holiman/uint256"
|
||||||
|
"go.uber.org/goleak"
|
||||||
)
|
)
|
||||||
|
|
||||||
func hashData(input []byte) common.Hash {
|
func hashData(input []byte) common.Hash {
|
||||||
|
|
@ -74,10 +75,10 @@ func testGeneration(t *testing.T, scheme string) {
|
||||||
}
|
}
|
||||||
checkSnapRoot(t, snap, root)
|
checkSnapRoot(t, snap, root)
|
||||||
|
|
||||||
// Signal abortion to the generator and wait for it to tear down
|
// Stop the generator (if still running) and wait for it to exit.
|
||||||
stop := make(chan *generatorStats)
|
if err := snap.Release(); err != nil {
|
||||||
snap.genAbort <- stop
|
t.Fatal(err)
|
||||||
<-stop
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Tests that snapshot generation with existent flat state.
|
// Tests that snapshot generation with existent flat state.
|
||||||
|
|
@ -115,10 +116,10 @@ func testGenerateExistentState(t *testing.T, scheme string) {
|
||||||
}
|
}
|
||||||
checkSnapRoot(t, snap, root)
|
checkSnapRoot(t, snap, root)
|
||||||
|
|
||||||
// Signal abortion to the generator and wait for it to tear down
|
// Stop the generator (if still running) and wait for it to exit.
|
||||||
stop := make(chan *generatorStats)
|
if err := snap.Release(); err != nil {
|
||||||
snap.genAbort <- stop
|
t.Fatal(err)
|
||||||
<-stop
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func checkSnapRoot(t *testing.T, snap *diskLayer, trieRoot common.Hash) {
|
func checkSnapRoot(t *testing.T, snap *diskLayer, trieRoot common.Hash) {
|
||||||
|
|
@ -351,10 +352,10 @@ func testGenerateExistentStateWithWrongStorage(t *testing.T, scheme string) {
|
||||||
t.Errorf("Snapshot generation failed")
|
t.Errorf("Snapshot generation failed")
|
||||||
}
|
}
|
||||||
checkSnapRoot(t, snap, root)
|
checkSnapRoot(t, snap, root)
|
||||||
// Signal abortion to the generator and wait for it to tear down
|
// Stop the generator (if still running) and wait for it to exit.
|
||||||
stop := make(chan *generatorStats)
|
if err := snap.Release(); err != nil {
|
||||||
snap.genAbort <- stop
|
t.Fatal(err)
|
||||||
<-stop
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Tests that snapshot generation with existent flat state, where the flat state
|
// Tests that snapshot generation with existent flat state, where the flat state
|
||||||
|
|
@ -414,10 +415,10 @@ func testGenerateExistentStateWithWrongAccounts(t *testing.T, scheme string) {
|
||||||
}
|
}
|
||||||
checkSnapRoot(t, snap, root)
|
checkSnapRoot(t, snap, root)
|
||||||
|
|
||||||
// Signal abortion to the generator and wait for it to tear down
|
// Stop the generator (if still running) and wait for it to exit.
|
||||||
stop := make(chan *generatorStats)
|
if err := snap.Release(); err != nil {
|
||||||
snap.genAbort <- stop
|
t.Fatal(err)
|
||||||
<-stop
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Tests that snapshot generation errors out correctly in case of a missing trie
|
// Tests that snapshot generation errors out correctly in case of a missing trie
|
||||||
|
|
@ -454,10 +455,10 @@ func testGenerateCorruptAccountTrie(t *testing.T, scheme string) {
|
||||||
case <-time.After(time.Second):
|
case <-time.After(time.Second):
|
||||||
// Not generated fast enough, hopefully blocked inside on missing trie node fail
|
// Not generated fast enough, hopefully blocked inside on missing trie node fail
|
||||||
}
|
}
|
||||||
// Signal abortion to the generator and wait for it to tear down
|
// Stop the generator (if still running) and wait for it to exit.
|
||||||
stop := make(chan *generatorStats)
|
if err := snap.Release(); err != nil {
|
||||||
snap.genAbort <- stop
|
t.Fatal(err)
|
||||||
<-stop
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Tests that snapshot generation errors out correctly in case of a missing root
|
// Tests that snapshot generation errors out correctly in case of a missing root
|
||||||
|
|
@ -498,10 +499,10 @@ func testGenerateMissingStorageTrie(t *testing.T, scheme string) {
|
||||||
case <-time.After(time.Second):
|
case <-time.After(time.Second):
|
||||||
// Not generated fast enough, hopefully blocked inside on missing trie node fail
|
// Not generated fast enough, hopefully blocked inside on missing trie node fail
|
||||||
}
|
}
|
||||||
// Signal abortion to the generator and wait for it to tear down
|
// Stop the generator (if still running) and wait for it to exit.
|
||||||
stop := make(chan *generatorStats)
|
if err := snap.Release(); err != nil {
|
||||||
snap.genAbort <- stop
|
t.Fatal(err)
|
||||||
<-stop
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Tests that snapshot generation errors out correctly in case of a missing trie
|
// Tests that snapshot generation errors out correctly in case of a missing trie
|
||||||
|
|
@ -540,10 +541,10 @@ func testGenerateCorruptStorageTrie(t *testing.T, scheme string) {
|
||||||
case <-time.After(time.Second):
|
case <-time.After(time.Second):
|
||||||
// Not generated fast enough, hopefully blocked inside on missing trie node fail
|
// Not generated fast enough, hopefully blocked inside on missing trie node fail
|
||||||
}
|
}
|
||||||
// Signal abortion to the generator and wait for it to tear down
|
// Stop the generator (if still running) and wait for it to exit.
|
||||||
stop := make(chan *generatorStats)
|
if err := snap.Release(); err != nil {
|
||||||
snap.genAbort <- stop
|
t.Fatal(err)
|
||||||
<-stop
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Tests that snapshot generation when an extra account with storage exists in the snap state.
|
// Tests that snapshot generation when an extra account with storage exists in the snap state.
|
||||||
|
|
@ -605,10 +606,10 @@ func testGenerateWithExtraAccounts(t *testing.T, scheme string) {
|
||||||
}
|
}
|
||||||
checkSnapRoot(t, snap, root)
|
checkSnapRoot(t, snap, root)
|
||||||
|
|
||||||
// Signal abortion to the generator and wait for it to tear down
|
// Stop the generator (if still running) and wait for it to exit.
|
||||||
stop := make(chan *generatorStats)
|
if err := snap.Release(); err != nil {
|
||||||
snap.genAbort <- stop
|
t.Fatal(err)
|
||||||
<-stop
|
}
|
||||||
// If we now inspect the snap db, there should exist no extraneous storage items
|
// If we now inspect the snap db, there should exist no extraneous storage items
|
||||||
if data := rawdb.ReadStorageSnapshot(helper.diskdb, hashData([]byte("acc-2")), hashData([]byte("b-key-1"))); data != nil {
|
if data := rawdb.ReadStorageSnapshot(helper.diskdb, hashData([]byte("acc-2")), hashData([]byte("b-key-1"))); data != nil {
|
||||||
t.Fatalf("expected slot to be removed, got %v", string(data))
|
t.Fatalf("expected slot to be removed, got %v", string(data))
|
||||||
|
|
@ -666,10 +667,10 @@ func testGenerateWithManyExtraAccounts(t *testing.T, scheme string) {
|
||||||
t.Errorf("Snapshot generation failed")
|
t.Errorf("Snapshot generation failed")
|
||||||
}
|
}
|
||||||
checkSnapRoot(t, snap, root)
|
checkSnapRoot(t, snap, root)
|
||||||
// Signal abortion to the generator and wait for it to tear down
|
// Stop the generator (if still running) and wait for it to exit.
|
||||||
stop := make(chan *generatorStats)
|
if err := snap.Release(); err != nil {
|
||||||
snap.genAbort <- stop
|
t.Fatal(err)
|
||||||
<-stop
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Tests this case
|
// Tests this case
|
||||||
|
|
@ -715,10 +716,10 @@ func testGenerateWithExtraBeforeAndAfter(t *testing.T, scheme string) {
|
||||||
t.Errorf("Snapshot generation failed")
|
t.Errorf("Snapshot generation failed")
|
||||||
}
|
}
|
||||||
checkSnapRoot(t, snap, root)
|
checkSnapRoot(t, snap, root)
|
||||||
// Signal abortion to the generator and wait for it to tear down
|
// Stop the generator (if still running) and wait for it to exit.
|
||||||
stop := make(chan *generatorStats)
|
if err := snap.Release(); err != nil {
|
||||||
snap.genAbort <- stop
|
t.Fatal(err)
|
||||||
<-stop
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// TestGenerateWithMalformedSnapdata tests what happes if we have some junk
|
// TestGenerateWithMalformedSnapdata tests what happes if we have some junk
|
||||||
|
|
@ -755,10 +756,10 @@ func testGenerateWithMalformedSnapdata(t *testing.T, scheme string) {
|
||||||
t.Errorf("Snapshot generation failed")
|
t.Errorf("Snapshot generation failed")
|
||||||
}
|
}
|
||||||
checkSnapRoot(t, snap, root)
|
checkSnapRoot(t, snap, root)
|
||||||
// Signal abortion to the generator and wait for it to tear down
|
// Stop the generator (if still running) and wait for it to exit.
|
||||||
stop := make(chan *generatorStats)
|
if err := snap.Release(); err != nil {
|
||||||
snap.genAbort <- stop
|
t.Fatal(err)
|
||||||
<-stop
|
}
|
||||||
// If we now inspect the snap db, there should exist no extraneous storage items
|
// If we now inspect the snap db, there should exist no extraneous storage items
|
||||||
if data := rawdb.ReadStorageSnapshot(helper.diskdb, hashData([]byte("acc-2")), hashData([]byte("b-key-1"))); data != nil {
|
if data := rawdb.ReadStorageSnapshot(helper.diskdb, hashData([]byte("acc-2")), hashData([]byte("b-key-1"))); data != nil {
|
||||||
t.Fatalf("expected slot to be removed, got %v", string(data))
|
t.Fatalf("expected slot to be removed, got %v", string(data))
|
||||||
|
|
@ -792,10 +793,10 @@ func testGenerateFromEmptySnap(t *testing.T, scheme string) {
|
||||||
t.Errorf("Snapshot generation failed")
|
t.Errorf("Snapshot generation failed")
|
||||||
}
|
}
|
||||||
checkSnapRoot(t, snap, root)
|
checkSnapRoot(t, snap, root)
|
||||||
// Signal abortion to the generator and wait for it to tear down
|
// Stop the generator (if still running) and wait for it to exit.
|
||||||
stop := make(chan *generatorStats)
|
if err := snap.Release(); err != nil {
|
||||||
snap.genAbort <- stop
|
t.Fatal(err)
|
||||||
<-stop
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Tests that snapshot generation with existent flat state, where the flat state
|
// Tests that snapshot generation with existent flat state, where the flat state
|
||||||
|
|
@ -843,10 +844,10 @@ func testGenerateWithIncompleteStorage(t *testing.T, scheme string) {
|
||||||
t.Errorf("Snapshot generation failed")
|
t.Errorf("Snapshot generation failed")
|
||||||
}
|
}
|
||||||
checkSnapRoot(t, snap, root)
|
checkSnapRoot(t, snap, root)
|
||||||
// Signal abortion to the generator and wait for it to tear down
|
// Stop the generator (if still running) and wait for it to exit.
|
||||||
stop := make(chan *generatorStats)
|
if err := snap.Release(); err != nil {
|
||||||
snap.genAbort <- stop
|
t.Fatal(err)
|
||||||
<-stop
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func incKey(key []byte) []byte {
|
func incKey(key []byte) []byte {
|
||||||
|
|
@ -939,10 +940,10 @@ func testGenerateCompleteSnapshotWithDanglingStorage(t *testing.T, scheme string
|
||||||
}
|
}
|
||||||
checkSnapRoot(t, snap, root)
|
checkSnapRoot(t, snap, root)
|
||||||
|
|
||||||
// Signal abortion to the generator and wait for it to tear down
|
// Stop the generator (if still running) and wait for it to exit.
|
||||||
stop := make(chan *generatorStats)
|
if err := snap.Release(); err != nil {
|
||||||
snap.genAbort <- stop
|
t.Fatal(err)
|
||||||
<-stop
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Tests that snapshot generation with dangling storages. Dangling storage means
|
// Tests that snapshot generation with dangling storages. Dangling storage means
|
||||||
|
|
@ -976,8 +977,49 @@ func testGenerateBrokenSnapshotWithDanglingStorage(t *testing.T, scheme string)
|
||||||
}
|
}
|
||||||
checkSnapRoot(t, snap, root)
|
checkSnapRoot(t, snap, root)
|
||||||
|
|
||||||
// Signal abortion to the generator and wait for it to tear down
|
// Stop the generator (if still running) and wait for it to exit.
|
||||||
stop := make(chan *generatorStats)
|
if err := snap.Release(); err != nil {
|
||||||
snap.genAbort <- stop
|
t.Fatal(err)
|
||||||
<-stop
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// TestGenerateGoroutineLeak verifies that Release() tears down the generator
|
||||||
|
// goroutine. Even after generation completes, the goroutine parks waiting for
|
||||||
|
// an abort signal. If Release() does not stop it, it lingers and can touch the
|
||||||
|
// database after it has been closed during shutdown.
|
||||||
|
func TestGenerateGoroutineLeak(t *testing.T) {
|
||||||
|
testGenerateGoroutineLeak(t, rawdb.HashScheme)
|
||||||
|
testGenerateGoroutineLeak(t, rawdb.PathScheme)
|
||||||
|
}
|
||||||
|
|
||||||
|
// generateAndRelease builds a minimal state, runs snapshot generation to
|
||||||
|
// completion, and releases the resulting disk layer.
|
||||||
|
func generateAndRelease(t *testing.T, scheme string) {
|
||||||
|
t.Helper()
|
||||||
|
|
||||||
|
helper := newHelper(scheme)
|
||||||
|
helper.addTrieAccount("acc-1", &types.StateAccount{Balance: uint256.NewInt(1), Root: types.EmptyRootHash, CodeHash: types.EmptyCodeHash.Bytes()})
|
||||||
|
|
||||||
|
_, snap := helper.CommitAndGenerate()
|
||||||
|
|
||||||
|
// Wait for generation to run to completion.
|
||||||
|
select {
|
||||||
|
case <-snap.genPending:
|
||||||
|
case <-time.After(3 * time.Second):
|
||||||
|
t.Fatal("snapshot generation did not complete in time")
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := snap.Release(); err != nil {
|
||||||
|
t.Fatalf("Release returned error: %v", err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func testGenerateGoroutineLeak(t *testing.T, scheme string) {
|
||||||
|
generateAndRelease(t, scheme)
|
||||||
|
|
||||||
|
// Snapshot the current goroutines now berfore verifying the run
|
||||||
|
// below leaks none of its own.
|
||||||
|
defer goleak.VerifyNone(t, goleak.IgnoreCurrent())
|
||||||
|
|
||||||
|
generateAndRelease(t, scheme)
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -179,7 +179,8 @@ func loadSnapshot(diskdb ethdb.KeyValueStore, triedb *triedb.Database, root comm
|
||||||
// if the background generation is allowed
|
// if the background generation is allowed
|
||||||
if !generator.Done && !noBuild {
|
if !generator.Done && !noBuild {
|
||||||
base.genPending = make(chan struct{})
|
base.genPending = make(chan struct{})
|
||||||
base.genAbort = make(chan chan *generatorStats)
|
base.cancel = make(chan struct{})
|
||||||
|
base.done = make(chan struct{})
|
||||||
|
|
||||||
var origin uint64
|
var origin uint64
|
||||||
if len(generator.Marker) >= 8 {
|
if len(generator.Marker) >= 8 {
|
||||||
|
|
@ -199,16 +200,9 @@ func loadSnapshot(diskdb ethdb.KeyValueStore, triedb *triedb.Database, root comm
|
||||||
// Journal terminates any in-progress snapshot generation, also implicitly pushing
|
// Journal terminates any in-progress snapshot generation, also implicitly pushing
|
||||||
// the progress into the database.
|
// the progress into the database.
|
||||||
func (dl *diskLayer) Journal(buffer *bytes.Buffer) (common.Hash, error) {
|
func (dl *diskLayer) Journal(buffer *bytes.Buffer) (common.Hash, error) {
|
||||||
// If the snapshot is currently being generated, abort it
|
// If the snapshot is currently being generated, stop it
|
||||||
var stats *generatorStats
|
dl.stopGeneration()
|
||||||
if dl.genAbort != nil {
|
|
||||||
abort := make(chan *generatorStats)
|
|
||||||
dl.genAbort <- abort
|
|
||||||
|
|
||||||
if stats = <-abort; stats != nil {
|
|
||||||
stats.Log("Journalling in-progress snapshot", dl.root, dl.genMarker)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
// Ensure the layer didn't get stale
|
// Ensure the layer didn't get stale
|
||||||
dl.lock.RLock()
|
dl.lock.RLock()
|
||||||
defer dl.lock.RUnlock()
|
defer dl.lock.RUnlock()
|
||||||
|
|
@ -216,8 +210,8 @@ func (dl *diskLayer) Journal(buffer *bytes.Buffer) (common.Hash, error) {
|
||||||
if dl.stale {
|
if dl.stale {
|
||||||
return common.Hash{}, ErrSnapshotStale
|
return common.Hash{}, ErrSnapshotStale
|
||||||
}
|
}
|
||||||
// Ensure the generator stats is written even if none was ran this cycle
|
// Ensure the generator marker is written even if none was ran this cycle
|
||||||
journalProgress(dl.diskdb, dl.genMarker, stats)
|
journalProgress(dl.diskdb, dl.genMarker, dl.genStats)
|
||||||
|
|
||||||
log.Debug("Journalled disk layer", "root", dl.root)
|
log.Debug("Journalled disk layer", "root", dl.root)
|
||||||
return dl.root, nil
|
return dl.root, nil
|
||||||
|
|
|
||||||
|
|
@ -492,7 +492,7 @@ func (t *Tree) cap(diff *diffLayer, layers int) *diskLayer {
|
||||||
// there's a snapshot being generated currently. In that case, the trie
|
// there's a snapshot being generated currently. In that case, the trie
|
||||||
// will move from underneath the generator so we **must** merge all the
|
// will move from underneath the generator so we **must** merge all the
|
||||||
// partial data down into the snapshot and restart the generation.
|
// partial data down into the snapshot and restart the generation.
|
||||||
if flattened.parent.(*diskLayer).genAbort == nil {
|
if flattened.parent.(*diskLayer).cancel == nil {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
@ -520,14 +520,10 @@ func diffToDisk(bottom *diffLayer) *diskLayer {
|
||||||
var (
|
var (
|
||||||
base = bottom.parent.(*diskLayer)
|
base = bottom.parent.(*diskLayer)
|
||||||
batch = base.diskdb.NewBatch()
|
batch = base.diskdb.NewBatch()
|
||||||
stats *generatorStats
|
|
||||||
)
|
)
|
||||||
// If the disk layer is running a snapshot generator, abort it
|
// Attempt to stop generation (if not already stopped)
|
||||||
if base.genAbort != nil {
|
base.stopGeneration()
|
||||||
abort := make(chan *generatorStats)
|
|
||||||
base.genAbort <- abort
|
|
||||||
stats = <-abort
|
|
||||||
}
|
|
||||||
// Put the deletion in the batch writer, flush all updates in the final step.
|
// Put the deletion in the batch writer, flush all updates in the final step.
|
||||||
rawdb.DeleteSnapshotRoot(batch)
|
rawdb.DeleteSnapshotRoot(batch)
|
||||||
|
|
||||||
|
|
@ -606,8 +602,8 @@ func diffToDisk(bottom *diffLayer) *diskLayer {
|
||||||
// Update the snapshot block marker and write any remainder data
|
// Update the snapshot block marker and write any remainder data
|
||||||
rawdb.WriteSnapshotRoot(batch, bottom.root)
|
rawdb.WriteSnapshotRoot(batch, bottom.root)
|
||||||
|
|
||||||
// Write out the generator progress marker and report
|
// Write out the generator progress marker
|
||||||
journalProgress(batch, base.genMarker, stats)
|
journalProgress(batch, base.genMarker, base.genStats)
|
||||||
|
|
||||||
// Flush all the updates in the single db operation. Ensure the
|
// Flush all the updates in the single db operation. Ensure the
|
||||||
// disk layer transition is atomic.
|
// disk layer transition is atomic.
|
||||||
|
|
@ -626,12 +622,13 @@ func diffToDisk(bottom *diffLayer) *diskLayer {
|
||||||
// If snapshot generation hasn't finished yet, port over all the starts and
|
// If snapshot generation hasn't finished yet, port over all the starts and
|
||||||
// continue where the previous round left off.
|
// continue where the previous round left off.
|
||||||
//
|
//
|
||||||
// Note, the `base.genAbort` comparison is not used normally, it's checked
|
// Note, the `base.genPending` comparison is not used normally, it's checked
|
||||||
// to allow the tests to play with the marker without triggering this path.
|
// to allow the tests to play with the marker without triggering this path.
|
||||||
if base.genMarker != nil && base.genAbort != nil {
|
if base.genMarker != nil && base.genPending != nil {
|
||||||
res.genMarker = base.genMarker
|
res.genMarker = base.genMarker
|
||||||
res.genAbort = make(chan chan *generatorStats)
|
res.cancel = make(chan struct{})
|
||||||
go res.generate(stats)
|
res.done = make(chan struct{})
|
||||||
|
go res.generate(base.genStats)
|
||||||
}
|
}
|
||||||
return res
|
return res
|
||||||
}
|
}
|
||||||
|
|
|
||||||
Loading…
Reference in a new issue