// Copyright 2025 The go-ethereum Authors // This file is part of the go-ethereum library. // // The go-ethereum library is free software: you can redistribute it and/or modify // it under the terms of the GNU Lesser General Public License as published by // the Free Software Foundation, either version 3 of the License, or // (at your option) any later version. // // The go-ethereum library is distributed in the hope that it will be useful, // but WITHOUT ANY WARRANTY; without even the implied warranty of // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the // GNU Lesser General Public License for more details. // // You should have received a copy of the GNU Lesser General Public License // along with the go-ethereum library. If not, see = tailID { log.Debug("Resume state history indexing", "id", metadata.Last+1, "tail", tailID) return metadata.Last + 1, nil } // History has been shortened without indexing. Discard the gapped segment // in the history and shift to the first available element. // // The missing indexes corresponding to the gapped histories won't be visible. // It's fine to leave them unindexed. log.Info("History gap detected, discard old segment", "oldHead", metadata.Last, "newHead", tailID) return tailID, nil } func (i *indexIniter) index(done chan struct{}, interrupt *atomic.Int32, lastID uint64) { defer close(done) beginID, err := i.next() if err != nil { log.Error("Failed to find next state history for indexing", "err", err) return } // All available state histories have been indexed, and the last indexed one // exceeds the most recent available state history. This situation may occur // when the state is reverted manually (chain.SetHead) or the deep reorg is // encountered. In such cases, no indexing should be scheduled. if beginID > lastID { if lastID == 0 && beginID == 1 { // Initialize the indexing flag if the state history is empty by // using zero as the disk layer ID. This is a common case that // can occur after snap sync. // // This step is essential to avoid spinning up indexing thread // endlessly until a history object is produced. storeIndexMetadata(i.disk, 0) log.Info("Initialized history indexing flag") } else { log.Debug("State history is fully indexed", "last", lastID) } return } log.Info("Start history indexing", "beginID", beginID, "lastID", lastID) var ( current = beginID start = time.Now() logged = time.Now() batch = newBatchIndexer(i.disk, false) ) for current <= lastID { count := lastID - current + 1 if count > historyReadBatch { count = historyReadBatch } histories, err := readHistories(i.freezer, current, count) if err != nil { // The history read might fall if the history is truncated from // head due to revert operation. log.Error("Failed to read history for indexing", "current", current, "count", count, "err", err) return } for _, h := range histories { if err := batch.process(h, current); err != nil { log.Error("Failed to index history", "err", err) return } current += 1 // Occasionally report the indexing progress if time.Since(logged) > time.Second*8 { logged = time.Now() var ( left = lastID - current + 1 done = current - beginID speed = done/uint64(time.Since(start)/time.Millisecond+1) + 1 // +1s to avoid division by zero ) // Override the ETA if larger than the largest until now eta := time.Duration(left/speed) * time.Millisecond log.Info("Indexing state history", "processed", done, "left", left, "elapsed", common.PrettyDuration(time.Since(start)), "eta", common.PrettyDuration(eta)) } } i.indexed.Store(current - 1) // update indexing progress // Check interruption signal and abort process if it's fired if interrupt != nil { if signal := interrupt.Load(); signal != 0 { if err := batch.finish(true); err != nil { log.Error("Failed to flush index", "err", err) } log.Info("State indexing interrupted") return } } } if err := batch.finish(true); err != nil { log.Error("Failed to flush index", "err", err) } log.Info("Indexed state history", "from", beginID, "to", lastID, "elapsed", common.PrettyDuration(time.Since(start))) } // historyIndexer manages the indexing and unindexing of state histories, // providing access to historical states. // // Upon initialization, historyIndexer starts a one-time background process // to complete the indexing of any remaining state histories. Once this // process is finished, all state histories are marked as fully indexed, // enabling handling of requests for historical states. Thereafter, any new // state histories must be indexed or unindexed synchronously, ensuring that // the history index is created or removed along with the corresponding // state history. type historyIndexer struct { initer *indexIniter disk ethdb.KeyValueStore freezer ethdb.AncientStore } // checkVersion checks whether the index data in the database matches the version. func checkVersion(disk ethdb.KeyValueStore) { blob := rawdb.ReadStateHistoryIndexMetadata(disk) if len(blob) == 0 { return } var m indexMetadata err := rlp.DecodeBytes(blob, &m) if err == nil && m.Version == stateIndexVersion { return } // TODO(rjl493456442) would be better to group them into a batch. rawdb.DeleteStateHistoryIndexMetadata(disk) rawdb.DeleteStateHistoryIndex(disk) version := "unknown" if err == nil { version = fmt.Sprintf("%d", m.Version) } log.Info("Cleaned up obsolete state history index", "version", version, "want", stateIndexVersion) } // newHistoryIndexer constructs the history indexer and launches the background // initer to complete the indexing of any remaining state histories. func newHistoryIndexer(disk ethdb.KeyValueStore, freezer ethdb.AncientStore, lastHistoryID uint64) *historyIndexer { checkVersion(disk) return &historyIndexer{ initer: newIndexIniter(disk, freezer, lastHistoryID), disk: disk, freezer: freezer, } } func (i *historyIndexer) close() { i.initer.close() } // inited returns a flag indicating whether the existing state histories // have been fully indexed, in other words, whether they are available // for external access. func (i *historyIndexer) inited() bool { return i.initer.inited() } // extend sends the notification that new state history with specified ID // has been written into the database and is ready for indexing. func (i *historyIndexer) extend(historyID uint64) error { signal := &interruptSignal{ newLastID: historyID, result: make(chan error, 1), } select { case <-i.initer.closed: return errors.New("indexer is closed") case <-i.initer.done: return indexSingle(historyID, i.disk, i.freezer) case i.initer.interrupt <- signal: return <-signal.result } } // shorten sends the notification that state history with specified ID // is about to be deleted from the database and should be unindexed. func (i *historyIndexer) shorten(historyID uint64) error { signal := &interruptSignal{ newLastID: historyID - 1, result: make(chan error, 1), } select { case <-i.initer.closed: return errors.New("indexer is closed") case <-i.initer.done: return unindexSingle(historyID, i.disk, i.freezer) case i.initer.interrupt <- signal: return <-signal.result } } // progress returns the indexing progress made so far. It provides the number // of states that remain unindexed. func (i *historyIndexer) progress() (uint64, error) { select { case <-i.initer.closed: return 0, errors.New("indexer is closed") default: return i.initer.remain(), nil } }