From a117eb35520c1c977152a54df1662ea5c8e71117 Mon Sep 17 00:00:00 2001 From: Gary Rong Date: Mon, 19 Jan 2026 16:18:34 +0800 Subject: [PATCH 1/3] triedb/pathdb: enhance history index initer --- triedb/pathdb/history_indexer.go | 139 +++++++++++++++++++++---------- 1 file changed, 95 insertions(+), 44 deletions(-) diff --git a/triedb/pathdb/history_indexer.go b/triedb/pathdb/history_indexer.go index c987b380ed..0a5740d54c 100644 --- a/triedb/pathdb/history_indexer.go +++ b/triedb/pathdb/history_indexer.go @@ -349,7 +349,7 @@ type interruptSignal struct { // If a state history is removed due to a rollback, the associated indexes should // be unmarked accordingly. type indexIniter struct { - disk ethdb.KeyValueStore + disk ethdb.Database freezer ethdb.AncientStore interrupt chan *interruptSignal done chan struct{} @@ -364,7 +364,7 @@ type indexIniter struct { wg sync.WaitGroup } -func newIndexIniter(disk ethdb.KeyValueStore, freezer ethdb.AncientStore, typ historyType, lastID uint64) *indexIniter { +func newIndexIniter(disk ethdb.Database, freezer ethdb.AncientStore, typ historyType, lastID uint64) *indexIniter { initer := &indexIniter{ disk: disk, freezer: freezer, @@ -385,12 +385,7 @@ func newIndexIniter(disk ethdb.KeyValueStore, freezer ethdb.AncientStore, typ hi // Launch background indexer initer.wg.Add(1) - if recover { - log.Info("History indexer is recovering", "history", lastID, "indexed", metadata.Last) - go initer.recover(lastID) - } else { - go initer.run(lastID) - } + go initer.run(lastID, recover) return initer } @@ -431,24 +426,63 @@ func (i *indexIniter) remain() uint64 { } } -func (i *indexIniter) run(lastID uint64) { +// readBlockTime reads the associated block time of the provided history ID. +func (i *indexIniter) readBlockTime(historyID uint64) (uint64, error) { + var blockNumber uint64 + switch i.typ { + case typeStateHistory: + m, err := readStateHistoryMeta(i.freezer, historyID) + if err != nil { + return 0, err + } + blockNumber = m.block + + case typeTrienodeHistory: + m, err := readTrienodeMetadata(i.freezer, historyID) + if err != nil { + return 0, err + } + blockNumber = m.block + } + hash := rawdb.ReadCanonicalHash(i.disk, blockNumber) + if hash == (common.Hash{}) { + return 0, errors.New("block not found") + } + header := rawdb.ReadHeader(i.disk, hash, blockNumber) + if header == nil { + return 0, errors.New("block not found") + } + return header.Time, nil +} + +func (i *indexIniter) run(lastID uint64, recover bool) { defer i.wg.Done() // Launch background indexing thread var ( - done = make(chan struct{}) - interrupt = new(atomic.Int32) + done chan struct{} + interrupt *atomic.Int32 - // checkDone indicates whether all requested state histories - // have been fully indexed. + start = time.Now() + lastCheckTime time.Time + + // checkDone reports whether indexing has completed for all histories. checkDone = func() bool { - metadata := loadIndexMetadata(i.disk, i.typ) - return metadata != nil && metadata.Last == lastID + return i.indexed.Load() == lastID } ) - go i.index(done, interrupt, lastID) - + if recover { + var aborted bool + lastID, aborted = i.recover(lastID) + if aborted { + return + } + } for { + if done == nil && !checkDone() { + done, interrupt = make(chan struct{}), new(atomic.Int32) + go i.index(done, interrupt, lastID) + } select { case signal := <-i.interrupt: // The indexing limit can only be extended or shortened continuously. @@ -467,11 +501,13 @@ func (i *indexIniter) run(lastID uint64) { i.log.Debug("Extended history range", "last", lastID) continue } - // The index limit is shortened by one, interrupt the current background - // process and relaunch with new target. - interrupt.Store(1) - <-done - + // The index limit is shortened, interrupt the current background + // process if it's active and update the target. + if done != nil { + interrupt.Store(1) + <-done + done, interrupt = nil, nil + } // If all state histories, including the one to be reverted, have // been fully indexed, unindex it here and shut down the initializer. if checkDone() { @@ -488,26 +524,42 @@ func (i *indexIniter) run(lastID uint64) { // Adjust the indexing target and relaunch the process lastID = newLastID signal.result <- nil - - done, interrupt = make(chan struct{}), new(atomic.Int32) - go i.index(done, interrupt, lastID) i.log.Debug("Shortened history range", "last", lastID) case <-done: - if checkDone() { + done, interrupt = nil, nil + + if checkDone() && time.Since(lastCheckTime) > time.Second*8 { + lastCheckTime = time.Now() + + // Verify that the timestamp of the most recently indexed history + // is sufficiently close to the current time, ensuring that the + // majority of histories have been indexed in batch. + timestamp, err := i.readBlockTime(lastID) + if err != nil { + i.log.Warn("Failed to read block time", "err", err) + continue + } + blockTime := time.Unix(int64(timestamp), 0) + + // By default, 128 layers are piled up, which corresponds to roughly 25 minutes + // with Ethereum's block interval (12s). A threshold of 6 hours is chosen as a + // conservative and safe default for whatever reason. + if time.Since(blockTime) > 6*time.Hour { + i.log.Debug("Histories fully indexed, waiting next wave", "age", common.PrettyAge(blockTime)) + continue + } close(i.done) - i.log.Info("Histories have been fully indexed", "last", lastID) + i.log.Info("Histories have been fully indexed", "last", lastID, "age", common.PrettyAge(blockTime), "elapsed", common.PrettyDuration(time.Since(start))) return } - // Relaunch the background runner if some tasks are left - done, interrupt = make(chan struct{}), new(atomic.Int32) - go i.index(done, interrupt, lastID) case <-i.closed: - interrupt.Store(1) - i.log.Info("Waiting background history index initer to exit") - <-done - + if done != nil { + interrupt.Store(1) + i.log.Info("Waiting background history index initer to exit") + <-done + } if checkDone() { close(i.done) } @@ -571,7 +623,7 @@ func (i *indexIniter) index(done chan struct{}, interrupt *atomic.Int32, lastID } return } - i.log.Info("Start history indexing", "beginID", beginID, "lastID", lastID) + i.log.Debug("Start history indexing", "beginID", beginID, "lastID", lastID) var ( current = beginID @@ -618,7 +670,7 @@ func (i *indexIniter) index(done chan struct{}, interrupt *atomic.Int32, lastID done = current - beginID ) eta := common.CalculateETA(done, left, time.Since(start)) - i.log.Info("Indexing history", "processed", done, "left", left, "elapsed", common.PrettyDuration(time.Since(start)), "eta", common.PrettyDuration(eta)) + i.log.Debug("Indexing history", "processed", done, "left", left, "elapsed", common.PrettyDuration(time.Since(start)), "eta", common.PrettyDuration(eta)) } } i.indexed.Store(current - 1) // update indexing progress @@ -629,7 +681,7 @@ func (i *indexIniter) index(done chan struct{}, interrupt *atomic.Int32, lastID if err := batch.finish(true); err != nil { i.log.Error("Failed to flush index", "err", err) } - log.Info("State indexing interrupted") + log.Debug("State indexing interrupted") return } } @@ -637,7 +689,7 @@ func (i *indexIniter) index(done chan struct{}, interrupt *atomic.Int32, lastID if err := batch.finish(true); err != nil { i.log.Error("Failed to flush index", "err", err) } - i.log.Info("Indexed history", "from", beginID, "to", lastID, "elapsed", common.PrettyDuration(time.Since(start))) + i.log.Debug("Indexed history", "from", beginID, "to", lastID, "elapsed", common.PrettyDuration(time.Since(start))) } // recover handles unclean shutdown recovery. After an unclean shutdown, any @@ -650,8 +702,8 @@ func (i *indexIniter) index(done chan struct{}, interrupt *atomic.Int32, lastID // by chain recovery, under the assumption that the recovered histories will be // identical to the lost ones. Fork-awareness should be added in the future to // correctly handle histories affected by reorgs. -func (i *indexIniter) recover(lastID uint64) { - defer i.wg.Done() +func (i *indexIniter) recover(lastID uint64) (uint64, bool) { + log.Info("History indexer is recovering", "history", lastID, "indexed", i.indexed.Load()) for { select { @@ -672,13 +724,12 @@ func (i *indexIniter) recover(lastID uint64) { // with the index data, indicating that index initialization is complete. metadata := loadIndexMetadata(i.disk, i.typ) if metadata != nil && metadata.Last == lastID { - close(i.done) i.log.Info("History indexer is recovered", "last", lastID) - return + return lastID, false } case <-i.closed: - return + return 0, true } } } @@ -746,7 +797,7 @@ func checkVersion(disk ethdb.KeyValueStore, typ historyType) { // newHistoryIndexer constructs the history indexer and launches the background // initer to complete the indexing of any remaining state histories. -func newHistoryIndexer(disk ethdb.KeyValueStore, freezer ethdb.AncientStore, lastHistoryID uint64, typ historyType) *historyIndexer { +func newHistoryIndexer(disk ethdb.Database, freezer ethdb.AncientStore, lastHistoryID uint64, typ historyType) *historyIndexer { checkVersion(disk, typ) return &historyIndexer{ initer: newIndexIniter(disk, freezer, typ, lastHistoryID), From 85070ad96cd3008322480ba5768eba1f4c198c36 Mon Sep 17 00:00:00 2001 From: Gary Rong Date: Wed, 28 Jan 2026 11:14:52 +0800 Subject: [PATCH 2/3] triedb/pathdb: improve history indexer --- triedb/pathdb/history_indexer.go | 108 ++++++++++++++++++++++--------- 1 file changed, 77 insertions(+), 31 deletions(-) diff --git a/triedb/pathdb/history_indexer.go b/triedb/pathdb/history_indexer.go index 0a5740d54c..4a29bfbc89 100644 --- a/triedb/pathdb/history_indexer.go +++ b/triedb/pathdb/history_indexer.go @@ -455,6 +455,61 @@ func (i *indexIniter) readBlockTime(historyID uint64) (uint64, error) { return header.Time, nil } +const ( + // syncHistoryThreshold is the time window behind the chain head. + // If the block being indexed is within this window, we consider the node + // to be close enough to the tip to operate in real-time mode. + syncHistoryThreshold = 6 * time.Hour + + // indexerIdleTimeout is the maximum duration of inactivity allowed + // before the indexer decides it has finished its current catch-up + // and exits the initial sync mode. + indexerIdleTimeout = 30 * time.Second + + // indexerHeartbeatInterval is the frequency at which the indexer + // checks its status and schedules background indexing operations. + // + // It is a deliberate design choice to introduce this delay instead + // of scheduling tasks immediately; this forces the indexer into + // "batch index mode" which significantly improves database I/O + // efficiency and reduces overhead. + indexerHeartbeatInterval = 15 * time.Second +) + +// checkExit reports whether the initial mode has completed. It assumes that +// all local histories have been fully indexed when invoked, and determines +// whether the initial mode should exit and marking historical state access +// as available. +func (i *indexIniter) checkExit(lastEventTime time.Time) bool { + // Verify that the timestamp of the most recently indexed history + // is sufficiently close to the current time, ensuring that the + // majority of histories have been indexed in batch. + timestamp, err := i.readBlockTime(i.indexed.Load()) + if err != nil { + i.log.Warn("Failed to read block time", "err", err) + return false + } + blockTime := time.Unix(int64(timestamp), 0) + + // By default, 128 layers are piled up, which corresponds to roughly 25 minutes + // with Ethereum's block interval (12s). A threshold of 6 hours is chosen as a + // conservative and safe default for whatever reason. + if time.Since(blockTime) < syncHistoryThreshold { + i.log.Info("Exit the initial mode as close to chain tip", "last", i.indexed.Load(), "age", common.PrettyAge(blockTime)) + return true + } + // The most recently indexed block is still at least 6 hours behind. The node + // may be syncing toward a historical block rather than the latest chain head. + // In this case, check whether the indexer has been idle for a while and exit + // the initial mode if so. + if time.Since(lastEventTime) > indexerIdleTimeout { + i.log.Info("Exit the initial mode due to inactivity", "last", i.indexed.Load(), "age", common.PrettyAge(blockTime)) + return true + } + i.log.Debug("Histories fully indexed, waiting next wave", "last", i.indexed.Load(), "age", common.PrettyAge(blockTime)) + return false +} + func (i *indexIniter) run(lastID uint64, recover bool) { defer i.wg.Done() @@ -463,13 +518,12 @@ func (i *indexIniter) run(lastID uint64, recover bool) { done chan struct{} interrupt *atomic.Int32 - start = time.Now() - lastCheckTime time.Time - // checkDone reports whether indexing has completed for all histories. checkDone = func() bool { return i.indexed.Load() == lastID } + lastEventTime = time.Now() + heartBeat = time.NewTicker(indexerHeartbeatInterval) ) if recover { var aborted bool @@ -479,12 +533,10 @@ func (i *indexIniter) run(lastID uint64, recover bool) { } } for { - if done == nil && !checkDone() { - done, interrupt = make(chan struct{}), new(atomic.Int32) - go i.index(done, interrupt, lastID) - } select { case signal := <-i.interrupt: + lastEventTime = time.Now() + // The indexing limit can only be extended or shortened continuously. newLastID := signal.newLastID if newLastID != lastID+1 && newLastID != lastID-1 { @@ -529,40 +581,34 @@ func (i *indexIniter) run(lastID uint64, recover bool) { case <-done: done, interrupt = nil, nil - if checkDone() && time.Since(lastCheckTime) > time.Second*8 { - lastCheckTime = time.Now() - - // Verify that the timestamp of the most recently indexed history - // is sufficiently close to the current time, ensuring that the - // majority of histories have been indexed in batch. - timestamp, err := i.readBlockTime(lastID) - if err != nil { - i.log.Warn("Failed to read block time", "err", err) - continue - } - blockTime := time.Unix(int64(timestamp), 0) - - // By default, 128 layers are piled up, which corresponds to roughly 25 minutes - // with Ethereum's block interval (12s). A threshold of 6 hours is chosen as a - // conservative and safe default for whatever reason. - if time.Since(blockTime) > 6*time.Hour { - i.log.Debug("Histories fully indexed, waiting next wave", "age", common.PrettyAge(blockTime)) - continue - } + if i.checkExit(lastEventTime) { close(i.done) - i.log.Info("Histories have been fully indexed", "last", lastID, "age", common.PrettyAge(blockTime), "elapsed", common.PrettyDuration(time.Since(start))) return } + case <-heartBeat.C: + if done != nil { + continue + } + if checkDone() { + if i.checkExit(lastEventTime) { + close(i.done) + return + } + continue + } + // Any pending tasks are scheduled for background execution, driven by the + // heartbeat ticker. This interval ensures that historical data is processed + // in batches rather than individually. + done, interrupt = make(chan struct{}), new(atomic.Int32) + go i.index(done, interrupt, lastID) + case <-i.closed: if done != nil { interrupt.Store(1) i.log.Info("Waiting background history index initer to exit") <-done } - if checkDone() { - close(i.done) - } return } } From 3b14f1ea02ddcdbeef3553ac02f6d4568dbec105 Mon Sep 17 00:00:00 2001 From: Gary Rong Date: Wed, 28 Jan 2026 12:06:47 +0800 Subject: [PATCH 3/3] triedb/pathdb: improve history indexer --- triedb/pathdb/history_indexer.go | 40 ++++++++++++++++++++++++++++---- 1 file changed, 35 insertions(+), 5 deletions(-) diff --git a/triedb/pathdb/history_indexer.go b/triedb/pathdb/history_indexer.go index 4a29bfbc89..17a8ca2b60 100644 --- a/triedb/pathdb/history_indexer.go +++ b/triedb/pathdb/history_indexer.go @@ -461,10 +461,18 @@ const ( // to be close enough to the tip to operate in real-time mode. syncHistoryThreshold = 6 * time.Hour - // indexerIdleTimeout is the maximum duration of inactivity allowed - // before the indexer decides it has finished its current catch-up - // and exits the initial sync mode. - indexerIdleTimeout = 30 * time.Second + // indexerIdleTimeoutLive is the strict inactivity limit applied once the + // indexer is synchronized with the chain head. + indexerIdleTimeoutLive = 60 * time.Second + + // indexerIdleTimeoutStartup is the period allowed when the indexer has + // little or no history, for Geth's initial sync procedures (e.g., beacon + // headers downloading or something else). + indexerIdleTimeoutStartup = 1 * time.Hour + + // indexerIdleDecayWindow defines the progress threshold over which the + // idle timeout linearly scales from the startup period down to the live one. + indexerIdleDecayWindow = 10000 // indexerHeartbeatInterval is the frequency at which the indexer // checks its status and schedules background indexing operations. @@ -476,6 +484,28 @@ const ( indexerHeartbeatInterval = 15 * time.Second ) +// getTimeout calculates a dynamic inactivity threshold based on indexing +// progress. For massive datasets, this accounts for the preparation phase +// required before active chain synchronization begins (such as beacon +// header downloading). +// +// Note: In private networks with no block, this may result in a long wait. +// In such cases, the initial sync mode is expected to be terminated by +// another condition verifying the chain head has been reached. This timeout +// primarily serves as a heuristic for the network with massive datasets. +func (i *indexIniter) getTimeout() time.Duration { + current := i.indexed.Load() + if current >= indexerIdleDecayWindow { + return indexerIdleTimeoutLive + } + progress := float64(current) / float64(indexerIdleDecayWindow) + + diff := float64(indexerIdleTimeoutStartup - indexerIdleTimeoutLive) + decay := time.Duration(diff * progress) + + return indexerIdleTimeoutStartup - decay +} + // checkExit reports whether the initial mode has completed. It assumes that // all local histories have been fully indexed when invoked, and determines // whether the initial mode should exit and marking historical state access @@ -502,7 +532,7 @@ func (i *indexIniter) checkExit(lastEventTime time.Time) bool { // may be syncing toward a historical block rather than the latest chain head. // In this case, check whether the indexer has been idle for a while and exit // the initial mode if so. - if time.Since(lastEventTime) > indexerIdleTimeout { + if time.Since(lastEventTime) > i.getTimeout() { i.log.Info("Exit the initial mode due to inactivity", "last", i.indexed.Load(), "age", common.PrettyAge(blockTime)) return true }