triedb/pathdb: enhance history index initer (#33640)

This PR improves the pbss archive mode. Initial sync
of an archive mode which has the --gcmode archive
flag enabled will be significantly sped up.

It achieves that with the following changes:

The indexer now attempts to process histories in batch whenever
possible.
Batch indexing is enforced when the node is still syncing and the local
chain
head is behind the network chain head. 

In this scenario, instead of scheduling indexing frequently alongside
block
insertion, the indexer waits until a sufficient amount of history has
accumulated
and then processes it in a batch, which is significantly more efficient.

---------

Co-authored-by: Sina M <1591639+s1na@users.noreply.github.com>
Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
This commit is contained in:
rjl493456442 2026-03-17 22:29:30 +08:00 committed by GitHub
parent fc1b0c0b83
commit 9b2ce121dc
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
6 changed files with 279 additions and 70 deletions

View file

@ -105,9 +105,10 @@ type Config struct {
FullValueCheckpoint uint32 // The rate at which trie nodes are encoded in full-value format
// Testing configurations
SnapshotNoBuild bool // Flag Whether the state generation is disabled
NoAsyncFlush bool // Flag whether the background buffer flushing is disabled
NoAsyncGeneration bool // Flag whether the background generation is disabled
SnapshotNoBuild bool // Flag Whether the state generation is disabled
NoAsyncFlush bool // Flag whether the background buffer flushing is disabled
NoAsyncGeneration bool // Flag whether the background generation is disabled
NoHistoryIndexDelay bool // Flag whether the history index delay is disabled
}
// sanitize checks the provided user configurations and changes anything that's

View file

@ -215,14 +215,14 @@ func (db *Database) setHistoryIndexer() {
if db.stateIndexer != nil {
db.stateIndexer.close()
}
db.stateIndexer = newHistoryIndexer(db.diskdb, db.stateFreezer, db.tree.bottom().stateID(), typeStateHistory)
db.stateIndexer = newHistoryIndexer(db.diskdb, db.stateFreezer, db.tree.bottom().stateID(), typeStateHistory, db.config.NoHistoryIndexDelay)
log.Info("Enabled state history indexing")
}
if db.trienodeFreezer != nil {
if db.trienodeIndexer != nil {
db.trienodeIndexer.close()
}
db.trienodeIndexer = newHistoryIndexer(db.diskdb, db.trienodeFreezer, db.tree.bottom().stateID(), typeTrienodeHistory)
db.trienodeIndexer = newHistoryIndexer(db.diskdb, db.trienodeFreezer, db.tree.bottom().stateID(), typeTrienodeHistory, db.config.NoHistoryIndexDelay)
log.Info("Enabled trienode history indexing")
}
}

View file

