From a117eb35520c1c977152a54df1662ea5c8e71117 Mon Sep 17 00:00:00 2001 From: Gary Rong Date: Mon, 19 Jan 2026 16:18:34 +0800 Subject: [PATCH] triedb/pathdb: enhance history index initer --- triedb/pathdb/history_indexer.go | 139 +++++++++++++++++++++---------- 1 file changed, 95 insertions(+), 44 deletions(-) diff --git a/triedb/pathdb/history_indexer.go b/triedb/pathdb/history_indexer.go index c987b380ed..0a5740d54c 100644 --- a/triedb/pathdb/history_indexer.go +++ b/triedb/pathdb/history_indexer.go @@ -349,7 +349,7 @@ type interruptSignal struct { // If a state history is removed due to a rollback, the associated indexes should // be unmarked accordingly. type indexIniter struct { - disk ethdb.KeyValueStore + disk ethdb.Database freezer ethdb.AncientStore interrupt chan *interruptSignal done chan struct{} @@ -364,7 +364,7 @@ type indexIniter struct { wg sync.WaitGroup } -func newIndexIniter(disk ethdb.KeyValueStore, freezer ethdb.AncientStore, typ historyType, lastID uint64) *indexIniter { +func newIndexIniter(disk ethdb.Database, freezer ethdb.AncientStore, typ historyType, lastID uint64) *indexIniter { initer := &indexIniter{ disk: disk, freezer: freezer, @@ -385,12 +385,7 @@ func newIndexIniter(disk ethdb.KeyValueStore, freezer ethdb.AncientStore, typ hi // Launch background indexer initer.wg.Add(1) - if recover { - log.Info("History indexer is recovering", "history", lastID, "indexed", metadata.Last) - go initer.recover(lastID) - } else { - go initer.run(lastID) - } + go initer.run(lastID, recover) return initer } @@ -431,24 +426,63 @@ func (i *indexIniter) remain() uint64 { } } -func (i *indexIniter) run(lastID uint64) { +// readBlockTime reads the associated block time of the provided history ID. +func (i *indexIniter) readBlockTime(historyID uint64) (uint64, error) { + var blockNumber uint64 + switch i.typ { + case typeStateHistory: + m, err := readStateHistoryMeta(i.freezer, historyID) + if err != nil { + return 0, err + } + blockNumber = m.block + + case typeTrienodeHistory: + m, err := readTrienodeMetadata(i.freezer, historyID) + if err != nil { + return 0, err + } + blockNumber = m.block + } + hash := rawdb.ReadCanonicalHash(i.disk, blockNumber) + if hash == (common.Hash{}) { + return 0, errors.New("block not found") + } + header := rawdb.ReadHeader(i.disk, hash, blockNumber) + if header == nil { + return 0, errors.New("block not found") + } + return header.Time, nil +} + +func (i *indexIniter) run(lastID uint64, recover bool) { defer i.wg.Done() // Launch background indexing thread var ( - done = make(chan struct{}) - interrupt = new(atomic.Int32) + done chan struct{} + interrupt *atomic.Int32 - // checkDone indicates whether all requested state histories - // have been fully indexed. + start = time.Now() + lastCheckTime time.Time + + // checkDone reports whether indexing has completed for all histories. checkDone = func() bool { - metadata := loadIndexMetadata(i.disk, i.typ) - return metadata != nil && metadata.Last == lastID + return i.indexed.Load() == lastID } ) - go i.index(done, interrupt, lastID) - + if recover { + var aborted bool + lastID, aborted = i.recover(lastID) + if aborted { + return + } + } for { + if done == nil && !checkDone() { + done, interrupt = make(chan struct{}), new(atomic.Int32) + go i.index(done, interrupt, lastID) + } select { case signal := <-i.interrupt: // The indexing limit can only be extended or shortened continuously. @@ -467,11 +501,13 @@ func (i *indexIniter) run(lastID uint64) { i.log.Debug("Extended history range", "last", lastID) continue } - // The index limit is shortened by one, interrupt the current background - // process and relaunch with new target. - interrupt.Store(1) - <-done - + // The index limit is shortened, interrupt the current background + // process if it's active and update the target. + if done != nil { + interrupt.Store(1) + <-done + done, interrupt = nil, nil + } // If all state histories, including the one to be reverted, have // been fully indexed, unindex it here and shut down the initializer. if checkDone() { @@ -488,26 +524,42 @@ func (i *indexIniter) run(lastID uint64) { // Adjust the indexing target and relaunch the process lastID = newLastID signal.result <- nil - - done, interrupt = make(chan struct{}), new(atomic.Int32) - go i.index(done, interrupt, lastID) i.log.Debug("Shortened history range", "last", lastID) case <-done: - if checkDone() { + done, interrupt = nil, nil + + if checkDone() && time.Since(lastCheckTime) > time.Second*8 { + lastCheckTime = time.Now() + + // Verify that the timestamp of the most recently indexed history + // is sufficiently close to the current time, ensuring that the + // majority of histories have been indexed in batch. + timestamp, err := i.readBlockTime(lastID) + if err != nil { + i.log.Warn("Failed to read block time", "err", err) + continue + } + blockTime := time.Unix(int64(timestamp), 0) + + // By default, 128 layers are piled up, which corresponds to roughly 25 minutes + // with Ethereum's block interval (12s). A threshold of 6 hours is chosen as a + // conservative and safe default for whatever reason. + if time.Since(blockTime) > 6*time.Hour { + i.log.Debug("Histories fully indexed, waiting next wave", "age", common.PrettyAge(blockTime)) + continue + } close(i.done) - i.log.Info("Histories have been fully indexed", "last", lastID) + i.log.Info("Histories have been fully indexed", "last", lastID, "age", common.PrettyAge(blockTime), "elapsed", common.PrettyDuration(time.Since(start))) return } - // Relaunch the background runner if some tasks are left - done, interrupt = make(chan struct{}), new(atomic.Int32) - go i.index(done, interrupt, lastID) case <-i.closed: - interrupt.Store(1) - i.log.Info("Waiting background history index initer to exit") - <-done - + if done != nil { + interrupt.Store(1) + i.log.Info("Waiting background history index initer to exit") + <-done + } if checkDone() { close(i.done) } @@ -571,7 +623,7 @@ func (i *indexIniter) index(done chan struct{}, interrupt *atomic.Int32, lastID } return } - i.log.Info("Start history indexing", "beginID", beginID, "lastID", lastID) + i.log.Debug("Start history indexing", "beginID", beginID, "lastID", lastID) var ( current = beginID @@ -618,7 +670,7 @@ func (i *indexIniter) index(done chan struct{}, interrupt *atomic.Int32, lastID done = current - beginID ) eta := common.CalculateETA(done, left, time.Since(start)) - i.log.Info("Indexing history", "processed", done, "left", left, "elapsed", common.PrettyDuration(time.Since(start)), "eta", common.PrettyDuration(eta)) + i.log.Debug("Indexing history", "processed", done, "left", left, "elapsed", common.PrettyDuration(time.Since(start)), "eta", common.PrettyDuration(eta)) } } i.indexed.Store(current - 1) // update indexing progress @@ -629,7 +681,7 @@ func (i *indexIniter) index(done chan struct{}, interrupt *atomic.Int32, lastID if err := batch.finish(true); err != nil { i.log.Error("Failed to flush index", "err", err) } - log.Info("State indexing interrupted") + log.Debug("State indexing interrupted") return } } @@ -637,7 +689,7 @@ func (i *indexIniter) index(done chan struct{}, interrupt *atomic.Int32, lastID if err := batch.finish(true); err != nil { i.log.Error("Failed to flush index", "err", err) } - i.log.Info("Indexed history", "from", beginID, "to", lastID, "elapsed", common.PrettyDuration(time.Since(start))) + i.log.Debug("Indexed history", "from", beginID, "to", lastID, "elapsed", common.PrettyDuration(time.Since(start))) } // recover handles unclean shutdown recovery. After an unclean shutdown, any @@ -650,8 +702,8 @@ func (i *indexIniter) index(done chan struct{}, interrupt *atomic.Int32, lastID // by chain recovery, under the assumption that the recovered histories will be // identical to the lost ones. Fork-awareness should be added in the future to // correctly handle histories affected by reorgs. -func (i *indexIniter) recover(lastID uint64) { - defer i.wg.Done() +func (i *indexIniter) recover(lastID uint64) (uint64, bool) { + log.Info("History indexer is recovering", "history", lastID, "indexed", i.indexed.Load()) for { select { @@ -672,13 +724,12 @@ func (i *indexIniter) recover(lastID uint64) { // with the index data, indicating that index initialization is complete. metadata := loadIndexMetadata(i.disk, i.typ) if metadata != nil && metadata.Last == lastID { - close(i.done) i.log.Info("History indexer is recovered", "last", lastID) - return + return lastID, false } case <-i.closed: - return + return 0, true } } } @@ -746,7 +797,7 @@ func checkVersion(disk ethdb.KeyValueStore, typ historyType) { // newHistoryIndexer constructs the history indexer and launches the background // initer to complete the indexing of any remaining state histories. -func newHistoryIndexer(disk ethdb.KeyValueStore, freezer ethdb.AncientStore, lastHistoryID uint64, typ historyType) *historyIndexer { +func newHistoryIndexer(disk ethdb.Database, freezer ethdb.AncientStore, lastHistoryID uint64, typ historyType) *historyIndexer { checkVersion(disk, typ) return &historyIndexer{ initer: newIndexIniter(disk, freezer, typ, lastHistoryID),