diff --git a/triedb/pathdb/history_indexer.go b/triedb/pathdb/history_indexer.go index c987b380ed..17a8ca2b60 100644 --- a/triedb/pathdb/history_indexer.go +++ b/triedb/pathdb/history_indexer.go @@ -349,7 +349,7 @@ type interruptSignal struct { // If a state history is removed due to a rollback, the associated indexes should // be unmarked accordingly. type indexIniter struct { - disk ethdb.KeyValueStore + disk ethdb.Database freezer ethdb.AncientStore interrupt chan *interruptSignal done chan struct{} @@ -364,7 +364,7 @@ type indexIniter struct { wg sync.WaitGroup } -func newIndexIniter(disk ethdb.KeyValueStore, freezer ethdb.AncientStore, typ historyType, lastID uint64) *indexIniter { +func newIndexIniter(disk ethdb.Database, freezer ethdb.AncientStore, typ historyType, lastID uint64) *indexIniter { initer := &indexIniter{ disk: disk, freezer: freezer, @@ -385,12 +385,7 @@ func newIndexIniter(disk ethdb.KeyValueStore, freezer ethdb.AncientStore, typ hi // Launch background indexer initer.wg.Add(1) - if recover { - log.Info("History indexer is recovering", "history", lastID, "indexed", metadata.Last) - go initer.recover(lastID) - } else { - go initer.run(lastID) - } + go initer.run(lastID, recover) return initer } @@ -431,26 +426,147 @@ func (i *indexIniter) remain() uint64 { } } -func (i *indexIniter) run(lastID uint64) { +// readBlockTime reads the associated block time of the provided history ID. +func (i *indexIniter) readBlockTime(historyID uint64) (uint64, error) { + var blockNumber uint64 + switch i.typ { + case typeStateHistory: + m, err := readStateHistoryMeta(i.freezer, historyID) + if err != nil { + return 0, err + } + blockNumber = m.block + + case typeTrienodeHistory: + m, err := readTrienodeMetadata(i.freezer, historyID) + if err != nil { + return 0, err + } + blockNumber = m.block + } + hash := rawdb.ReadCanonicalHash(i.disk, blockNumber) + if hash == (common.Hash{}) { + return 0, errors.New("block not found") + } + header := rawdb.ReadHeader(i.disk, hash, blockNumber) + if header == nil { + return 0, errors.New("block not found") + } + return header.Time, nil +} + +const ( + // syncHistoryThreshold is the time window behind the chain head. + // If the block being indexed is within this window, we consider the node + // to be close enough to the tip to operate in real-time mode. + syncHistoryThreshold = 6 * time.Hour + + // indexerIdleTimeoutLive is the strict inactivity limit applied once the + // indexer is synchronized with the chain head. + indexerIdleTimeoutLive = 60 * time.Second + + // indexerIdleTimeoutStartup is the period allowed when the indexer has + // little or no history, for Geth's initial sync procedures (e.g., beacon + // headers downloading or something else). + indexerIdleTimeoutStartup = 1 * time.Hour + + // indexerIdleDecayWindow defines the progress threshold over which the + // idle timeout linearly scales from the startup period down to the live one. + indexerIdleDecayWindow = 10000 + + // indexerHeartbeatInterval is the frequency at which the indexer + // checks its status and schedules background indexing operations. + // + // It is a deliberate design choice to introduce this delay instead + // of scheduling tasks immediately; this forces the indexer into + // "batch index mode" which significantly improves database I/O + // efficiency and reduces overhead. + indexerHeartbeatInterval = 15 * time.Second +) + +// getTimeout calculates a dynamic inactivity threshold based on indexing +// progress. For massive datasets, this accounts for the preparation phase +// required before active chain synchronization begins (such as beacon +// header downloading). +// +// Note: In private networks with no block, this may result in a long wait. +// In such cases, the initial sync mode is expected to be terminated by +// another condition verifying the chain head has been reached. This timeout +// primarily serves as a heuristic for the network with massive datasets. +func (i *indexIniter) getTimeout() time.Duration { + current := i.indexed.Load() + if current >= indexerIdleDecayWindow { + return indexerIdleTimeoutLive + } + progress := float64(current) / float64(indexerIdleDecayWindow) + + diff := float64(indexerIdleTimeoutStartup - indexerIdleTimeoutLive) + decay := time.Duration(diff * progress) + + return indexerIdleTimeoutStartup - decay +} + +// checkExit reports whether the initial mode has completed. It assumes that +// all local histories have been fully indexed when invoked, and determines +// whether the initial mode should exit and marking historical state access +// as available. +func (i *indexIniter) checkExit(lastEventTime time.Time) bool { + // Verify that the timestamp of the most recently indexed history + // is sufficiently close to the current time, ensuring that the + // majority of histories have been indexed in batch. + timestamp, err := i.readBlockTime(i.indexed.Load()) + if err != nil { + i.log.Warn("Failed to read block time", "err", err) + return false + } + blockTime := time.Unix(int64(timestamp), 0) + + // By default, 128 layers are piled up, which corresponds to roughly 25 minutes + // with Ethereum's block interval (12s). A threshold of 6 hours is chosen as a + // conservative and safe default for whatever reason. + if time.Since(blockTime) < syncHistoryThreshold { + i.log.Info("Exit the initial mode as close to chain tip", "last", i.indexed.Load(), "age", common.PrettyAge(blockTime)) + return true + } + // The most recently indexed block is still at least 6 hours behind. The node + // may be syncing toward a historical block rather than the latest chain head. + // In this case, check whether the indexer has been idle for a while and exit + // the initial mode if so. + if time.Since(lastEventTime) > i.getTimeout() { + i.log.Info("Exit the initial mode due to inactivity", "last", i.indexed.Load(), "age", common.PrettyAge(blockTime)) + return true + } + i.log.Debug("Histories fully indexed, waiting next wave", "last", i.indexed.Load(), "age", common.PrettyAge(blockTime)) + return false +} + +func (i *indexIniter) run(lastID uint64, recover bool) { defer i.wg.Done() // Launch background indexing thread var ( - done = make(chan struct{}) - interrupt = new(atomic.Int32) + done chan struct{} + interrupt *atomic.Int32 - // checkDone indicates whether all requested state histories - // have been fully indexed. + // checkDone reports whether indexing has completed for all histories. checkDone = func() bool { - metadata := loadIndexMetadata(i.disk, i.typ) - return metadata != nil && metadata.Last == lastID + return i.indexed.Load() == lastID } + lastEventTime = time.Now() + heartBeat = time.NewTicker(indexerHeartbeatInterval) ) - go i.index(done, interrupt, lastID) - + if recover { + var aborted bool + lastID, aborted = i.recover(lastID) + if aborted { + return + } + } for { select { case signal := <-i.interrupt: + lastEventTime = time.Now() + // The indexing limit can only be extended or shortened continuously. newLastID := signal.newLastID if newLastID != lastID+1 && newLastID != lastID-1 { @@ -467,11 +583,13 @@ func (i *indexIniter) run(lastID uint64) { i.log.Debug("Extended history range", "last", lastID) continue } - // The index limit is shortened by one, interrupt the current background - // process and relaunch with new target. - interrupt.Store(1) - <-done - + // The index limit is shortened, interrupt the current background + // process if it's active and update the target. + if done != nil { + interrupt.Store(1) + <-done + done, interrupt = nil, nil + } // If all state histories, including the one to be reverted, have // been fully indexed, unindex it here and shut down the initializer. if checkDone() { @@ -488,28 +606,38 @@ func (i *indexIniter) run(lastID uint64) { // Adjust the indexing target and relaunch the process lastID = newLastID signal.result <- nil - - done, interrupt = make(chan struct{}), new(atomic.Int32) - go i.index(done, interrupt, lastID) i.log.Debug("Shortened history range", "last", lastID) case <-done: - if checkDone() { + done, interrupt = nil, nil + + if i.checkExit(lastEventTime) { close(i.done) - i.log.Info("Histories have been fully indexed", "last", lastID) return } - // Relaunch the background runner if some tasks are left + + case <-heartBeat.C: + if done != nil { + continue + } + if checkDone() { + if i.checkExit(lastEventTime) { + close(i.done) + return + } + continue + } + // Any pending tasks are scheduled for background execution, driven by the + // heartbeat ticker. This interval ensures that historical data is processed + // in batches rather than individually. done, interrupt = make(chan struct{}), new(atomic.Int32) go i.index(done, interrupt, lastID) case <-i.closed: - interrupt.Store(1) - i.log.Info("Waiting background history index initer to exit") - <-done - - if checkDone() { - close(i.done) + if done != nil { + interrupt.Store(1) + i.log.Info("Waiting background history index initer to exit") + <-done } return } @@ -571,7 +699,7 @@ func (i *indexIniter) index(done chan struct{}, interrupt *atomic.Int32, lastID } return } - i.log.Info("Start history indexing", "beginID", beginID, "lastID", lastID) + i.log.Debug("Start history indexing", "beginID", beginID, "lastID", lastID) var ( current = beginID @@ -618,7 +746,7 @@ func (i *indexIniter) index(done chan struct{}, interrupt *atomic.Int32, lastID done = current - beginID ) eta := common.CalculateETA(done, left, time.Since(start)) - i.log.Info("Indexing history", "processed", done, "left", left, "elapsed", common.PrettyDuration(time.Since(start)), "eta", common.PrettyDuration(eta)) + i.log.Debug("Indexing history", "processed", done, "left", left, "elapsed", common.PrettyDuration(time.Since(start)), "eta", common.PrettyDuration(eta)) } } i.indexed.Store(current - 1) // update indexing progress @@ -629,7 +757,7 @@ func (i *indexIniter) index(done chan struct{}, interrupt *atomic.Int32, lastID if err := batch.finish(true); err != nil { i.log.Error("Failed to flush index", "err", err) } - log.Info("State indexing interrupted") + log.Debug("State indexing interrupted") return } } @@ -637,7 +765,7 @@ func (i *indexIniter) index(done chan struct{}, interrupt *atomic.Int32, lastID if err := batch.finish(true); err != nil { i.log.Error("Failed to flush index", "err", err) } - i.log.Info("Indexed history", "from", beginID, "to", lastID, "elapsed", common.PrettyDuration(time.Since(start))) + i.log.Debug("Indexed history", "from", beginID, "to", lastID, "elapsed", common.PrettyDuration(time.Since(start))) } // recover handles unclean shutdown recovery. After an unclean shutdown, any @@ -650,8 +778,8 @@ func (i *indexIniter) index(done chan struct{}, interrupt *atomic.Int32, lastID // by chain recovery, under the assumption that the recovered histories will be // identical to the lost ones. Fork-awareness should be added in the future to // correctly handle histories affected by reorgs. -func (i *indexIniter) recover(lastID uint64) { - defer i.wg.Done() +func (i *indexIniter) recover(lastID uint64) (uint64, bool) { + log.Info("History indexer is recovering", "history", lastID, "indexed", i.indexed.Load()) for { select { @@ -672,13 +800,12 @@ func (i *indexIniter) recover(lastID uint64) { // with the index data, indicating that index initialization is complete. metadata := loadIndexMetadata(i.disk, i.typ) if metadata != nil && metadata.Last == lastID { - close(i.done) i.log.Info("History indexer is recovered", "last", lastID) - return + return lastID, false } case <-i.closed: - return + return 0, true } } } @@ -746,7 +873,7 @@ func checkVersion(disk ethdb.KeyValueStore, typ historyType) { // newHistoryIndexer constructs the history indexer and launches the background // initer to complete the indexing of any remaining state histories. -func newHistoryIndexer(disk ethdb.KeyValueStore, freezer ethdb.AncientStore, lastHistoryID uint64, typ historyType) *historyIndexer { +func newHistoryIndexer(disk ethdb.Database, freezer ethdb.AncientStore, lastHistoryID uint64, typ historyType) *historyIndexer { checkVersion(disk, typ) return &historyIndexer{ initer: newIndexIniter(disk, freezer, typ, lastHistoryID),