From 9b2ce121dc94fc964f9c8ba6cd6e70af5f11e5d2 Mon Sep 17 00:00:00 2001 From: rjl493456442 Date: Tue, 17 Mar 2026 22:29:30 +0800 Subject: [PATCH] triedb/pathdb: enhance history index initer (#33640) This PR improves the pbss archive mode. Initial sync of an archive mode which has the --gcmode archive flag enabled will be significantly sped up. It achieves that with the following changes: The indexer now attempts to process histories in batch whenever possible. Batch indexing is enforced when the node is still syncing and the local chain head is behind the network chain head. In this scenario, instead of scheduling indexing frequently alongside block insertion, the indexer waits until a sufficient amount of history has accumulated and then processes it in a batch, which is significantly more efficient. --------- Co-authored-by: Sina M <1591639+s1na@users.noreply.github.com> Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> --- triedb/pathdb/config.go | 7 +- triedb/pathdb/database.go | 4 +- triedb/pathdb/database_test.go | 1 + triedb/pathdb/history_indexer.go | 150 +++++++++++--------- triedb/pathdb/history_indexer_state.go | 183 +++++++++++++++++++++++++ triedb/pathdb/history_indexer_test.go | 4 +- 6 files changed, 279 insertions(+), 70 deletions(-) create mode 100644 triedb/pathdb/history_indexer_state.go diff --git a/triedb/pathdb/config.go b/triedb/pathdb/config.go index c236b34333..97ee1c2315 100644 --- a/triedb/pathdb/config.go +++ b/triedb/pathdb/config.go @@ -105,9 +105,10 @@ type Config struct { FullValueCheckpoint uint32 // The rate at which trie nodes are encoded in full-value format // Testing configurations - SnapshotNoBuild bool // Flag Whether the state generation is disabled - NoAsyncFlush bool // Flag whether the background buffer flushing is disabled - NoAsyncGeneration bool // Flag whether the background generation is disabled + SnapshotNoBuild bool // Flag Whether the state generation is disabled + NoAsyncFlush bool // Flag whether the background buffer flushing is disabled + NoAsyncGeneration bool // Flag whether the background generation is disabled + NoHistoryIndexDelay bool // Flag whether the history index delay is disabled } // sanitize checks the provided user configurations and changes anything that's diff --git a/triedb/pathdb/database.go b/triedb/pathdb/database.go index 5255602a4e..86a42c69f4 100644 --- a/triedb/pathdb/database.go +++ b/triedb/pathdb/database.go @@ -215,14 +215,14 @@ func (db *Database) setHistoryIndexer() { if db.stateIndexer != nil { db.stateIndexer.close() } - db.stateIndexer = newHistoryIndexer(db.diskdb, db.stateFreezer, db.tree.bottom().stateID(), typeStateHistory) + db.stateIndexer = newHistoryIndexer(db.diskdb, db.stateFreezer, db.tree.bottom().stateID(), typeStateHistory, db.config.NoHistoryIndexDelay) log.Info("Enabled state history indexing") } if db.trienodeFreezer != nil { if db.trienodeIndexer != nil { db.trienodeIndexer.close() } - db.trienodeIndexer = newHistoryIndexer(db.diskdb, db.trienodeFreezer, db.tree.bottom().stateID(), typeTrienodeHistory) + db.trienodeIndexer = newHistoryIndexer(db.diskdb, db.trienodeFreezer, db.tree.bottom().stateID(), typeTrienodeHistory, db.config.NoHistoryIndexDelay) log.Info("Enabled trienode history indexing") } } diff --git a/triedb/pathdb/database_test.go b/triedb/pathdb/database_test.go index 2d1819d08f..8ece83cad7 100644 --- a/triedb/pathdb/database_test.go +++ b/triedb/pathdb/database_test.go @@ -182,6 +182,7 @@ func newTester(t *testing.T, config *testerConfig) *tester { WriteBufferSize: config.writeBufferSize(), NoAsyncFlush: true, JournalDirectory: config.journalDir, + NoHistoryIndexDelay: true, }, config.isVerkle) obj = &tester{ diff --git a/triedb/pathdb/history_indexer.go b/triedb/pathdb/history_indexer.go index c987b380ed..c9bf3e87f1 100644 --- a/triedb/pathdb/history_indexer.go +++ b/triedb/pathdb/history_indexer.go @@ -41,6 +41,8 @@ const ( stateHistoryIndexVersion = stateHistoryIndexV0 // the current state index version trienodeHistoryIndexV0 = uint8(0) // initial version of trienode index structure trienodeHistoryIndexVersion = trienodeHistoryIndexV0 // the current trienode index version + + indexerProcessBatchInSync = 100000 // threshold for history batch indexing when node is in sync stage. ) // indexVersion returns the latest index version for the given history type. @@ -349,7 +351,8 @@ type interruptSignal struct { // If a state history is removed due to a rollback, the associated indexes should // be unmarked accordingly. type indexIniter struct { - disk ethdb.KeyValueStore + state *initerState + disk ethdb.Database freezer ethdb.AncientStore interrupt chan *interruptSignal done chan struct{} @@ -364,8 +367,9 @@ type indexIniter struct { wg sync.WaitGroup } -func newIndexIniter(disk ethdb.KeyValueStore, freezer ethdb.AncientStore, typ historyType, lastID uint64) *indexIniter { +func newIndexIniter(disk ethdb.Database, freezer ethdb.AncientStore, typ historyType, lastID uint64, noWait bool) *indexIniter { initer := &indexIniter{ + state: newIniterState(disk, noWait), disk: disk, freezer: freezer, interrupt: make(chan *interruptSignal), @@ -385,12 +389,7 @@ func newIndexIniter(disk ethdb.KeyValueStore, freezer ethdb.AncientStore, typ hi // Launch background indexer initer.wg.Add(1) - if recover { - log.Info("History indexer is recovering", "history", lastID, "indexed", metadata.Last) - go initer.recover(lastID) - } else { - go initer.run(lastID) - } + go initer.run(recover) return initer } @@ -400,6 +399,7 @@ func (i *indexIniter) close() { return default: close(i.closed) + i.state.close() i.wg.Wait() } } @@ -431,85 +431,109 @@ func (i *indexIniter) remain() uint64 { } } -func (i *indexIniter) run(lastID uint64) { +func (i *indexIniter) run(recover bool) { defer i.wg.Done() // Launch background indexing thread var ( - done = make(chan struct{}) - interrupt = new(atomic.Int32) + done chan struct{} + interrupt *atomic.Int32 - // checkDone indicates whether all requested state histories - // have been fully indexed. + // checkDone reports whether indexing has completed for all histories. checkDone = func() bool { metadata := loadIndexMetadata(i.disk, i.typ) - return metadata != nil && metadata.Last == lastID + return metadata != nil && metadata.Last == i.last.Load() } + // canExit reports whether the initial indexing phase has completed. + canExit = func() bool { + return !i.state.is(stateSyncing) && checkDone() + } + heartBeat = time.NewTimer(0) ) - go i.index(done, interrupt, lastID) + defer heartBeat.Stop() + if recover { + if aborted := i.recover(); aborted { + return + } + } for { select { case signal := <-i.interrupt: - // The indexing limit can only be extended or shortened continuously. newLastID := signal.newLastID - if newLastID != lastID+1 && newLastID != lastID-1 { - signal.result <- fmt.Errorf("invalid history id, last: %d, got: %d", lastID, newLastID) + oldLastID := i.last.Load() + + // The indexing limit can only be extended or shortened continuously. + if newLastID != oldLastID+1 && newLastID != oldLastID-1 { + signal.result <- fmt.Errorf("invalid history id, last: %d, got: %d", oldLastID, newLastID) continue } i.last.Store(newLastID) // update indexing range // The index limit is extended by one, update the limit without // interrupting the current background process. - if newLastID == lastID+1 { - lastID = newLastID + if newLastID == oldLastID+1 { signal.result <- nil - i.log.Debug("Extended history range", "last", lastID) + i.log.Debug("Extended history range", "last", newLastID) continue } - // The index limit is shortened by one, interrupt the current background - // process and relaunch with new target. - interrupt.Store(1) - <-done - + // The index limit is shortened, interrupt the current background + // process if it's active and update the target. + if done != nil { + interrupt.Store(1) + <-done + done, interrupt = nil, nil + } // If all state histories, including the one to be reverted, have // been fully indexed, unindex it here and shut down the initializer. if checkDone() { - i.log.Info("Truncate the extra history", "id", lastID) - if err := unindexSingle(lastID, i.disk, i.freezer, i.typ); err != nil { + i.log.Info("Truncate the extra history", "id", oldLastID) + if err := unindexSingle(oldLastID, i.disk, i.freezer, i.typ); err != nil { signal.result <- err return } close(i.done) signal.result <- nil - i.log.Info("Histories have been fully indexed", "last", lastID-1) + i.log.Info("Histories have been fully indexed", "last", i.last.Load()) return } - // Adjust the indexing target and relaunch the process - lastID = newLastID + // Adjust the indexing target signal.result <- nil - - done, interrupt = make(chan struct{}), new(atomic.Int32) - go i.index(done, interrupt, lastID) - i.log.Debug("Shortened history range", "last", lastID) + i.log.Debug("Shortened history range", "last", newLastID) case <-done: - if checkDone() { + done, interrupt = nil, nil + + if canExit() { close(i.done) - i.log.Info("Histories have been fully indexed", "last", lastID) return } - // Relaunch the background runner if some tasks are left + + case <-heartBeat.C: + heartBeat.Reset(time.Second * 15) + + // Short circuit if the indexer is still busy + if done != nil { + continue + } + if canExit() { + close(i.done) + return + } + // The local chain is still in the syncing phase. Only start the indexing + // when a sufficient amount of histories has accumulated. Batch indexing + // is more efficient than processing items individually. + if i.state.is(stateSyncing) && i.last.Load()-i.indexed.Load() < indexerProcessBatchInSync { + continue + } done, interrupt = make(chan struct{}), new(atomic.Int32) - go i.index(done, interrupt, lastID) + go i.index(done, interrupt, i.last.Load()) case <-i.closed: - interrupt.Store(1) - i.log.Info("Waiting background history index initer to exit") - <-done - - if checkDone() { - close(i.done) + if done != nil { + interrupt.Store(1) + i.log.Info("Waiting background history index initer to exit") + <-done } return } @@ -571,7 +595,7 @@ func (i *indexIniter) index(done chan struct{}, interrupt *atomic.Int32, lastID } return } - i.log.Info("Start history indexing", "beginID", beginID, "lastID", lastID) + i.log.Debug("Start history indexing", "beginID", beginID, "lastID", lastID) var ( current = beginID @@ -618,7 +642,7 @@ func (i *indexIniter) index(done chan struct{}, interrupt *atomic.Int32, lastID done = current - beginID ) eta := common.CalculateETA(done, left, time.Since(start)) - i.log.Info("Indexing history", "processed", done, "left", left, "elapsed", common.PrettyDuration(time.Since(start)), "eta", common.PrettyDuration(eta)) + i.log.Debug("Indexing history", "processed", done, "left", left, "elapsed", common.PrettyDuration(time.Since(start)), "eta", common.PrettyDuration(eta)) } } i.indexed.Store(current - 1) // update indexing progress @@ -629,7 +653,7 @@ func (i *indexIniter) index(done chan struct{}, interrupt *atomic.Int32, lastID if err := batch.finish(true); err != nil { i.log.Error("Failed to flush index", "err", err) } - log.Info("State indexing interrupted") + log.Debug("State indexing interrupted") return } } @@ -637,7 +661,7 @@ func (i *indexIniter) index(done chan struct{}, interrupt *atomic.Int32, lastID if err := batch.finish(true); err != nil { i.log.Error("Failed to flush index", "err", err) } - i.log.Info("Indexed history", "from", beginID, "to", lastID, "elapsed", common.PrettyDuration(time.Since(start))) + i.log.Debug("Indexed history", "from", beginID, "to", lastID, "elapsed", common.PrettyDuration(time.Since(start))) } // recover handles unclean shutdown recovery. After an unclean shutdown, any @@ -650,35 +674,35 @@ func (i *indexIniter) index(done chan struct{}, interrupt *atomic.Int32, lastID // by chain recovery, under the assumption that the recovered histories will be // identical to the lost ones. Fork-awareness should be added in the future to // correctly handle histories affected by reorgs. -func (i *indexIniter) recover(lastID uint64) { - defer i.wg.Done() +func (i *indexIniter) recover() bool { + log.Info("History indexer is recovering", "last", i.last.Load(), "indexed", i.indexed.Load()) for { select { case signal := <-i.interrupt: newLastID := signal.newLastID - if newLastID != lastID+1 && newLastID != lastID-1 { - signal.result <- fmt.Errorf("invalid history id, last: %d, got: %d", lastID, newLastID) + oldLastID := i.last.Load() + + // The indexing limit can only be extended or shortened continuously. + if newLastID != oldLastID+1 && newLastID != oldLastID-1 { + signal.result <- fmt.Errorf("invalid history id, last: %d, got: %d", oldLastID, newLastID) continue } - // Update the last indexed flag - lastID = newLastID signal.result <- nil i.last.Store(newLastID) - i.log.Debug("Updated history index flag", "last", lastID) + i.log.Debug("Updated history index flag", "last", newLastID) // Terminate the recovery routine once the histories are fully aligned // with the index data, indicating that index initialization is complete. metadata := loadIndexMetadata(i.disk, i.typ) - if metadata != nil && metadata.Last == lastID { - close(i.done) - i.log.Info("History indexer is recovered", "last", lastID) - return + if metadata != nil && metadata.Last == newLastID { + i.log.Info("History indexer is recovered", "last", newLastID) + return false } case <-i.closed: - return + return true } } } @@ -746,10 +770,10 @@ func checkVersion(disk ethdb.KeyValueStore, typ historyType) { // newHistoryIndexer constructs the history indexer and launches the background // initer to complete the indexing of any remaining state histories. -func newHistoryIndexer(disk ethdb.KeyValueStore, freezer ethdb.AncientStore, lastHistoryID uint64, typ historyType) *historyIndexer { +func newHistoryIndexer(disk ethdb.Database, freezer ethdb.AncientStore, lastHistoryID uint64, typ historyType, noWait bool) *historyIndexer { checkVersion(disk, typ) return &historyIndexer{ - initer: newIndexIniter(disk, freezer, typ, lastHistoryID), + initer: newIndexIniter(disk, freezer, typ, lastHistoryID, noWait), typ: typ, disk: disk, freezer: freezer, diff --git a/triedb/pathdb/history_indexer_state.go b/triedb/pathdb/history_indexer_state.go new file mode 100644 index 0000000000..2746083297 --- /dev/null +++ b/triedb/pathdb/history_indexer_state.go @@ -0,0 +1,183 @@ +// Copyright 2026 The go-ethereum Authors +// This file is part of the go-ethereum library. +// +// The go-ethereum library is free software: you can redistribute it and/or modify +// it under the terms of the GNU Lesser General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// The go-ethereum library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the go-ethereum library. If not, see . + +package pathdb + +import ( + "bytes" + "sync" + "time" + + "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/core/rawdb" + "github.com/ethereum/go-ethereum/core/types" + "github.com/ethereum/go-ethereum/ethdb" + "github.com/ethereum/go-ethereum/log" +) + +// state represents the syncing status of the node. +type state int + +const ( + // stateSynced indicates that the local chain head is sufficiently close to the + // network chain head, and the majority of the data has been fully synchronized. + stateSynced state = iota + + // stateSyncing indicates that the sync process is still in progress. Local node + // is actively catching up with the network chain head. + stateSyncing + + // stateStalled indicates that sync progress has stopped for a while + // with no progress. This may be caused by network instability (e.g., no peers), + // manual operation such as syncing the local chain to a specific block. + stateStalled +) + +const ( + // syncStateTimeWindow defines the maximum allowed lag behind the network + // chain head. + // + // If the local chain head falls within this threshold, the node is considered + // close to the tip and will be marked as stateSynced. + syncStateTimeWindow = 6 * time.Hour + + // syncStalledTimeout defines the maximum duration during which no sync + // progress is observed. If this timeout is exceeded, the node's status + // will be considered stalled. + syncStalledTimeout = 5 * time.Minute +) + +type initerState struct { + state state + stateLock sync.Mutex + disk ethdb.Database + term chan struct{} +} + +func newIniterState(disk ethdb.Database, noWait bool) *initerState { + s := &initerState{ + state: stateSyncing, + disk: disk, + term: make(chan struct{}), + } + go s.update(noWait) + return s +} + +func (s *initerState) get() state { + s.stateLock.Lock() + defer s.stateLock.Unlock() + + return s.state +} + +func (s *initerState) is(state state) bool { + return s.get() == state +} + +func (s *initerState) set(state state) { + s.stateLock.Lock() + defer s.stateLock.Unlock() + + s.state = state +} + +func (s *initerState) update(noWait bool) { + ticker := time.NewTicker(time.Minute) + defer ticker.Stop() + + headBlock := s.readLastBlock() + if headBlock != nil && time.Since(time.Unix(int64(headBlock.Time), 0)) < syncStateTimeWindow { + s.set(stateSynced) + log.Info("Marked indexing initer as synced") + } else if noWait { + s.set(stateSynced) + log.Info("Marked indexing initer as synced forcibly") + } else { + s.set(stateSyncing) + } + + var ( + hhash = rawdb.ReadHeadHeaderHash(s.disk) + fhash = rawdb.ReadHeadFastBlockHash(s.disk) + bhash = rawdb.ReadHeadBlockHash(s.disk) + skeleton = rawdb.ReadSkeletonSyncStatus(s.disk) + lastProgress = time.Now() + ) + for { + select { + case <-ticker.C: + state := s.get() + if state == stateSynced || state == stateStalled { + continue + } + headBlock := s.readLastBlock() + if headBlock == nil { + continue + } + // State machine: stateSyncing => stateSynced + if time.Since(time.Unix(int64(headBlock.Time), 0)) < syncStateTimeWindow { + s.set(stateSynced) + log.Info("Marked indexing initer as synced") + continue + } + // State machine: stateSyncing => stateStalled + newhhash := rawdb.ReadHeadHeaderHash(s.disk) + newfhash := rawdb.ReadHeadFastBlockHash(s.disk) + newbhash := rawdb.ReadHeadBlockHash(s.disk) + newskeleton := rawdb.ReadSkeletonSyncStatus(s.disk) + hasProgress := newhhash.Cmp(hhash) != 0 || newfhash.Cmp(fhash) != 0 || newbhash.Cmp(bhash) != 0 || !bytes.Equal(newskeleton, skeleton) + + if !hasProgress && time.Since(lastProgress) > syncStalledTimeout { + s.set(stateStalled) + log.Info("Marked indexing initer as stalled") + continue + } + if hasProgress { + hhash = newhhash + fhash = newfhash + bhash = newbhash + skeleton = newskeleton + lastProgress = time.Now() + } + + case <-s.term: + return + } + } +} + +func (s *initerState) close() { + select { + case <-s.term: + default: + close(s.term) + } + return +} + +// readLastBlock returns the local chain head. +func (s *initerState) readLastBlock() *types.Header { + hash := rawdb.ReadHeadBlockHash(s.disk) + if hash == (common.Hash{}) { + return nil + } + number, exists := rawdb.ReadHeaderNumber(s.disk, hash) + if !exists { + return nil + } + return rawdb.ReadHeader(s.disk, hash, number) +} diff --git a/triedb/pathdb/history_indexer_test.go b/triedb/pathdb/history_indexer_test.go index f333d18d8b..8bb1db42da 100644 --- a/triedb/pathdb/history_indexer_test.go +++ b/triedb/pathdb/history_indexer_test.go @@ -27,7 +27,7 @@ import ( // deadlock when the indexer is active. This specifically targets the case where // signal.result must be sent to unblock the caller. func TestHistoryIndexerShortenDeadlock(t *testing.T) { - //log.SetDefault(log.NewLogger(log.NewTerminalHandlerWithLevel(os.Stderr, log.LevelInfo, true))) + // log.SetDefault(log.NewLogger(log.NewTerminalHandlerWithLevel(os.Stderr, log.LevelDebug, true))) db := rawdb.NewMemoryDatabase() freezer, _ := rawdb.NewStateFreezer(t.TempDir(), false, false) defer freezer.Close() @@ -38,7 +38,7 @@ func TestHistoryIndexerShortenDeadlock(t *testing.T) { rawdb.WriteStateHistory(freezer, uint64(i+1), h.meta.encode(), accountIndex, storageIndex, accountData, storageData) } // As a workaround, assign a future block to keep the initer running indefinitely - indexer := newHistoryIndexer(db, freezer, 200, typeStateHistory) + indexer := newHistoryIndexer(db, freezer, 200, typeStateHistory, true) defer indexer.close() done := make(chan error, 1)