@ -182,6 +182,7 @@ func newTester(t *testing.T, config *testerConfig) *tester {
WriteBufferSize: config.writeBufferSize(),
NoAsyncFlush: true,
JournalDirectory: config.journalDir,
NoHistoryIndexDelay: true,
}, config.isVerkle)
obj = &tester{

View file

@ -41,6 +41,8 @@ const (
stateHistoryIndexVersion = stateHistoryIndexV0 // the current state index version
trienodeHistoryIndexV0 = uint8(0) // initial version of trienode index structure
trienodeHistoryIndexVersion = trienodeHistoryIndexV0 // the current trienode index version
indexerProcessBatchInSync = 100000 // threshold for history batch indexing when node is in sync stage.
)
// indexVersion returns the latest index version for the given history type.
@ -349,7 +351,8 @@ type interruptSignal struct {
// If a state history is removed due to a rollback, the associated indexes should
// be unmarked accordingly.
type indexIniter struct {
disk ethdb.KeyValueStore
state *initerState
disk ethdb.Database
freezer ethdb.AncientStore
interrupt chan *interruptSignal
done chan struct{}
@ -364,8 +367,9 @@ type indexIniter struct {
wg sync.WaitGroup
}
func newIndexIniter(disk ethdb.KeyValueStore, freezer ethdb.AncientStore, typ historyType, lastID uint64) *indexIniter {
func newIndexIniter(disk ethdb.Database, freezer ethdb.AncientStore, typ historyType, lastID uint64, noWait bool) *indexIniter {
initer := &indexIniter{
state: newIniterState(disk, noWait),
disk: disk,
freezer: freezer,
interrupt: make(chan *interruptSignal),
@ -385,12 +389,7 @@ func newIndexIniter(disk ethdb.KeyValueStore, freezer ethdb.AncientStore, typ hi
// Launch background indexer
initer.wg.Add(1)
if recover {
log.Info("History indexer is recovering", "history", lastID, "indexed", metadata.Last)
go initer.recover(lastID)
} else {
go initer.run(lastID)
}
go initer.run(recover)
return initer
}
@ -400,6 +399,7 @@ func (i *indexIniter) close() {
return
default:
close(i.closed)
i.state.close()
i.wg.Wait()
}
}
@ -431,85 +431,109 @@ func (i *indexIniter) remain() uint64 {
}
}
func (i *indexIniter) run(lastID uint64) {
func (i *indexIniter) run(recover bool) {
defer i.wg.Done()
// Launch background indexing thread
var (
done = make(chan struct{})
interrupt = new(atomic.Int32)
done chan struct{}
interrupt *atomic.Int32
// checkDone indicates whether all requested state histories
// have been fully indexed.
// checkDone reports whether indexing has completed for all histories.
checkDone = func() bool {
metadata := loadIndexMetadata(i.disk, i.typ)
return metadata != nil && metadata.Last == lastID
return metadata != nil && metadata.Last == i.last.Load()
}
// canExit reports whether the initial indexing phase has completed.
canExit = func() bool {
return !i.state.is(stateSyncing) && checkDone()
}
heartBeat = time.NewTimer(0)
)
go i.index(done, interrupt, lastID)
defer heartBeat.Stop()
if recover {
if aborted := i.recover(); aborted {
return
}
}
for {
select {
case signal := <-i.interrupt:
// The indexing limit can only be extended or shortened continuously.
newLastID := signal.newLastID
if newLastID != lastID+1 && newLastID != lastID-1 {
signal.result <- fmt.Errorf("invalid history id, last: %d, got: %d", lastID, newLastID)
oldLastID := i.last.Load()
// The indexing limit can only be extended or shortened continuously.
if newLastID != oldLastID+1 && newLastID != oldLastID-1 {
signal.result <- fmt.Errorf("invalid history id, last: %d, got: %d", oldLastID, newLastID)
continue
}
i.last.Store(newLastID) // update indexing range
// The index limit is extended by one, update the limit without
// interrupting the current background process.
if newLastID == lastID+1 {
lastID = newLastID
if newLastID == oldLastID+1 {
signal.result <- nil
i.log.Debug("Extended history range", "last", lastID)
i.log.Debug("Extended history range", "last", newLastID)
continue
}
// The index limit is shortened by one, interrupt the current background
// process and relaunch with new target.
interrupt.Store(1)
<-done
// The index limit is shortened, interrupt the current background
// process if it's active and update the target.
if done != nil {
interrupt.Store(1)
<-done
done, interrupt = nil, nil
}
// If all state histories, including the one to be reverted, have
// been fully indexed, unindex it here and shut down the initializer.
if checkDone() {
i.log.Info("Truncate the extra history", "id", lastID)
if err := unindexSingle(lastID, i.disk, i.freezer, i.typ); err != nil {
i.log.Info("Truncate the extra history", "id", oldLastID)
if err := unindexSingle(oldLastID, i.disk, i.freezer, i.typ); err != nil {
signal.result <- err
return
}
close(i.done)
signal.result <- nil
i.log.Info("Histories have been fully indexed", "last", lastID-1)
i.log.Info("Histories have been fully indexed", "last", i.last.Load())
return
}
// Adjust the indexing target and relaunch the process
lastID = newLastID
// Adjust the indexing target
signal.result <- nil
done, interrupt = make(chan struct{}), new(atomic.Int32)
go i.index(done, interrupt, lastID)
i.log.Debug("Shortened history range", "last", lastID)
i.log.Debug("Shortened history range", "last", newLastID)
case <-done:
if checkDone() {
done, interrupt = nil, nil
if canExit() {
close(i.done)
i.log.Info("Histories have been fully indexed", "last", lastID)
return
}
// Relaunch the background runner if some tasks are left
case <-heartBeat.C:
heartBeat.Reset(time.Second * 15)
// Short circuit if the indexer is still busy
if done != nil {
continue
}
if canExit() {
close(i.done)
return
}
// The local chain is still in the syncing phase. Only start the indexing
// when a sufficient amount of histories has accumulated. Batch indexing
// is more efficient than processing items individually.
if i.state.is(stateSyncing) && i.last.Load()-i.indexed.Load() < indexerProcessBatchInSync {
continue
}
done, interrupt = make(chan struct{}), new(atomic.Int32)
go i.index(done, interrupt, lastID)
go i.index(done, interrupt, i.last.Load())
case <-i.closed:
interrupt.Store(1)
i.log.Info("Waiting background history index initer to exit")
<-done
if checkDone() {
close(i.done)
if done != nil {
interrupt.Store(1)
i.log.Info("Waiting background history index initer to exit")
<-done
}
return
}
@ -571,7 +595,7 @@ func (i *indexIniter) index(done chan struct{}, interrupt *atomic.Int32, lastID
}
return
}
i.log.Info("Start history indexing", "beginID", beginID, "lastID", lastID)
i.log.Debug("Start history indexing", "beginID", beginID, "lastID", lastID)
var (
current = beginID
@ -618,7 +642,7 @@ func (i *indexIniter) index(done chan struct{}, interrupt *atomic.Int32, lastID
done = current - beginID
)
eta := common.CalculateETA(done, left, time.Since(start))
i.log.Info("Indexing history", "processed", done, "left", left, "elapsed", common.PrettyDuration(time.Since(start)), "eta", common.PrettyDuration(eta))
i.log.Debug("Indexing history", "processed", done, "left", left, "elapsed", common.PrettyDuration(time.Since(start)), "eta", common.PrettyDuration(eta))
}
}
i.indexed.Store(current - 1) // update indexing progress
@ -629,7 +653,7 @@ func (i *indexIniter) index(done chan struct{}, interrupt *atomic.Int32, lastID
if err := batch.finish(true); err != nil {
i.log.Error("Failed to flush index", "err", err)
}
log.Info("State indexing interrupted")
log.Debug("State indexing interrupted")
return
}
}
@ -637,7 +661,7 @@ func (i *indexIniter) index(done chan struct{}, interrupt *atomic.Int32, lastID
if err := batch.finish(true); err != nil {
i.log.Error("Failed to flush index", "err", err)
}
i.log.Info("Indexed history", "from", beginID, "to", lastID, "elapsed", common.PrettyDuration(time.Since(start)))
i.log.Debug("Indexed history", "from", beginID, "to", lastID, "elapsed", common.PrettyDuration(time.Since(start)))
}
// recover handles unclean shutdown recovery. After an unclean shutdown, any
@ -650,35 +674,35 @@ func (i *indexIniter) index(done chan struct{}, interrupt *atomic.Int32, lastID
// by chain recovery, under the assumption that the recovered histories will be
// identical to the lost ones. Fork-awareness should be added in the future to
// correctly handle histories affected by reorgs.
func (i *indexIniter) recover(lastID uint64) {
defer i.wg.Done()
func (i *indexIniter) recover() bool {
log.Info("History indexer is recovering", "last", i.last.Load(), "indexed", i.indexed.Load())
for {
select {
case signal := <-i.interrupt:
newLastID := signal.newLastID
if newLastID != lastID+1 && newLastID != lastID-1 {
signal.result <- fmt.Errorf("invalid history id, last: %d, got: %d", lastID, newLastID)
oldLastID := i.last.Load()
// The indexing limit can only be extended or shortened continuously.
if newLastID != oldLastID+1 && newLastID != oldLastID-1 {
signal.result <- fmt.Errorf("invalid history id, last: %d, got: %d", oldLastID, newLastID)
continue
}
// Update the last indexed flag
lastID = newLastID
signal.result <- nil
i.last.Store(newLastID)
i.log.Debug("Updated history index flag", "last", lastID)
i.log.Debug("Updated history index flag", "last", newLastID)
// Terminate the recovery routine once the histories are fully aligned
// with the index data, indicating that index initialization is complete.
metadata := loadIndexMetadata(i.disk, i.typ)
if metadata != nil && metadata.Last == lastID {
close(i.done)
i.log.Info("History indexer is recovered", "last", lastID)
return
if metadata != nil && metadata.Last == newLastID {
i.log.Info("History indexer is recovered", "last", newLastID)
return false
}
case <-i.closed:
return
return true
}
}
}
@ -746,10 +770,10 @@ func checkVersion(disk ethdb.KeyValueStore, typ historyType) {
// newHistoryIndexer constructs the history indexer and launches the background
// initer to complete the indexing of any remaining state histories.
func newHistoryIndexer(disk ethdb.KeyValueStore, freezer ethdb.AncientStore, lastHistoryID uint64, typ historyType) *historyIndexer {
func newHistoryIndexer(disk ethdb.Database, freezer ethdb.AncientStore, lastHistoryID uint64, typ historyType, noWait bool) *historyIndexer {
checkVersion(disk, typ)
return &historyIndexer{
initer: newIndexIniter(disk, freezer, typ, lastHistoryID),
initer: newIndexIniter(disk, freezer, typ, lastHistoryID, noWait),
typ: typ,
disk: disk,
freezer: freezer,

View file

@ -0,0 +1,183 @@
// Copyright 2026 The go-ethereum Authors
// This file is part of the go-ethereum library.
//
// The go-ethereum library is free software: you can redistribute it and/or modify
// it under the terms of the GNU Lesser General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// The go-ethereum library is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Lesser General Public License for more details.
//
// You should have received a copy of the GNU Lesser General Public License
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
package pathdb
import (
"bytes"
"sync"
"time"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/rawdb"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/ethdb"
"github.com/ethereum/go-ethereum/log"
)
// state represents the syncing status of the node.
type state int
const (
// stateSynced indicates that the local chain head is sufficiently close to the
// network chain head, and the majority of the data has been fully synchronized.
stateSynced state = iota
// stateSyncing indicates that the sync process is still in progress. Local node
// is actively catching up with the network chain head.
stateSyncing
// stateStalled indicates that sync progress has stopped for a while
// with no progress. This may be caused by network instability (e.g., no peers),
// manual operation such as syncing the local chain to a specific block.
stateStalled
)
const (
// syncStateTimeWindow defines the maximum allowed lag behind the network
// chain head.
//
// If the local chain head falls within this threshold, the node is considered
// close to the tip and will be marked as stateSynced.
syncStateTimeWindow = 6 * time.Hour
// syncStalledTimeout defines the maximum duration during which no sync
// progress is observed. If this timeout is exceeded, the node's status
// will be considered stalled.
syncStalledTimeout = 5 * time.Minute
)
type initerState struct {
state state
stateLock sync.Mutex
disk ethdb.Database
term chan struct{}
}
func newIniterState(disk ethdb.Database, noWait bool) *initerState {
s := &initerState{
state: stateSyncing,
disk: disk,
term: make(chan struct{}),
}
go s.update(noWait)
return s
}
func (s *initerState) get() state {
s.stateLock.Lock()
defer s.stateLock.Unlock()
return s.state
}
func (s *initerState) is(state state) bool {
return s.get() == state
}
func (s *initerState) set(state state) {
s.stateLock.Lock()
defer s.stateLock.Unlock()
s.state = state
}
func (s *initerState) update(noWait bool) {
ticker := time.NewTicker(time.Minute)
defer ticker.Stop()
headBlock := s.readLastBlock()
if headBlock != nil && time.Since(time.Unix(int64(headBlock.Time), 0)) < syncStateTimeWindow {
s.set(stateSynced)
log.Info("Marked indexing initer as synced")
} else if noWait {
s.set(stateSynced)
log.Info("Marked indexing initer as synced forcibly")
} else {
s.set(stateSyncing)
}
var (
hhash = rawdb.ReadHeadHeaderHash(s.disk)
fhash = rawdb.ReadHeadFastBlockHash(s.disk)
bhash = rawdb.ReadHeadBlockHash(s.disk)
skeleton = rawdb.ReadSkeletonSyncStatus(s.disk)
lastProgress = time.Now()
)
for {
select {
case <-ticker.C:
state := s.get()
if state == stateSynced || state == stateStalled {
continue
}
headBlock := s.readLastBlock()
if headBlock == nil {
continue
}
// State machine: stateSyncing => stateSynced
if time.Since(time.Unix(int64(headBlock.Time), 0)) < syncStateTimeWindow {
s.set(stateSynced)
log.Info("Marked indexing initer as synced")
continue
}
// State machine: stateSyncing => stateStalled
newhhash := rawdb.ReadHeadHeaderHash(s.disk)
newfhash := rawdb.ReadHeadFastBlockHash(s.disk)
newbhash := rawdb.ReadHeadBlockHash(s.disk)
newskeleton := rawdb.ReadSkeletonSyncStatus(s.disk)
hasProgress := newhhash.Cmp(hhash) != 0 || newfhash.Cmp(fhash) != 0 || newbhash.Cmp(bhash) != 0 || !bytes.Equal(newskeleton, skeleton)
if !hasProgress && time.Since(lastProgress) > syncStalledTimeout {
s.set(stateStalled)
log.Info("Marked indexing initer as stalled")
continue
}
if hasProgress {
hhash = newhhash
fhash = newfhash
bhash = newbhash
skeleton = newskeleton
lastProgress = time.Now()
}
case <-s.term:
return
}
}
}
func (s *initerState) close() {
select {
case <-s.term:
default:
close(s.term)
}
return
}
// readLastBlock returns the local chain head.
func (s *initerState) readLastBlock() *types.Header {
hash := rawdb.ReadHeadBlockHash(s.disk)
if hash == (common.Hash{}) {
return nil
}
number, exists := rawdb.ReadHeaderNumber(s.disk, hash)
if !exists {
return nil
}
return rawdb.ReadHeader(s.disk, hash, number)
}

View file

@ -27,7 +27,7 @@ import (
// deadlock when the indexer is active. This specifically targets the case where
// signal.result must be sent to unblock the caller.
func TestHistoryIndexerShortenDeadlock(t *testing.T) {
//log.SetDefault(log.NewLogger(log.NewTerminalHandlerWithLevel(os.Stderr, log.LevelInfo, true)))
// log.SetDefault(log.NewLogger(log.NewTerminalHandlerWithLevel(os.Stderr, log.LevelDebug, true)))
db := rawdb.NewMemoryDatabase()
freezer, _ := rawdb.NewStateFreezer(t.TempDir(), false, false)
defer freezer.Close()
@ -38,7 +38,7 @@ func TestHistoryIndexerShortenDeadlock(t *testing.T) {
rawdb.WriteStateHistory(freezer, uint64(i+1), h.meta.encode(), accountIndex, storageIndex, accountData, storageData)
}
// As a workaround, assign a future block to keep the initer running indefinitely
indexer := newHistoryIndexer(db, freezer, 200, typeStateHistory)
indexer := newHistoryIndexer(db, freezer, 200, typeStateHistory, true)
defer indexer.close()
done := make(chan error, 1)