// Copyright 2025 The go-ethereum Authors // This file is part of the go-ethereum library. // // The go-ethereum library is free software: you can redistribute it and/or modify // it under the terms of the GNU Lesser General Public License as published by // the Free Software Foundation, either version 3 of the License, or // (at your option) any later version. // // The go-ethereum library is distributed in the hope that it will be useful, // but WITHOUT ANY WARRANTY; without even the implied warranty of // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the // GNU Lesser General Public License for more details. // // You should have received a copy of the GNU Lesser General Public License // along with the go-ethereum library. If not, see = ethdb.IdealBatchSize { batchSize += batch.ValueSize() if err := batch.Write(); err != nil { return err } batch.Reset() } return nil } ) eg.SetLimit(runtime.NumCPU()) var indexed uint64 if metadata := loadIndexMetadata(b.db, b.typ); metadata != nil { indexed = metadata.Last } for ident, list := range b.index { ext := b.ext[ident] eg.Go(func() error { if !b.delete { iw, err := newIndexWriter(b.db, ident, indexed, ident.bloomSize()) if err != nil { return err } for i, n := range list { if err := iw.append(n, ext[i]); err != nil { return err } } return writeBatch(func(batch ethdb.Batch) { iw.finish(batch) }) } else { id, err := newIndexDeleter(b.db, ident, indexed, ident.bloomSize()) if err != nil { return err } for _, n := range list { if err := id.pop(n); err != nil { return err } } return writeBatch(func(batch ethdb.Batch) { id.finish(batch) }) } }) } if err := eg.Wait(); err != nil { return err } // Update the position of last indexed state history if !b.delete { storeIndexMetadata(batch, b.typ, b.lastID) } else { if b.lastID == 1 { deleteIndexMetadata(batch, b.typ) } else { storeIndexMetadata(batch, b.typ, b.lastID-1) } } batchSize += batch.ValueSize() if err := batch.Write(); err != nil { return err } log.Debug("Committed batch indexer", "type", b.typ, "entries", len(b.index), "records", b.pending, "size", common.StorageSize(batchSize), "elapsed", common.PrettyDuration(time.Since(start))) b.pending = 0 clear(b.index) clear(b.ext) return nil } // indexSingle processes the state history with the specified ID for indexing. func indexSingle(historyID uint64, db ethdb.KeyValueStore, freezer ethdb.AncientReader, typ historyType) error { start := time.Now() defer func() { if typ == typeStateHistory { stateIndexHistoryTimer.UpdateSince(start) } else if typ == typeTrienodeHistory { trienodeIndexHistoryTimer.UpdateSince(start) } }() metadata := loadIndexMetadata(db, typ) if metadata == nil || metadata.Last+1 != historyID { last := "null" if metadata != nil { last = fmt.Sprintf("%v", metadata.Last) } return fmt.Errorf("history indexing is out of order, last: %s, requested: %d", last, historyID) } var ( err error h history b = newBatchIndexer(db, false, typ) ) if typ == typeStateHistory { h, err = readStateHistory(freezer, historyID) } else { h, err = readTrienodeHistory(freezer, historyID) } if err != nil { return err } if err := b.process(h, historyID); err != nil { return err } if err := b.finish(true); err != nil { return err } log.Debug("Indexed history", "type", typ, "id", historyID, "elapsed", common.PrettyDuration(time.Since(start))) return nil } // unindexSingle processes the state history with the specified ID for unindexing. func unindexSingle(historyID uint64, db ethdb.KeyValueStore, freezer ethdb.AncientReader, typ historyType) error { start := time.Now() defer func() { if typ == typeStateHistory { stateUnindexHistoryTimer.UpdateSince(start) } else if typ == typeTrienodeHistory { trienodeUnindexHistoryTimer.UpdateSince(start) } }() metadata := loadIndexMetadata(db, typ) if metadata == nil || metadata.Last != historyID { last := "null" if metadata != nil { last = fmt.Sprintf("%v", metadata.Last) } return fmt.Errorf("history unindexing is out of order, last: %s, requested: %d", last, historyID) } var ( err error h history ) b := newBatchIndexer(db, true, typ) if typ == typeStateHistory { h, err = readStateHistory(freezer, historyID) } else { h, err = readTrienodeHistory(freezer, historyID) } if err != nil { return err } if err := b.process(h, historyID); err != nil { return err } if err := b.finish(true); err != nil { return err } log.Debug("Unindexed history", "type", typ, "id", historyID, "elapsed", common.PrettyDuration(time.Since(start))) return nil } type interruptSignal struct { newLastID uint64 result chan error } // indexIniter is responsible for completing the indexing of remaining state // histories in batch. It runs as a one-time background thread and terminates // once all available state histories are indexed. // // Afterward, new state histories should be indexed synchronously alongside // the state data itself, ensuring both the history and its index are available. // If a state history is removed due to a rollback, the associated indexes should // be unmarked accordingly. type indexIniter struct { state *initerState disk ethdb.Database freezer ethdb.AncientStore interrupt chan *interruptSignal done chan struct{} closed chan struct{} typ historyType log log.Logger // Contextual logger with the history type injected // indexing progress indexed atomic.Uint64 // the id of latest indexed state last atomic.Uint64 // the id of the target state to be indexed wg sync.WaitGroup } func newIndexIniter(disk ethdb.Database, freezer ethdb.AncientStore, typ historyType, lastID uint64, noWait bool) *indexIniter { initer := &indexIniter{ state: newIniterState(disk, noWait), disk: disk, freezer: freezer, interrupt: make(chan *interruptSignal), done: make(chan struct{}), closed: make(chan struct{}), typ: typ, log: log.New("type", typ.String()), } // Load indexing progress var recover bool initer.last.Store(lastID) metadata := loadIndexMetadata(disk, typ) if metadata != nil { initer.indexed.Store(metadata.Last) recover = metadata.Last > lastID } // Launch background indexer initer.wg.Add(1) go initer.run(recover) return initer } func (i *indexIniter) close() { select { case <-i.closed: return default: close(i.closed) i.state.close() i.wg.Wait() } } func (i *indexIniter) inited() bool { select { case <-i.closed: return false case <-i.done: return true default: return false } } func (i *indexIniter) remain() uint64 { select { case <-i.closed: return 0 case <-i.done: return 0 default: last, indexed := i.last.Load(), i.indexed.Load() if last < indexed { i.log.Warn("State indexer is in recovery", "indexed", indexed, "last", last) return indexed - last } return last - indexed } } func (i *indexIniter) run(recover bool) { defer i.wg.Done() // Launch background indexing thread var ( done chan struct{} interrupt *atomic.Int32 // checkDone reports whether indexing has completed for all histories. checkDone = func() bool { metadata := loadIndexMetadata(i.disk, i.typ) return metadata != nil && metadata.Last == i.last.Load() } // canExit reports whether the initial indexing phase has completed. canExit = func() bool { return !i.state.is(stateSyncing) && checkDone() } heartBeat = time.NewTimer(0) ) defer heartBeat.Stop() if recover { if aborted := i.recover(); aborted { return } } for { select { case signal := <-i.interrupt: newLastID := signal.newLastID oldLastID := i.last.Load() // The indexing limit can only be extended or shortened continuously. if newLastID != oldLastID+1 && newLastID != oldLastID-1 { signal.result <- fmt.Errorf("invalid history id, last: %d, got: %d", oldLastID, newLastID) continue } i.last.Store(newLastID) // update indexing range // The index limit is extended by one, update the limit without // interrupting the current background process. if newLastID == oldLastID+1 { signal.result <- nil i.log.Debug("Extended history range", "last", newLastID) continue } // The index limit is shortened, interrupt the current background // process if it's active and update the target. if done != nil { interrupt.Store(1) <-done done, interrupt = nil, nil } // If all state histories, including the one to be reverted, have // been fully indexed, unindex it here and shut down the initializer. if checkDone() { i.log.Info("Truncate the extra history", "id", oldLastID) if err := unindexSingle(oldLastID, i.disk, i.freezer, i.typ); err != nil { signal.result <- err return } close(i.done) signal.result <- nil i.log.Info("Histories have been fully indexed", "last", i.last.Load()) return } // Adjust the indexing target signal.result <- nil i.log.Debug("Shortened history range", "last", newLastID) case <-done: done, interrupt = nil, nil if canExit() { close(i.done) return } case <-heartBeat.C: heartBeat.Reset(time.Second * 15) // Short circuit if the indexer is still busy if done != nil { continue } if canExit() { close(i.done) return } // The local chain is still in the syncing phase. Only start the indexing // when a sufficient amount of histories has accumulated. Batch indexing // is more efficient than processing items individually. if i.state.is(stateSyncing) && i.last.Load()-i.indexed.Load() < indexerProcessBatchInSync { continue } done, interrupt = make(chan struct{}), new(atomic.Int32) go i.index(done, interrupt, i.last.Load()) case <-i.closed: if done != nil { interrupt.Store(1) i.log.Info("Waiting background history index initer to exit") <-done } return } } } // next returns the ID of the next state history to be indexed. func (i *indexIniter) next() (uint64, error) { tail, err := i.freezer.Tail() if err != nil { return 0, err } tailID := tail + 1 // compute the id of the oldest history // Start indexing from scratch if nothing has been indexed metadata := loadIndexMetadata(i.disk, i.typ) if metadata == nil { i.log.Debug("Initialize history indexing from scratch", "id", tailID) return tailID, nil } // Resume indexing from the last interrupted position if metadata.Last+1 >= tailID { i.log.Debug("Resume history indexing", "id", metadata.Last+1, "tail", tailID) return metadata.Last + 1, nil } // History has been shortened without indexing. Discard the gapped segment // in the history and shift to the first available element. // // The missing indexes corresponding to the gapped histories won't be visible. // It's fine to leave them unindexed. i.log.Info("History gap detected, discard old segment", "oldHead", metadata.Last, "newHead", tailID) return tailID, nil } func (i *indexIniter) index(done chan struct{}, interrupt *atomic.Int32, lastID uint64) { defer close(done) beginID, err := i.next() if err != nil { i.log.Error("Failed to find next history for indexing", "err", err) return } // All available state histories have been indexed, and the last indexed one // exceeds the most recent available state history. This situation may occur // when the state is reverted manually (chain.SetHead) or the deep reorg is // encountered. In such cases, no indexing should be scheduled. if beginID > lastID { if lastID == 0 && beginID == 1 { // Initialize the indexing flag if the state history is empty by // using zero as the disk layer ID. This is a common case that // can occur after snap sync. // // This step is essential to avoid spinning up indexing thread // endlessly until a history object is produced. storeIndexMetadata(i.disk, i.typ, 0) i.log.Info("Initialized history indexing flag") } else { i.log.Debug("History is fully indexed", "last", lastID) } return } i.log.Debug("Start history indexing", "beginID", beginID, "lastID", lastID) var ( current = beginID start = time.Now() logged = time.Now() batch = newBatchIndexer(i.disk, false, i.typ) ) for current <= lastID { count := lastID - current + 1 if count > historyReadBatch { count = historyReadBatch } var histories []history if i.typ == typeStateHistory { histories, err = readStateHistories(i.freezer, current, count) if err != nil { // The history read might fall if the history is truncated from // head due to revert operation. i.log.Error("Failed to read history for indexing", "current", current, "count", count, "err", err) return } } else { histories, err = readTrienodeHistories(i.freezer, current, count) if err != nil { // The history read might fall if the history is truncated from // head due to revert operation. i.log.Error("Failed to read history for indexing", "current", current, "count", count, "err", err) return } } for _, h := range histories { if err := batch.process(h, current); err != nil { i.log.Error("Failed to index history", "err", err) return } current += 1 // Occasionally report the indexing progress if time.Since(logged) > time.Second*8 { logged = time.Now() var ( left = lastID - current + 1 done = current - beginID ) eta := common.CalculateETA(done, left, time.Since(start)) i.log.Debug("Indexing history", "processed", done, "left", left, "elapsed", common.PrettyDuration(time.Since(start)), "eta", common.PrettyDuration(eta)) } } i.indexed.Store(current - 1) // update indexing progress // Check interruption signal and abort process if it's fired if interrupt != nil { if signal := interrupt.Load(); signal != 0 { if err := batch.finish(true); err != nil { i.log.Error("Failed to flush index", "err", err) } log.Debug("State indexing interrupted") return } } } if err := batch.finish(true); err != nil { i.log.Error("Failed to flush index", "err", err) } i.log.Debug("Indexed history", "from", beginID, "to", lastID, "elapsed", common.PrettyDuration(time.Since(start))) } // recover handles unclean shutdown recovery. After an unclean shutdown, any // extra histories are typically truncated, while the corresponding history index // entries may still have been written. Ideally, we would unindex these histories // in reverse order, but there is no guarantee that the required histories will // still be available. // // As a workaround, indexIniter waits until the missing histories are regenerated // by chain recovery, under the assumption that the recovered histories will be // identical to the lost ones. Fork-awareness should be added in the future to // correctly handle histories affected by reorgs. func (i *indexIniter) recover() bool { log.Info("History indexer is recovering", "last", i.last.Load(), "indexed", i.indexed.Load()) for { select { case signal := <-i.interrupt: newLastID := signal.newLastID oldLastID := i.last.Load() // The indexing limit can only be extended or shortened continuously. if newLastID != oldLastID+1 && newLastID != oldLastID-1 { signal.result <- fmt.Errorf("invalid history id, last: %d, got: %d", oldLastID, newLastID) continue } // Update the last indexed flag signal.result <- nil i.last.Store(newLastID) i.log.Debug("Updated history index flag", "last", newLastID) // Terminate the recovery routine once the histories are fully aligned // with the index data, indicating that index initialization is complete. metadata := loadIndexMetadata(i.disk, i.typ) if metadata != nil && metadata.Last == newLastID { i.log.Info("History indexer is recovered", "last", newLastID) return false } case <-i.closed: return true } } } // historyIndexer manages the indexing and unindexing of state histories, // providing access to historical states. // // Upon initialization, historyIndexer starts a one-time background process // to complete the indexing of any remaining state histories. Once this // process is finished, all state histories are marked as fully indexed, // enabling handling of requests for historical states. Thereafter, any new // state histories must be indexed or unindexed synchronously, ensuring that // the history index is created or removed along with the corresponding // state history. type historyIndexer struct { initer *indexIniter pruner *indexPruner typ historyType disk ethdb.KeyValueStore freezer ethdb.AncientStore } // checkVersion checks whether the index data in the database matches the version. func checkVersion(disk ethdb.KeyValueStore, typ historyType) { var blob []byte if typ == typeStateHistory { blob = rawdb.ReadStateHistoryIndexMetadata(disk) } else if typ == typeTrienodeHistory { blob = rawdb.ReadTrienodeHistoryIndexMetadata(disk) } else { panic(fmt.Errorf("unknown history type: %v", typ)) } // Short circuit if metadata is not found, re-index is required // from scratch. if len(blob) == 0 { return } // Short circuit if the metadata is found and the version is matched ver := stateHistoryIndexVersion if typ == typeTrienodeHistory { ver = trienodeHistoryIndexVersion } var m indexMetadata err := rlp.DecodeBytes(blob, &m) if err == nil && m.Version == ver { return } // Version is not matched, prune the existing data and re-index from scratch batch := disk.NewBatch() if typ == typeStateHistory { rawdb.DeleteStateHistoryIndexMetadata(batch) rawdb.DeleteStateHistoryIndexes(batch) } else { rawdb.DeleteTrienodeHistoryIndexMetadata(batch) rawdb.DeleteTrienodeHistoryIndexes(batch) } if err := batch.Write(); err != nil { log.Crit("Failed to purge history index", "type", typ, "err", err) } version := "unknown" if err == nil { version = fmt.Sprintf("%d", m.Version) } log.Info("Cleaned up obsolete history index", "type", typ, "version", version, "want", version) } // newHistoryIndexer constructs the history indexer and launches the background // initer to complete the indexing of any remaining state histories. func newHistoryIndexer(disk ethdb.Database, freezer ethdb.AncientStore, lastHistoryID uint64, typ historyType, noWait bool) *historyIndexer { checkVersion(disk, typ) return &historyIndexer{ initer: newIndexIniter(disk, freezer, typ, lastHistoryID, noWait), pruner: newIndexPruner(disk, typ), typ: typ, disk: disk, freezer: freezer, } } func (i *historyIndexer) close() { i.initer.close() i.pruner.close() } // inited returns a flag indicating whether the existing state histories // have been fully indexed, in other words, whether they are available // for external access. func (i *historyIndexer) inited() bool { return i.initer.inited() } // extend sends the notification that new state history with specified ID // has been written into the database and is ready for indexing. func (i *historyIndexer) extend(historyID uint64) error { signal := &interruptSignal{ newLastID: historyID, result: make(chan error, 1), } select { case <-i.initer.closed: return errors.New("indexer is closed") case <-i.initer.done: i.pruner.pause() defer i.pruner.resume() return indexSingle(historyID, i.disk, i.freezer, i.typ) case i.initer.interrupt <- signal: return <-signal.result } } // shorten sends the notification that state history with specified ID // is about to be deleted from the database and should be unindexed. func (i *historyIndexer) shorten(historyID uint64) error { signal := &interruptSignal{ newLastID: historyID - 1, result: make(chan error, 1), } select { case <-i.initer.closed: return errors.New("indexer is closed") case <-i.initer.done: i.pruner.pause() defer i.pruner.resume() return unindexSingle(historyID, i.disk, i.freezer, i.typ) case i.initer.interrupt <- signal: return <-signal.result } } // prune signals the pruner that the history tail has advanced to the given ID, // so that stale index blocks referencing pruned histories can be removed. func (i *historyIndexer) prune(newTail uint64) { select { case <-i.initer.closed: log.Debug("Ignored the pruning signal", "reason", "closed") case <-i.initer.done: i.pruner.prune(newTail) default: log.Debug("Ignored the pruning signal", "reason", "busy") } } // progress returns the indexing progress made so far. It provides the number // of states that remain unindexed. func (i *historyIndexer) progress() (uint64, error) { select { case <-i.initer.closed: return 0, errors.New("indexer is closed") default: return i.initer.remain(), nil } }