diff --git a/core/blockchain_reader.go b/core/blockchain_reader.go index 046d96063d..0734baab35 100644 --- a/core/blockchain_reader.go +++ b/core/blockchain_reader.go @@ -426,6 +426,11 @@ func (bc *BlockChain) TxIndexProgress() (TxIndexProgress, error) { return bc.txIndexer.txIndexProgress(), nil } +// StateIndexProgress returns the historical state indexing progress. +func (bc *BlockChain) StateIndexProgress() (uint64, error) { + return bc.triedb.IndexProgress() +} + // HistoryPruningCutoff returns the configured history pruning point. // Blocks before this might not be available in the database. func (bc *BlockChain) HistoryPruningCutoff() (uint64, common.Hash) { diff --git a/eth/api_backend.go b/eth/api_backend.go index 99fd4c0aa0..bd90f611f1 100644 --- a/eth/api_backend.go +++ b/eth/api_backend.go @@ -403,6 +403,10 @@ func (b *EthAPIBackend) SyncProgress(ctx context.Context) ethereum.SyncProgress prog.TxIndexFinishedBlocks = txProg.Indexed prog.TxIndexRemainingBlocks = txProg.Remaining } + remain, err := b.eth.blockchain.StateIndexProgress() + if err == nil { + prog.StateIndexRemaining = remain + } return prog } diff --git a/eth/downloader/api.go b/eth/downloader/api.go index ac175672a0..c98f9a2c3f 100644 --- a/eth/downloader/api.go +++ b/eth/downloader/api.go @@ -81,6 +81,10 @@ func (api *DownloaderAPI) eventLoop() { prog.TxIndexFinishedBlocks = txProg.Indexed prog.TxIndexRemainingBlocks = txProg.Remaining } + remain, err := api.chain.StateIndexProgress() + if err == nil { + prog.StateIndexRemaining = remain + } return prog } ) diff --git a/ethclient/ethclient.go b/ethclient/ethclient.go index 9d0e0d5b52..1195929f7d 100644 --- a/ethclient/ethclient.go +++ b/ethclient/ethclient.go @@ -789,6 +789,7 @@ type rpcProgress struct { HealingBytecode hexutil.Uint64 TxIndexFinishedBlocks hexutil.Uint64 TxIndexRemainingBlocks hexutil.Uint64 + StateIndexRemaining hexutil.Uint64 } func (p *rpcProgress) toSyncProgress() *ethereum.SyncProgress { @@ -815,5 +816,6 @@ func (p *rpcProgress) toSyncProgress() *ethereum.SyncProgress { HealingBytecode: uint64(p.HealingBytecode), TxIndexFinishedBlocks: uint64(p.TxIndexFinishedBlocks), TxIndexRemainingBlocks: uint64(p.TxIndexRemainingBlocks), + StateIndexRemaining: uint64(p.StateIndexRemaining), } } diff --git a/graphql/graphql.go b/graphql/graphql.go index e23e6fcb0e..6738cd913b 100644 --- a/graphql/graphql.go +++ b/graphql/graphql.go @@ -1510,6 +1510,9 @@ func (s *SyncState) TxIndexFinishedBlocks() hexutil.Uint64 { func (s *SyncState) TxIndexRemainingBlocks() hexutil.Uint64 { return hexutil.Uint64(s.progress.TxIndexRemainingBlocks) } +func (s *SyncState) StateIndexRemaining() hexutil.Uint64 { + return hexutil.Uint64(s.progress.StateIndexRemaining) +} // Syncing returns false in case the node is currently not syncing with the network. It can be up-to-date or has not // yet received the latest block headers from its peers. In case it is synchronizing: diff --git a/interfaces.go b/interfaces.go index 54a215d6e7..be5b970851 100644 --- a/interfaces.go +++ b/interfaces.go @@ -124,6 +124,9 @@ type SyncProgress struct { // "transaction indexing" fields TxIndexFinishedBlocks uint64 // Number of blocks whose transactions are already indexed TxIndexRemainingBlocks uint64 // Number of blocks whose transactions are not indexed yet + + // "historical state indexing" fields + StateIndexRemaining uint64 // Number of states remain unindexed } // Done returns the indicator if the initial sync is finished or not. @@ -131,7 +134,7 @@ func (prog SyncProgress) Done() bool { if prog.CurrentBlock < prog.HighestBlock { return false } - return prog.TxIndexRemainingBlocks == 0 + return prog.TxIndexRemainingBlocks == 0 && prog.StateIndexRemaining == 0 } // ChainSyncReader wraps access to the node's current sync status. If there's no diff --git a/internal/ethapi/api.go b/internal/ethapi/api.go index db0297ae5a..8f9442acee 100644 --- a/internal/ethapi/api.go +++ b/internal/ethapi/api.go @@ -170,6 +170,7 @@ func (api *EthereumAPI) Syncing(ctx context.Context) (interface{}, error) { "healingBytecode": hexutil.Uint64(progress.HealingBytecode), "txIndexFinishedBlocks": hexutil.Uint64(progress.TxIndexFinishedBlocks), "txIndexRemainingBlocks": hexutil.Uint64(progress.TxIndexRemainingBlocks), + "stateIndexRemaining": hexutil.Uint64(progress.StateIndexRemaining), }, nil } diff --git a/internal/jsre/deps/web3.js b/internal/jsre/deps/web3.js index 0aa738a2af..0071d7eb7d 100644 --- a/internal/jsre/deps/web3.js +++ b/internal/jsre/deps/web3.js @@ -3977,6 +3977,7 @@ var outputSyncingFormatter = function(result) { result.healingBytecode = utils.toDecimal(result.healingBytecode); result.txIndexFinishedBlocks = utils.toDecimal(result.txIndexFinishedBlocks); result.txIndexRemainingBlocks = utils.toDecimal(result.txIndexRemainingBlocks); + result.stateIndexRemaining = utils.toDecimal(result.stateIndexRemaining) return result; }; diff --git a/triedb/database.go b/triedb/database.go index 12b8856d32..e2f4334d6e 100644 --- a/triedb/database.go +++ b/triedb/database.go @@ -356,6 +356,16 @@ func (db *Database) StorageIterator(root common.Hash, account common.Hash, seek return pdb.StorageIterator(root, account, seek) } +// IndexProgress returns the indexing progress made so far. It provides the +// number of states that remain unindexed. +func (db *Database) IndexProgress() (uint64, error) { + pdb, ok := db.backend.(*pathdb.Database) + if !ok { + return 0, errors.New("not supported") + } + return pdb.IndexProgress() +} + // IsVerkle returns the indicator if the database is holding a verkle tree. func (db *Database) IsVerkle() bool { return db.config.IsVerkle diff --git a/triedb/pathdb/database.go b/triedb/pathdb/database.go index 8932f3f7f8..7c8c327484 100644 --- a/triedb/pathdb/database.go +++ b/triedb/pathdb/database.go @@ -700,6 +700,15 @@ func (db *Database) HistoryRange() (uint64, uint64, error) { return historyRange(db.freezer) } +// IndexProgress returns the indexing progress made so far. It provides the +// number of states that remain unindexed. +func (db *Database) IndexProgress() (uint64, error) { + if db.indexer == nil { + return 0, nil + } + return db.indexer.progress() +} + // AccountIterator creates a new account iterator for the specified root hash and // seeks to a starting account hash. func (db *Database) AccountIterator(root common.Hash, seek common.Hash) (AccountIterator, error) { diff --git a/triedb/pathdb/history_indexer.go b/triedb/pathdb/history_indexer.go index 577eb86b78..6df74de61c 100644 --- a/triedb/pathdb/history_indexer.go +++ b/triedb/pathdb/history_indexer.go @@ -305,7 +305,12 @@ type indexIniter struct { interrupt chan *interruptSignal done chan struct{} closed chan struct{} - wg sync.WaitGroup + + // indexing progress + indexed atomic.Uint64 // the id of latest indexed state + last atomic.Uint64 // the id of the target state to be indexed + + wg sync.WaitGroup } func newIndexIniter(disk ethdb.KeyValueStore, freezer ethdb.AncientStore, lastID uint64) *indexIniter { @@ -316,6 +321,14 @@ func newIndexIniter(disk ethdb.KeyValueStore, freezer ethdb.AncientStore, lastID done: make(chan struct{}), closed: make(chan struct{}), } + // Load indexing progress + initer.last.Store(lastID) + metadata := loadIndexMetadata(disk) + if metadata != nil { + initer.indexed.Store(metadata.Last) + } + + // Launch background indexer initer.wg.Add(1) go initer.run(lastID) return initer @@ -342,6 +355,22 @@ func (i *indexIniter) inited() bool { } } +func (i *indexIniter) remain() uint64 { + select { + case <-i.closed: + return 0 + case <-i.done: + return 0 + default: + last, indexed := i.last.Load(), i.indexed.Load() + if last < indexed { + log.Error("Invalid state indexing range", "last", last, "indexed", indexed) + return 0 + } + return last - indexed + } +} + func (i *indexIniter) run(lastID uint64) { defer i.wg.Done() @@ -367,6 +396,8 @@ func (i *indexIniter) run(lastID uint64) { signal.result <- fmt.Errorf("invalid history id, last: %d, got: %d", lastID, signal.newLastID) continue } + i.last.Store(signal.newLastID) // update indexing range + // The index limit is extended by one, update the limit without // interrupting the current background process. if signal.newLastID == lastID+1 { @@ -507,6 +538,8 @@ func (i *indexIniter) index(done chan struct{}, interrupt *atomic.Int32, lastID log.Info("Indexing state history", "processed", done, "left", left, "elapsed", common.PrettyDuration(time.Since(start)), "eta", common.PrettyDuration(eta)) } } + i.indexed.Store(current - 1) // update indexing progress + // Check interruption signal and abort process if it's fired if interrupt != nil { if signal := interrupt.Load(); signal != 0 { @@ -617,3 +650,14 @@ func (i *historyIndexer) shorten(historyID uint64) error { return <-signal.result } } + +// progress returns the indexing progress made so far. It provides the number +// of states that remain unindexed. +func (i *historyIndexer) progress() (uint64, error) { + select { + case <-i.initer.closed: + return 0, errors.New("indexer is closed") + default: + return i.initer.remain(), nil + } +}