diff --git a/triedb/pathdb/history_indexer.go b/triedb/pathdb/history_indexer.go index 17a8ca2b60..ec7233110c 100644 --- a/triedb/pathdb/history_indexer.go +++ b/triedb/pathdb/history_indexer.go @@ -41,6 +41,8 @@ const ( stateHistoryIndexVersion = stateHistoryIndexV0 // the current state index version trienodeHistoryIndexV0 = uint8(0) // initial version of trienode index structure trienodeHistoryIndexVersion = trienodeHistoryIndexV0 // the current trienode index version + + indexerProcessBatchInSync = 100000 // threshold for history batch indexing when node is in sync stage. ) // indexVersion returns the latest index version for the given history type. @@ -349,6 +351,7 @@ type interruptSignal struct { // If a state history is removed due to a rollback, the associated indexes should // be unmarked accordingly. type indexIniter struct { + state *initerState disk ethdb.Database freezer ethdb.AncientStore interrupt chan *interruptSignal @@ -366,6 +369,7 @@ type indexIniter struct { func newIndexIniter(disk ethdb.Database, freezer ethdb.AncientStore, typ historyType, lastID uint64) *indexIniter { initer := &indexIniter{ + state: newIniterState(disk), disk: disk, freezer: freezer, interrupt: make(chan *interruptSignal), @@ -385,7 +389,7 @@ func newIndexIniter(disk ethdb.Database, freezer ethdb.AncientStore, typ history // Launch background indexer initer.wg.Add(1) - go initer.run(lastID, recover) + go initer.run(recover) return initer } @@ -395,6 +399,7 @@ func (i *indexIniter) close() { return default: close(i.closed) + i.state.close() i.wg.Wait() } } @@ -426,121 +431,7 @@ func (i *indexIniter) remain() uint64 { } } -// readBlockTime reads the associated block time of the provided history ID. -func (i *indexIniter) readBlockTime(historyID uint64) (uint64, error) { - var blockNumber uint64 - switch i.typ { - case typeStateHistory: - m, err := readStateHistoryMeta(i.freezer, historyID) - if err != nil { - return 0, err - } - blockNumber = m.block - - case typeTrienodeHistory: - m, err := readTrienodeMetadata(i.freezer, historyID) - if err != nil { - return 0, err - } - blockNumber = m.block - } - hash := rawdb.ReadCanonicalHash(i.disk, blockNumber) - if hash == (common.Hash{}) { - return 0, errors.New("block not found") - } - header := rawdb.ReadHeader(i.disk, hash, blockNumber) - if header == nil { - return 0, errors.New("block not found") - } - return header.Time, nil -} - -const ( - // syncHistoryThreshold is the time window behind the chain head. - // If the block being indexed is within this window, we consider the node - // to be close enough to the tip to operate in real-time mode. - syncHistoryThreshold = 6 * time.Hour - - // indexerIdleTimeoutLive is the strict inactivity limit applied once the - // indexer is synchronized with the chain head. - indexerIdleTimeoutLive = 60 * time.Second - - // indexerIdleTimeoutStartup is the period allowed when the indexer has - // little or no history, for Geth's initial sync procedures (e.g., beacon - // headers downloading or something else). - indexerIdleTimeoutStartup = 1 * time.Hour - - // indexerIdleDecayWindow defines the progress threshold over which the - // idle timeout linearly scales from the startup period down to the live one. - indexerIdleDecayWindow = 10000 - - // indexerHeartbeatInterval is the frequency at which the indexer - // checks its status and schedules background indexing operations. - // - // It is a deliberate design choice to introduce this delay instead - // of scheduling tasks immediately; this forces the indexer into - // "batch index mode" which significantly improves database I/O - // efficiency and reduces overhead. - indexerHeartbeatInterval = 15 * time.Second -) - -// getTimeout calculates a dynamic inactivity threshold based on indexing -// progress. For massive datasets, this accounts for the preparation phase -// required before active chain synchronization begins (such as beacon -// header downloading). -// -// Note: In private networks with no block, this may result in a long wait. -// In such cases, the initial sync mode is expected to be terminated by -// another condition verifying the chain head has been reached. This timeout -// primarily serves as a heuristic for the network with massive datasets. -func (i *indexIniter) getTimeout() time.Duration { - current := i.indexed.Load() - if current >= indexerIdleDecayWindow { - return indexerIdleTimeoutLive - } - progress := float64(current) / float64(indexerIdleDecayWindow) - - diff := float64(indexerIdleTimeoutStartup - indexerIdleTimeoutLive) - decay := time.Duration(diff * progress) - - return indexerIdleTimeoutStartup - decay -} - -// checkExit reports whether the initial mode has completed. It assumes that -// all local histories have been fully indexed when invoked, and determines -// whether the initial mode should exit and marking historical state access -// as available. -func (i *indexIniter) checkExit(lastEventTime time.Time) bool { - // Verify that the timestamp of the most recently indexed history - // is sufficiently close to the current time, ensuring that the - // majority of histories have been indexed in batch. - timestamp, err := i.readBlockTime(i.indexed.Load()) - if err != nil { - i.log.Warn("Failed to read block time", "err", err) - return false - } - blockTime := time.Unix(int64(timestamp), 0) - - // By default, 128 layers are piled up, which corresponds to roughly 25 minutes - // with Ethereum's block interval (12s). A threshold of 6 hours is chosen as a - // conservative and safe default for whatever reason. - if time.Since(blockTime) < syncHistoryThreshold { - i.log.Info("Exit the initial mode as close to chain tip", "last", i.indexed.Load(), "age", common.PrettyAge(blockTime)) - return true - } - // The most recently indexed block is still at least 6 hours behind. The node - // may be syncing toward a historical block rather than the latest chain head. - // In this case, check whether the indexer has been idle for a while and exit - // the initial mode if so. - if time.Since(lastEventTime) > i.getTimeout() { - i.log.Info("Exit the initial mode due to inactivity", "last", i.indexed.Load(), "age", common.PrettyAge(blockTime)) - return true - } - i.log.Debug("Histories fully indexed, waiting next wave", "last", i.indexed.Load(), "age", common.PrettyAge(blockTime)) - return false -} - -func (i *indexIniter) run(lastID uint64, recover bool) { +func (i *indexIniter) run(recover bool) { defer i.wg.Done() // Launch background indexing thread @@ -550,37 +441,37 @@ func (i *indexIniter) run(lastID uint64, recover bool) { // checkDone reports whether indexing has completed for all histories. checkDone = func() bool { - return i.indexed.Load() == lastID + return i.indexed.Load() == i.last.Load() } - lastEventTime = time.Now() - heartBeat = time.NewTicker(indexerHeartbeatInterval) + // canExit reports whether the initial indexing phase has completed. + canExit = func() bool { + return !i.state.is(stateSyncing) && checkDone() + } + heartBeat = time.NewTicker(15 * time.Second) ) if recover { - var aborted bool - lastID, aborted = i.recover(lastID) - if aborted { + if aborted := i.recover(); aborted { return } } for { select { case signal := <-i.interrupt: - lastEventTime = time.Now() + newLastID := signal.newLastID + oldLastID := i.last.Load() // The indexing limit can only be extended or shortened continuously. - newLastID := signal.newLastID - if newLastID != lastID+1 && newLastID != lastID-1 { - signal.result <- fmt.Errorf("invalid history id, last: %d, got: %d", lastID, newLastID) + if newLastID != oldLastID+1 && newLastID != oldLastID-1 { + signal.result <- fmt.Errorf("invalid history id, last: %d, got: %d", oldLastID, newLastID) continue } i.last.Store(newLastID) // update indexing range // The index limit is extended by one, update the limit without // interrupting the current background process. - if newLastID == lastID+1 { - lastID = newLastID + if newLastID == oldLastID+1 { signal.result <- nil - i.log.Debug("Extended history range", "last", lastID) + i.log.Debug("Extended history range", "last", newLastID) continue } // The index limit is shortened, interrupt the current background @@ -593,45 +484,45 @@ func (i *indexIniter) run(lastID uint64, recover bool) { // If all state histories, including the one to be reverted, have // been fully indexed, unindex it here and shut down the initializer. if checkDone() { - i.log.Info("Truncate the extra history", "id", lastID) - if err := unindexSingle(lastID, i.disk, i.freezer, i.typ); err != nil { + i.log.Info("Truncate the extra history", "id", oldLastID) + if err := unindexSingle(oldLastID, i.disk, i.freezer, i.typ); err != nil { signal.result <- err return } close(i.done) signal.result <- nil - i.log.Info("Histories have been fully indexed", "last", lastID-1) + i.log.Info("Histories have been fully indexed", "last", i.last.Load()) return } - // Adjust the indexing target and relaunch the process - lastID = newLastID + // Adjust the indexing target signal.result <- nil - i.log.Debug("Shortened history range", "last", lastID) + i.log.Debug("Shortened history range", "last", newLastID) case <-done: done, interrupt = nil, nil - if i.checkExit(lastEventTime) { + if canExit() { close(i.done) return } case <-heartBeat.C: + // Short circuit if the indexer is still busy if done != nil { continue } - if checkDone() { - if i.checkExit(lastEventTime) { - close(i.done) - return - } + if canExit() { + close(i.done) + return + } + // The local chain is still in the syncing phase. Only start the indexing + // when a sufficient amount of histories has accumulated. Batch indexing + // is more efficient than processing items individually. + if i.state.is(stateSyncing) && i.last.Load()-i.indexed.Load() < indexerProcessBatchInSync { continue } - // Any pending tasks are scheduled for background execution, driven by the - // heartbeat ticker. This interval ensures that historical data is processed - // in batches rather than individually. done, interrupt = make(chan struct{}), new(atomic.Int32) - go i.index(done, interrupt, lastID) + go i.index(done, interrupt, i.last.Load()) case <-i.closed: if done != nil { @@ -778,34 +669,35 @@ func (i *indexIniter) index(done chan struct{}, interrupt *atomic.Int32, lastID // by chain recovery, under the assumption that the recovered histories will be // identical to the lost ones. Fork-awareness should be added in the future to // correctly handle histories affected by reorgs. -func (i *indexIniter) recover(lastID uint64) (uint64, bool) { - log.Info("History indexer is recovering", "history", lastID, "indexed", i.indexed.Load()) +func (i *indexIniter) recover() bool { + log.Info("History indexer is recovering", "last", i.last.Load(), "indexed", i.indexed.Load()) for { select { case signal := <-i.interrupt: newLastID := signal.newLastID - if newLastID != lastID+1 && newLastID != lastID-1 { - signal.result <- fmt.Errorf("invalid history id, last: %d, got: %d", lastID, newLastID) + oldLastID := i.last.Load() + + // The indexing limit can only be extended or shortened continuously. + if newLastID != oldLastID+1 && newLastID != oldLastID-1 { + signal.result <- fmt.Errorf("invalid history id, last: %d, got: %d", oldLastID, newLastID) continue } - // Update the last indexed flag - lastID = newLastID signal.result <- nil i.last.Store(newLastID) - i.log.Debug("Updated history index flag", "last", lastID) + i.log.Debug("Updated history index flag", "last", newLastID) // Terminate the recovery routine once the histories are fully aligned // with the index data, indicating that index initialization is complete. metadata := loadIndexMetadata(i.disk, i.typ) - if metadata != nil && metadata.Last == lastID { - i.log.Info("History indexer is recovered", "last", lastID) - return lastID, false + if metadata != nil && metadata.Last == newLastID { + i.log.Info("History indexer is recovered", "last", newLastID) + return false } case <-i.closed: - return 0, true + return true } } } diff --git a/triedb/pathdb/history_indexer_state.go b/triedb/pathdb/history_indexer_state.go new file mode 100644 index 0000000000..c8d0826c07 --- /dev/null +++ b/triedb/pathdb/history_indexer_state.go @@ -0,0 +1,179 @@ +// Copyright 2026 The go-ethereum Authors +// This file is part of the go-ethereum library. +// +// The go-ethereum library is free software: you can redistribute it and/or modify +// it under the terms of the GNU Lesser General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// The go-ethereum library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the go-ethereum library. If not, see syncStateTimeWindow { + s.set(stateSynced) + log.Info("Marked indexing initer as synced") + } else { + s.set(stateSyncing) + } + + var ( + hhash = rawdb.ReadHeadHeaderHash(s.disk) + fhash = rawdb.ReadHeadFastBlockHash(s.disk) + bhash = rawdb.ReadHeadBlockHash(s.disk) + skeleton = rawdb.ReadSkeletonSyncStatus(s.disk) + lastProgress = time.Now() + ) + for { + select { + case <-ticker.C: + state := s.get() + if state == stateSynced || state == stateStalled { + continue + } + headBlock := s.readLastBlock() + if headBlock == nil { + continue + } + // State machine: stateSyncing => stateSynced + if time.Since(time.Unix(int64(headBlock.Time), 0)) < syncStateTimeWindow { + s.set(stateSynced) + log.Info("Marked indexing initer as synced") + continue + } + // State machine: stateSyncing => stateStalled + newhhash := rawdb.ReadHeadHeaderHash(s.disk) + newfhash := rawdb.ReadHeadFastBlockHash(s.disk) + newbhash := rawdb.ReadHeadBlockHash(s.disk) + newskeleton := rawdb.ReadSkeletonSyncStatus(s.disk) + hasProgress := newhhash.Cmp(hhash) != 0 || newfhash.Cmp(fhash) != 0 || newbhash.Cmp(bhash) != 0 || bytes.Equal(newskeleton, skeleton) + + if !hasProgress && time.Since(lastProgress) > syncStalledTimeout { + s.set(stateStalled) + log.Info("Marked indexing initer as stalled") + continue + } + if hasProgress { + hhash = newhhash + fhash = newfhash + bhash = newbhash + skeleton = newskeleton + lastProgress = time.Now() + } + + case <-s.term: + return + } + } +} + +func (s *initerState) close() { + select { + case <-s.term: + default: + close(s.term) + } + return +} + +// readLastBlock returns the local chain head. +func (s *initerState) readLastBlock() *types.Header { + hash := rawdb.ReadHeadBlockHash(s.disk) + if hash == (common.Hash{}) { + return nil + } + number, exists := rawdb.ReadHeaderNumber(s.disk, hash) + if !exists { + return nil + } + return rawdb.ReadHeader(s.disk, hash, number) +} diff --git a/triedb/pathdb/history_indexer_test.go b/triedb/pathdb/history_indexer_test.go index f333d18d8b..71296de3c7 100644 --- a/triedb/pathdb/history_indexer_test.go +++ b/triedb/pathdb/history_indexer_test.go @@ -27,7 +27,7 @@ import ( // deadlock when the indexer is active. This specifically targets the case where // signal.result must be sent to unblock the caller. func TestHistoryIndexerShortenDeadlock(t *testing.T) { - //log.SetDefault(log.NewLogger(log.NewTerminalHandlerWithLevel(os.Stderr, log.LevelInfo, true))) + // log.SetDefault(log.NewLogger(log.NewTerminalHandlerWithLevel(os.Stderr, log.LevelDebug, true))) db := rawdb.NewMemoryDatabase() freezer, _ := rawdb.NewStateFreezer(t.TempDir(), false, false) defer freezer.Close()