diff --git a/cmd/utils/flags.go b/cmd/utils/flags.go index 2412964a3a..ba98ec4169 100644 --- a/cmd/utils/flags.go +++ b/cmd/utils/flags.go @@ -1661,11 +1661,6 @@ func SetEthConfig(ctx *cli.Context, stack *node.Node, cfg *ethconfig.Config) { cfg.TransactionHistory = 0 log.Warn("Disabled transaction unindexing for archive node") } - - if cfg.StateScheme != rawdb.HashScheme { - cfg.StateScheme = rawdb.HashScheme - log.Warn("Forcing hash state-scheme for archive mode") - } } if ctx.IsSet(LogHistoryFlag.Name) { cfg.LogHistory = ctx.Uint64(LogHistoryFlag.Name) diff --git a/core/blockchain.go b/core/blockchain.go index 3bb681d7c3..d8b101f616 100644 --- a/core/blockchain.go +++ b/core/blockchain.go @@ -242,9 +242,10 @@ func (cfg *BlockChainConfig) triedbConfig(isVerkle bool) *triedb.Config { } if cfg.StateScheme == rawdb.PathScheme { config.PathDB = &pathdb.Config{ - StateHistory: cfg.StateHistory, - TrieCleanSize: cfg.TrieCleanLimit * 1024 * 1024, - StateCleanSize: cfg.SnapshotLimit * 1024 * 1024, + StateHistory: cfg.StateHistory, + EnableStateIndexing: cfg.ArchiveMode, + TrieCleanSize: cfg.TrieCleanLimit * 1024 * 1024, + StateCleanSize: cfg.SnapshotLimit * 1024 * 1024, // TODO(rjl493456442): The write buffer represents the memory limit used // for flushing both trie data and state data to disk. The config name diff --git a/core/rawdb/accessors_history.go b/core/rawdb/accessors_history.go new file mode 100644 index 0000000000..cf1073f387 --- /dev/null +++ b/core/rawdb/accessors_history.go @@ -0,0 +1,179 @@ +// Copyright 2025 The go-ethereum Authors +// This file is part of the go-ethereum library. +// +// The go-ethereum library is free software: you can redistribute it and/or modify +// it under the terms of the GNU Lesser General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// The go-ethereum library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the go-ethereum library. If not, see . + +package rawdb + +import ( + "bytes" + "errors" + + "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/ethdb" + "github.com/ethereum/go-ethereum/log" +) + +// ReadStateHistoryIndexMetadata retrieves the metadata of state history index. +func ReadStateHistoryIndexMetadata(db ethdb.KeyValueReader) []byte { + data, _ := db.Get(headStateHistoryIndexKey) + return data +} + +// WriteStateHistoryIndexMetadata stores the metadata of state history index +// into database. +func WriteStateHistoryIndexMetadata(db ethdb.KeyValueWriter, blob []byte) { + if err := db.Put(headStateHistoryIndexKey, blob); err != nil { + log.Crit("Failed to store the metadata of state history index", "err", err) + } +} + +// DeleteStateHistoryIndexMetadata removes the metadata of state history index. +func DeleteStateHistoryIndexMetadata(db ethdb.KeyValueWriter) { + if err := db.Delete(headStateHistoryIndexKey); err != nil { + log.Crit("Failed to delete the metadata of state history index", "err", err) + } +} + +// ReadAccountHistoryIndex retrieves the account history index with the provided +// account address. +func ReadAccountHistoryIndex(db ethdb.KeyValueReader, addressHash common.Hash) []byte { + data, err := db.Get(accountHistoryIndexKey(addressHash)) + if err != nil || len(data) == 0 { + return nil + } + return data +} + +// WriteAccountHistoryIndex writes the provided account history index into database. +func WriteAccountHistoryIndex(db ethdb.KeyValueWriter, addressHash common.Hash, data []byte) { + if err := db.Put(accountHistoryIndexKey(addressHash), data); err != nil { + log.Crit("Failed to store account history index", "err", err) + } +} + +// DeleteAccountHistoryIndex deletes the specified account history index from +// the database. +func DeleteAccountHistoryIndex(db ethdb.KeyValueWriter, addressHash common.Hash) { + if err := db.Delete(accountHistoryIndexKey(addressHash)); err != nil { + log.Crit("Failed to delete account history index", "err", err) + } +} + +// ReadStorageHistoryIndex retrieves the storage history index with the provided +// account address and storage key hash. +func ReadStorageHistoryIndex(db ethdb.KeyValueReader, addressHash common.Hash, storageHash common.Hash) []byte { + data, err := db.Get(storageHistoryIndexKey(addressHash, storageHash)) + if err != nil || len(data) == 0 { + return nil + } + return data +} + +// WriteStorageHistoryIndex writes the provided storage history index into database. +func WriteStorageHistoryIndex(db ethdb.KeyValueWriter, addressHash common.Hash, storageHash common.Hash, data []byte) { + if err := db.Put(storageHistoryIndexKey(addressHash, storageHash), data); err != nil { + log.Crit("Failed to store storage history index", "err", err) + } +} + +// DeleteStorageHistoryIndex deletes the specified state index from the database. +func DeleteStorageHistoryIndex(db ethdb.KeyValueWriter, addressHash common.Hash, storageHash common.Hash) { + if err := db.Delete(storageHistoryIndexKey(addressHash, storageHash)); err != nil { + log.Crit("Failed to delete storage history index", "err", err) + } +} + +// ReadAccountHistoryIndexBlock retrieves the index block with the provided +// account address along with the block id. +func ReadAccountHistoryIndexBlock(db ethdb.KeyValueReader, addressHash common.Hash, blockID uint32) []byte { + data, err := db.Get(accountHistoryIndexBlockKey(addressHash, blockID)) + if err != nil || len(data) == 0 { + return nil + } + return data +} + +// WriteAccountHistoryIndexBlock writes the provided index block into database. +func WriteAccountHistoryIndexBlock(db ethdb.KeyValueWriter, addressHash common.Hash, blockID uint32, data []byte) { + if err := db.Put(accountHistoryIndexBlockKey(addressHash, blockID), data); err != nil { + log.Crit("Failed to store account index block", "err", err) + } +} + +// DeleteAccountHistoryIndexBlock deletes the specified index block from the database. +func DeleteAccountHistoryIndexBlock(db ethdb.KeyValueWriter, addressHash common.Hash, blockID uint32) { + if err := db.Delete(accountHistoryIndexBlockKey(addressHash, blockID)); err != nil { + log.Crit("Failed to delete account index block", "err", err) + } +} + +// ReadStorageHistoryIndexBlock retrieves the index block with the provided state +// identifier along with the block id. +func ReadStorageHistoryIndexBlock(db ethdb.KeyValueReader, addressHash common.Hash, storageHash common.Hash, blockID uint32) []byte { + data, err := db.Get(storageHistoryIndexBlockKey(addressHash, storageHash, blockID)) + if err != nil || len(data) == 0 { + return nil + } + return data +} + +// WriteStorageHistoryIndexBlock writes the provided index block into database. +func WriteStorageHistoryIndexBlock(db ethdb.KeyValueWriter, addressHash common.Hash, storageHash common.Hash, id uint32, data []byte) { + if err := db.Put(storageHistoryIndexBlockKey(addressHash, storageHash, id), data); err != nil { + log.Crit("Failed to store storage index block", "err", err) + } +} + +// DeleteStorageHistoryIndexBlock deletes the specified index block from the database. +func DeleteStorageHistoryIndexBlock(db ethdb.KeyValueWriter, addressHash common.Hash, storageHash common.Hash, id uint32) { + if err := db.Delete(storageHistoryIndexBlockKey(addressHash, storageHash, id)); err != nil { + log.Crit("Failed to delete storage index block", "err", err) + } +} + +// increaseKey increase the input key by one bit. Return nil if the entire +// addition operation overflows. +func increaseKey(key []byte) []byte { + for i := len(key) - 1; i >= 0; i-- { + key[i]++ + if key[i] != 0x0 { + return key + } + } + return nil +} + +// DeleteStateHistoryIndex completely removes all history indexing data, including +// indexes for accounts and storages. +// +// Note, this method assumes the storage space with prefix `StateHistoryIndexPrefix` +// is exclusively occupied by the history indexing data! +func DeleteStateHistoryIndex(db ethdb.KeyValueRangeDeleter) { + start := StateHistoryIndexPrefix + limit := increaseKey(bytes.Clone(StateHistoryIndexPrefix)) + + // Try to remove the data in the range by a loop, as the leveldb + // doesn't support the native range deletion. + for { + err := db.DeleteRange(start, limit) + if err == nil { + return + } + if errors.Is(err, ethdb.ErrTooManyKeys) { + continue + } + log.Crit("Failed to delete history index range", "err", err) + } +} diff --git a/core/rawdb/accessors_state.go b/core/rawdb/accessors_state.go index 41e15debe9..7d7b37641b 100644 --- a/core/rawdb/accessors_state.go +++ b/core/rawdb/accessors_state.go @@ -18,6 +18,7 @@ package rawdb import ( "encoding/binary" + "errors" "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/ethdb" @@ -255,6 +256,36 @@ func ReadStateHistory(db ethdb.AncientReaderOp, id uint64) ([]byte, []byte, []by return meta, accountIndex, storageIndex, accountData, storageData, nil } +// ReadStateHistoryList retrieves a list of state histories from database with +// specific range. Compute the position of state history in freezer by minus one +// since the id of first state history starts from one(zero for initial state). +func ReadStateHistoryList(db ethdb.AncientReaderOp, start uint64, count uint64) ([][]byte, [][]byte, [][]byte, [][]byte, [][]byte, error) { + metaList, err := db.AncientRange(stateHistoryMeta, start-1, count, 0) + if err != nil { + return nil, nil, nil, nil, nil, err + } + aIndexList, err := db.AncientRange(stateHistoryAccountIndex, start-1, count, 0) + if err != nil { + return nil, nil, nil, nil, nil, err + } + sIndexList, err := db.AncientRange(stateHistoryStorageIndex, start-1, count, 0) + if err != nil { + return nil, nil, nil, nil, nil, err + } + aDataList, err := db.AncientRange(stateHistoryAccountData, start-1, count, 0) + if err != nil { + return nil, nil, nil, nil, nil, err + } + sDataList, err := db.AncientRange(stateHistoryStorageData, start-1, count, 0) + if err != nil { + return nil, nil, nil, nil, nil, err + } + if len(metaList) != len(aIndexList) || len(metaList) != len(sIndexList) || len(metaList) != len(aDataList) || len(metaList) != len(sDataList) { + return nil, nil, nil, nil, nil, errors.New("state history is corrupted") + } + return metaList, aIndexList, sIndexList, aDataList, sDataList, nil +} + // WriteStateHistory writes the provided state history to database. Compute the // position of state history in freezer by minus one since the id of first state // history starts from one(zero for initial state). diff --git a/core/rawdb/database.go b/core/rawdb/database.go index 8854336327..9681c39c58 100644 --- a/core/rawdb/database.go +++ b/core/rawdb/database.go @@ -412,6 +412,9 @@ func InspectDatabase(db ethdb.Database, keyPrefix, keyStart []byte) error { filterMapLastBlock stat filterMapBlockLV stat + // Path-mode archive data + stateIndex stat + // Verkle statistics verkleTries stat verkleStateLookups stat @@ -489,6 +492,10 @@ func InspectDatabase(db ethdb.Database, keyPrefix, keyStart []byte) error { case bytes.HasPrefix(key, bloomBitsMetaPrefix) && len(key) < len(bloomBitsMetaPrefix)+8: bloomBits.Add(size) + // Path-based historic state indexes + case bytes.HasPrefix(key, StateHistoryIndexPrefix) && len(key) >= len(StateHistoryIndexPrefix)+common.HashLength: + stateIndex.Add(size) + // Verkle trie data is detected, determine the sub-category case bytes.HasPrefix(key, VerklePrefix): remain := key[len(VerklePrefix):] @@ -544,6 +551,7 @@ func InspectDatabase(db ethdb.Database, keyPrefix, keyStart []byte) error { {"Key-Value store", "Path trie state lookups", stateLookups.Size(), stateLookups.Count()}, {"Key-Value store", "Path trie account nodes", accountTries.Size(), accountTries.Count()}, {"Key-Value store", "Path trie storage nodes", storageTries.Size(), storageTries.Count()}, + {"Key-Value store", "Path state history indexes", stateIndex.Size(), stateIndex.Count()}, {"Key-Value store", "Verkle trie nodes", verkleTries.Size(), verkleTries.Count()}, {"Key-Value store", "Verkle trie state lookups", verkleStateLookups.Size(), verkleStateLookups.Count()}, {"Key-Value store", "Trie preimages", preimages.Size(), preimages.Count()}, @@ -591,7 +599,7 @@ var knownMetadataKeys = [][]byte{ snapshotGeneratorKey, snapshotRecoveryKey, txIndexTailKey, fastTxLookupLimitKey, uncleanShutdownKey, badBlockKey, transitionStatusKey, skeletonSyncStatusKey, persistentStateIDKey, trieJournalKey, snapshotSyncStatusKey, snapSyncStatusFlagKey, - filterMapsRangeKey, + filterMapsRangeKey, headStateHistoryIndexKey, } // printChainMetadata prints out chain metadata to stderr. diff --git a/core/rawdb/schema.go b/core/rawdb/schema.go index fa125cecc0..eec25a8042 100644 --- a/core/rawdb/schema.go +++ b/core/rawdb/schema.go @@ -76,6 +76,10 @@ var ( // trieJournalKey tracks the in-memory trie node layers across restarts. trieJournalKey = []byte("TrieJournal") + // headStateHistoryIndexKey tracks the ID of the latest state history that has + // been indexed. + headStateHistoryIndexKey = []byte("LastStateHistoryIndex") + // txIndexTailKey tracks the oldest block whose transactions have been indexed. txIndexTailKey = []byte("TransactionIndexTail") @@ -117,6 +121,13 @@ var ( TrieNodeStoragePrefix = []byte("O") // TrieNodeStoragePrefix + accountHash + hexPath -> trie node stateIDPrefix = []byte("L") // stateIDPrefix + state root -> state id + // State history indexing within path-based storage scheme + StateHistoryIndexPrefix = []byte("m") // The global prefix of state history index data + StateHistoryAccountMetadataPrefix = []byte("ma") // StateHistoryAccountMetadataPrefix + account address hash => account metadata + StateHistoryStorageMetadataPrefix = []byte("ms") // StateHistoryStorageMetadataPrefix + account address hash + storage slot hash => slot metadata + StateHistoryAccountBlockPrefix = []byte("mba") // StateHistoryAccountBlockPrefix + account address hash + block_number => account block + StateHistoryStorageBlockPrefix = []byte("mbs") // StateHistoryStorageBlockPrefix + account address hash + storage slot hash + block_number => slot block + // VerklePrefix is the database prefix for Verkle trie data, which includes: // (a) Trie nodes // (b) In-memory trie node journal @@ -362,3 +373,27 @@ func filterMapBlockLVKey(number uint64) []byte { binary.BigEndian.PutUint64(key[l:], number) return key } + +// accountHistoryIndexKey = StateHistoryAccountMetadataPrefix + addressHash +func accountHistoryIndexKey(addressHash common.Hash) []byte { + return append(StateHistoryAccountMetadataPrefix, addressHash.Bytes()...) +} + +// storageHistoryIndexKey = StateHistoryStorageMetadataPrefix + addressHash + storageHash +func storageHistoryIndexKey(addressHash common.Hash, storageHash common.Hash) []byte { + return append(append(StateHistoryStorageMetadataPrefix, addressHash.Bytes()...), storageHash.Bytes()...) +} + +// accountHistoryIndexBlockKey = StateHistoryAccountBlockPrefix + addressHash + blockID +func accountHistoryIndexBlockKey(addressHash common.Hash, blockID uint32) []byte { + var buf [4]byte + binary.BigEndian.PutUint32(buf[:], blockID) + return append(append(StateHistoryAccountBlockPrefix, addressHash.Bytes()...), buf[:]...) +} + +// storageHistoryIndexBlockKey = StateHistoryStorageBlockPrefix + addressHash + storageHash + blockID +func storageHistoryIndexBlockKey(addressHash common.Hash, storageHash common.Hash, blockID uint32) []byte { + var buf [4]byte + binary.BigEndian.PutUint32(buf[:], blockID) + return append(append(append(StateHistoryStorageBlockPrefix, addressHash.Bytes()...), storageHash.Bytes()...), buf[:]...) +} diff --git a/ethdb/pebble/pebble.go b/ethdb/pebble/pebble.go index ac9eb9bdba..58a521f6fb 100644 --- a/ethdb/pebble/pebble.go +++ b/ethdb/pebble/pebble.go @@ -83,6 +83,7 @@ type Database struct { estimatedCompDebtGauge *metrics.Gauge // Gauge for tracking the number of bytes that need to be compacted liveCompGauge *metrics.Gauge // Gauge for tracking the number of in-progress compactions liveCompSizeGauge *metrics.Gauge // Gauge for tracking the size of in-progress compactions + liveIterGauge *metrics.Gauge // Gauge for tracking the number of live database iterators levelsGauge []*metrics.Gauge // Gauge for tracking the number of tables in levels quitLock sync.RWMutex // Mutex protecting the quit channel and the closed flag @@ -326,6 +327,7 @@ func New(file string, cache int, handles int, namespace string, readonly bool) ( db.estimatedCompDebtGauge = metrics.GetOrRegisterGauge(namespace+"compact/estimateDebt", nil) db.liveCompGauge = metrics.GetOrRegisterGauge(namespace+"compact/live/count", nil) db.liveCompSizeGauge = metrics.GetOrRegisterGauge(namespace+"compact/live/size", nil) + db.liveIterGauge = metrics.GetOrRegisterGauge(namespace+"iter/count", nil) // Start up the metrics gathering and return go db.meter(metricsGatheringInterval, namespace) @@ -582,6 +584,7 @@ func (d *Database) meter(refresh time.Duration, namespace string) { d.seekCompGauge.Update(stats.Compact.ReadCount) d.liveCompGauge.Update(stats.Compact.NumInProgress) d.liveCompSizeGauge.Update(stats.Compact.InProgressBytes) + d.liveIterGauge.Update(stats.TableIters) d.liveMemTablesGauge.Update(stats.MemTable.Count) d.zombieMemTablesGauge.Update(stats.MemTable.ZombieCount) diff --git a/triedb/pathdb/database.go b/triedb/pathdb/database.go index 9795d81e63..8932f3f7f8 100644 --- a/triedb/pathdb/database.go +++ b/triedb/pathdb/database.go @@ -114,11 +114,12 @@ type layer interface { // Config contains the settings for database. type Config struct { - StateHistory uint64 // Number of recent blocks to maintain state history for - TrieCleanSize int // Maximum memory allowance (in bytes) for caching clean trie nodes - StateCleanSize int // Maximum memory allowance (in bytes) for caching clean state data - WriteBufferSize int // Maximum memory allowance (in bytes) for write buffer - ReadOnly bool // Flag whether the database is opened in read only mode + StateHistory uint64 // Number of recent blocks to maintain state history for + EnableStateIndexing bool // Whether to enable state history indexing for external state access + TrieCleanSize int // Maximum memory allowance (in bytes) for caching clean trie nodes + StateCleanSize int // Maximum memory allowance (in bytes) for caching clean state data + WriteBufferSize int // Maximum memory allowance (in bytes) for write buffer + ReadOnly bool // Flag whether the database is opened in read only mode // Testing configurations SnapshotNoBuild bool // Flag Whether the state generation is allowed @@ -149,7 +150,12 @@ func (c *Config) fields() []interface{} { list = append(list, "triecache", common.StorageSize(c.TrieCleanSize)) list = append(list, "statecache", common.StorageSize(c.StateCleanSize)) list = append(list, "buffer", common.StorageSize(c.WriteBufferSize)) - list = append(list, "history", c.StateHistory) + + if c.StateHistory == 0 { + list = append(list, "history", "entire chain") + } else { + list = append(list, "history", fmt.Sprintf("last %d blocks", c.StateHistory)) + } return list } @@ -213,6 +219,7 @@ type Database struct { tree *layerTree // The group for all known layers freezer ethdb.ResettableAncientStore // Freezer for storing trie histories, nil possible in tests lock sync.RWMutex // Lock to prevent mutations from happening at the same time + indexer *historyIndexer // History indexer } // New attempts to load an already existing layer from a persistent key-value @@ -262,6 +269,11 @@ func New(diskdb ethdb.Database, config *Config, isVerkle bool) *Database { if err := db.setStateGenerator(); err != nil { log.Crit("Failed to setup the generator", "err", err) } + // TODO (rjl493456442) disable the background indexing in read-only mode + if db.freezer != nil && db.config.EnableStateIndexing { + db.indexer = newHistoryIndexer(db.diskdb, db.freezer, db.tree.bottom().stateID()) + log.Info("Enabled state history indexing") + } fields := config.fields() if db.isVerkle { fields = append(fields, "verkle", true) @@ -299,6 +311,11 @@ func (db *Database) repairHistory() error { log.Crit("Failed to retrieve head of state history", "err", err) } if frozen != 0 { + // TODO(rjl493456442) would be better to group them into a batch. + // + // Purge all state history indexing data first + rawdb.DeleteStateHistoryIndexMetadata(db.diskdb) + rawdb.DeleteStateHistoryIndex(db.diskdb) err := db.freezer.Reset() if err != nil { log.Crit("Failed to reset state histories", "err", err) @@ -487,6 +504,11 @@ func (db *Database) Enable(root common.Hash) error { // mappings can be huge and might take a while to clear // them, just leave them in disk and wait for overwriting. if db.freezer != nil { + // TODO(rjl493456442) would be better to group them into a batch. + // + // Purge all state history indexing data first + rawdb.DeleteStateHistoryIndexMetadata(db.diskdb) + rawdb.DeleteStateHistoryIndex(db.diskdb) if err := db.freezer.Reset(); err != nil { return err } @@ -611,6 +633,10 @@ func (db *Database) Close() error { } dl.resetCache() // release the memory held by clean cache + // Terminate the background state history indexer + if db.indexer != nil { + db.indexer.close() + } // Close the attached state history freezer. if db.freezer == nil { return nil diff --git a/triedb/pathdb/database_test.go b/triedb/pathdb/database_test.go index 6fc099015c..2982202009 100644 --- a/triedb/pathdb/database_test.go +++ b/triedb/pathdb/database_test.go @@ -121,15 +121,16 @@ type tester struct { snapStorages map[common.Hash]map[common.Hash]map[common.Hash][]byte // Keyed by the hash of account address and the hash of storage key } -func newTester(t *testing.T, historyLimit uint64, isVerkle bool, layers int) *tester { +func newTester(t *testing.T, historyLimit uint64, isVerkle bool, layers int, enableIndex bool) *tester { var ( disk, _ = rawdb.Open(rawdb.NewMemoryDatabase(), rawdb.OpenOptions{Ancient: t.TempDir()}) db = New(disk, &Config{ - StateHistory: historyLimit, - TrieCleanSize: 256 * 1024, - StateCleanSize: 256 * 1024, - WriteBufferSize: 256 * 1024, - NoAsyncFlush: true, + StateHistory: historyLimit, + EnableStateIndexing: enableIndex, + TrieCleanSize: 256 * 1024, + StateCleanSize: 256 * 1024, + WriteBufferSize: 256 * 1024, + NoAsyncFlush: true, }, isVerkle) obj = &tester{ @@ -164,6 +165,20 @@ func (t *tester) hashPreimage(hash common.Hash) common.Hash { return common.BytesToHash(t.preimages[hash]) } +func (t *tester) extend(layers int) { + for i := 0; i < layers; i++ { + var parent = types.EmptyRootHash + if len(t.roots) != 0 { + parent = t.roots[len(t.roots)-1] + } + root, nodes, states := t.generate(parent, true) + if err := t.db.Update(root, parent, uint64(i), nodes, states); err != nil { + panic(fmt.Errorf("failed to update state changes, err: %w", err)) + } + t.roots = append(t.roots, root) + } +} + func (t *tester) release() { t.db.Close() t.db.diskdb.Close() @@ -451,7 +466,7 @@ func TestDatabaseRollback(t *testing.T) { }() // Verify state histories - tester := newTester(t, 0, false, 32) + tester := newTester(t, 0, false, 32, false) defer tester.release() if err := tester.verifyHistory(); err != nil { @@ -485,7 +500,7 @@ func TestDatabaseRecoverable(t *testing.T) { }() var ( - tester = newTester(t, 0, false, 12) + tester = newTester(t, 0, false, 12, false) index = tester.bottomIndex() ) defer tester.release() @@ -529,7 +544,7 @@ func TestDisable(t *testing.T) { maxDiffLayers = 128 }() - tester := newTester(t, 0, false, 32) + tester := newTester(t, 0, false, 32, false) defer tester.release() stored := crypto.Keccak256Hash(rawdb.ReadAccountTrieNode(tester.db.diskdb, nil)) @@ -571,7 +586,7 @@ func TestCommit(t *testing.T) { maxDiffLayers = 128 }() - tester := newTester(t, 0, false, 12) + tester := newTester(t, 0, false, 12, false) defer tester.release() if err := tester.db.Commit(tester.lastHash(), false); err != nil { @@ -601,7 +616,7 @@ func TestJournal(t *testing.T) { maxDiffLayers = 128 }() - tester := newTester(t, 0, false, 12) + tester := newTester(t, 0, false, 12, false) defer tester.release() if err := tester.db.Journal(tester.lastHash()); err != nil { @@ -631,7 +646,7 @@ func TestCorruptedJournal(t *testing.T) { maxDiffLayers = 128 }() - tester := newTester(t, 0, false, 12) + tester := newTester(t, 0, false, 12, false) defer tester.release() if err := tester.db.Journal(tester.lastHash()); err != nil { @@ -679,7 +694,7 @@ func TestTailTruncateHistory(t *testing.T) { maxDiffLayers = 128 }() - tester := newTester(t, 10, false, 12) + tester := newTester(t, 10, false, 12, false) defer tester.release() tester.db.Close() diff --git a/triedb/pathdb/disklayer.go b/triedb/pathdb/disklayer.go index 494164987f..06f0a7285f 100644 --- a/triedb/pathdb/disklayer.go +++ b/triedb/pathdb/disklayer.go @@ -355,6 +355,12 @@ func (dl *diskLayer) commit(bottom *diffLayer, force bool) (*diskLayer, error) { overflow = true oldest = bottom.stateID() - limit + 1 // track the id of history **after truncation** } + // Notify the state history indexer for newly created history + if dl.db.indexer != nil { + if err := dl.db.indexer.extend(bottom.stateID()); err != nil { + return nil, err + } + } } // Mark the diskLayer as stale before applying any mutations on top. dl.stale = true @@ -477,6 +483,12 @@ func (dl *diskLayer) revert(h *history) (*diskLayer, error) { dl.stale = true + // Unindex the corresponding state history + if dl.db.indexer != nil { + if err := dl.db.indexer.shorten(dl.id); err != nil { + return nil, err + } + } // State change may be applied to node buffer, or the persistent // state, depends on if node buffer is empty or not. If the node // buffer is not empty, it means that the state transition that diff --git a/triedb/pathdb/history.go b/triedb/pathdb/history.go index 63c9152bf7..47f224170d 100644 --- a/triedb/pathdb/history.go +++ b/triedb/pathdb/history.go @@ -505,25 +505,41 @@ func (h *history) decode(accountData, storageData, accountIndexes, storageIndexe // readHistory reads and decodes the state history object by the given id. func readHistory(reader ethdb.AncientReader, id uint64) (*history, error) { - blob := rawdb.ReadStateHistoryMeta(reader, id) - if len(blob) == 0 { - return nil, fmt.Errorf("state history not found %d", id) + mData, accountIndexes, storageIndexes, accountData, storageData, err := rawdb.ReadStateHistory(reader, id) + if err != nil { + return nil, err } var m meta - if err := m.decode(blob); err != nil { + if err := m.decode(mData); err != nil { return nil, err } - var ( - dec = history{meta: &m} - accountData = rawdb.ReadStateAccountHistory(reader, id) - storageData = rawdb.ReadStateStorageHistory(reader, id) - accountIndexes = rawdb.ReadStateAccountIndex(reader, id) - storageIndexes = rawdb.ReadStateStorageIndex(reader, id) - ) - if err := dec.decode(accountData, storageData, accountIndexes, storageIndexes); err != nil { + h := history{meta: &m} + if err := h.decode(accountData, storageData, accountIndexes, storageIndexes); err != nil { return nil, err } - return &dec, nil + return &h, nil +} + +// readHistories reads and decodes a list of state histories with the specific +// history range. +func readHistories(freezer ethdb.AncientReader, start uint64, count uint64) ([]*history, error) { + var histories []*history + metaList, aIndexList, sIndexList, aDataList, sDataList, err := rawdb.ReadStateHistoryList(freezer, start, count) + if err != nil { + return nil, err + } + for i := 0; i < len(metaList); i++ { + var m meta + if err := m.decode(metaList[i]); err != nil { + return nil, err + } + h := history{meta: &m} + if err := h.decode(aDataList[i], sDataList[i], aIndexList[i], sIndexList[i]); err != nil { + return nil, err + } + histories = append(histories, &h) + } + return histories, nil } // writeHistory persists the state history with the provided state set. diff --git a/triedb/pathdb/history_index.go b/triedb/pathdb/history_index.go new file mode 100644 index 0000000000..f79581b38b --- /dev/null +++ b/triedb/pathdb/history_index.go @@ -0,0 +1,436 @@ +// Copyright 2025 The go-ethereum Authors +// This file is part of the go-ethereum library. +// +// The go-ethereum library is free software: you can redistribute it and/or modify +// it under the terms of the GNU Lesser General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// The go-ethereum library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the go-ethereum library. If not, see = indexBlockEntriesCap +} + +// encode packs index block descriptor into byte stream. +func (d *indexBlockDesc) encode() []byte { + var buf [indexBlockDescSize]byte + binary.BigEndian.PutUint64(buf[0:8], d.max) + binary.BigEndian.PutUint16(buf[8:10], d.entries) + binary.BigEndian.PutUint32(buf[10:14], d.id) + return buf[:] +} + +// decode unpacks index block descriptor from byte stream. +func (d *indexBlockDesc) decode(blob []byte) { + d.max = binary.BigEndian.Uint64(blob[:8]) + d.entries = binary.BigEndian.Uint16(blob[8:10]) + d.id = binary.BigEndian.Uint32(blob[10:14]) +} + +// parseIndexBlock parses the index block with the supplied byte stream. +// The index block format can be illustrated as below: +// +// +---->+------------------+ +// | | Chunk1 | +// | +------------------+ +// | | ...... | +// | +-->+------------------+ +// | | | ChunkN | +// | | +------------------+ +// +-|---| Restart1 | +// | | Restart... | 2N bytes +// +---| RestartN | +// +------------------+ +// | Restart count | 1 byte +// +------------------+ +// +// - Chunk list: A list of data chunks +// - Restart list: A list of 2-byte pointers, each pointing to the start position of a chunk +// - Restart count: The number of restarts in the block, stored at the end of the block (1 byte) +// +// Note: the pointer is encoded as a uint16, which is sufficient within a chunk. +// A uint16 can cover offsets in the range [0, 65536), which is more than enough +// to store 4096 integers. +// +// Each chunk begins with the full value of the first integer, followed by +// subsequent integers representing the differences between the current value +// and the preceding one. Integers are encoded with variable-size for best +// storage efficiency. Each chunk can be illustrated as below. +// +// Restart ---> +----------------+ +// | Full integer | +// +----------------+ +// | Diff with prev | +// +----------------+ +// | ... | +// +----------------+ +// | Diff with prev | +// +----------------+ +// +// Empty index block is regarded as invalid. +func parseIndexBlock(blob []byte) ([]uint16, []byte, error) { + if len(blob) < 1 { + return nil, nil, fmt.Errorf("corrupted index block, len: %d", len(blob)) + } + restartLen := blob[len(blob)-1] + if restartLen == 0 { + return nil, nil, errors.New("corrupted index block, no restart") + } + tailLen := int(restartLen)*2 + 1 + if len(blob) < tailLen { + return nil, nil, fmt.Errorf("truncated restarts, size: %d, restarts: %d", len(blob), restartLen) + } + restarts := make([]uint16, 0, restartLen) + for i := int(restartLen); i > 0; i-- { + restart := binary.BigEndian.Uint16(blob[len(blob)-1-2*i:]) + restarts = append(restarts, restart) + } + // Validate that restart points are strictly ordered and within the valid + // data range. + var prev uint16 + for i := 0; i < len(restarts); i++ { + if i != 0 { + if restarts[i] <= prev { + return nil, nil, fmt.Errorf("restart out of order, prev: %d, next: %d", prev, restarts[i]) + } + } + if int(restarts[i]) >= len(blob)-tailLen { + return nil, nil, fmt.Errorf("invalid restart position, restart: %d, size: %d", restarts[i], len(blob)-tailLen) + } + prev = restarts[i] + } + return restarts, blob[:len(blob)-tailLen], nil +} + +// blockReader is the reader to access the element within a block. +type blockReader struct { + restarts []uint16 + data []byte +} + +// newBlockReader constructs the block reader with the supplied block data. +func newBlockReader(blob []byte) (*blockReader, error) { + restarts, data, err := parseIndexBlock(blob) + if err != nil { + return nil, err + } + return &blockReader{ + restarts: restarts, + data: data, // safe to own the slice + }, nil +} + +// readGreaterThan locates the first element in the block that is greater than +// the specified value. If no such element is found, MaxUint64 is returned. +func (br *blockReader) readGreaterThan(id uint64) (uint64, error) { + var err error + index := sort.Search(len(br.restarts), func(i int) bool { + item, n := binary.Uvarint(br.data[br.restarts[i]:]) + if n <= 0 { + err = fmt.Errorf("failed to decode item at restart %d", br.restarts[i]) + } + return item > id + }) + if err != nil { + return 0, err + } + if index == 0 { + item, _ := binary.Uvarint(br.data[br.restarts[0]:]) + return item, nil + } + var ( + start int + limit int + result uint64 + ) + if index == len(br.restarts) { + // The element being searched falls within the last restart section, + // there is no guarantee such element can be found. + start = int(br.restarts[len(br.restarts)-1]) + limit = len(br.data) + } else { + // The element being searched falls within the non-last restart section, + // such element can be found for sure. + start = int(br.restarts[index-1]) + limit = int(br.restarts[index]) + } + pos := start + for pos < limit { + x, n := binary.Uvarint(br.data[pos:]) + if pos == start { + result = x + } else { + result += x + } + if result > id { + return result, nil + } + pos += n + } + // The element which is greater than specified id is not found. + if index == len(br.restarts) { + return math.MaxUint64, nil + } + // The element which is the first one greater than the specified id + // is exactly the one located at the restart point. + item, _ := binary.Uvarint(br.data[br.restarts[index]:]) + return item, nil +} + +type blockWriter struct { + desc *indexBlockDesc // Descriptor of the block + restarts []uint16 // Offsets into the data slice, marking the start of each section + scratch []byte // Buffer used for encoding full integers or value differences + data []byte // Aggregated encoded data slice +} + +func newBlockWriter(blob []byte, desc *indexBlockDesc) (*blockWriter, error) { + scratch := make([]byte, binary.MaxVarintLen64) + if len(blob) == 0 { + return &blockWriter{ + desc: desc, + scratch: scratch, + data: make([]byte, 0, 1024), + }, nil + } + restarts, data, err := parseIndexBlock(blob) + if err != nil { + return nil, err + } + return &blockWriter{ + desc: desc, + restarts: restarts, + scratch: scratch, + data: data, // safe to own the slice + }, nil +} + +// append adds a new element to the block. The new element must be greater than +// the previous one. The provided ID is assumed to always be greater than 0. +func (b *blockWriter) append(id uint64) error { + if id == 0 { + return errors.New("invalid zero id") + } + if id <= b.desc.max { + return fmt.Errorf("append element out of order, last: %d, this: %d", b.desc.max, id) + } + // Rotate the current restart section if it's full + if b.desc.entries%indexBlockRestartLen == 0 { + // Save the offset within the data slice as the restart point + // for the next section. + b.restarts = append(b.restarts, uint16(len(b.data))) + + // The restart point item can either be encoded in variable + // size or fixed size. Although variable-size encoding is + // slightly slower (2ns per operation), it is still relatively + // fast, therefore, it's picked for better space efficiency. + // + // The first element in a restart range is encoded using its + // full value. + n := binary.PutUvarint(b.scratch[0:], id) + b.data = append(b.data, b.scratch[:n]...) + } else { + // The current section is not full, append the element. + // The element which is not the first one in the section + // is encoded using the value difference from the preceding + // element. + n := binary.PutUvarint(b.scratch[0:], id-b.desc.max) + b.data = append(b.data, b.scratch[:n]...) + } + b.desc.entries++ + + // The state history ID must be greater than 0. + //if b.desc.min == 0 { + // b.desc.min = id + //} + b.desc.max = id + return nil +} + +// scanSection traverses the specified section and terminates if fn returns true. +func (b *blockWriter) scanSection(section int, fn func(uint64, int) bool) { + var ( + value uint64 + start = int(b.restarts[section]) + pos = start + limit int + ) + if section == len(b.restarts)-1 { + limit = len(b.data) + } else { + limit = int(b.restarts[section+1]) + } + for pos < limit { + x, n := binary.Uvarint(b.data[pos:]) + if pos == start { + value = x + } else { + value += x + } + if fn(value, pos) { + return + } + pos += n + } +} + +// sectionLast returns the last element in the specified section. +func (b *blockWriter) sectionLast(section int) uint64 { + var n uint64 + b.scanSection(section, func(v uint64, _ int) bool { + n = v + return false + }) + return n +} + +// sectionSearch looks up the specified value in the given section, +// the position and the preceding value will be returned if found. +func (b *blockWriter) sectionSearch(section int, n uint64) (found bool, prev uint64, pos int) { + b.scanSection(section, func(v uint64, p int) bool { + if n == v { + pos = p + found = true + return true // terminate iteration + } + prev = v + return false // continue iteration + }) + return found, prev, pos +} + +// pop removes the last element from the block. The assumption is held that block +// writer must be non-empty. +func (b *blockWriter) pop(id uint64) error { + if id == 0 { + return errors.New("invalid zero id") + } + if id != b.desc.max { + return fmt.Errorf("pop element out of order, last: %d, this: %d", b.desc.max, id) + } + // If there is only one entry left, the entire block should be reset + if b.desc.entries == 1 { + //b.desc.min = 0 + b.desc.max = 0 + b.desc.entries = 0 + b.restarts = nil + b.data = b.data[:0] + return nil + } + // Pop the last restart section if the section becomes empty after removing + // one element. + if b.desc.entries%indexBlockRestartLen == 1 { + b.data = b.data[:b.restarts[len(b.restarts)-1]] + b.restarts = b.restarts[:len(b.restarts)-1] + b.desc.max = b.sectionLast(len(b.restarts) - 1) + b.desc.entries -= 1 + return nil + } + // Look up the element preceding the one to be popped, in order to update + // the maximum element in the block. + found, prev, pos := b.sectionSearch(len(b.restarts)-1, id) + if !found { + return fmt.Errorf("pop element is not found, last: %d, this: %d", b.desc.max, id) + } + b.desc.max = prev + b.data = b.data[:pos] + b.desc.entries -= 1 + return nil +} + +func (b *blockWriter) empty() bool { + return b.desc.empty() +} + +func (b *blockWriter) full() bool { + return b.desc.full() +} + +// finish finalizes the index block encoding by appending the encoded restart points +// and the restart counter to the end of the block. +// +// This function is safe to be called multiple times. +func (b *blockWriter) finish() []byte { + var buf []byte + for _, number := range b.restarts { + binary.BigEndian.PutUint16(b.scratch[:2], number) + buf = append(buf, b.scratch[:2]...) + } + buf = append(buf, byte(len(b.restarts))) + return append(b.data, buf...) +} diff --git a/triedb/pathdb/history_index_block_test.go b/triedb/pathdb/history_index_block_test.go new file mode 100644 index 0000000000..173387b447 --- /dev/null +++ b/triedb/pathdb/history_index_block_test.go @@ -0,0 +1,216 @@ +// Copyright 2025 The go-ethereum Authors +// This file is part of the go-ethereum library. +// +// The go-ethereum library is free software: you can redistribute it and/or modify +// it under the terms of the GNU Lesser General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// The go-ethereum library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the go-ethereum library. If not, see value + }) + got, err := br.readGreaterThan(value) + if err != nil { + t.Fatalf("Unexpected error, got %v", err) + } + if pos == len(elements) { + if got != math.MaxUint64 { + t.Fatalf("Unexpected result, got %d, wanted math.MaxUint64", got) + } + } else if got != elements[pos] { + t.Fatalf("Unexpected result, got %d, wanted %d", got, elements[pos]) + } + } +} + +func TestBlockWriterBasic(t *testing.T) { + bw, _ := newBlockWriter(nil, newIndexBlockDesc(0)) + if !bw.empty() { + t.Fatal("expected empty block") + } + bw.append(2) + if err := bw.append(1); err == nil { + t.Fatal("out-of-order insertion is not expected") + } + for i := 0; i < 10; i++ { + bw.append(uint64(i + 3)) + } + + bw, err := newBlockWriter(bw.finish(), newIndexBlockDesc(0)) + if err != nil { + t.Fatalf("Failed to construct the block writer, %v", err) + } + for i := 0; i < 10; i++ { + if err := bw.append(uint64(i + 100)); err != nil { + t.Fatalf("Failed to append value %d: %v", i, err) + } + } + bw.finish() +} + +func TestBlockWriterDelete(t *testing.T) { + bw, _ := newBlockWriter(nil, newIndexBlockDesc(0)) + for i := 0; i < 10; i++ { + bw.append(uint64(i + 1)) + } + // Pop unknown id, the request should be rejected + if err := bw.pop(100); err == nil { + t.Fatal("Expect error to occur for unknown id") + } + for i := 10; i >= 1; i-- { + if err := bw.pop(uint64(i)); err != nil { + t.Fatalf("Unexpected error for element popping, %v", err) + } + empty := i == 1 + if empty != bw.empty() { + t.Fatalf("Emptiness is not matched, want: %T, got: %T", empty, bw.empty()) + } + newMax := uint64(i - 1) + if bw.desc.max != newMax { + t.Fatalf("Maxmium element is not matched, want: %d, got: %d", newMax, bw.desc.max) + } + } +} + +func TestBlcokWriterDeleteWithData(t *testing.T) { + elements := []uint64{ + 1, 5, 10, 11, 20, + } + bw, _ := newBlockWriter(nil, newIndexBlockDesc(0)) + for i := 0; i < len(elements); i++ { + bw.append(elements[i]) + } + + // Re-construct the block writer with data + desc := &indexBlockDesc{ + id: 0, + max: 20, + entries: 5, + } + bw, err := newBlockWriter(bw.finish(), desc) + if err != nil { + t.Fatalf("Failed to construct block writer %v", err) + } + for i := len(elements) - 1; i > 0; i-- { + if err := bw.pop(elements[i]); err != nil { + t.Fatalf("Failed to pop element, %v", err) + } + newTail := elements[i-1] + + // Ensure the element can still be queried with no issue + br, err := newBlockReader(bw.finish()) + if err != nil { + t.Fatalf("Failed to construct the block reader, %v", err) + } + cases := []struct { + value uint64 + result uint64 + }{ + {0, 1}, + {1, 5}, + {10, 11}, + {19, 20}, + {20, math.MaxUint64}, + {21, math.MaxUint64}, + } + for _, c := range cases { + want := c.result + if c.value >= newTail { + want = math.MaxUint64 + } + got, err := br.readGreaterThan(c.value) + if err != nil { + t.Fatalf("Unexpected error, got %v", err) + } + if got != want { + t.Fatalf("Unexpected result, got %v, wanted %v", got, want) + } + } + } +} + +func TestCorruptedIndexBlock(t *testing.T) { + bw, _ := newBlockWriter(nil, newIndexBlockDesc(0)) + for i := 0; i < 10; i++ { + bw.append(uint64(i + 1)) + } + buf := bw.finish() + + // Mutate the buffer manually + buf[len(buf)-1]++ + _, err := newBlockWriter(buf, newIndexBlockDesc(0)) + if err == nil { + t.Fatal("Corrupted index block data is not detected") + } +} diff --git a/triedb/pathdb/history_index_test.go b/triedb/pathdb/history_index_test.go new file mode 100644 index 0000000000..7b24b86fd6 --- /dev/null +++ b/triedb/pathdb/history_index_test.go @@ -0,0 +1,292 @@ +// Copyright 2025 The go-ethereum Authors +// This file is part of the go-ethereum library. +// +// The go-ethereum library is free software: you can redistribute it and/or modify +// it under the terms of the GNU Lesser General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// The go-ethereum library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the go-ethereum library. If not, see value + }) + got, err := br.readGreaterThan(value) + if err != nil { + t.Fatalf("Unexpected error, got %v", err) + } + if pos == len(elements) { + if got != math.MaxUint64 { + t.Fatalf("Unexpected result, got %d, wanted math.MaxUint64", got) + } + } else if got != elements[pos] { + t.Fatalf("Unexpected result, got %d, wanted %d", got, elements[pos]) + } + } +} + +func TestEmptyIndexReader(t *testing.T) { + br, err := newIndexReader(rawdb.NewMemoryDatabase(), newAccountIdent(common.Hash{0xa})) + if err != nil { + t.Fatalf("Failed to construct the index reader, %v", err) + } + res, err := br.readGreaterThan(100) + if err != nil { + t.Fatalf("Failed to query, %v", err) + } + if res != math.MaxUint64 { + t.Fatalf("Unexpected result, got %d, wanted math.MaxUint64", res) + } +} + +func TestIndexWriterBasic(t *testing.T) { + db := rawdb.NewMemoryDatabase() + iw, _ := newIndexWriter(db, newAccountIdent(common.Hash{0xa})) + iw.append(2) + if err := iw.append(1); err == nil { + t.Fatal("out-of-order insertion is not expected") + } + for i := 0; i < 10; i++ { + iw.append(uint64(i + 3)) + } + batch := db.NewBatch() + iw.finish(batch) + batch.Write() + + iw, err := newIndexWriter(db, newAccountIdent(common.Hash{0xa})) + if err != nil { + t.Fatalf("Failed to construct the block writer, %v", err) + } + for i := 0; i < 10; i++ { + if err := iw.append(uint64(i + 100)); err != nil { + t.Fatalf("Failed to append item, %v", err) + } + } + iw.finish(db.NewBatch()) +} + +func TestIndexWriterDelete(t *testing.T) { + db := rawdb.NewMemoryDatabase() + iw, _ := newIndexWriter(db, newAccountIdent(common.Hash{0xa})) + for i := 0; i < indexBlockEntriesCap*4; i++ { + iw.append(uint64(i + 1)) + } + batch := db.NewBatch() + iw.finish(batch) + batch.Write() + + // Delete unknown id, the request should be rejected + id, _ := newIndexDeleter(db, newAccountIdent(common.Hash{0xa})) + if err := id.pop(indexBlockEntriesCap * 5); err == nil { + t.Fatal("Expect error to occur for unknown id") + } + for i := indexBlockEntriesCap * 4; i >= 1; i-- { + if err := id.pop(uint64(i)); err != nil { + t.Fatalf("Unexpected error for element popping, %v", err) + } + if id.lastID != uint64(i-1) { + t.Fatalf("Unexpected lastID, want: %d, got: %d", uint64(i-1), iw.lastID) + } + if rand.Intn(10) == 0 { + batch := db.NewBatch() + id.finish(batch) + batch.Write() + } + } +} + +func TestBatchIndexerWrite(t *testing.T) { + var ( + db = rawdb.NewMemoryDatabase() + batch = newBatchIndexer(db, false) + histories = makeHistories(10) + ) + for i, h := range histories { + if err := batch.process(h, uint64(i+1)); err != nil { + t.Fatalf("Failed to process history, %v", err) + } + } + if err := batch.finish(true); err != nil { + t.Fatalf("Failed to finish batch indexer, %v", err) + } + metadata := loadIndexMetadata(db) + if metadata == nil || metadata.Last != uint64(10) { + t.Fatal("Unexpected index position") + } + var ( + accounts = make(map[common.Hash][]uint64) + storages = make(map[common.Hash]map[common.Hash][]uint64) + ) + for i, h := range histories { + for _, addr := range h.accountList { + addrHash := crypto.Keccak256Hash(addr.Bytes()) + accounts[addrHash] = append(accounts[addrHash], uint64(i+1)) + + if _, ok := storages[addrHash]; !ok { + storages[addrHash] = make(map[common.Hash][]uint64) + } + for _, slot := range h.storageList[addr] { + storages[addrHash][slot] = append(storages[addrHash][slot], uint64(i+1)) + } + } + } + for addrHash, indexes := range accounts { + ir, _ := newIndexReader(db, newAccountIdent(addrHash)) + for i := 0; i < len(indexes)-1; i++ { + n, err := ir.readGreaterThan(indexes[i]) + if err != nil { + t.Fatalf("Failed to read index, %v", err) + } + if n != indexes[i+1] { + t.Fatalf("Unexpected result, want %d, got %d", indexes[i+1], n) + } + } + n, err := ir.readGreaterThan(indexes[len(indexes)-1]) + if err != nil { + t.Fatalf("Failed to read index, %v", err) + } + if n != math.MaxUint64 { + t.Fatalf("Unexpected result, want math.MaxUint64, got %d", n) + } + } + for addrHash, slots := range storages { + for slotHash, indexes := range slots { + ir, _ := newIndexReader(db, newStorageIdent(addrHash, slotHash)) + for i := 0; i < len(indexes)-1; i++ { + n, err := ir.readGreaterThan(indexes[i]) + if err != nil { + t.Fatalf("Failed to read index, %v", err) + } + if n != indexes[i+1] { + t.Fatalf("Unexpected result, want %d, got %d", indexes[i+1], n) + } + } + n, err := ir.readGreaterThan(indexes[len(indexes)-1]) + if err != nil { + t.Fatalf("Failed to read index, %v", err) + } + if n != math.MaxUint64 { + t.Fatalf("Unexpected result, want math.MaxUint64, got %d", n) + } + } + } +} + +func TestBatchIndexerDelete(t *testing.T) { + var ( + db = rawdb.NewMemoryDatabase() + bw = newBatchIndexer(db, false) + histories = makeHistories(10) + ) + // Index histories + for i, h := range histories { + if err := bw.process(h, uint64(i+1)); err != nil { + t.Fatalf("Failed to process history, %v", err) + } + } + if err := bw.finish(true); err != nil { + t.Fatalf("Failed to finish batch indexer, %v", err) + } + + // Unindex histories + bd := newBatchIndexer(db, true) + for i := len(histories) - 1; i >= 0; i-- { + if err := bd.process(histories[i], uint64(i+1)); err != nil { + t.Fatalf("Failed to process history, %v", err) + } + } + if err := bd.finish(true); err != nil { + t.Fatalf("Failed to finish batch indexer, %v", err) + } + + metadata := loadIndexMetadata(db) + if metadata != nil { + t.Fatal("Unexpected index position") + } + it := db.NewIterator(rawdb.StateHistoryIndexPrefix, nil) + for it.Next() { + t.Fatal("Leftover history index data") + } + it.Release() +} diff --git a/triedb/pathdb/history_indexer.go b/triedb/pathdb/history_indexer.go new file mode 100644 index 0000000000..b09804ce9d --- /dev/null +++ b/triedb/pathdb/history_indexer.go @@ -0,0 +1,613 @@ +// Copyright 2025 The go-ethereum Authors +// This file is part of the go-ethereum library. +// +// The go-ethereum library is free software: you can redistribute it and/or modify +// it under the terms of the GNU Lesser General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// The go-ethereum library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the go-ethereum library. If not, see = tailID { + log.Debug("Resume state history indexing", "id", metadata.Last+1, "tail", tailID) + return metadata.Last + 1, nil + } + // History has been shortened without indexing. Discard the gapped segment + // in the history and shift to the first available element. + // + // The missing indexes corresponding to the gapped histories won't be visible. + // It's fine to leave them unindexed. + log.Info("History gap detected, discard old segment", "oldHead", metadata.Last, "newHead", tailID) + return tailID, nil +} + +func (i *indexIniter) index(done chan struct{}, interrupt *atomic.Int32, lastID uint64) { + defer close(done) + + beginID, err := i.next() + if err != nil { + log.Error("Failed to find next state history for indexing", "err", err) + return + } + // All available state histories have been indexed, and the last indexed one + // exceeds the most recent available state history. This situation may occur + // when the state is reverted manually (chain.SetHead) or the deep reorg is + // encountered. In such cases, no indexing should be scheduled. + if beginID > lastID { + log.Debug("State history is fully indexed", "last", lastID) + return + } + log.Info("Start history indexing", "beginID", beginID, "lastID", lastID) + + var ( + current = beginID + start = time.Now() + logged = time.Now() + batch = newBatchIndexer(i.disk, false) + ) + for current <= lastID { + count := lastID - current + 1 + if count > historyReadBatch { + count = historyReadBatch + } + histories, err := readHistories(i.freezer, current, count) + if err != nil { + // The history read might fall if the history is truncated from + // head due to revert operation. + log.Error("Failed to read history for indexing", "current", current, "count", count, "err", err) + return + } + for _, h := range histories { + if err := batch.process(h, current); err != nil { + log.Error("Failed to index history", "err", err) + return + } + current += 1 + + // Occasionally report the indexing progress + if time.Since(logged) > time.Second*8 { + logged = time.Now() + + var ( + left = lastID - current + 1 + done = current - beginID + speed = done/uint64(time.Since(start)/time.Millisecond+1) + 1 // +1s to avoid division by zero + ) + // Override the ETA if larger than the largest until now + eta := time.Duration(left/speed) * time.Millisecond + log.Info("Indexing state history", "processed", done, "left", left, "eta", common.PrettyDuration(eta)) + } + } + // Check interruption signal and abort process if it's fired + if interrupt != nil { + if signal := interrupt.Load(); signal != 0 { + if err := batch.finish(true); err != nil { + log.Error("Failed to flush index", "err", err) + } + log.Info("State indexing interrupted") + return + } + } + } + if err := batch.finish(true); err != nil { + log.Error("Failed to flush index", "err", err) + } + log.Info("Indexed state history", "from", beginID, "to", lastID, "elapsed", common.PrettyDuration(time.Since(start))) +} + +// historyIndexer manages the indexing and unindexing of state histories, +// providing access to historical states. +// +// Upon initialization, historyIndexer starts a one-time background process +// to complete the indexing of any remaining state histories. Once this +// process is finished, all state histories are marked as fully indexed, +// enabling handling of requests for historical states. Thereafter, any new +// state histories must be indexed or unindexed synchronously, ensuring that +// the history index is created or removed along with the corresponding +// state history. +type historyIndexer struct { + initer *indexIniter + disk ethdb.KeyValueStore + freezer ethdb.AncientStore +} + +// checkVersion checks whether the index data in the database matches the version. +func checkVersion(disk ethdb.KeyValueStore) { + blob := rawdb.ReadStateHistoryIndexMetadata(disk) + if len(blob) == 0 { + return + } + var m indexMetadata + err := rlp.DecodeBytes(blob, &m) + if err == nil && m.Version == stateIndexVersion { + return + } + // TODO(rjl493456442) would be better to group them into a batch. + rawdb.DeleteStateHistoryIndexMetadata(disk) + rawdb.DeleteStateHistoryIndex(disk) + + version := "unknown" + if err == nil { + version = fmt.Sprintf("%d", m.Version) + } + log.Info("Cleaned up obsolete state history index", "version", version, "want", stateIndexVersion) +} + +// newHistoryIndexer constructs the history indexer and launches the background +// initer to complete the indexing of any remaining state histories. +func newHistoryIndexer(disk ethdb.KeyValueStore, freezer ethdb.AncientStore, lastHistoryID uint64) *historyIndexer { + checkVersion(disk) + return &historyIndexer{ + initer: newIndexIniter(disk, freezer, lastHistoryID), + disk: disk, + freezer: freezer, + } +} + +func (i *historyIndexer) close() { + i.initer.close() +} + +// inited returns a flag indicating whether the existing state histories +// have been fully indexed, in other words, whether they are available +// for external access. +func (i *historyIndexer) inited() bool { + return i.initer.inited() +} + +// extend sends the notification that new state history with specified ID +// has been written into the database and is ready for indexing. +func (i *historyIndexer) extend(historyID uint64) error { + signal := &interruptSignal{ + newLastID: historyID, + result: make(chan error, 1), + } + select { + case <-i.initer.closed: + return errors.New("indexer is closed") + case <-i.initer.done: + return indexSingle(historyID, i.disk, i.freezer) + case i.initer.interrupt <- signal: + return <-signal.result + } +} + +// shorten sends the notification that state history with specified ID +// is about to be deleted from the database and should be unindexed. +func (i *historyIndexer) shorten(historyID uint64) error { + signal := &interruptSignal{ + newLastID: historyID - 1, + result: make(chan error, 1), + } + select { + case <-i.initer.closed: + return errors.New("indexer is closed") + case <-i.initer.done: + return unindexSingle(historyID, i.disk, i.freezer) + case i.initer.interrupt <- signal: + return <-signal.result + } +} diff --git a/triedb/pathdb/history_reader.go b/triedb/pathdb/history_reader.go new file mode 100644 index 0000000000..9471ab423d --- /dev/null +++ b/triedb/pathdb/history_reader.go @@ -0,0 +1,375 @@ +// Copyright 2025 The go-ethereum Authors +// This file is part of the go-ethereum library. +// +// The go-ethereum library is free software: you can redistribute it and/or modify +// it under the terms of the GNU Lesser General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// The go-ethereum library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the go-ethereum library. If not, see lastID { + return 0, fmt.Errorf("index reader is stale, limit: %d, last-state-id: %d", r.limit, lastID) + } + // Try to find the element which is greater than the specified target + res, err := r.reader.readGreaterThan(id) + if err != nil { + return 0, err + } + // Short circuit if the element is found within the current index + if res != math.MaxUint64 { + return res, nil + } + // The element was not found, and no additional histories have been indexed. + // Return a not-found result. + if r.limit == lastID { + return res, nil + } + // Refresh the index reader and attempt again. If the latest indexed position + // is even below the ID of the disk layer, it indicates that state histories + // are being removed. In this case, it would theoretically be better to block + // the state rollback operation synchronously until all readers are released. + // Given that it's very unlikely to occur and users try to perform historical + // state queries while reverting the states at the same time. Simply returning + // an error should be sufficient for now. + metadata := loadIndexMetadata(r.db) + if metadata == nil || metadata.Last < lastID { + return 0, errors.New("state history hasn't been indexed yet") + } + if err := r.reader.refresh(); err != nil { + return 0, err + } + r.limit = metadata.Last + + return r.reader.readGreaterThan(id) +} + +// historyReader is the structure to access historic state data. +type historyReader struct { + disk ethdb.KeyValueReader + freezer ethdb.AncientReader + readers map[string]*indexReaderWithLimitTag +} + +// newHistoryReader constructs the history reader with the supplied db. +func newHistoryReader(disk ethdb.KeyValueReader, freezer ethdb.AncientReader) *historyReader { + return &historyReader{ + disk: disk, + freezer: freezer, + readers: make(map[string]*indexReaderWithLimitTag), + } +} + +// readAccountMetadata resolves the account metadata within the specified +// state history. +func (r *historyReader) readAccountMetadata(address common.Address, historyID uint64) ([]byte, error) { + blob := rawdb.ReadStateAccountIndex(r.freezer, historyID) + if len(blob) == 0 { + return nil, fmt.Errorf("account index is truncated, historyID: %d", historyID) + } + if len(blob)%accountIndexSize != 0 { + return nil, fmt.Errorf("account index is corrupted, historyID: %d, size: %d", historyID, len(blob)) + } + n := len(blob) / accountIndexSize + + pos := sort.Search(n, func(i int) bool { + h := blob[accountIndexSize*i : accountIndexSize*i+common.HashLength] + return bytes.Compare(h, address.Bytes()) >= 0 + }) + if pos == n { + return nil, fmt.Errorf("account %#x is not found", address) + } + offset := accountIndexSize * pos + if address != common.BytesToAddress(blob[offset:offset+common.AddressLength]) { + return nil, fmt.Errorf("account %#x is not found", address) + } + return blob[offset : accountIndexSize*(pos+1)], nil +} + +// readStorageMetadata resolves the storage slot metadata within the specified +// state history. +func (r *historyReader) readStorageMetadata(storageKey common.Hash, storageHash common.Hash, historyID uint64, slotOffset, slotNumber int) ([]byte, error) { + // TODO(rj493456442) optimize it with partial read + blob := rawdb.ReadStateStorageIndex(r.freezer, historyID) + if len(blob) == 0 { + return nil, fmt.Errorf("storage index is truncated, historyID: %d", historyID) + } + if len(blob)%slotIndexSize != 0 { + return nil, fmt.Errorf("storage indices is corrupted, historyID: %d, size: %d", historyID, len(blob)) + } + if slotIndexSize*(slotOffset+slotNumber) > len(blob) { + return nil, fmt.Errorf("storage indices is truncated, historyID: %d, size: %d, offset: %d, length: %d", historyID, len(blob), slotOffset, slotNumber) + } + subSlice := blob[slotIndexSize*slotOffset : slotIndexSize*(slotOffset+slotNumber)] + + // TODO(rj493456442) get rid of the metadata resolution + var ( + m meta + target common.Hash + ) + blob = rawdb.ReadStateHistoryMeta(r.freezer, historyID) + if err := m.decode(blob); err != nil { + return nil, err + } + if m.version == stateHistoryV0 { + target = storageHash + } else { + target = storageKey + } + pos := sort.Search(slotNumber, func(i int) bool { + slotID := subSlice[slotIndexSize*i : slotIndexSize*i+common.HashLength] + return bytes.Compare(slotID, target.Bytes()) >= 0 + }) + if pos == slotNumber { + return nil, fmt.Errorf("storage metadata is not found, slot key: %#x, historyID: %d", storageKey, historyID) + } + offset := slotIndexSize * pos + if target != common.BytesToHash(subSlice[offset:offset+common.HashLength]) { + return nil, fmt.Errorf("storage metadata is not found, slot key: %#x, historyID: %d", storageKey, historyID) + } + return subSlice[offset : slotIndexSize*(pos+1)], nil +} + +// readAccount retrieves the account data from the specified state history. +func (r *historyReader) readAccount(address common.Address, historyID uint64) ([]byte, error) { + metadata, err := r.readAccountMetadata(address, historyID) + if err != nil { + return nil, err + } + length := int(metadata[common.AddressLength]) // one byte for account data length + offset := int(binary.BigEndian.Uint32(metadata[common.AddressLength+1 : common.AddressLength+5])) // four bytes for the account data offset + + // TODO(rj493456442) optimize it with partial read + data := rawdb.ReadStateAccountHistory(r.freezer, historyID) + if len(data) < length+offset { + return nil, fmt.Errorf("account data is truncated, address: %#x, historyID: %d, size: %d, offset: %d, len: %d", address, historyID, len(data), offset, length) + } + return data[offset : offset+length], nil +} + +// readStorage retrieves the storage slot data from the specified state history. +func (r *historyReader) readStorage(address common.Address, storageKey common.Hash, storageHash common.Hash, historyID uint64) ([]byte, error) { + metadata, err := r.readAccountMetadata(address, historyID) + if err != nil { + return nil, err + } + // slotIndexOffset: + // The offset of storage indices associated with the specified account. + // slotIndexNumber: + // The number of storage indices associated with the specified account. + slotIndexOffset := int(binary.BigEndian.Uint32(metadata[common.AddressLength+5 : common.AddressLength+9])) + slotIndexNumber := int(binary.BigEndian.Uint32(metadata[common.AddressLength+9 : common.AddressLength+13])) + + slotMetadata, err := r.readStorageMetadata(storageKey, storageHash, historyID, slotIndexOffset, slotIndexNumber) + if err != nil { + return nil, err + } + length := int(slotMetadata[common.HashLength]) // one byte for slot data length + offset := int(binary.BigEndian.Uint32(slotMetadata[common.HashLength+1 : common.HashLength+5])) // four bytes for slot data offset + + // TODO(rj493456442) optimize it with partial read + data := rawdb.ReadStateStorageHistory(r.freezer, historyID) + if len(data) < offset+length { + return nil, fmt.Errorf("storage data is truncated, address: %#x, key: %#x, historyID: %d, size: %d, offset: %d, len: %d", address, storageKey, historyID, len(data), offset, length) + } + return data[offset : offset+length], nil +} + +// read retrieves the state element data associated with the stateID. +// stateID: represents the ID of the state of the specified version; +// lastID: represents the ID of the latest/newest state history; +// latestValue: represents the state value at the current disk layer with ID == lastID; +func (r *historyReader) read(state stateIdentQuery, stateID uint64, lastID uint64, latestValue []byte) ([]byte, error) { + tail, err := r.freezer.Tail() + if err != nil { + return nil, err + } + // stateID == tail is allowed, as the first history object preserved + // is tail+1 + if stateID < tail { + return nil, errors.New("historical state has been pruned") + } + + // To serve the request, all state histories from stateID+1 to lastID + // must be indexed. It's not supposed to happen unless system is very + // wrong. + metadata := loadIndexMetadata(r.disk) + if metadata == nil || metadata.Last < lastID { + indexed := "null" + if metadata != nil { + indexed = fmt.Sprintf("%d", metadata.Last) + } + return nil, fmt.Errorf("state history is not fully indexed, requested: %d, indexed: %s", stateID, indexed) + } + + // Construct the index reader to locate the corresponding history for + // state retrieval + ir, ok := r.readers[state.String()] + if !ok { + ir, err = newIndexReaderWithLimitTag(r.disk, state.stateIdent) + if err != nil { + return nil, err + } + r.readers[state.String()] = ir + } + historyID, err := ir.readGreaterThan(stateID, lastID) + if err != nil { + return nil, err + } + // The state was not found in the state histories, as it has not been modified + // since stateID. Use the data from the associated disk layer instead. + if historyID == math.MaxUint64 { + return latestValue, nil + } + // Resolve data from the specified state history object. Notably, since the history + // reader operates completely asynchronously with the indexer/unindexer, it's possible + // that the associated state histories are no longer available due to a rollback. + // Such truncation should be captured by the state resolver below, rather than returning + // invalid data. + if state.account { + return r.readAccount(state.address, historyID) + } + return r.readStorage(state.address, state.storageKey, state.storageHash, historyID) +} diff --git a/triedb/pathdb/history_reader_test.go b/triedb/pathdb/history_reader_test.go new file mode 100644 index 0000000000..4356490f23 --- /dev/null +++ b/triedb/pathdb/history_reader_test.go @@ -0,0 +1,159 @@ +// Copyright 2025 The go-ethereum Authors +// This file is part of the go-ethereum library. +// +// The go-ethereum library is free software: you can redistribute it and/or modify +// it under the terms of the GNU Lesser General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// The go-ethereum library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the go-ethereum library. If not, see = db.tree.bottom().stateID() { + return + } + time.Sleep(100 * time.Millisecond) + } +} + +func checkHistoricState(env *tester, root common.Hash, hr *historyReader) error { + // Short circuit if the historical state is no longer available + if rawdb.ReadStateID(env.db.diskdb, root) == nil { + return nil + } + var ( + dl = env.db.tree.bottom() + stateID = rawdb.ReadStateID(env.db.diskdb, root) + accounts = env.snapAccounts[root] + storages = env.snapStorages[root] + ) + for addrHash, accountData := range accounts { + latest, _ := dl.account(addrHash, 0) + blob, err := hr.read(newAccountIdentQuery(env.accountPreimage(addrHash), addrHash), *stateID, dl.stateID(), latest) + if err != nil { + return err + } + if !bytes.Equal(accountData, blob) { + return fmt.Errorf("wrong account data, expected %x, got %x", accountData, blob) + } + } + for i := 0; i < len(env.roots); i++ { + if env.roots[i] == root { + break + } + // Find all accounts deleted in the past, ensure the associated data is null + for addrHash := range env.snapAccounts[env.roots[i]] { + if _, ok := accounts[addrHash]; !ok { + latest, _ := dl.account(addrHash, 0) + blob, err := hr.read(newAccountIdentQuery(env.accountPreimage(addrHash), addrHash), *stateID, dl.stateID(), latest) + if err != nil { + return err + } + if len(blob) != 0 { + return fmt.Errorf("wrong account data, expected null, got %x", blob) + } + } + } + } + for addrHash, slots := range storages { + for slotHash, slotData := range slots { + latest, _ := dl.storage(addrHash, slotHash, 0) + blob, err := hr.read(newStorageIdentQuery(env.accountPreimage(addrHash), addrHash, env.hashPreimage(slotHash), slotHash), *stateID, dl.stateID(), latest) + if err != nil { + return err + } + if !bytes.Equal(slotData, blob) { + return fmt.Errorf("wrong storage data, expected %x, got %x", slotData, blob) + } + } + } + for i := 0; i < len(env.roots); i++ { + if env.roots[i] == root { + break + } + // Find all storage slots deleted in the past, ensure the associated data is null + for addrHash, slots := range env.snapStorages[env.roots[i]] { + for slotHash := range slots { + _, ok := storages[addrHash] + if ok { + _, ok = storages[addrHash][slotHash] + } + if !ok { + latest, _ := dl.storage(addrHash, slotHash, 0) + blob, err := hr.read(newStorageIdentQuery(env.accountPreimage(addrHash), addrHash, env.hashPreimage(slotHash), slotHash), *stateID, dl.stateID(), latest) + if err != nil { + return err + } + if len(blob) != 0 { + return fmt.Errorf("wrong storage data, expected null, got %x", blob) + } + } + } + } + } + return nil +} + +func TestHistoryReader(t *testing.T) { + testHistoryReader(t, 0) // with all histories reserved + testHistoryReader(t, 10) // with latest 10 histories reserved +} + +func testHistoryReader(t *testing.T, historyLimit uint64) { + maxDiffLayers = 4 + defer func() { + maxDiffLayers = 128 + }() + //log.SetDefault(log.NewLogger(log.NewTerminalHandlerWithLevel(os.Stderr, log.LevelDebug, true))) + + env := newTester(t, historyLimit, false, 64, true) + defer env.release() + waitIndexing(env.db) + + var ( + roots = env.roots + dRoot = env.db.tree.bottom().rootHash() + hr = newHistoryReader(env.db.diskdb, env.db.freezer) + ) + for _, root := range roots { + if root == dRoot { + break + } + if err := checkHistoricState(env, root, hr); err != nil { + t.Fatal(err) + } + } + + // Pile up more histories on top, ensuring the historic reader is not affected + env.extend(4) + waitIndexing(env.db) + + for _, root := range roots { + if root == dRoot { + break + } + if err := checkHistoricState(env, root, hr); err != nil { + t.Fatal(err) + } + } +} diff --git a/triedb/pathdb/metrics.go b/triedb/pathdb/metrics.go index 6d40c5713b..779f9d813f 100644 --- a/triedb/pathdb/metrics.go +++ b/triedb/pathdb/metrics.go @@ -73,8 +73,14 @@ var ( historyDataBytesMeter = metrics.NewRegisteredMeter("pathdb/history/bytes/data", nil) historyIndexBytesMeter = metrics.NewRegisteredMeter("pathdb/history/bytes/index", nil) + indexHistoryTimer = metrics.NewRegisteredResettingTimer("pathdb/history/index/time", nil) + unindexHistoryTimer = metrics.NewRegisteredResettingTimer("pathdb/history/unindex/time", nil) + lookupAddLayerTimer = metrics.NewRegisteredResettingTimer("pathdb/lookup/add/time", nil) lookupRemoveLayerTimer = metrics.NewRegisteredResettingTimer("pathdb/lookup/remove/time", nil) + + historicalAccountReadTimer = metrics.NewRegisteredResettingTimer("pathdb/history/account/reads", nil) + historicalStorageReadTimer = metrics.NewRegisteredResettingTimer("pathdb/history/storage/reads", nil) ) // Metrics in generation diff --git a/triedb/pathdb/reader.go b/triedb/pathdb/reader.go index bc72db34e3..b7b18f13f9 100644 --- a/triedb/pathdb/reader.go +++ b/triedb/pathdb/reader.go @@ -19,10 +19,13 @@ package pathdb import ( "errors" "fmt" + "time" "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/common/hexutil" + "github.com/ethereum/go-ethereum/core/rawdb" "github.com/ethereum/go-ethereum/core/types" + "github.com/ethereum/go-ethereum/crypto" "github.com/ethereum/go-ethereum/log" "github.com/ethereum/go-ethereum/rlp" "github.com/ethereum/go-ethereum/triedb/database" @@ -192,3 +195,122 @@ func (db *Database) StateReader(root common.Hash) (database.StateReader, error) layer: layer, }, nil } + +// HistoricalStateReader is a wrapper over history reader, providing access to +// historical state. +type HistoricalStateReader struct { + db *Database + reader *historyReader + id uint64 +} + +// HistoricReader constructs a reader for accessing the requested historic state. +func (db *Database) HistoricReader(root common.Hash) (*HistoricalStateReader, error) { + // Bail out if the state history hasn't been fully indexed + if db.indexer == nil || !db.indexer.inited() { + return nil, errors.New("state histories haven't been fully indexed yet") + } + if db.freezer == nil { + return nil, errors.New("state histories are not available") + } + // States at the current disk layer or above are directly accessible via + // db.StateReader. + // + // States older than the current disk layer (including the disk layer + // itself) are available through historic state access. + // + // Note: the requested state may refer to a stale historic state that has + // already been pruned. This function does not validate availability, as + // underlying states may be pruned dynamically. Validity is checked during + // each actual state retrieval. + id := rawdb.ReadStateID(db.diskdb, root) + if id == nil { + return nil, fmt.Errorf("state %#x is not available", root) + } + return &HistoricalStateReader{ + id: *id, + db: db, + reader: newHistoryReader(db.diskdb, db.freezer), + }, nil +} + +// AccountRLP directly retrieves the account RLP associated with a particular +// address in the slim data format. An error will be returned if the read +// operation exits abnormally. Specifically, if the layer is already stale. +// +// Note: +// - the returned account is not a copy, please don't modify it. +// - no error will be returned if the requested account is not found in database. +func (r *HistoricalStateReader) AccountRLP(address common.Address) ([]byte, error) { + defer func(start time.Time) { + historicalAccountReadTimer.UpdateSince(start) + }(time.Now()) + + // TODO(rjl493456442): Theoretically, the obtained disk layer could become stale + // within a very short time window. + // + // While reading the account data while holding `db.tree.lock` can resolve + // this issue, but it will introduce a heavy contention over the lock. + // + // Let's optimistically assume the situation is very unlikely to happen, + // and try to define a low granularity lock if the current approach doesn't + // work later. + dl := r.db.tree.bottom() + hash := crypto.Keccak256Hash(address.Bytes()) + latest, err := dl.account(hash, 0) + if err != nil { + return nil, err + } + return r.reader.read(newAccountIdentQuery(address, hash), r.id, dl.stateID(), latest) +} + +// Account directly retrieves the account associated with a particular address in +// the slim data format. An error will be returned if the read operation exits +// abnormally. Specifically, if the layer is already stale. +// +// No error will be returned if the requested account is not found in database +func (r *HistoricalStateReader) Account(address common.Address) (*types.SlimAccount, error) { + blob, err := r.AccountRLP(address) + if err != nil { + return nil, err + } + if len(blob) == 0 { + return nil, nil + } + account := new(types.SlimAccount) + if err := rlp.DecodeBytes(blob, account); err != nil { + panic(err) + } + return account, nil +} + +// Storage directly retrieves the storage data associated with a particular key, +// within a particular account. An error will be returned if the read operation +// exits abnormally. Specifically, if the layer is already stale. +// +// Note: +// - the returned storage data is not a copy, please don't modify it. +// - no error will be returned if the requested slot is not found in database. +func (r *HistoricalStateReader) Storage(address common.Address, key common.Hash) ([]byte, error) { + defer func(start time.Time) { + historicalStorageReadTimer.UpdateSince(start) + }(time.Now()) + + // TODO(rjl493456442): Theoretically, the obtained disk layer could become stale + // within a very short time window. + // + // While reading the account data while holding `db.tree.lock` can resolve + // this issue, but it will introduce a heavy contention over the lock. + // + // Let's optimistically assume the situation is very unlikely to happen, + // and try to define a low granularity lock if the current approach doesn't + // work later. + dl := r.db.tree.bottom() + addrHash := crypto.Keccak256Hash(address.Bytes()) + keyHash := crypto.Keccak256Hash(key.Bytes()) + latest, err := dl.storage(addrHash, keyHash, 0) + if err != nil { + return nil, err + } + return r.reader.read(newStorageIdentQuery(address, addrHash, key, keyHash), r.id, dl.stateID(), latest) +}