From 8a6e026fcbfeff0d4c67dceacb8df36c38351e27 Mon Sep 17 00:00:00 2001 From: Gary Rong Date: Wed, 28 Jan 2026 11:14:52 +0800 Subject: [PATCH] triedb/pathdb: improve history indexer --- triedb/pathdb/history_indexer.go | 108 ++++++++++++++++++++++--------- 1 file changed, 77 insertions(+), 31 deletions(-) diff --git a/triedb/pathdb/history_indexer.go b/triedb/pathdb/history_indexer.go index 0a5740d54c..4a29bfbc89 100644 --- a/triedb/pathdb/history_indexer.go +++ b/triedb/pathdb/history_indexer.go @@ -455,6 +455,61 @@ func (i *indexIniter) readBlockTime(historyID uint64) (uint64, error) { return header.Time, nil } +const ( + // syncHistoryThreshold is the time window behind the chain head. + // If the block being indexed is within this window, we consider the node + // to be close enough to the tip to operate in real-time mode. + syncHistoryThreshold = 6 * time.Hour + + // indexerIdleTimeout is the maximum duration of inactivity allowed + // before the indexer decides it has finished its current catch-up + // and exits the initial sync mode. + indexerIdleTimeout = 30 * time.Second + + // indexerHeartbeatInterval is the frequency at which the indexer + // checks its status and schedules background indexing operations. + // + // It is a deliberate design choice to introduce this delay instead + // of scheduling tasks immediately; this forces the indexer into + // "batch index mode" which significantly improves database I/O + // efficiency and reduces overhead. + indexerHeartbeatInterval = 15 * time.Second +) + +// checkExit reports whether the initial mode has completed. It assumes that +// all local histories have been fully indexed when invoked, and determines +// whether the initial mode should exit and marking historical state access +// as available. +func (i *indexIniter) checkExit(lastEventTime time.Time) bool { + // Verify that the timestamp of the most recently indexed history + // is sufficiently close to the current time, ensuring that the + // majority of histories have been indexed in batch. + timestamp, err := i.readBlockTime(i.indexed.Load()) + if err != nil { + i.log.Warn("Failed to read block time", "err", err) + return false + } + blockTime := time.Unix(int64(timestamp), 0) + + // By default, 128 layers are piled up, which corresponds to roughly 25 minutes + // with Ethereum's block interval (12s). A threshold of 6 hours is chosen as a + // conservative and safe default for whatever reason. + if time.Since(blockTime) < syncHistoryThreshold { + i.log.Info("Exit the initial mode as close to chain tip", "last", i.indexed.Load(), "age", common.PrettyAge(blockTime)) + return true + } + // The most recently indexed block is still at least 6 hours behind. The node + // may be syncing toward a historical block rather than the latest chain head. + // In this case, check whether the indexer has been idle for a while and exit + // the initial mode if so. + if time.Since(lastEventTime) > indexerIdleTimeout { + i.log.Info("Exit the initial mode due to inactivity", "last", i.indexed.Load(), "age", common.PrettyAge(blockTime)) + return true + } + i.log.Debug("Histories fully indexed, waiting next wave", "last", i.indexed.Load(), "age", common.PrettyAge(blockTime)) + return false +} + func (i *indexIniter) run(lastID uint64, recover bool) { defer i.wg.Done() @@ -463,13 +518,12 @@ func (i *indexIniter) run(lastID uint64, recover bool) { done chan struct{} interrupt *atomic.Int32 - start = time.Now() - lastCheckTime time.Time - // checkDone reports whether indexing has completed for all histories. checkDone = func() bool { return i.indexed.Load() == lastID } + lastEventTime = time.Now() + heartBeat = time.NewTicker(indexerHeartbeatInterval) ) if recover { var aborted bool @@ -479,12 +533,10 @@ func (i *indexIniter) run(lastID uint64, recover bool) { } } for { - if done == nil && !checkDone() { - done, interrupt = make(chan struct{}), new(atomic.Int32) - go i.index(done, interrupt, lastID) - } select { case signal := <-i.interrupt: + lastEventTime = time.Now() + // The indexing limit can only be extended or shortened continuously. newLastID := signal.newLastID if newLastID != lastID+1 && newLastID != lastID-1 { @@ -529,40 +581,34 @@ func (i *indexIniter) run(lastID uint64, recover bool) { case <-done: done, interrupt = nil, nil - if checkDone() && time.Since(lastCheckTime) > time.Second*8 { - lastCheckTime = time.Now() - - // Verify that the timestamp of the most recently indexed history - // is sufficiently close to the current time, ensuring that the - // majority of histories have been indexed in batch. - timestamp, err := i.readBlockTime(lastID) - if err != nil { - i.log.Warn("Failed to read block time", "err", err) - continue - } - blockTime := time.Unix(int64(timestamp), 0) - - // By default, 128 layers are piled up, which corresponds to roughly 25 minutes - // with Ethereum's block interval (12s). A threshold of 6 hours is chosen as a - // conservative and safe default for whatever reason. - if time.Since(blockTime) > 6*time.Hour { - i.log.Debug("Histories fully indexed, waiting next wave", "age", common.PrettyAge(blockTime)) - continue - } + if i.checkExit(lastEventTime) { close(i.done) - i.log.Info("Histories have been fully indexed", "last", lastID, "age", common.PrettyAge(blockTime), "elapsed", common.PrettyDuration(time.Since(start))) return } + case <-heartBeat.C: + if done != nil { + continue + } + if checkDone() { + if i.checkExit(lastEventTime) { + close(i.done) + return + } + continue + } + // Any pending tasks are scheduled for background execution, driven by the + // heartbeat ticker. This interval ensures that historical data is processed + // in batches rather than individually. + done, interrupt = make(chan struct{}), new(atomic.Int32) + go i.index(done, interrupt, lastID) + case <-i.closed: if done != nil { interrupt.Store(1) i.log.Info("Waiting background history index initer to exit") <-done } - if checkDone() { - close(i.done) - } return } }