This commit is contained in:
rjl493456442 2026-02-25 21:54:44 -08:00 committed by GitHub
commit e8056e8cc4
No known key found for this signature in database
GPG key ID: B5690EEEBB952194

View file

@ -349,7 +349,7 @@ type interruptSignal struct {
// If a state history is removed due to a rollback, the associated indexes should
// be unmarked accordingly.
type indexIniter struct {
disk ethdb.KeyValueStore
disk ethdb.Database
freezer ethdb.AncientStore
interrupt chan *interruptSignal
done chan struct{}
@ -364,7 +364,7 @@ type indexIniter struct {
wg sync.WaitGroup
}
func newIndexIniter(disk ethdb.KeyValueStore, freezer ethdb.AncientStore, typ historyType, lastID uint64) *indexIniter {
func newIndexIniter(disk ethdb.Database, freezer ethdb.AncientStore, typ historyType, lastID uint64) *indexIniter {
initer := &indexIniter{
disk: disk,
freezer: freezer,
@ -385,12 +385,7 @@ func newIndexIniter(disk ethdb.KeyValueStore, freezer ethdb.AncientStore, typ hi
// Launch background indexer
initer.wg.Add(1)
if recover {
log.Info("History indexer is recovering", "history", lastID, "indexed", metadata.Last)
go initer.recover(lastID)
} else {
go initer.run(lastID)
}
go initer.run(lastID, recover)
return initer
}
@ -431,26 +426,147 @@ func (i *indexIniter) remain() uint64 {
}
}
func (i *indexIniter) run(lastID uint64) {
// readBlockTime reads the associated block time of the provided history ID.
func (i *indexIniter) readBlockTime(historyID uint64) (uint64, error) {
var blockNumber uint64
switch i.typ {
case typeStateHistory:
m, err := readStateHistoryMeta(i.freezer, historyID)
if err != nil {
return 0, err
}
blockNumber = m.block
case typeTrienodeHistory:
m, err := readTrienodeMetadata(i.freezer, historyID)
if err != nil {
return 0, err
}
blockNumber = m.block
}
hash := rawdb.ReadCanonicalHash(i.disk, blockNumber)
if hash == (common.Hash{}) {
return 0, errors.New("block not found")
}
header := rawdb.ReadHeader(i.disk, hash, blockNumber)
if header == nil {
return 0, errors.New("block not found")
}
return header.Time, nil
}
const (
// syncHistoryThreshold is the time window behind the chain head.
// If the block being indexed is within this window, we consider the node
// to be close enough to the tip to operate in real-time mode.
syncHistoryThreshold = 6 * time.Hour
// indexerIdleTimeoutLive is the strict inactivity limit applied once the
// indexer is synchronized with the chain head.
indexerIdleTimeoutLive = 60 * time.Second
// indexerIdleTimeoutStartup is the period allowed when the indexer has
// little or no history, for Geth's initial sync procedures (e.g., beacon
// headers downloading or something else).
indexerIdleTimeoutStartup = 1 * time.Hour
// indexerIdleDecayWindow defines the progress threshold over which the
// idle timeout linearly scales from the startup period down to the live one.
indexerIdleDecayWindow = 10000
// indexerHeartbeatInterval is the frequency at which the indexer
// checks its status and schedules background indexing operations.
//
// It is a deliberate design choice to introduce this delay instead
// of scheduling tasks immediately; this forces the indexer into
// "batch index mode" which significantly improves database I/O
// efficiency and reduces overhead.
indexerHeartbeatInterval = 15 * time.Second
)
// getTimeout calculates a dynamic inactivity threshold based on indexing
// progress. For massive datasets, this accounts for the preparation phase
// required before active chain synchronization begins (such as beacon
// header downloading).
//
// Note: In private networks with no block, this may result in a long wait.
// In such cases, the initial sync mode is expected to be terminated by
// another condition verifying the chain head has been reached. This timeout
// primarily serves as a heuristic for the network with massive datasets.
func (i *indexIniter) getTimeout() time.Duration {
current := i.indexed.Load()
if current >= indexerIdleDecayWindow {
return indexerIdleTimeoutLive
}
progress := float64(current) / float64(indexerIdleDecayWindow)
diff := float64(indexerIdleTimeoutStartup - indexerIdleTimeoutLive)
decay := time.Duration(diff * progress)
return indexerIdleTimeoutStartup - decay
}
// checkExit reports whether the initial mode has completed. It assumes that
// all local histories have been fully indexed when invoked, and determines
// whether the initial mode should exit and marking historical state access
// as available.
func (i *indexIniter) checkExit(lastEventTime time.Time) bool {
// Verify that the timestamp of the most recently indexed history
// is sufficiently close to the current time, ensuring that the
// majority of histories have been indexed in batch.
timestamp, err := i.readBlockTime(i.indexed.Load())
if err != nil {
i.log.Warn("Failed to read block time", "err", err)
return false
}
blockTime := time.Unix(int64(timestamp), 0)
// By default, 128 layers are piled up, which corresponds to roughly 25 minutes
// with Ethereum's block interval (12s). A threshold of 6 hours is chosen as a
// conservative and safe default for whatever reason.
if time.Since(blockTime) < syncHistoryThreshold {
i.log.Info("Exit the initial mode as close to chain tip", "last", i.indexed.Load(), "age", common.PrettyAge(blockTime))
return true
}
// The most recently indexed block is still at least 6 hours behind. The node
// may be syncing toward a historical block rather than the latest chain head.
// In this case, check whether the indexer has been idle for a while and exit
// the initial mode if so.
if time.Since(lastEventTime) > i.getTimeout() {
i.log.Info("Exit the initial mode due to inactivity", "last", i.indexed.Load(), "age", common.PrettyAge(blockTime))
return true
}
i.log.Debug("Histories fully indexed, waiting next wave", "last", i.indexed.Load(), "age", common.PrettyAge(blockTime))
return false
}
func (i *indexIniter) run(lastID uint64, recover bool) {
defer i.wg.Done()
// Launch background indexing thread
var (
done = make(chan struct{})
interrupt = new(atomic.Int32)
done chan struct{}
interrupt *atomic.Int32
// checkDone indicates whether all requested state histories
// have been fully indexed.
// checkDone reports whether indexing has completed for all histories.
checkDone = func() bool {
metadata := loadIndexMetadata(i.disk, i.typ)
return metadata != nil && metadata.Last == lastID
return i.indexed.Load() == lastID
}
lastEventTime = time.Now()
heartBeat = time.NewTicker(indexerHeartbeatInterval)
)
go i.index(done, interrupt, lastID)
if recover {
var aborted bool
lastID, aborted = i.recover(lastID)
if aborted {
return
}
}
for {
select {
case signal := <-i.interrupt:
lastEventTime = time.Now()
// The indexing limit can only be extended or shortened continuously.
newLastID := signal.newLastID
if newLastID != lastID+1 && newLastID != lastID-1 {
@ -467,11 +583,13 @@ func (i *indexIniter) run(lastID uint64) {
i.log.Debug("Extended history range", "last", lastID)
continue
}
// The index limit is shortened by one, interrupt the current background
// process and relaunch with new target.
interrupt.Store(1)
<-done
// The index limit is shortened, interrupt the current background
// process if it's active and update the target.
if done != nil {
interrupt.Store(1)
<-done
done, interrupt = nil, nil
}
// If all state histories, including the one to be reverted, have
// been fully indexed, unindex it here and shut down the initializer.
if checkDone() {
@ -488,28 +606,38 @@ func (i *indexIniter) run(lastID uint64) {
// Adjust the indexing target and relaunch the process
lastID = newLastID
signal.result <- nil
done, interrupt = make(chan struct{}), new(atomic.Int32)
go i.index(done, interrupt, lastID)
i.log.Debug("Shortened history range", "last", lastID)
case <-done:
if checkDone() {
done, interrupt = nil, nil
if i.checkExit(lastEventTime) {
close(i.done)
i.log.Info("Histories have been fully indexed", "last", lastID)
return
}
// Relaunch the background runner if some tasks are left
case <-heartBeat.C:
if done != nil {
continue
}
if checkDone() {
if i.checkExit(lastEventTime) {
close(i.done)
return
}
continue
}
// Any pending tasks are scheduled for background execution, driven by the
// heartbeat ticker. This interval ensures that historical data is processed
// in batches rather than individually.
done, interrupt = make(chan struct{}), new(atomic.Int32)
go i.index(done, interrupt, lastID)
case <-i.closed:
interrupt.Store(1)
i.log.Info("Waiting background history index initer to exit")
<-done
if checkDone() {
close(i.done)
if done != nil {
interrupt.Store(1)
i.log.Info("Waiting background history index initer to exit")
<-done
}
return
}
@ -571,7 +699,7 @@ func (i *indexIniter) index(done chan struct{}, interrupt *atomic.Int32, lastID
}
return
}
i.log.Info("Start history indexing", "beginID", beginID, "lastID", lastID)
i.log.Debug("Start history indexing", "beginID", beginID, "lastID", lastID)
var (
current = beginID
@ -618,7 +746,7 @@ func (i *indexIniter) index(done chan struct{}, interrupt *atomic.Int32, lastID
done = current - beginID
)
eta := common.CalculateETA(done, left, time.Since(start))
i.log.Info("Indexing history", "processed", done, "left", left, "elapsed", common.PrettyDuration(time.Since(start)), "eta", common.PrettyDuration(eta))
i.log.Debug("Indexing history", "processed", done, "left", left, "elapsed", common.PrettyDuration(time.Since(start)), "eta", common.PrettyDuration(eta))
}
}
i.indexed.Store(current - 1) // update indexing progress
@ -629,7 +757,7 @@ func (i *indexIniter) index(done chan struct{}, interrupt *atomic.Int32, lastID
if err := batch.finish(true); err != nil {
i.log.Error("Failed to flush index", "err", err)
}
log.Info("State indexing interrupted")
log.Debug("State indexing interrupted")
return
}
}
@ -637,7 +765,7 @@ func (i *indexIniter) index(done chan struct{}, interrupt *atomic.Int32, lastID
if err := batch.finish(true); err != nil {
i.log.Error("Failed to flush index", "err", err)
}
i.log.Info("Indexed history", "from", beginID, "to", lastID, "elapsed", common.PrettyDuration(time.Since(start)))
i.log.Debug("Indexed history", "from", beginID, "to", lastID, "elapsed", common.PrettyDuration(time.Since(start)))
}
// recover handles unclean shutdown recovery. After an unclean shutdown, any
@ -650,8 +778,8 @@ func (i *indexIniter) index(done chan struct{}, interrupt *atomic.Int32, lastID
// by chain recovery, under the assumption that the recovered histories will be
// identical to the lost ones. Fork-awareness should be added in the future to
// correctly handle histories affected by reorgs.
func (i *indexIniter) recover(lastID uint64) {
defer i.wg.Done()
func (i *indexIniter) recover(lastID uint64) (uint64, bool) {
log.Info("History indexer is recovering", "history", lastID, "indexed", i.indexed.Load())
for {
select {
@ -672,13 +800,12 @@ func (i *indexIniter) recover(lastID uint64) {
// with the index data, indicating that index initialization is complete.
metadata := loadIndexMetadata(i.disk, i.typ)
if metadata != nil && metadata.Last == lastID {
close(i.done)
i.log.Info("History indexer is recovered", "last", lastID)
return
return lastID, false
}
case <-i.closed:
return
return 0, true
}
}
}
@ -746,7 +873,7 @@ func checkVersion(disk ethdb.KeyValueStore, typ historyType) {
// newHistoryIndexer constructs the history indexer and launches the background
// initer to complete the indexing of any remaining state histories.
func newHistoryIndexer(disk ethdb.KeyValueStore, freezer ethdb.AncientStore, lastHistoryID uint64, typ historyType) *historyIndexer {
func newHistoryIndexer(disk ethdb.Database, freezer ethdb.AncientStore, lastHistoryID uint64, typ historyType) *historyIndexer {
checkVersion(disk, typ)
return &historyIndexer{
initer: newIndexIniter(disk, freezer, typ, lastHistoryID),