triedb/pathdb: improve history indexer

This commit is contained in:
Gary Rong 2026-01-28 11:14:52 +08:00
parent 1fbdea6a99
commit 8a6e026fcb

View file

@ -455,6 +455,61 @@ func (i *indexIniter) readBlockTime(historyID uint64) (uint64, error) {
return header.Time, nil
}
const (
// syncHistoryThreshold is the time window behind the chain head.
// If the block being indexed is within this window, we consider the node
// to be close enough to the tip to operate in real-time mode.
syncHistoryThreshold = 6 * time.Hour
// indexerIdleTimeout is the maximum duration of inactivity allowed
// before the indexer decides it has finished its current catch-up
// and exits the initial sync mode.
indexerIdleTimeout = 30 * time.Second
// indexerHeartbeatInterval is the frequency at which the indexer
// checks its status and schedules background indexing operations.
//
// It is a deliberate design choice to introduce this delay instead
// of scheduling tasks immediately; this forces the indexer into
// "batch index mode" which significantly improves database I/O
// efficiency and reduces overhead.
indexerHeartbeatInterval = 15 * time.Second
)
// checkExit reports whether the initial mode has completed. It assumes that
// all local histories have been fully indexed when invoked, and determines
// whether the initial mode should exit and marking historical state access
// as available.
func (i *indexIniter) checkExit(lastEventTime time.Time) bool {
// Verify that the timestamp of the most recently indexed history
// is sufficiently close to the current time, ensuring that the
// majority of histories have been indexed in batch.
timestamp, err := i.readBlockTime(i.indexed.Load())
if err != nil {
i.log.Warn("Failed to read block time", "err", err)
return false
}
blockTime := time.Unix(int64(timestamp), 0)
// By default, 128 layers are piled up, which corresponds to roughly 25 minutes
// with Ethereum's block interval (12s). A threshold of 6 hours is chosen as a
// conservative and safe default for whatever reason.
if time.Since(blockTime) < syncHistoryThreshold {
i.log.Info("Exit the initial mode as close to chain tip", "last", i.indexed.Load(), "age", common.PrettyAge(blockTime))
return true
}
// The most recently indexed block is still at least 6 hours behind. The node
// may be syncing toward a historical block rather than the latest chain head.
// In this case, check whether the indexer has been idle for a while and exit
// the initial mode if so.
if time.Since(lastEventTime) > indexerIdleTimeout {
i.log.Info("Exit the initial mode due to inactivity", "last", i.indexed.Load(), "age", common.PrettyAge(blockTime))
return true
}
i.log.Debug("Histories fully indexed, waiting next wave", "last", i.indexed.Load(), "age", common.PrettyAge(blockTime))
return false
}
func (i *indexIniter) run(lastID uint64, recover bool) {
defer i.wg.Done()
@ -463,13 +518,12 @@ func (i *indexIniter) run(lastID uint64, recover bool) {
done chan struct{}
interrupt *atomic.Int32
start = time.Now()
lastCheckTime time.Time
// checkDone reports whether indexing has completed for all histories.
checkDone = func() bool {
return i.indexed.Load() == lastID
}
lastEventTime = time.Now()
heartBeat = time.NewTicker(indexerHeartbeatInterval)
)
if recover {
var aborted bool
@ -479,12 +533,10 @@ func (i *indexIniter) run(lastID uint64, recover bool) {
}
}
for {
if done == nil && !checkDone() {
done, interrupt = make(chan struct{}), new(atomic.Int32)
go i.index(done, interrupt, lastID)
}
select {
case signal := <-i.interrupt:
lastEventTime = time.Now()
// The indexing limit can only be extended or shortened continuously.
newLastID := signal.newLastID
if newLastID != lastID+1 && newLastID != lastID-1 {
@ -529,40 +581,34 @@ func (i *indexIniter) run(lastID uint64, recover bool) {
case <-done:
done, interrupt = nil, nil
if checkDone() && time.Since(lastCheckTime) > time.Second*8 {
lastCheckTime = time.Now()
// Verify that the timestamp of the most recently indexed history
// is sufficiently close to the current time, ensuring that the
// majority of histories have been indexed in batch.
timestamp, err := i.readBlockTime(lastID)
if err != nil {
i.log.Warn("Failed to read block time", "err", err)
continue
}
blockTime := time.Unix(int64(timestamp), 0)
// By default, 128 layers are piled up, which corresponds to roughly 25 minutes
// with Ethereum's block interval (12s). A threshold of 6 hours is chosen as a
// conservative and safe default for whatever reason.
if time.Since(blockTime) > 6*time.Hour {
i.log.Debug("Histories fully indexed, waiting next wave", "age", common.PrettyAge(blockTime))
continue
}
if i.checkExit(lastEventTime) {
close(i.done)
i.log.Info("Histories have been fully indexed", "last", lastID, "age", common.PrettyAge(blockTime), "elapsed", common.PrettyDuration(time.Since(start)))
return
}
case <-heartBeat.C:
if done != nil {
continue
}
if checkDone() {
if i.checkExit(lastEventTime) {
close(i.done)
return
}
continue
}
// Any pending tasks are scheduled for background execution, driven by the
// heartbeat ticker. This interval ensures that historical data is processed
// in batches rather than individually.
done, interrupt = make(chan struct{}), new(atomic.Int32)
go i.index(done, interrupt, lastID)
case <-i.closed:
if done != nil {
interrupt.Store(1)
i.log.Info("Waiting background history index initer to exit")
<-done
}
if checkDone() {
close(i.done)
}
return
}
}