all: incorporate state history indexing status into eth_syncing response (#32099)
Some checks are pending
/ Linux Build (push) Waiting to run
/ Linux Build (arm) (push) Waiting to run
/ Docker Image (push) Waiting to run

This pull request tracks the state indexing progress in eth_syncing
RPC response, i.e. we will return non-null syncing status until indexing
has finished.
This commit is contained in:
rjl493456442 2025-06-26 23:20:20 +08:00 committed by GitHub
parent 36bcc24fbe
commit 0c90e4bda0
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
11 changed files with 88 additions and 2 deletions

View file

@ -426,6 +426,11 @@ func (bc *BlockChain) TxIndexProgress() (TxIndexProgress, error) {
return bc.txIndexer.txIndexProgress(), nil
}
// StateIndexProgress returns the historical state indexing progress.
func (bc *BlockChain) StateIndexProgress() (uint64, error) {
return bc.triedb.IndexProgress()
}
// HistoryPruningCutoff returns the configured history pruning point.
// Blocks before this might not be available in the database.
func (bc *BlockChain) HistoryPruningCutoff() (uint64, common.Hash) {

View file

@ -403,6 +403,10 @@ func (b *EthAPIBackend) SyncProgress(ctx context.Context) ethereum.SyncProgress
prog.TxIndexFinishedBlocks = txProg.Indexed
prog.TxIndexRemainingBlocks = txProg.Remaining
}
remain, err := b.eth.blockchain.StateIndexProgress()
if err == nil {
prog.StateIndexRemaining = remain
}
return prog
}

View file

@ -81,6 +81,10 @@ func (api *DownloaderAPI) eventLoop() {
prog.TxIndexFinishedBlocks = txProg.Indexed
prog.TxIndexRemainingBlocks = txProg.Remaining
}
remain, err := api.chain.StateIndexProgress()
if err == nil {
prog.StateIndexRemaining = remain
}
return prog
}
)

View file

@ -789,6 +789,7 @@ type rpcProgress struct {
HealingBytecode hexutil.Uint64
TxIndexFinishedBlocks hexutil.Uint64
TxIndexRemainingBlocks hexutil.Uint64
StateIndexRemaining hexutil.Uint64
}
func (p *rpcProgress) toSyncProgress() *ethereum.SyncProgress {
@ -815,5 +816,6 @@ func (p *rpcProgress) toSyncProgress() *ethereum.SyncProgress {
HealingBytecode: uint64(p.HealingBytecode),
TxIndexFinishedBlocks: uint64(p.TxIndexFinishedBlocks),
TxIndexRemainingBlocks: uint64(p.TxIndexRemainingBlocks),
StateIndexRemaining: uint64(p.StateIndexRemaining),
}
}

View file

@ -1510,6 +1510,9 @@ func (s *SyncState) TxIndexFinishedBlocks() hexutil.Uint64 {
func (s *SyncState) TxIndexRemainingBlocks() hexutil.Uint64 {
return hexutil.Uint64(s.progress.TxIndexRemainingBlocks)
}
func (s *SyncState) StateIndexRemaining() hexutil.Uint64 {
return hexutil.Uint64(s.progress.StateIndexRemaining)
}
// Syncing returns false in case the node is currently not syncing with the network. It can be up-to-date or has not
// yet received the latest block headers from its peers. In case it is synchronizing:

View file

@ -124,6 +124,9 @@ type SyncProgress struct {
// "transaction indexing" fields
TxIndexFinishedBlocks uint64 // Number of blocks whose transactions are already indexed
TxIndexRemainingBlocks uint64 // Number of blocks whose transactions are not indexed yet
// "historical state indexing" fields
StateIndexRemaining uint64 // Number of states remain unindexed
}
// Done returns the indicator if the initial sync is finished or not.
@ -131,7 +134,7 @@ func (prog SyncProgress) Done() bool {
if prog.CurrentBlock < prog.HighestBlock {
return false
}
return prog.TxIndexRemainingBlocks == 0
return prog.TxIndexRemainingBlocks == 0 && prog.StateIndexRemaining == 0
}
// ChainSyncReader wraps access to the node's current sync status. If there's no

View file

@ -170,6 +170,7 @@ func (api *EthereumAPI) Syncing(ctx context.Context) (interface{}, error) {
"healingBytecode": hexutil.Uint64(progress.HealingBytecode),
"txIndexFinishedBlocks": hexutil.Uint64(progress.TxIndexFinishedBlocks),
"txIndexRemainingBlocks": hexutil.Uint64(progress.TxIndexRemainingBlocks),
"stateIndexRemaining": hexutil.Uint64(progress.StateIndexRemaining),
}, nil
}

