triedb/pathdb: add recovery mechanism in state indexer (#32447)

Alternative of #32335, enhancing the history indexer recovery after
unclean shutdown.
This commit is contained in:
rjl493456442 2025-09-08 16:07:00 +08:00 committed by GitHub
parent c4ec4504bb
commit bc4ee71a5d
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
4 changed files with 210 additions and 14 deletions

View file

@ -121,6 +121,8 @@ func (ctx *genctx) storageOriginSet(rawStorageKey bool, t *tester) map[common.Ad
type tester struct {
db *Database
roots []common.Hash
nodes []*trienode.MergedNodeSet
states []*StateSetWithOrigin
preimages map[common.Hash][]byte
// current state set
@ -135,12 +137,38 @@ type tester struct {
snapNodes map[common.Hash]*trienode.MergedNodeSet
}
// testerConfig holds configuration parameters for running a test scenario.
type testerConfig struct {
stateHistory uint64
isVerkle bool
layers int
enableIndex bool
journalDir string
stateHistory uint64 // Number of historical states to retain
layers int // Number of state transitions to generate for
enableIndex bool // Enable state history indexing or not
journalDir string // Directory path for persisting journal files
isVerkle bool // Enables Verkle trie mode if true
writeBuffer *int // Optional, the size of memory allocated for write buffer
trieCache *int // Optional, the size of memory allocated for trie cache
stateCache *int // Optional, the size of memory allocated for state cache
}
func (c *testerConfig) trieCacheSize() int {
if c.trieCache != nil {
return *c.trieCache
}
return 256 * 1024
}
func (c *testerConfig) stateCacheSize() int {
if c.stateCache != nil {
return *c.stateCache
}
return 256 * 1024
}
func (c *testerConfig) writeBufferSize() int {
if c.writeBuffer != nil {
return *c.writeBuffer
}
return 256 * 1024
}
func newTester(t *testing.T, config *testerConfig) *tester {
@ -149,9 +177,9 @@ func newTester(t *testing.T, config *testerConfig) *tester {
db = New(disk, &Config{
StateHistory: config.stateHistory,
EnableStateIndexing: config.enableIndex,
TrieCleanSize: 256 * 1024,
StateCleanSize: 256 * 1024,
WriteBufferSize: 256 * 1024,
TrieCleanSize: config.trieCacheSize(),
StateCleanSize: config.stateCacheSize(),
WriteBufferSize: config.writeBufferSize(),
NoAsyncFlush: true,
JournalDirectory: config.journalDir,
}, config.isVerkle)
@ -177,6 +205,8 @@ func newTester(t *testing.T, config *testerConfig) *tester {
panic(fmt.Errorf("failed to update state changes, err: %w", err))
}
obj.roots = append(obj.roots, root)
obj.nodes = append(obj.nodes, nodes)
obj.states = append(obj.states, states)
}
return obj
}
@ -200,6 +230,8 @@ func (t *tester) extend(layers int) {
panic(fmt.Errorf("failed to update state changes, err: %w", err))
}
t.roots = append(t.roots, root)
t.nodes = append(t.nodes, nodes)
t.states = append(t.states, states)
}
}
@ -885,3 +917,107 @@ func copyStorages(set map[common.Hash]map[common.Hash][]byte) map[common.Hash]ma
}
return copied
}
func TestDatabaseIndexRecovery(t *testing.T) {
maxDiffLayers = 4
defer func() {
maxDiffLayers = 128
}()
//log.SetDefault(log.NewLogger(log.NewTerminalHandlerWithLevel(os.Stderr, log.LevelDebug, true)))
writeBuffer := 512 * 1024
config := &testerConfig{
layers: 64,
enableIndex: true,
writeBuffer: &writeBuffer,
}
env := newTester(t, config)
defer env.release()
// Ensure the buffer in disk layer is not empty
var (
bRoot = env.db.tree.bottom().rootHash()
dRoot = crypto.Keccak256Hash(rawdb.ReadAccountTrieNode(env.db.diskdb, nil))
)
for dRoot == bRoot {
env.extend(1)
bRoot = env.db.tree.bottom().rootHash()
dRoot = crypto.Keccak256Hash(rawdb.ReadAccountTrieNode(env.db.diskdb, nil))
}
waitIndexing(env.db)
var (
dIndex int
roots = env.roots
hr = newHistoryReader(env.db.diskdb, env.db.stateFreezer)
)
for i, root := range roots {
if root == dRoot {
dIndex = i
}
if root == bRoot {
break
}
if err := checkHistoricalState(env, root, uint64(i+1), hr); err != nil {
t.Fatal(err)
}
}
// Terminate the database and mutate the journal, it's for simulating
// the unclean shutdown
env.db.Journal(env.lastHash())
env.db.Close()
// Mutate the journal in disk, it should be regarded as invalid
blob := rawdb.ReadTrieJournal(env.db.diskdb)
blob[0] = 0xa
rawdb.WriteTrieJournal(env.db.diskdb, blob)
// Reload the database, the extra state histories should be removed
env.db = New(env.db.diskdb, env.db.config, false)
for i := range roots {
_, err := readStateHistory(env.db.stateFreezer, uint64(i+1))
if i <= dIndex && err != nil {
t.Fatalf("State history is not found, %d", i)
}
if i > dIndex && err == nil {
t.Fatalf("Unexpected state history found, %d", i)
}
}
remain, err := env.db.IndexProgress()
if err != nil {
t.Fatalf("Failed to obtain the progress, %v", err)
}
if remain == 0 {
t.Fatalf("Unexpected progress remain, %d", remain)
}
// Apply new states on top, ensuring state indexing can respond correctly
for i := dIndex + 1; i < len(roots); i++ {
if err := env.db.Update(roots[i], roots[i-1], uint64(i), env.nodes[i], env.states[i]); err != nil {
panic(fmt.Errorf("failed to update state changes, err: %w", err))
}
}
remain, err = env.db.IndexProgress()
if err != nil {
t.Fatalf("Failed to obtain the progress, %v", err)
}
if remain != 0 {
t.Fatalf("Unexpected progress remain, %d", remain)
}
waitIndexing(env.db)
// Ensure the truncated state histories become accessible
bRoot = env.db.tree.bottom().rootHash()
hr = newHistoryReader(env.db.diskdb, env.db.stateFreezer)
for i, root := range roots {
if root == bRoot {
break
}
if err := checkHistoricalState(env, root, uint64(i+1), hr); err != nil {
t.Fatal(err)
}
}
}

