triedb/pathdb: enhance history index initer

This commit is contained in:
Gary Rong 2026-01-19 16:18:34 +08:00
parent 344d01e2be
commit a117eb3552

View file

@ -349,7 +349,7 @@ type interruptSignal struct {
// If a state history is removed due to a rollback, the associated indexes should
// be unmarked accordingly.
type indexIniter struct {
disk ethdb.KeyValueStore
disk ethdb.Database
freezer ethdb.AncientStore
interrupt chan *interruptSignal
done chan struct{}
@ -364,7 +364,7 @@ type indexIniter struct {
wg sync.WaitGroup
}
func newIndexIniter(disk ethdb.KeyValueStore, freezer ethdb.AncientStore, typ historyType, lastID uint64) *indexIniter {
func newIndexIniter(disk ethdb.Database, freezer ethdb.AncientStore, typ historyType, lastID uint64) *indexIniter {
initer := &indexIniter{
disk: disk,
freezer: freezer,
@ -385,12 +385,7 @@ func newIndexIniter(disk ethdb.KeyValueStore, freezer ethdb.AncientStore, typ hi
// Launch background indexer
initer.wg.Add(1)
if recover {
log.Info("History indexer is recovering", "history", lastID, "indexed", metadata.Last)
go initer.recover(lastID)
} else {
go initer.run(lastID)
}
go initer.run(lastID, recover)
return initer
}
@ -431,24 +426,63 @@ func (i *indexIniter) remain() uint64 {
}
}
func (i *indexIniter) run(lastID uint64) {
// readBlockTime reads the associated block time of the provided history ID.
func (i *indexIniter) readBlockTime(historyID uint64) (uint64, error) {
var blockNumber uint64
switch i.typ {
case typeStateHistory:
m, err := readStateHistoryMeta(i.freezer, historyID)
if err != nil {
return 0, err
}
blockNumber = m.block
case typeTrienodeHistory:
m, err := readTrienodeMetadata(i.freezer, historyID)
if err != nil {
return 0, err
}
blockNumber = m.block
}
hash := rawdb.ReadCanonicalHash(i.disk, blockNumber)
if hash == (common.Hash{}) {
return 0, errors.New("block not found")
}
header := rawdb.ReadHeader(i.disk, hash, blockNumber)
if header == nil {
return 0, errors.New("block not found")
}
return header.Time, nil
}
func (i *indexIniter) run(lastID uint64, recover bool) {
defer i.wg.Done()
// Launch background indexing thread
var (
done = make(chan struct{})
interrupt = new(atomic.Int32)
done chan struct{}
interrupt *atomic.Int32
// checkDone indicates whether all requested state histories
// have been fully indexed.
start = time.Now()
lastCheckTime time.Time
// checkDone reports whether indexing has completed for all histories.
checkDone = func() bool {
metadata := loadIndexMetadata(i.disk, i.typ)
return metadata != nil && metadata.Last == lastID
return i.indexed.Load() == lastID
}
)
go i.index(done, interrupt, lastID)
if recover {
var aborted bool
lastID, aborted = i.recover(lastID)
if aborted {
return
}
}
for {
if done == nil && !checkDone() {
done, interrupt = make(chan struct{}), new(atomic.Int32)
go i.index(done, interrupt, lastID)
}
select {
case signal := <-i.interrupt:
// The indexing limit can only be extended or shortened continuously.
@ -467,11 +501,13 @@ func (i *indexIniter) run(lastID uint64) {
i.log.Debug("Extended history range", "last", lastID)
continue
}
// The index limit is shortened by one, interrupt the current background
// process and relaunch with new target.
interrupt.Store(1)
<-done
// The index limit is shortened, interrupt the current background
// process if it's active and update the target.
if done != nil {
interrupt.Store(1)
<-done
done, interrupt = nil, nil
}
// If all state histories, including the one to be reverted, have
// been fully indexed, unindex it here and shut down the initializer.
if checkDone() {
@ -488,26 +524,42 @@ func (i *indexIniter) run(lastID uint64) {
// Adjust the indexing target and relaunch the process
lastID = newLastID
signal.result <- nil
done, interrupt = make(chan struct{}), new(atomic.Int32)
go i.index(done, interrupt, lastID)
i.log.Debug("Shortened history range", "last", lastID)
case <-done:
if checkDone() {
done, interrupt = nil, nil
if checkDone() && time.Since(lastCheckTime) > time.Second*8 {
lastCheckTime = time.Now()
// Verify that the timestamp of the most recently indexed history
// is sufficiently close to the current time, ensuring that the
// majority of histories have been indexed in batch.
timestamp, err := i.readBlockTime(lastID)
if err != nil {
i.log.Warn("Failed to read block time", "err", err)
continue
}
blockTime := time.Unix(int64(timestamp), 0)
// By default, 128 layers are piled up, which corresponds to roughly 25 minutes
// with Ethereum's block interval (12s). A threshold of 6 hours is chosen as a
// conservative and safe default for whatever reason.
if time.Since(blockTime) > 6*time.Hour {
i.log.Debug("Histories fully indexed, waiting next wave", "age", common.PrettyAge(blockTime))
continue
}
close(i.done)
i.log.Info("Histories have been fully indexed", "last", lastID)
i.log.Info("Histories have been fully indexed", "last", lastID, "age", common.PrettyAge(blockTime), "elapsed", common.PrettyDuration(time.Since(start)))
return
}
// Relaunch the background runner if some tasks are left
done, interrupt = make(chan struct{}), new(atomic.Int32)
go i.index(done, interrupt, lastID)
case <-i.closed:
interrupt.Store(1)
i.log.Info("Waiting background history index initer to exit")
<-done
if done != nil {
interrupt.Store(1)
i.log.Info("Waiting background history index initer to exit")
<-done
}
if checkDone() {
close(i.done)
}
@ -571,7 +623,7 @@ func (i *indexIniter) index(done chan struct{}, interrupt *atomic.Int32, lastID
}
return
}
i.log.Info("Start history indexing", "beginID", beginID, "lastID", lastID)
i.log.Debug("Start history indexing", "beginID", beginID, "lastID", lastID)
var (
current = beginID
@ -618,7 +670,7 @@ func (i *indexIniter) index(done chan struct{}, interrupt *atomic.Int32, lastID
done = current - beginID
)
eta := common.CalculateETA(done, left, time.Since(start))
i.log.Info("Indexing history", "processed", done, "left", left, "elapsed", common.PrettyDuration(time.Since(start)), "eta", common.PrettyDuration(eta))
i.log.Debug("Indexing history", "processed", done, "left", left, "elapsed", common.PrettyDuration(time.Since(start)), "eta", common.PrettyDuration(eta))
}
}
i.indexed.Store(current - 1) // update indexing progress
@ -629,7 +681,7 @@ func (i *indexIniter) index(done chan struct{}, interrupt *atomic.Int32, lastID
if err := batch.finish(true); err != nil {
i.log.Error("Failed to flush index", "err", err)
}
log.Info("State indexing interrupted")
log.Debug("State indexing interrupted")
return
}
}
@ -637,7 +689,7 @@ func (i *indexIniter) index(done chan struct{}, interrupt *atomic.Int32, lastID
if err := batch.finish(true); err != nil {
i.log.Error("Failed to flush index", "err", err)
}
i.log.Info("Indexed history", "from", beginID, "to", lastID, "elapsed", common.PrettyDuration(time.Since(start)))
i.log.Debug("Indexed history", "from", beginID, "to", lastID, "elapsed", common.PrettyDuration(time.Since(start)))
}
// recover handles unclean shutdown recovery. After an unclean shutdown, any
@ -650,8 +702,8 @@ func (i *indexIniter) index(done chan struct{}, interrupt *atomic.Int32, lastID
// by chain recovery, under the assumption that the recovered histories will be
// identical to the lost ones. Fork-awareness should be added in the future to
// correctly handle histories affected by reorgs.
func (i *indexIniter) recover(lastID uint64) {
defer i.wg.Done()
func (i *indexIniter) recover(lastID uint64) (uint64, bool) {
log.Info("History indexer is recovering", "history", lastID, "indexed", i.indexed.Load())
for {
select {
@ -672,13 +724,12 @@ func (i *indexIniter) recover(lastID uint64) {
// with the index data, indicating that index initialization is complete.
metadata := loadIndexMetadata(i.disk, i.typ)
if metadata != nil && metadata.Last == lastID {
close(i.done)
i.log.Info("History indexer is recovered", "last", lastID)
return
return lastID, false
}
case <-i.closed:
return
return 0, true
}
}
}
@ -746,7 +797,7 @@ func checkVersion(disk ethdb.KeyValueStore, typ historyType) {
// newHistoryIndexer constructs the history indexer and launches the background
// initer to complete the indexing of any remaining state histories.
func newHistoryIndexer(disk ethdb.KeyValueStore, freezer ethdb.AncientStore, lastHistoryID uint64, typ historyType) *historyIndexer {
func newHistoryIndexer(disk ethdb.Database, freezer ethdb.AncientStore, lastHistoryID uint64, typ historyType) *historyIndexer {
checkVersion(disk, typ)
return &historyIndexer{
initer: newIndexIniter(disk, freezer, typ, lastHistoryID),