triedb/pathdb: improve history indexer

This commit is contained in:
Gary Rong 2026-03-05 11:14:03 +08:00
parent ecd3e2a7f1
commit 120b2ec7af
3 changed files with 228 additions and 157 deletions

View file

@ -41,6 +41,8 @@ const (
stateHistoryIndexVersion = stateHistoryIndexV0 // the current state index version
trienodeHistoryIndexV0 = uint8(0) // initial version of trienode index structure
trienodeHistoryIndexVersion = trienodeHistoryIndexV0 // the current trienode index version
indexerProcessBatchInSync = 100000 // threshold for history batch indexing when node is in sync stage.
)
// indexVersion returns the latest index version for the given history type.
@ -349,6 +351,7 @@ type interruptSignal struct {
// If a state history is removed due to a rollback, the associated indexes should
// be unmarked accordingly.
type indexIniter struct {
state *initerState
disk ethdb.Database
freezer ethdb.AncientStore
interrupt chan *interruptSignal
@ -366,6 +369,7 @@ type indexIniter struct {
func newIndexIniter(disk ethdb.Database, freezer ethdb.AncientStore, typ historyType, lastID uint64) *indexIniter {
initer := &indexIniter{
state: newIniterState(disk),
disk: disk,
freezer: freezer,
interrupt: make(chan *interruptSignal),
@ -385,7 +389,7 @@ func newIndexIniter(disk ethdb.Database, freezer ethdb.AncientStore, typ history
// Launch background indexer
initer.wg.Add(1)
go initer.run(lastID, recover)
go initer.run(recover)
return initer
}
@ -395,6 +399,7 @@ func (i *indexIniter) close() {
return
default:
close(i.closed)
i.state.close()
i.wg.Wait()
}
}
@ -426,121 +431,7 @@ func (i *indexIniter) remain() uint64 {
}
}
// readBlockTime reads the associated block time of the provided history ID.
func (i *indexIniter) readBlockTime(historyID uint64) (uint64, error) {
var blockNumber uint64
switch i.typ {
case typeStateHistory:
m, err := readStateHistoryMeta(i.freezer, historyID)
if err != nil {
return 0, err
}
blockNumber = m.block
case typeTrienodeHistory:
m, err := readTrienodeMetadata(i.freezer, historyID)
if err != nil {
return 0, err
}
blockNumber = m.block
}
hash := rawdb.ReadCanonicalHash(i.disk, blockNumber)
if hash == (common.Hash{}) {
return 0, errors.New("block not found")
}
header := rawdb.ReadHeader(i.disk, hash, blockNumber)
if header == nil {
return 0, errors.New("block not found")
}
return header.Time, nil
}
const (
// syncHistoryThreshold is the time window behind the chain head.
// If the block being indexed is within this window, we consider the node
// to be close enough to the tip to operate in real-time mode.
syncHistoryThreshold = 6 * time.Hour
// indexerIdleTimeoutLive is the strict inactivity limit applied once the
// indexer is synchronized with the chain head.
indexerIdleTimeoutLive = 60 * time.Second
// indexerIdleTimeoutStartup is the period allowed when the indexer has
// little or no history, for Geth's initial sync procedures (e.g., beacon
// headers downloading or something else).
indexerIdleTimeoutStartup = 1 * time.Hour
// indexerIdleDecayWindow defines the progress threshold over which the
// idle timeout linearly scales from the startup period down to the live one.
indexerIdleDecayWindow = 10000
// indexerHeartbeatInterval is the frequency at which the indexer
// checks its status and schedules background indexing operations.
//
// It is a deliberate design choice to introduce this delay instead
// of scheduling tasks immediately; this forces the indexer into
// "batch index mode" which significantly improves database I/O
// efficiency and reduces overhead.
indexerHeartbeatInterval = 15 * time.Second
)
// getTimeout calculates a dynamic inactivity threshold based on indexing
// progress. For massive datasets, this accounts for the preparation phase
// required before active chain synchronization begins (such as beacon
// header downloading).
//
// Note: In private networks with no block, this may result in a long wait.
// In such cases, the initial sync mode is expected to be terminated by
// another condition verifying the chain head has been reached. This timeout
// primarily serves as a heuristic for the network with massive datasets.
func (i *indexIniter) getTimeout() time.Duration {
current := i.indexed.Load()
if current >= indexerIdleDecayWindow {
return indexerIdleTimeoutLive
}
progress := float64(current) / float64(indexerIdleDecayWindow)
diff := float64(indexerIdleTimeoutStartup - indexerIdleTimeoutLive)
decay := time.Duration(diff * progress)
return indexerIdleTimeoutStartup - decay
}
// checkExit reports whether the initial mode has completed. It assumes that
// all local histories have been fully indexed when invoked, and determines
// whether the initial mode should exit and marking historical state access
// as available.
func (i *indexIniter) checkExit(lastEventTime time.Time) bool {
// Verify that the timestamp of the most recently indexed history
// is sufficiently close to the current time, ensuring that the
// majority of histories have been indexed in batch.
timestamp, err := i.readBlockTime(i.indexed.Load())
if err != nil {
i.log.Warn("Failed to read block time", "err", err)
return false
}
blockTime := time.Unix(int64(timestamp), 0)
// By default, 128 layers are piled up, which corresponds to roughly 25 minutes
// with Ethereum's block interval (12s). A threshold of 6 hours is chosen as a
// conservative and safe default for whatever reason.
if time.Since(blockTime) < syncHistoryThreshold {
i.log.Info("Exit the initial mode as close to chain tip", "last", i.indexed.Load(), "age", common.PrettyAge(blockTime))
return true
}
// The most recently indexed block is still at least 6 hours behind. The node
// may be syncing toward a historical block rather than the latest chain head.
// In this case, check whether the indexer has been idle for a while and exit
// the initial mode if so.
if time.Since(lastEventTime) > i.getTimeout() {
i.log.Info("Exit the initial mode due to inactivity", "last", i.indexed.Load(), "age", common.PrettyAge(blockTime))
return true
}
i.log.Debug("Histories fully indexed, waiting next wave", "last", i.indexed.Load(), "age", common.PrettyAge(blockTime))
return false
}
func (i *indexIniter) run(lastID uint64, recover bool) {
func (i *indexIniter) run(recover bool) {
defer i.wg.Done()
// Launch background indexing thread
@ -550,37 +441,37 @@ func (i *indexIniter) run(lastID uint64, recover bool) {
// checkDone reports whether indexing has completed for all histories.
checkDone = func() bool {
return i.indexed.Load() == lastID
return i.indexed.Load() == i.last.Load()
}
lastEventTime = time.Now()
heartBeat = time.NewTicker(indexerHeartbeatInterval)
// canExit reports whether the initial indexing phase has completed.
canExit = func() bool {
return !i.state.is(stateSyncing) && checkDone()
}
heartBeat = time.NewTicker(15 * time.Second)
)
if recover {
var aborted bool
lastID, aborted = i.recover(lastID)
if aborted {
if aborted := i.recover(); aborted {
return
}
}
for {
select {
case signal := <-i.interrupt:
lastEventTime = time.Now()
newLastID := signal.newLastID
oldLastID := i.last.Load()
// The indexing limit can only be extended or shortened continuously.
newLastID := signal.newLastID
if newLastID != lastID+1 && newLastID != lastID-1 {
signal.result <- fmt.Errorf("invalid history id, last: %d, got: %d", lastID, newLastID)
if newLastID != oldLastID+1 && newLastID != oldLastID-1 {
signal.result <- fmt.Errorf("invalid history id, last: %d, got: %d", oldLastID, newLastID)
continue
}
i.last.Store(newLastID) // update indexing range
// The index limit is extended by one, update the limit without
// interrupting the current background process.
if newLastID == lastID+1 {
lastID = newLastID
if newLastID == oldLastID+1 {
signal.result <- nil
i.log.Debug("Extended history range", "last", lastID)
i.log.Debug("Extended history range", "last", newLastID)
continue
}
// The index limit is shortened, interrupt the current background
@ -593,45 +484,45 @@ func (i *indexIniter) run(lastID uint64, recover bool) {
// If all state histories, including the one to be reverted, have
// been fully indexed, unindex it here and shut down the initializer.
if checkDone() {
i.log.Info("Truncate the extra history", "id", lastID)
if err := unindexSingle(lastID, i.disk, i.freezer, i.typ); err != nil {
i.log.Info("Truncate the extra history", "id", oldLastID)
if err := unindexSingle(oldLastID, i.disk, i.freezer, i.typ); err != nil {
signal.result <- err
return
}
close(i.done)
signal.result <- nil
i.log.Info("Histories have been fully indexed", "last", lastID-1)
i.log.Info("Histories have been fully indexed", "last", i.last.Load())
return
}
// Adjust the indexing target and relaunch the process
lastID = newLastID
// Adjust the indexing target
signal.result <- nil
i.log.Debug("Shortened history range", "last", lastID)
i.log.Debug("Shortened history range", "last", newLastID)
case <-done:
done, interrupt = nil, nil
if i.checkExit(lastEventTime) {
if canExit() {
close(i.done)
return
}
case <-heartBeat.C:
// Short circuit if the indexer is still busy
if done != nil {
continue
}
if checkDone() {
if i.checkExit(lastEventTime) {
close(i.done)
return
}
if canExit() {
close(i.done)
return
}
// The local chain is still in the syncing phase. Only start the indexing
// when a sufficient amount of histories has accumulated. Batch indexing
// is more efficient than processing items individually.
if i.state.is(stateSyncing) && i.last.Load()-i.indexed.Load() < indexerProcessBatchInSync {
continue
}
// Any pending tasks are scheduled for background execution, driven by the
// heartbeat ticker. This interval ensures that historical data is processed
// in batches rather than individually.
done, interrupt = make(chan struct{}), new(atomic.Int32)
go i.index(done, interrupt, lastID)
go i.index(done, interrupt, i.last.Load())
case <-i.closed:
if done != nil {
@ -778,34 +669,35 @@ func (i *indexIniter) index(done chan struct{}, interrupt *atomic.Int32, lastID
// by chain recovery, under the assumption that the recovered histories will be
// identical to the lost ones. Fork-awareness should be added in the future to
// correctly handle histories affected by reorgs.
func (i *indexIniter) recover(lastID uint64) (uint64, bool) {
log.Info("History indexer is recovering", "history", lastID, "indexed", i.indexed.Load())
func (i *indexIniter) recover() bool {
log.Info("History indexer is recovering", "last", i.last.Load(), "indexed", i.indexed.Load())
for {
select {
case signal := <-i.interrupt:
newLastID := signal.newLastID
if newLastID != lastID+1 && newLastID != lastID-1 {
signal.result <- fmt.Errorf("invalid history id, last: %d, got: %d", lastID, newLastID)
oldLastID := i.last.Load()
// The indexing limit can only be extended or shortened continuously.
if newLastID != oldLastID+1 && newLastID != oldLastID-1 {
signal.result <- fmt.Errorf("invalid history id, last: %d, got: %d", oldLastID, newLastID)
continue
}
// Update the last indexed flag
lastID = newLastID
signal.result <- nil
i.last.Store(newLastID)
i.log.Debug("Updated history index flag", "last", lastID)
i.log.Debug("Updated history index flag", "last", newLastID)
// Terminate the recovery routine once the histories are fully aligned
// with the index data, indicating that index initialization is complete.
metadata := loadIndexMetadata(i.disk, i.typ)
if metadata != nil && metadata.Last == lastID {
i.log.Info("History indexer is recovered", "last", lastID)
return lastID, false
if metadata != nil && metadata.Last == newLastID {
i.log.Info("History indexer is recovered", "last", newLastID)
return false
}
case <-i.closed:
return 0, true
return true
}
}
}

View file

@ -0,0 +1,179 @@
// Copyright 2026 The go-ethereum Authors
// This file is part of the go-ethereum library.
//
// The go-ethereum library is free software: you can redistribute it and/or modify
// it under the terms of the GNU Lesser General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// The go-ethereum library is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Lesser General Public License for more details.
//
// You should have received a copy of the GNU Lesser General Public License
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/
package pathdb
import (
"bytes"
"sync"
"time"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/rawdb"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/ethdb"
"github.com/ethereum/go-ethereum/log"
)
// state represents the syncing status of the node.
type state int
const (
// stateSynced indicates that the local chain head is sufficiently close to the
// network chain head, and the majority of the data has been fully synchronized.
stateSynced state = iota
// stateSyncing indicates that the sync process is still in progress. Local node
// is actively catching up with the network chain head.
stateSyncing
// stateStalled indicates that sync progress has stopped for a while
// with no progress. This may be caused by network instability (e.g., no eers),
// manual operation such as syncing the local chain to a specific block.
stateStalled
)
const (
// syncStateTimeWindow defines the maximum allowed lag behind the network
// chain head.
//
// If the local chain head falls within this threshold, the node is considered
// close to the tip and will be marked as stateSynced.
syncStateTimeWindow = 6 * time.Hour
// syncStalledTimeout defines the maximum duration during which no sync
// progress is observed. If this timeout is exceeded, the node's status
// will be considered stalled.
syncStalledTimeout = 5 * time.Minute
)
type initerState struct {
state state
stateLock sync.Mutex
disk ethdb.Database
term chan struct{}
}
func newIniterState(disk ethdb.Database) *initerState {
s := &initerState{
disk: disk,
term: make(chan struct{}),
}
go s.update()
return s
}
func (s *initerState) get() state {
s.stateLock.Lock()
defer s.stateLock.Unlock()
return s.state
}
func (s *initerState) is(state state) bool {
return s.get() == state
}
func (s *initerState) set(state state) {
s.stateLock.Lock()
defer s.stateLock.Unlock()
s.state = state
}
func (s *initerState) update() {
ticker := time.NewTicker(time.Minute)
defer ticker.Stop()
head := s.readLastBlock()
if head != nil && time.Since(time.Unix(int64(head.Time), 0)) > syncStateTimeWindow {
s.set(stateSynced)
log.Info("Marked indexing initer as synced")
} else {
s.set(stateSyncing)
}
var (
hhash = rawdb.ReadHeadHeaderHash(s.disk)
fhash = rawdb.ReadHeadFastBlockHash(s.disk)
bhash = rawdb.ReadHeadBlockHash(s.disk)
skeleton = rawdb.ReadSkeletonSyncStatus(s.disk)
lastProgress = time.Now()
)
for {
select {
case <-ticker.C:
state := s.get()
if state == stateSynced || state == stateStalled {
continue
}
headBlock := s.readLastBlock()
if headBlock == nil {
continue
}
// State machine: stateSyncing => stateSynced
if time.Since(time.Unix(int64(headBlock.Time), 0)) < syncStateTimeWindow {
s.set(stateSynced)
log.Info("Marked indexing initer as synced")
continue
}
// State machine: stateSyncing => stateStalled
newhhash := rawdb.ReadHeadHeaderHash(s.disk)
newfhash := rawdb.ReadHeadFastBlockHash(s.disk)
newbhash := rawdb.ReadHeadBlockHash(s.disk)
newskeleton := rawdb.ReadSkeletonSyncStatus(s.disk)
hasProgress := newhhash.Cmp(hhash) != 0 || newfhash.Cmp(fhash) != 0 || newbhash.Cmp(bhash) != 0 || bytes.Equal(newskeleton, skeleton)
if !hasProgress && time.Since(lastProgress) > syncStalledTimeout {
s.set(stateStalled)
log.Info("Marked indexing initer as stalled")
continue
}
if hasProgress {
hhash = newhhash
fhash = newfhash
bhash = newbhash
skeleton = newskeleton
lastProgress = time.Now()
}
case <-s.term:
return
}
}
}
func (s *initerState) close() {
select {
case <-s.term:
default:
close(s.term)
}
return
}
// readLastBlock returns the local chain head.
func (s *initerState) readLastBlock() *types.Header {
hash := rawdb.ReadHeadBlockHash(s.disk)
if hash == (common.Hash{}) {
return nil
}
number, exists := rawdb.ReadHeaderNumber(s.disk, hash)
if !exists {
return nil
}
return rawdb.ReadHeader(s.disk, hash, number)
}

View file

@ -27,7 +27,7 @@ import (
// deadlock when the indexer is active. This specifically targets the case where
// signal.result must be sent to unblock the caller.
func TestHistoryIndexerShortenDeadlock(t *testing.T) {
//log.SetDefault(log.NewLogger(log.NewTerminalHandlerWithLevel(os.Stderr, log.LevelInfo, true)))
// log.SetDefault(log.NewLogger(log.NewTerminalHandlerWithLevel(os.Stderr, log.LevelDebug, true)))
db := rawdb.NewMemoryDatabase()
freezer, _ := rawdb.NewStateFreezer(t.TempDir(), false, false)
defer freezer.Close()