View file

@ -322,15 +322,22 @@ func newIndexIniter(disk ethdb.KeyValueStore, freezer ethdb.AncientStore, lastID
closed: make(chan struct{}),
}
// Load indexing progress
var recover bool
initer.last.Store(lastID)
metadata := loadIndexMetadata(disk)
if metadata != nil {
initer.indexed.Store(metadata.Last)
recover = metadata.Last > lastID
}
// Launch background indexer
initer.wg.Add(1)
go initer.run(lastID)
if recover {
log.Info("History indexer is recovering", "history", lastID, "indexed", metadata.Last)
go initer.recover(lastID)
} else {
go initer.run(lastID)
}
return initer
}
@ -364,8 +371,8 @@ func (i *indexIniter) remain() uint64 {
default:
last, indexed := i.last.Load(), i.indexed.Load()
if last < indexed {
log.Error("Invalid state indexing range", "last", last, "indexed", indexed)
return 0
log.Warn("State indexer is in recovery", "indexed", indexed, "last", last)
return indexed - last
}
return last - indexed
}
@ -569,6 +576,49 @@ func (i *indexIniter) index(done chan struct{}, interrupt *atomic.Int32, lastID
log.Info("Indexed state history", "from", beginID, "to", lastID, "elapsed", common.PrettyDuration(time.Since(start)))
}
// recover handles unclean shutdown recovery. After an unclean shutdown, any
// extra histories are typically truncated, while the corresponding history index
// entries may still have been written. Ideally, we would unindex these histories
// in reverse order, but there is no guarantee that the required histories will
// still be available.
//
// As a workaround, indexIniter waits until the missing histories are regenerated
// by chain recovery, under the assumption that the recovered histories will be
// identical to the lost ones. Fork-awareness should be added in the future to
// correctly handle histories affected by reorgs.
func (i *indexIniter) recover(lastID uint64) {
defer i.wg.Done()
for {
select {
case signal := <-i.interrupt:
newLastID := signal.newLastID
if newLastID != lastID+1 && newLastID != lastID-1 {
signal.result <- fmt.Errorf("invalid history id, last: %d, got: %d", lastID, newLastID)
continue
}
// Update the last indexed flag
lastID = newLastID
signal.result <- nil
i.last.Store(newLastID)
log.Debug("Updated history index flag", "last", lastID)
// Terminate the recovery routine once the histories are fully aligned
// with the index data, indicating that index initialization is complete.
metadata := loadIndexMetadata(i.disk)
if metadata != nil && metadata.Last == lastID {
close(i.done)
log.Info("History indexer is recovered", "last", lastID)
return
}
case <-i.closed:
return
}
}
}
// historyIndexer manages the indexing and unindexing of state histories,
// providing access to historical states.
//

View file

@ -144,7 +144,13 @@ func testHistoryReader(t *testing.T, historyLimit uint64) {
maxDiffLayers = 128
}()
env := newTester(t, &testerConfig{stateHistory: historyLimit, layers: 64, enableIndex: true})
//log.SetDefault(log.NewLogger(log.NewTerminalHandlerWithLevel(os.Stderr, log.LevelDebug, true)))
config := &testerConfig{
stateHistory: historyLimit,
layers: 64,
enableIndex: true,
}
env := newTester(t, config)
defer env.release()
waitIndexing(env.db)
@ -183,7 +189,11 @@ func TestHistoricalStateReader(t *testing.T) {
}()
//log.SetDefault(log.NewLogger(log.NewTerminalHandlerWithLevel(os.Stderr, log.LevelDebug, true)))
config := &testerConfig{stateHistory: 0, layers: 64, enableIndex: true}
config := &testerConfig{
stateHistory: 0,
layers: 64,
enableIndex: true,
}
env := newTester(t, config)
defer env.release()
waitIndexing(env.db)

View file

@ -267,7 +267,7 @@ func (dl *diskLayer) journal(w io.Writer) error {
if err := dl.buffer.states.encode(w); err != nil {
return err
}
log.Debug("Journaled pathdb disk layer", "root", dl.root)
log.Debug("Journaled pathdb disk layer", "root", dl.root, "id", dl.id)
return nil
}