diff --git a/triedb/pathdb/config.go b/triedb/pathdb/config.go index c236b34333..97ee1c2315 100644 --- a/triedb/pathdb/config.go +++ b/triedb/pathdb/config.go @@ -105,9 +105,10 @@ type Config struct { FullValueCheckpoint uint32 // The rate at which trie nodes are encoded in full-value format // Testing configurations - SnapshotNoBuild bool // Flag Whether the state generation is disabled - NoAsyncFlush bool // Flag whether the background buffer flushing is disabled - NoAsyncGeneration bool // Flag whether the background generation is disabled + SnapshotNoBuild bool // Flag Whether the state generation is disabled + NoAsyncFlush bool // Flag whether the background buffer flushing is disabled + NoAsyncGeneration bool // Flag whether the background generation is disabled + NoHistoryIndexDelay bool // Flag whether the history index delay is disabled } // sanitize checks the provided user configurations and changes anything that's diff --git a/triedb/pathdb/database.go b/triedb/pathdb/database.go index 5255602a4e..86a42c69f4 100644 --- a/triedb/pathdb/database.go +++ b/triedb/pathdb/database.go @@ -215,14 +215,14 @@ func (db *Database) setHistoryIndexer() { if db.stateIndexer != nil { db.stateIndexer.close() } - db.stateIndexer = newHistoryIndexer(db.diskdb, db.stateFreezer, db.tree.bottom().stateID(), typeStateHistory) + db.stateIndexer = newHistoryIndexer(db.diskdb, db.stateFreezer, db.tree.bottom().stateID(), typeStateHistory, db.config.NoHistoryIndexDelay) log.Info("Enabled state history indexing") } if db.trienodeFreezer != nil { if db.trienodeIndexer != nil { db.trienodeIndexer.close() } - db.trienodeIndexer = newHistoryIndexer(db.diskdb, db.trienodeFreezer, db.tree.bottom().stateID(), typeTrienodeHistory) + db.trienodeIndexer = newHistoryIndexer(db.diskdb, db.trienodeFreezer, db.tree.bottom().stateID(), typeTrienodeHistory, db.config.NoHistoryIndexDelay) log.Info("Enabled trienode history indexing") } } diff --git a/triedb/pathdb/database_test.go b/triedb/pathdb/database_test.go index 2d1819d08f..8ece83cad7 100644 --- a/triedb/pathdb/database_test.go +++ b/triedb/pathdb/database_test.go @@ -182,6 +182,7 @@ func newTester(t *testing.T, config *testerConfig) *tester { WriteBufferSize: config.writeBufferSize(), NoAsyncFlush: true, JournalDirectory: config.journalDir, + NoHistoryIndexDelay: true, }, config.isVerkle) obj = &tester{ diff --git a/triedb/pathdb/history_indexer.go b/triedb/pathdb/history_indexer.go index c987b380ed..c9bf3e87f1 100644 --- a/triedb/pathdb/history_indexer.go +++ b/triedb/pathdb/history_indexer.go @@ -41,6 +41,8 @@ const ( stateHistoryIndexVersion = stateHistoryIndexV0 // the current state index version trienodeHistoryIndexV0 = uint8(0) // initial version of trienode index structure trienodeHistoryIndexVersion = trienodeHistoryIndexV0 // the current trienode index version + + indexerProcessBatchInSync = 100000 // threshold for history batch indexing when node is in sync stage. ) // indexVersion returns the latest index version for the given history type. @@ -349,7 +351,8 @@ type interruptSignal struct { // If a state history is removed due to a rollback, the associated indexes should // be unmarked accordingly. type indexIniter struct { - disk ethdb.KeyValueStore + state *initerState + disk ethdb.Database freezer ethdb.AncientStore interrupt chan *interruptSignal done chan struct{} @@ -364,8 +367,9 @@ type indexIniter struct { wg sync.WaitGroup } -func newIndexIniter(disk ethdb.KeyValueStore, freezer ethdb.AncientStore, typ historyType, lastID uint64) *indexIniter { +func newIndexIniter(disk ethdb.Database, freezer ethdb.AncientStore, typ historyType, lastID uint64, noWait bool) *indexIniter { initer := &indexIniter{ + state: newIniterState(disk, noWait), disk: disk, freezer: freezer, interrupt: make(chan *interruptSignal), @@ -385,12 +389,7 @@ func newIndexIniter(disk ethdb.KeyValueStore, freezer ethdb.AncientStore, typ hi // Launch background indexer initer.wg.Add(1) - if recover { - log.Info("History indexer is recovering", "history", lastID, "indexed", metadata.Last) - go initer.recover(lastID) - } else { - go initer.run(lastID) - } + go initer.run(recover) return initer } @@ -400,6 +399,7 @@ func (i *indexIniter) close() { return default: close(i.closed) + i.state.close() i.wg.Wait() } } @@ -431,85 +431,109 @@ func (i *indexIniter) remain() uint64 { } } -func (i *indexIniter) run(lastID uint64) { +func (i *indexIniter) run(recover bool) { defer i.wg.Done() // Launch background indexing thread var ( - done = make(chan struct{}) - interrupt = new(atomic.Int32) + done chan struct{} + interrupt *atomic.Int32 - // checkDone indicates whether all requested state histories - // have been fully indexed. + // checkDone reports whether indexing has completed for all histories. checkDone = func() bool { metadata := loadIndexMetadata(i.disk, i.typ) - return metadata != nil && metadata.Last == lastID + return metadata != nil && metadata.Last == i.last.Load() } + // canExit reports whether the initial indexing phase has completed. + canExit = func() bool { + return !i.state.is(stateSyncing) && checkDone() + } + heartBeat = time.NewTimer(0) ) - go i.index(done, interrupt, lastID) + defer heartBeat.Stop() + if recover { + if aborted := i.recover(); aborted { + return + } + } for { select { case signal := <-i.interrupt: - // The indexing limit can only be extended or shortened continuously. newLastID := signal.newLastID - if newLastID != lastID+1 && newLastID != lastID-1 { - signal.result <- fmt.Errorf("invalid history id, last: %d, got: %d", lastID, newLastID) + oldLastID := i.last.Load() + + // The indexing limit can only be extended or shortened continuously. + if newLastID != oldLastID+1 && newLastID != oldLastID-1 { + signal.result <- fmt.Errorf("invalid history id, last: %d, got: %d", oldLastID, newLastID) continue } i.last.Store(newLastID) // update indexing range // The index limit is extended by one, update the limit without // interrupting the current background process. - if newLastID == lastID+1 { - lastID = newLastID + if newLastID == oldLastID+1 { signal.result <- nil - i.log.Debug("Extended history range", "last", lastID) + i.log.Debug("Extended history range", "last", newLastID) continue } - // The index limit is shortened by one, interrupt the current background - // process and relaunch with new target. - interrupt.Store(1) - <-done - + // The index limit is shortened, interrupt the current background + // process if it's active and update the target. + if done != nil { + interrupt.Store(1) + <-done + done, interrupt = nil, nil + } // If all state histories, including the one to be reverted, have // been fully indexed, unindex it here and shut down the initializer. if checkDone() { - i.log.Info("Truncate the extra history", "id", lastID) - if err := unindexSingle(lastID, i.disk, i.freezer, i.typ); err != nil { + i.log.Info("Truncate the extra history", "id", oldLastID) + if err := unindexSingle(oldLastID, i.disk, i.freezer, i.typ); err != nil { signal.result <- err return } close(i.done) signal.result <- nil - i.log.Info("Histories have been fully indexed", "last", lastID-1) + i.log.Info("Histories have been fully indexed", "last", i.last.Load()) return } - // Adjust the indexing target and relaunch the process - lastID = newLastID + // Adjust the indexing target signal.result <- nil - - done, interrupt = make(chan struct{}), new(atomic.Int32) - go i.index(done, interrupt, lastID) - i.log.Debug("Shortened history range", "last", lastID) + i.log.Debug("Shortened history range", "last", newLastID) case <-done: - if checkDone() { + done, interrupt = nil, nil + + if canExit() { close(i.done) - i.log.Info("Histories have been fully indexed", "last", lastID) return } - // Relaunch the background runner if some tasks are left + + case <-heartBeat.C: + heartBeat.Reset(time.Second * 15) + + // Short circuit if the indexer is still busy + if done != nil { + continue + } + if canExit() { + close(i.done) + return + } + // The local chain is still in the syncing phase. Only start the indexing + // when a sufficient amount of histories has accumulated. Batch indexing + // is more efficient than processing items individually. + if i.state.is(stateSyncing) && i.last.Load()-i.indexed.Load() < indexerProcessBatchInSync { + continue + } done, interrupt = make(chan struct{}), new(atomic.Int32) - go i.index(done, interrupt, lastID) + go i.index(done, interrupt, i.last.Load()) case <-i.closed: - interrupt.Store(1) - i.log.Info("Waiting background history index initer to exit") - <-done - - if checkDone() { - close(i.done) + if done != nil { + interrupt.Store(1) + i.log.Info("Waiting background history index initer to exit") + <-done } return } @@ -571,7 +595,7 @@ func (i *indexIniter) index(done chan struct{}, interrupt *atomic.Int32, lastID } return } - i.log.Info("Start history indexing", "beginID", beginID, "lastID", lastID) + i.log.Debug("Start history indexing", "beginID", beginID, "lastID", lastID) var ( current = beginID @@ -618,7 +642,7 @@ func (i *indexIniter) index(done chan struct{}, interrupt *atomic.Int32, lastID done = current - beginID ) eta := common.CalculateETA(done, left, time.Since(start)) - i.log.Info("Indexing history", "processed", done, "left", left, "elapsed", common.PrettyDuration(time.Since(start)), "eta", common.PrettyDuration(eta)) + i.log.Debug("Indexing history", "processed", done, "left", left, "elapsed", common.PrettyDuration(time.Since(start)), "eta", common.PrettyDuration(eta)) } } i.indexed.Store(current - 1) // update indexing progress @@ -629,7 +653,7 @@ func (i *indexIniter) index(done chan struct{}, interrupt *atomic.Int32, lastID if err := batch.finish(true); err != nil { i.log.Error("Failed to flush index", "err", err) } - log.Info("State indexing interrupted") + log.Debug("State indexing interrupted") return } } @@ -637,7 +661,7 @@ func (i *indexIniter) index(done chan struct{}, interrupt *atomic.Int32, lastID if err := batch.finish(true); err != nil { i.log.Error("Failed to flush index", "err", err) } - i.log.Info("Indexed history", "from", beginID, "to", lastID, "elapsed", common.PrettyDuration(time.Since(start))) + i.log.Debug("Indexed history", "from", beginID, "to", lastID, "elapsed", common.PrettyDuration(time.Since(start))) } // recover handles unclean shutdown recovery. After an unclean shutdown, any @@ -650,35 +674,35 @@ func (i *indexIniter) index(done chan struct{}, interrupt *atomic.Int32, lastID // by chain recovery, under the assumption that the recovered histories will be // identical to the lost ones. Fork-awareness should be added in the future to // correctly handle histories affected by reorgs. -func (i *indexIniter) recover(lastID uint64) { - defer i.wg.Done() +func (i *indexIniter) recover() bool { + log.Info("History indexer is recovering", "last", i.last.Load(), "indexed", i.indexed.Load()) for { select { case signal := <-i.interrupt: newLastID := signal.newLastID - if newLastID != lastID+1 && newLastID != lastID-1 { - signal.result <- fmt.Errorf("invalid history id, last: %d, got: %d", lastID, newLastID) + oldLastID := i.last.Load() + + // The indexing limit can only be extended or shortened continuously. + if newLastID != oldLastID+1 && newLastID != oldLastID-1 { + signal.result <- fmt.Errorf("invalid history id, last: %d, got: %d", oldLastID, newLastID) continue } - // Update the last indexed flag - lastID = newLastID signal.result <- nil i.last.Store(newLastID) - i.log.Debug("Updated history index flag", "last", lastID) + i.log.Debug("Updated history index flag", "last", newLastID) // Terminate the recovery routine once the histories are fully aligned // with the index data, indicating that index initialization is complete. metadata := loadIndexMetadata(i.disk, i.typ) - if metadata != nil && metadata.Last == lastID { - close(i.done) - i.log.Info("History indexer is recovered", "last", lastID) - return + if metadata != nil && metadata.Last == newLastID { + i.log.Info("History indexer is recovered", "last", newLastID) + return false } case <-i.closed: - return + return true } } } @@ -746,10 +770,10 @@ func checkVersion(disk ethdb.KeyValueStore, typ historyType) { // newHistoryIndexer constructs the history indexer and launches the background // initer to complete the indexing of any remaining state histories. -func newHistoryIndexer(disk ethdb.KeyValueStore, freezer ethdb.AncientStore, lastHistoryID uint64, typ historyType) *historyIndexer { +func newHistoryIndexer(disk ethdb.Database, freezer ethdb.AncientStore, lastHistoryID uint64, typ historyType, noWait bool) *historyIndexer { checkVersion(disk, typ) return &historyIndexer{ - initer: newIndexIniter(disk, freezer, typ, lastHistoryID), + initer: newIndexIniter(disk, freezer, typ, lastHistoryID, noWait), typ: typ, disk: disk, freezer: freezer, diff --git a/triedb/pathdb/history_indexer_state.go b/triedb/pathdb/history_indexer_state.go new file mode 100644 index 0000000000..2746083297 --- /dev/null +++ b/triedb/pathdb/history_indexer_state.go @@ -0,0 +1,183 @@ +// Copyright 2026 The go-ethereum Authors +// This file is part of the go-ethereum library. +// +// The go-ethereum library is free software: you can redistribute it and/or modify +// it under the terms of the GNU Lesser General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// The go-ethereum library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the go-ethereum library. If not, see . + +package pathdb + +import ( + "bytes" + "sync" + "time" + + "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/core/rawdb" + "github.com/ethereum/go-ethereum/core/types" + "github.com/ethereum/go-ethereum/ethdb" + "github.com/ethereum/go-ethereum/log" +) + +// state represents the syncing status of the node. +type state int + +const ( + // stateSynced indicates that the local chain head is sufficiently close to the + // network chain head, and the majority of the data has been fully synchronized. + stateSynced state = iota + + // stateSyncing indicates that the sync process is still in progress. Local node + // is actively catching up with the network chain head. + stateSyncing + + // stateStalled indicates that sync progress has stopped for a while + // with no progress. This may be caused by network instability (e.g., no peers), + // manual operation such as syncing the local chain to a specific block. + stateStalled +) + +const ( + // syncStateTimeWindow defines the maximum allowed lag behind the network + // chain head. + // + // If the local chain head falls within this threshold, the node is considered + // close to the tip and will be marked as stateSynced. + syncStateTimeWindow = 6 * time.Hour + + // syncStalledTimeout defines the maximum duration during which no sync + // progress is observed. If this timeout is exceeded, the node's status + // will be considered stalled. + syncStalledTimeout = 5 * time.Minute +) + +type initerState struct { + state state + stateLock sync.Mutex + disk ethdb.Database + term chan struct{} +} + +func newIniterState(disk ethdb.Database, noWait bool) *initerState { + s := &initerState{ + state: stateSyncing, + disk: disk, + term: make(chan struct{}), + } + go s.update(noWait) + return s +} + +func (s *initerState) get() state { + s.stateLock.Lock() + defer s.stateLock.Unlock() + + return s.state +} + +func (s *initerState) is(state state) bool { + return s.get() == state +} + +func (s *initerState) set(state state) { + s.stateLock.Lock() + defer s.stateLock.Unlock() + + s.state = state +} + +func (s *initerState) update(noWait bool) { + ticker := time.NewTicker(time.Minute) + defer ticker.Stop() + + headBlock := s.readLastBlock() + if headBlock != nil && time.Since(time.Unix(int64(headBlock.Time), 0)) < syncStateTimeWindow { + s.set(stateSynced) + log.Info("Marked indexing initer as synced") + } else if noWait { + s.set(stateSynced) + log.Info("Marked indexing initer as synced forcibly") + } else { + s.set(stateSyncing) + } + + var ( + hhash = rawdb.ReadHeadHeaderHash(s.disk) + fhash = rawdb.ReadHeadFastBlockHash(s.disk) + bhash = rawdb.ReadHeadBlockHash(s.disk) + skeleton = rawdb.ReadSkeletonSyncStatus(s.disk) + lastProgress = time.Now() + ) + for { + select { + case <-ticker.C: + state := s.get() + if state == stateSynced || state == stateStalled { + continue + } + headBlock := s.readLastBlock() + if headBlock == nil { + continue + } + // State machine: stateSyncing => stateSynced + if time.Since(time.Unix(int64(headBlock.Time), 0)) < syncStateTimeWindow { + s.set(stateSynced) + log.Info("Marked indexing initer as synced") + continue + } + // State machine: stateSyncing => stateStalled + newhhash := rawdb.ReadHeadHeaderHash(s.disk) + newfhash := rawdb.ReadHeadFastBlockHash(s.disk) + newbhash := rawdb.ReadHeadBlockHash(s.disk) + newskeleton := rawdb.ReadSkeletonSyncStatus(s.disk) + hasProgress := newhhash.Cmp(hhash) != 0 || newfhash.Cmp(fhash) != 0 || newbhash.Cmp(bhash) != 0 || !bytes.Equal(newskeleton, skeleton) + + if !hasProgress && time.Since(lastProgress) > syncStalledTimeout { + s.set(stateStalled) + log.Info("Marked indexing initer as stalled") + continue + } + if hasProgress { + hhash = newhhash + fhash = newfhash + bhash = newbhash + skeleton = newskeleton + lastProgress = time.Now() + } + + case <-s.term: + return + } + } +} + +func (s *initerState) close() { + select { + case <-s.term: + default: + close(s.term) + } + return +} + +// readLastBlock returns the local chain head. +func (s *initerState) readLastBlock() *types.Header { + hash := rawdb.ReadHeadBlockHash(s.disk) + if hash == (common.Hash{}) { + return nil + } + number, exists := rawdb.ReadHeaderNumber(s.disk, hash) + if !exists { + return nil + } + return rawdb.ReadHeader(s.disk, hash, number) +} diff --git a/triedb/pathdb/history_indexer_test.go b/triedb/pathdb/history_indexer_test.go index f333d18d8b..8bb1db42da 100644 --- a/triedb/pathdb/history_indexer_test.go +++ b/triedb/pathdb/history_indexer_test.go @@ -27,7 +27,7 @@ import ( // deadlock when the indexer is active. This specifically targets the case where // signal.result must be sent to unblock the caller. func TestHistoryIndexerShortenDeadlock(t *testing.T) { - //log.SetDefault(log.NewLogger(log.NewTerminalHandlerWithLevel(os.Stderr, log.LevelInfo, true))) + // log.SetDefault(log.NewLogger(log.NewTerminalHandlerWithLevel(os.Stderr, log.LevelDebug, true))) db := rawdb.NewMemoryDatabase() freezer, _ := rawdb.NewStateFreezer(t.TempDir(), false, false) defer freezer.Close() @@ -38,7 +38,7 @@ func TestHistoryIndexerShortenDeadlock(t *testing.T) { rawdb.WriteStateHistory(freezer, uint64(i+1), h.meta.encode(), accountIndex, storageIndex, accountData, storageData) } // As a workaround, assign a future block to keep the initer running indefinitely - indexer := newHistoryIndexer(db, freezer, 200, typeStateHistory) + indexer := newHistoryIndexer(db, freezer, 200, typeStateHistory, true) defer indexer.close() done := make(chan error, 1)