mirror of
https://github.com/ethereum/go-ethereum.git
synced 2026-06-05 22:48:36 +00:00
## Overview
This PR fixes a race condition during blockchain shutdown where snapshot
generation could continue accessing the trie database after it has been
closed, leading to iterator errors. We noticed this in one of our nodes
on https://github.com/ava-labs/avalanchego, which relies on an older
version of geth with the same issue (so this behavior does happen!).
During node shutdown, the following sequence occurs:
1. `BlockChain.Stop()` calls `snaps.Release()` to clean up snapshot
resources
2. `Release()` only resets the cache but doesn't stop the generator
goroutine
3. The trie database is then closed via `triedb.Close()`
4. The still-running generator attempts to iterate storage tries
5. Iterator fails because the database is closed (`"Generator failed to
iterate storage trie"`)
## Problem
There are three related bugs:
1. `Release()` doesn't stop generation: The `diskLayer.Release()` method
only resets the cache without stopping ongoing snapshot generation,
leaving the generator goroutine running after database closure.
2. `stopGeneration()` has an incorrect completion check: The
`stopGeneration()` method checks `genMarker != nil` to determine if
generation is running. However, `genMarker` is set to nil when
generation completes successfully, even though the generator goroutine
is still waiting for the abort signal at the end of `generate()`. See
line 705 in `generate.go`:
eaaa5b716d/core/state/snapshot/generate.go (L699-L707)
This means `stopGeneration()` returns early without sending the abort
signal.
3. Node shutdown doesn't stop generation: During shutdown, no code path
calls `stopGeneration()` or sends the abort signal to the generator,
causing the generator to access a closed database and error.
## Fix
- Modified `diskLayer.Release()` to call `stopGeneration()` before
releasing resources
- Added cancelation architecture, removing reliance on someone having to
wait
- Fixed `stopGeneration()` to properly and safely stop snapshot
generation
- Added `TestGenerateGoroutineLeak` to verify the fix and prevent
regression. The test fails without the fix and passes with it.
- The test creates a snapshot with active generation, waits for
completion, then calls `Release()`, and uses `go.uber.org/goleak` to
assert no generator goroutine survives.
- Without the fix, the test fails: `Release()` returns without stopping
the generator, which stays parked at `generate.go:705` waiting for an
abort signal that never comes:
```
--- FAIL: TestGenerateGoroutineLeak (0.88s)
generate_test.go: found unexpected goroutines:
[Goroutine 6 in state chan receive, with
core/state/snapshot.(*diskLayer).generate on top of the stack:
core/state/snapshot.(*diskLayer).generate(...)
core/state/snapshot/generate.go:705
created by core/state/snapshot.generateSnapshot
core/state/snapshot/generate.go:79 ]
```
- With the fix, the test passes: `Release()` -> `stopGeneration()`
blocks until the generator goroutine has fully exited, so nothing leaks
Note that this fix follows the same pattern used in `Tree.Disable()` in
https://github.com/ethereum/go-ethereum/pull/30040, which introduced
`stopGeneration()` for use in `Disable()` and `Rebuild()` but didn't
address the shutdown path.
The test follows the same pattern used in
`TestCheckSimBackendGoroutineLeak`
379 lines
14 KiB
Go
379 lines
14 KiB
Go
// Copyright 2019 The go-ethereum Authors
|
|
// This file is part of the go-ethereum library.
|
|
//
|
|
// The go-ethereum library is free software: you can redistribute it and/or modify
|
|
// it under the terms of the GNU Lesser General Public License as published by
|
|
// the Free Software Foundation, either version 3 of the License, or
|
|
// (at your option) any later version.
|
|
//
|
|
// The go-ethereum library is distributed in the hope that it will be useful,
|
|
// but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
|
// GNU Lesser General Public License for more details.
|
|
//
|
|
// You should have received a copy of the GNU Lesser General Public License
|
|
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
|
|
|
|
package snapshot
|
|
|
|
import (
|
|
"bytes"
|
|
"encoding/binary"
|
|
"errors"
|
|
"fmt"
|
|
"io"
|
|
"time"
|
|
|
|
"github.com/VictoriaMetrics/fastcache"
|
|
"github.com/ethereum/go-ethereum/common"
|
|
"github.com/ethereum/go-ethereum/core/rawdb"
|
|
"github.com/ethereum/go-ethereum/ethdb"
|
|
"github.com/ethereum/go-ethereum/log"
|
|
"github.com/ethereum/go-ethereum/rlp"
|
|
"github.com/ethereum/go-ethereum/triedb"
|
|
)
|
|
|
|
const (
|
|
journalV0 uint64 = 0 // initial version
|
|
journalV1 uint64 = 1 // current version, with destruct flag (in diff layers) removed
|
|
journalCurrentVersion = journalV1
|
|
)
|
|
|
|
// journalGenerator is a disk layer entry containing the generator progress marker.
|
|
type journalGenerator struct {
|
|
// Indicator that whether the database was in progress of being wiped.
|
|
// It's deprecated but keep it here for background compatibility.
|
|
Wiping bool
|
|
|
|
Done bool // Whether the generator finished creating the snapshot
|
|
Marker []byte
|
|
Accounts uint64
|
|
Slots uint64
|
|
Storage uint64
|
|
}
|
|
|
|
// journalDestruct is an account deletion entry in a diffLayer's disk journal.
|
|
type journalDestruct struct {
|
|
Hash common.Hash
|
|
}
|
|
|
|
// journalAccount is an account entry in a diffLayer's disk journal.
|
|
type journalAccount struct {
|
|
Hash common.Hash
|
|
Blob []byte
|
|
}
|
|
|
|
// journalStorage is an account's storage map in a diffLayer's disk journal.
|
|
type journalStorage struct {
|
|
Hash common.Hash
|
|
Keys []common.Hash
|
|
Vals [][]byte
|
|
}
|
|
|
|
func ParseGeneratorStatus(generatorBlob []byte) string {
|
|
if len(generatorBlob) == 0 {
|
|
return ""
|
|
}
|
|
var generator journalGenerator
|
|
if err := rlp.DecodeBytes(generatorBlob, &generator); err != nil {
|
|
log.Warn("failed to decode snapshot generator", "err", err)
|
|
return ""
|
|
}
|
|
// Figure out whether we're after or within an account
|
|
var m string
|
|
switch marker := generator.Marker; len(marker) {
|
|
case common.HashLength:
|
|
m = fmt.Sprintf("at %#x", marker)
|
|
case 2 * common.HashLength:
|
|
m = fmt.Sprintf("in %#x at %#x", marker[:common.HashLength], marker[common.HashLength:])
|
|
default:
|
|
m = fmt.Sprintf("%#x", marker)
|
|
}
|
|
return fmt.Sprintf(`Done: %v, Accounts: %d, Slots: %d, Storage: %d, Marker: %s`,
|
|
generator.Done, generator.Accounts, generator.Slots, generator.Storage, m)
|
|
}
|
|
|
|
// loadAndParseJournal tries to parse the snapshot journal in latest format.
|
|
func loadAndParseJournal(db ethdb.KeyValueStore, base *diskLayer) (snapshot, journalGenerator, error) {
|
|
// Retrieve the disk layer generator. It must exist, no matter the
|
|
// snapshot is fully generated or not. Otherwise the entire disk
|
|
// layer is invalid.
|
|
generatorBlob := rawdb.ReadSnapshotGenerator(db)
|
|
if len(generatorBlob) == 0 {
|
|
return nil, journalGenerator{}, errors.New("missing snapshot generator")
|
|
}
|
|
var generator journalGenerator
|
|
if err := rlp.DecodeBytes(generatorBlob, &generator); err != nil {
|
|
return nil, journalGenerator{}, fmt.Errorf("failed to decode snapshot generator: %v", err)
|
|
}
|
|
// Retrieve the diff layer journal. It's possible that the journal is
|
|
// not existent, e.g. the disk layer is generating while that the Geth
|
|
// crashes without persisting the diff journal.
|
|
// So if there is no journal, or the journal is invalid(e.g. the journal
|
|
// is not matched with disk layer; or the it's the legacy-format journal,
|
|
// etc.), we just discard all diffs and try to recover them later.
|
|
var current snapshot = base
|
|
err := iterateJournal(db, func(parent common.Hash, root common.Hash, accountData map[common.Hash][]byte, storageData map[common.Hash]map[common.Hash][]byte) error {
|
|
current = newDiffLayer(current, root, accountData, storageData)
|
|
return nil
|
|
})
|
|
if err != nil {
|
|
return base, generator, nil
|
|
}
|
|
return current, generator, nil
|
|
}
|
|
|
|
// loadSnapshot loads a pre-existing state snapshot backed by a key-value store.
|
|
func loadSnapshot(diskdb ethdb.KeyValueStore, triedb *triedb.Database, root common.Hash, cache int, recovery bool, noBuild bool) (snapshot, bool, error) {
|
|
// If snapshotting is disabled (initial sync in progress), don't do anything,
|
|
// wait for the chain to permit us to do something meaningful
|
|
if rawdb.ReadSnapshotDisabled(diskdb) {
|
|
return nil, true, nil
|
|
}
|
|
// Retrieve the block number and hash of the snapshot, failing if no snapshot
|
|
// is present in the database (or crashed mid-update).
|
|
baseRoot := rawdb.ReadSnapshotRoot(diskdb)
|
|
if baseRoot == (common.Hash{}) {
|
|
return nil, false, errors.New("missing or corrupted snapshot")
|
|
}
|
|
base := &diskLayer{
|
|
diskdb: diskdb,
|
|
triedb: triedb,
|
|
cache: fastcache.New(cache * 1024 * 1024),
|
|
root: baseRoot,
|
|
}
|
|
snapshot, generator, err := loadAndParseJournal(diskdb, base)
|
|
if err != nil {
|
|
log.Warn("Failed to load journal", "error", err)
|
|
return nil, false, err
|
|
}
|
|
// Entire snapshot journal loaded, sanity check the head. If the loaded
|
|
// snapshot is not matched with current state root, print a warning log
|
|
// or discard the entire snapshot it's legacy snapshot.
|
|
//
|
|
// Possible scenario: Geth was crashed without persisting journal and then
|
|
// restart, the head is rewound to the point with available state(trie)
|
|
// which is below the snapshot. In this case the snapshot can be recovered
|
|
// by re-executing blocks but right now it's unavailable.
|
|
if head := snapshot.Root(); head != root {
|
|
// If it's legacy snapshot, or it's new-format snapshot but
|
|
// it's not in recovery mode, returns the error here for
|
|
// rebuilding the entire snapshot forcibly.
|
|
if !recovery {
|
|
return nil, false, fmt.Errorf("head doesn't match snapshot: have %#x, want %#x", head, root)
|
|
}
|
|
// It's in snapshot recovery, the assumption is held that
|
|
// the disk layer is always higher than chain head. It can
|
|
// be eventually recovered when the chain head beyonds the
|
|
// disk layer.
|
|
log.Warn("Snapshot is not continuous with chain", "snaproot", head, "chainroot", root)
|
|
}
|
|
// Load the disk layer status from the generator if it's not complete
|
|
if !generator.Done {
|
|
base.genMarker = generator.Marker
|
|
if base.genMarker == nil {
|
|
base.genMarker = []byte{}
|
|
}
|
|
}
|
|
// Everything loaded correctly, resume any suspended operations
|
|
// if the background generation is allowed
|
|
if !generator.Done && !noBuild {
|
|
base.genPending = make(chan struct{})
|
|
base.cancel = make(chan struct{})
|
|
base.done = make(chan struct{})
|
|
|
|
var origin uint64
|
|
if len(generator.Marker) >= 8 {
|
|
origin = binary.BigEndian.Uint64(generator.Marker)
|
|
}
|
|
go base.generate(&generatorStats{
|
|
origin: origin,
|
|
start: time.Now(),
|
|
accounts: generator.Accounts,
|
|
slots: generator.Slots,
|
|
storage: common.StorageSize(generator.Storage),
|
|
})
|
|
}
|
|
return snapshot, false, nil
|
|
}
|
|
|
|
// Journal terminates any in-progress snapshot generation, also implicitly pushing
|
|
// the progress into the database.
|
|
func (dl *diskLayer) Journal(buffer *bytes.Buffer) (common.Hash, error) {
|
|
// If the snapshot is currently being generated, stop it
|
|
dl.stopGeneration()
|
|
|
|
// Ensure the layer didn't get stale
|
|
dl.lock.RLock()
|
|
defer dl.lock.RUnlock()
|
|
|
|
if dl.stale {
|
|
return common.Hash{}, ErrSnapshotStale
|
|
}
|
|
// Ensure the generator marker is written even if none was ran this cycle
|
|
journalProgress(dl.diskdb, dl.genMarker, dl.genStats)
|
|
|
|
log.Debug("Journalled disk layer", "root", dl.root)
|
|
return dl.root, nil
|
|
}
|
|
|
|
// Journal writes the memory layer contents into a buffer to be stored in the
|
|
// database as the snapshot journal.
|
|
func (dl *diffLayer) Journal(buffer *bytes.Buffer) (common.Hash, error) {
|
|
// Journal the parent first
|
|
base, err := dl.parent.Journal(buffer)
|
|
if err != nil {
|
|
return common.Hash{}, err
|
|
}
|
|
// Ensure the layer didn't get stale
|
|
dl.lock.RLock()
|
|
defer dl.lock.RUnlock()
|
|
|
|
if dl.Stale() {
|
|
return common.Hash{}, ErrSnapshotStale
|
|
}
|
|
// Everything below was journalled, persist this layer too
|
|
if err := rlp.Encode(buffer, dl.root); err != nil {
|
|
return common.Hash{}, err
|
|
}
|
|
accounts := make([]journalAccount, 0, len(dl.accountData))
|
|
for hash, blob := range dl.accountData {
|
|
accounts = append(accounts, journalAccount{
|
|
Hash: hash,
|
|
Blob: blob,
|
|
})
|
|
}
|
|
if err := rlp.Encode(buffer, accounts); err != nil {
|
|
return common.Hash{}, err
|
|
}
|
|
storage := make([]journalStorage, 0, len(dl.storageData))
|
|
for hash, slots := range dl.storageData {
|
|
keys := make([]common.Hash, 0, len(slots))
|
|
vals := make([][]byte, 0, len(slots))
|
|
for key, val := range slots {
|
|
keys = append(keys, key)
|
|
vals = append(vals, val)
|
|
}
|
|
storage = append(storage, journalStorage{Hash: hash, Keys: keys, Vals: vals})
|
|
}
|
|
if err := rlp.Encode(buffer, storage); err != nil {
|
|
return common.Hash{}, err
|
|
}
|
|
log.Debug("Journalled diff layer", "root", dl.root, "parent", dl.parent.Root())
|
|
return base, nil
|
|
}
|
|
|
|
// journalCallback is a function which is invoked by iterateJournal, every
|
|
// time a difflayer is loaded from disk.
|
|
type journalCallback = func(parent common.Hash, root common.Hash, accounts map[common.Hash][]byte, storage map[common.Hash]map[common.Hash][]byte) error
|
|
|
|
// iterateJournal iterates through the journalled difflayers, loading them from
|
|
// the database, and invoking the callback for each loaded layer.
|
|
// The order is incremental; starting with the bottom-most difflayer, going towards
|
|
// the most recent layer.
|
|
// This method returns error either if there was some error reading from disk,
|
|
// OR if the callback returns an error when invoked.
|
|
func iterateJournal(db ethdb.KeyValueReader, callback journalCallback) error {
|
|
journal := rawdb.ReadSnapshotJournal(db)
|
|
if len(journal) == 0 {
|
|
log.Warn("Loaded snapshot journal", "diffs", "missing")
|
|
return nil
|
|
}
|
|
r := rlp.NewStream(bytes.NewReader(journal), 0)
|
|
// Firstly, resolve the first element as the journal version
|
|
version, err := r.Uint64()
|
|
if err != nil {
|
|
log.Warn("Failed to resolve the journal version", "error", err)
|
|
return errors.New("failed to resolve journal version")
|
|
}
|
|
if version != journalV0 && version != journalCurrentVersion {
|
|
log.Warn("Discarded journal with wrong version", "required", journalCurrentVersion, "got", version)
|
|
return errors.New("wrong journal version")
|
|
}
|
|
// Secondly, resolve the disk layer root, ensure it's continuous
|
|
// with disk layer. Note now we can ensure it's the snapshot journal
|
|
// correct version, so we expect everything can be resolved properly.
|
|
var parent common.Hash
|
|
if err := r.Decode(&parent); err != nil {
|
|
return errors.New("missing disk layer root")
|
|
}
|
|
if baseRoot := rawdb.ReadSnapshotRoot(db); baseRoot != parent {
|
|
log.Warn("Loaded snapshot journal", "diskroot", baseRoot, "diffs", "unmatched")
|
|
return errors.New("mismatched disk and diff layers")
|
|
}
|
|
for {
|
|
var (
|
|
root common.Hash
|
|
accounts []journalAccount
|
|
storage []journalStorage
|
|
accountData = make(map[common.Hash][]byte)
|
|
storageData = make(map[common.Hash]map[common.Hash][]byte)
|
|
)
|
|
// Read the next diff journal entry
|
|
if err := r.Decode(&root); err != nil {
|
|
// The first read may fail with EOF, marking the end of the journal
|
|
if errors.Is(err, io.EOF) {
|
|
return nil
|
|
}
|
|
return fmt.Errorf("load diff root: %v", err)
|
|
}
|
|
// If a legacy journal is detected, decode the destruct set from the stream.
|
|
// The destruct set has been deprecated. If the journal contains non-empty
|
|
// destruct set, then it is deemed incompatible.
|
|
//
|
|
// Since self-destruction has been deprecated following the cancun fork,
|
|
// the destruct set is expected to be nil for layers above the fork block.
|
|
// However, an exception occurs during contract deployment: pre-funded accounts
|
|
// may self-destruct, causing accounts with non-zero balances to be removed
|
|
// from the state. For example,
|
|
// https://etherscan.io/tx/0xa087333d83f0cd63b96bdafb686462e1622ce25f40bd499e03efb1051f31fe49).
|
|
//
|
|
// For nodes with a fully synced state, the legacy journal is likely compatible
|
|
// with the updated definition, eliminating the need for regeneration. Unfortunately,
|
|
// nodes performing a full sync of historical chain segments or encountering
|
|
// pre-funded account deletions may face incompatibilities, leading to automatic
|
|
// snapshot regeneration.
|
|
//
|
|
// This approach minimizes snapshot regeneration for Geth nodes upgrading from a
|
|
// legacy version that are already synced. The workaround can be safely removed
|
|
// after the next hard fork.
|
|
if version == journalV0 {
|
|
var destructs []journalDestruct
|
|
if err := r.Decode(&destructs); err != nil {
|
|
return fmt.Errorf("load diff destructs: %v", err)
|
|
}
|
|
if len(destructs) > 0 {
|
|
log.Warn("Incompatible legacy journal detected", "version", journalV0)
|
|
return errors.New("incompatible legacy journal detected")
|
|
}
|
|
}
|
|
if err := r.Decode(&accounts); err != nil {
|
|
return fmt.Errorf("load diff accounts: %v", err)
|
|
}
|
|
if err := r.Decode(&storage); err != nil {
|
|
return fmt.Errorf("load diff storage: %v", err)
|
|
}
|
|
for _, entry := range accounts {
|
|
if len(entry.Blob) > 0 { // RLP loses nil-ness, but `[]byte{}` is not a valid item, so reinterpret that
|
|
accountData[entry.Hash] = entry.Blob
|
|
} else {
|
|
accountData[entry.Hash] = nil
|
|
}
|
|
}
|
|
for _, entry := range storage {
|
|
slots := make(map[common.Hash][]byte)
|
|
for i, key := range entry.Keys {
|
|
if len(entry.Vals[i]) > 0 { // RLP loses nil-ness, but `[]byte{}` is not a valid item, so reinterpret that
|
|
slots[key] = entry.Vals[i]
|
|
} else {
|
|
slots[key] = nil
|
|
}
|
|
}
|
|
storageData[entry.Hash] = slots
|
|
}
|
|
if err := callback(parent, root, accountData, storageData); err != nil {
|
|
return err
|
|
}
|
|
parent = root
|
|
}
|
|
}
|