From bc4ee71a5d25bbd2a9777c17eab6c9ab2c50f0ef Mon Sep 17 00:00:00 2001 From: rjl493456442 Date: Mon, 8 Sep 2025 16:07:00 +0800 Subject: [PATCH] triedb/pathdb: add recovery mechanism in state indexer (#32447) Alternative of #32335, enhancing the history indexer recovery after unclean shutdown. --- triedb/pathdb/database_test.go | 152 +++++++++++++++++++++++++-- triedb/pathdb/history_indexer.go | 56 +++++++++- triedb/pathdb/history_reader_test.go | 14 ++- triedb/pathdb/journal.go | 2 +- 4 files changed, 210 insertions(+), 14 deletions(-) diff --git a/triedb/pathdb/database_test.go b/triedb/pathdb/database_test.go index 99de4380bf..8cca7b1b3c 100644 --- a/triedb/pathdb/database_test.go +++ b/triedb/pathdb/database_test.go @@ -121,6 +121,8 @@ func (ctx *genctx) storageOriginSet(rawStorageKey bool, t *tester) map[common.Ad type tester struct { db *Database roots []common.Hash + nodes []*trienode.MergedNodeSet + states []*StateSetWithOrigin preimages map[common.Hash][]byte // current state set @@ -135,12 +137,38 @@ type tester struct { snapNodes map[common.Hash]*trienode.MergedNodeSet } +// testerConfig holds configuration parameters for running a test scenario. type testerConfig struct { - stateHistory uint64 - isVerkle bool - layers int - enableIndex bool - journalDir string + stateHistory uint64 // Number of historical states to retain + layers int // Number of state transitions to generate for + enableIndex bool // Enable state history indexing or not + journalDir string // Directory path for persisting journal files + isVerkle bool // Enables Verkle trie mode if true + + writeBuffer *int // Optional, the size of memory allocated for write buffer + trieCache *int // Optional, the size of memory allocated for trie cache + stateCache *int // Optional, the size of memory allocated for state cache +} + +func (c *testerConfig) trieCacheSize() int { + if c.trieCache != nil { + return *c.trieCache + } + return 256 * 1024 +} + +func (c *testerConfig) stateCacheSize() int { + if c.stateCache != nil { + return *c.stateCache + } + return 256 * 1024 +} + +func (c *testerConfig) writeBufferSize() int { + if c.writeBuffer != nil { + return *c.writeBuffer + } + return 256 * 1024 } func newTester(t *testing.T, config *testerConfig) *tester { @@ -149,9 +177,9 @@ func newTester(t *testing.T, config *testerConfig) *tester { db = New(disk, &Config{ StateHistory: config.stateHistory, EnableStateIndexing: config.enableIndex, - TrieCleanSize: 256 * 1024, - StateCleanSize: 256 * 1024, - WriteBufferSize: 256 * 1024, + TrieCleanSize: config.trieCacheSize(), + StateCleanSize: config.stateCacheSize(), + WriteBufferSize: config.writeBufferSize(), NoAsyncFlush: true, JournalDirectory: config.journalDir, }, config.isVerkle) @@ -177,6 +205,8 @@ func newTester(t *testing.T, config *testerConfig) *tester { panic(fmt.Errorf("failed to update state changes, err: %w", err)) } obj.roots = append(obj.roots, root) + obj.nodes = append(obj.nodes, nodes) + obj.states = append(obj.states, states) } return obj } @@ -200,6 +230,8 @@ func (t *tester) extend(layers int) { panic(fmt.Errorf("failed to update state changes, err: %w", err)) } t.roots = append(t.roots, root) + t.nodes = append(t.nodes, nodes) + t.states = append(t.states, states) } } @@ -885,3 +917,107 @@ func copyStorages(set map[common.Hash]map[common.Hash][]byte) map[common.Hash]ma } return copied } + +func TestDatabaseIndexRecovery(t *testing.T) { + maxDiffLayers = 4 + defer func() { + maxDiffLayers = 128 + }() + + //log.SetDefault(log.NewLogger(log.NewTerminalHandlerWithLevel(os.Stderr, log.LevelDebug, true))) + writeBuffer := 512 * 1024 + config := &testerConfig{ + layers: 64, + enableIndex: true, + writeBuffer: &writeBuffer, + } + env := newTester(t, config) + defer env.release() + + // Ensure the buffer in disk layer is not empty + var ( + bRoot = env.db.tree.bottom().rootHash() + dRoot = crypto.Keccak256Hash(rawdb.ReadAccountTrieNode(env.db.diskdb, nil)) + ) + for dRoot == bRoot { + env.extend(1) + + bRoot = env.db.tree.bottom().rootHash() + dRoot = crypto.Keccak256Hash(rawdb.ReadAccountTrieNode(env.db.diskdb, nil)) + } + waitIndexing(env.db) + + var ( + dIndex int + roots = env.roots + hr = newHistoryReader(env.db.diskdb, env.db.stateFreezer) + ) + for i, root := range roots { + if root == dRoot { + dIndex = i + } + if root == bRoot { + break + } + if err := checkHistoricalState(env, root, uint64(i+1), hr); err != nil { + t.Fatal(err) + } + } + + // Terminate the database and mutate the journal, it's for simulating + // the unclean shutdown + env.db.Journal(env.lastHash()) + env.db.Close() + + // Mutate the journal in disk, it should be regarded as invalid + blob := rawdb.ReadTrieJournal(env.db.diskdb) + blob[0] = 0xa + rawdb.WriteTrieJournal(env.db.diskdb, blob) + + // Reload the database, the extra state histories should be removed + env.db = New(env.db.diskdb, env.db.config, false) + + for i := range roots { + _, err := readStateHistory(env.db.stateFreezer, uint64(i+1)) + if i <= dIndex && err != nil { + t.Fatalf("State history is not found, %d", i) + } + if i > dIndex && err == nil { + t.Fatalf("Unexpected state history found, %d", i) + } + } + remain, err := env.db.IndexProgress() + if err != nil { + t.Fatalf("Failed to obtain the progress, %v", err) + } + if remain == 0 { + t.Fatalf("Unexpected progress remain, %d", remain) + } + + // Apply new states on top, ensuring state indexing can respond correctly + for i := dIndex + 1; i < len(roots); i++ { + if err := env.db.Update(roots[i], roots[i-1], uint64(i), env.nodes[i], env.states[i]); err != nil { + panic(fmt.Errorf("failed to update state changes, err: %w", err)) + } + } + remain, err = env.db.IndexProgress() + if err != nil { + t.Fatalf("Failed to obtain the progress, %v", err) + } + if remain != 0 { + t.Fatalf("Unexpected progress remain, %d", remain) + } + waitIndexing(env.db) + + // Ensure the truncated state histories become accessible + bRoot = env.db.tree.bottom().rootHash() + hr = newHistoryReader(env.db.diskdb, env.db.stateFreezer) + for i, root := range roots { + if root == bRoot { + break + } + if err := checkHistoricalState(env, root, uint64(i+1), hr); err != nil { + t.Fatal(err) + } + } +} diff --git a/triedb/pathdb/history_indexer.go b/triedb/pathdb/history_indexer.go index 14b9af5367..b4e89c3f17 100644 --- a/triedb/pathdb/history_indexer.go +++ b/triedb/pathdb/history_indexer.go @@ -322,15 +322,22 @@ func newIndexIniter(disk ethdb.KeyValueStore, freezer ethdb.AncientStore, lastID closed: make(chan struct{}), } // Load indexing progress + var recover bool initer.last.Store(lastID) metadata := loadIndexMetadata(disk) if metadata != nil { initer.indexed.Store(metadata.Last) + recover = metadata.Last > lastID } // Launch background indexer initer.wg.Add(1) - go initer.run(lastID) + if recover { + log.Info("History indexer is recovering", "history", lastID, "indexed", metadata.Last) + go initer.recover(lastID) + } else { + go initer.run(lastID) + } return initer } @@ -364,8 +371,8 @@ func (i *indexIniter) remain() uint64 { default: last, indexed := i.last.Load(), i.indexed.Load() if last < indexed { - log.Error("Invalid state indexing range", "last", last, "indexed", indexed) - return 0 + log.Warn("State indexer is in recovery", "indexed", indexed, "last", last) + return indexed - last } return last - indexed } @@ -569,6 +576,49 @@ func (i *indexIniter) index(done chan struct{}, interrupt *atomic.Int32, lastID log.Info("Indexed state history", "from", beginID, "to", lastID, "elapsed", common.PrettyDuration(time.Since(start))) } +// recover handles unclean shutdown recovery. After an unclean shutdown, any +// extra histories are typically truncated, while the corresponding history index +// entries may still have been written. Ideally, we would unindex these histories +// in reverse order, but there is no guarantee that the required histories will +// still be available. +// +// As a workaround, indexIniter waits until the missing histories are regenerated +// by chain recovery, under the assumption that the recovered histories will be +// identical to the lost ones. Fork-awareness should be added in the future to +// correctly handle histories affected by reorgs. +func (i *indexIniter) recover(lastID uint64) { + defer i.wg.Done() + + for { + select { + case signal := <-i.interrupt: + newLastID := signal.newLastID + if newLastID != lastID+1 && newLastID != lastID-1 { + signal.result <- fmt.Errorf("invalid history id, last: %d, got: %d", lastID, newLastID) + continue + } + + // Update the last indexed flag + lastID = newLastID + signal.result <- nil + i.last.Store(newLastID) + log.Debug("Updated history index flag", "last", lastID) + + // Terminate the recovery routine once the histories are fully aligned + // with the index data, indicating that index initialization is complete. + metadata := loadIndexMetadata(i.disk) + if metadata != nil && metadata.Last == lastID { + close(i.done) + log.Info("History indexer is recovered", "last", lastID) + return + } + + case <-i.closed: + return + } + } +} + // historyIndexer manages the indexing and unindexing of state histories, // providing access to historical states. // diff --git a/triedb/pathdb/history_reader_test.go b/triedb/pathdb/history_reader_test.go index 2ae1cfdd29..75c5f701f9 100644 --- a/triedb/pathdb/history_reader_test.go +++ b/triedb/pathdb/history_reader_test.go @@ -144,7 +144,13 @@ func testHistoryReader(t *testing.T, historyLimit uint64) { maxDiffLayers = 128 }() - env := newTester(t, &testerConfig{stateHistory: historyLimit, layers: 64, enableIndex: true}) + //log.SetDefault(log.NewLogger(log.NewTerminalHandlerWithLevel(os.Stderr, log.LevelDebug, true))) + config := &testerConfig{ + stateHistory: historyLimit, + layers: 64, + enableIndex: true, + } + env := newTester(t, config) defer env.release() waitIndexing(env.db) @@ -183,7 +189,11 @@ func TestHistoricalStateReader(t *testing.T) { }() //log.SetDefault(log.NewLogger(log.NewTerminalHandlerWithLevel(os.Stderr, log.LevelDebug, true))) - config := &testerConfig{stateHistory: 0, layers: 64, enableIndex: true} + config := &testerConfig{ + stateHistory: 0, + layers: 64, + enableIndex: true, + } env := newTester(t, config) defer env.release() waitIndexing(env.db) diff --git a/triedb/pathdb/journal.go b/triedb/pathdb/journal.go index bd9081a28f..7a634dc974 100644 --- a/triedb/pathdb/journal.go +++ b/triedb/pathdb/journal.go @@ -267,7 +267,7 @@ func (dl *diskLayer) journal(w io.Writer) error { if err := dl.buffer.states.encode(w); err != nil { return err } - log.Debug("Journaled pathdb disk layer", "root", dl.root) + log.Debug("Journaled pathdb disk layer", "root", dl.root, "id", dl.id) return nil }