View file

@ -3977,6 +3977,7 @@ var outputSyncingFormatter = function(result) {
result.healingBytecode = utils.toDecimal(result.healingBytecode);
result.txIndexFinishedBlocks = utils.toDecimal(result.txIndexFinishedBlocks);
result.txIndexRemainingBlocks = utils.toDecimal(result.txIndexRemainingBlocks);
result.stateIndexRemaining = utils.toDecimal(result.stateIndexRemaining)
return result;
};

View file

@ -356,6 +356,16 @@ func (db *Database) StorageIterator(root common.Hash, account common.Hash, seek
return pdb.StorageIterator(root, account, seek)
}
// IndexProgress returns the indexing progress made so far. It provides the
// number of states that remain unindexed.
func (db *Database) IndexProgress() (uint64, error) {
pdb, ok := db.backend.(*pathdb.Database)
if !ok {
return 0, errors.New("not supported")
}
return pdb.IndexProgress()
}
// IsVerkle returns the indicator if the database is holding a verkle tree.
func (db *Database) IsVerkle() bool {
return db.config.IsVerkle

View file

@ -700,6 +700,15 @@ func (db *Database) HistoryRange() (uint64, uint64, error) {
return historyRange(db.freezer)
}
// IndexProgress returns the indexing progress made so far. It provides the
// number of states that remain unindexed.
func (db *Database) IndexProgress() (uint64, error) {
if db.indexer == nil {
return 0, nil
}
return db.indexer.progress()
}
// AccountIterator creates a new account iterator for the specified root hash and
// seeks to a starting account hash.
func (db *Database) AccountIterator(root common.Hash, seek common.Hash) (AccountIterator, error) {

View file

@ -305,7 +305,12 @@ type indexIniter struct {
interrupt chan *interruptSignal
done chan struct{}
closed chan struct{}
wg sync.WaitGroup
// indexing progress
indexed atomic.Uint64 // the id of latest indexed state
last atomic.Uint64 // the id of the target state to be indexed
wg sync.WaitGroup
}
func newIndexIniter(disk ethdb.KeyValueStore, freezer ethdb.AncientStore, lastID uint64) *indexIniter {
@ -316,6 +321,14 @@ func newIndexIniter(disk ethdb.KeyValueStore, freezer ethdb.AncientStore, lastID
done: make(chan struct{}),
closed: make(chan struct{}),
}
// Load indexing progress
initer.last.Store(lastID)
metadata := loadIndexMetadata(disk)
if metadata != nil {
initer.indexed.Store(metadata.Last)
}
// Launch background indexer
initer.wg.Add(1)
go initer.run(lastID)
return initer
@ -342,6 +355,22 @@ func (i *indexIniter) inited() bool {
}
}
func (i *indexIniter) remain() uint64 {
select {
case <-i.closed:
return 0
case <-i.done:
return 0
default:
last, indexed := i.last.Load(), i.indexed.Load()
if last < indexed {
log.Error("Invalid state indexing range", "last", last, "indexed", indexed)
return 0
}
return last - indexed
}
}
func (i *indexIniter) run(lastID uint64) {
defer i.wg.Done()
@ -367,6 +396,8 @@ func (i *indexIniter) run(lastID uint64) {
signal.result <- fmt.Errorf("invalid history id, last: %d, got: %d", lastID, signal.newLastID)
continue
}
i.last.Store(signal.newLastID) // update indexing range
// The index limit is extended by one, update the limit without
// interrupting the current background process.
if signal.newLastID == lastID+1 {
@ -507,6 +538,8 @@ func (i *indexIniter) index(done chan struct{}, interrupt *atomic.Int32, lastID
log.Info("Indexing state history", "processed", done, "left", left, "elapsed", common.PrettyDuration(time.Since(start)), "eta", common.PrettyDuration(eta))
}
}
i.indexed.Store(current - 1) // update indexing progress
// Check interruption signal and abort process if it's fired
if interrupt != nil {
if signal := interrupt.Load(); signal != 0 {
@ -617,3 +650,14 @@ func (i *historyIndexer) shorten(historyID uint64) error {
return <-signal.result
}
}
// progress returns the indexing progress made so far. It provides the number
// of states that remain unindexed.
func (i *historyIndexer) progress() (uint64, error) {
select {
case <-i.initer.closed:
return 0, errors.New("indexer is closed")
default:
return i.initer.remain(), nil
}
}