mirror of
https://github.com/ethereum/go-ethereum.git
synced 2026-02-26 15:47:21 +00:00
core/rawdb, triedb/pathdb: implement history indexer (#31156)
This pull request is part-1 for shipping the core part of archive node in PBSS mode.
This commit is contained in:
parent
ebff350a2e
commit
9c5c0e37bf
20 changed files with 2986 additions and 41 deletions
|
|
@ -1661,11 +1661,6 @@ func SetEthConfig(ctx *cli.Context, stack *node.Node, cfg *ethconfig.Config) {
|
|||
cfg.TransactionHistory = 0
|
||||
log.Warn("Disabled transaction unindexing for archive node")
|
||||
}
|
||||
|
||||
if cfg.StateScheme != rawdb.HashScheme {
|
||||
cfg.StateScheme = rawdb.HashScheme
|
||||
log.Warn("Forcing hash state-scheme for archive mode")
|
||||
}
|
||||
}
|
||||
if ctx.IsSet(LogHistoryFlag.Name) {
|
||||
cfg.LogHistory = ctx.Uint64(LogHistoryFlag.Name)
|
||||
|
|
|
|||
|
|
@ -242,9 +242,10 @@ func (cfg *BlockChainConfig) triedbConfig(isVerkle bool) *triedb.Config {
|
|||
}
|
||||
if cfg.StateScheme == rawdb.PathScheme {
|
||||
config.PathDB = &pathdb.Config{
|
||||
StateHistory: cfg.StateHistory,
|
||||
TrieCleanSize: cfg.TrieCleanLimit * 1024 * 1024,
|
||||
StateCleanSize: cfg.SnapshotLimit * 1024 * 1024,
|
||||
StateHistory: cfg.StateHistory,
|
||||
EnableStateIndexing: cfg.ArchiveMode,
|
||||
TrieCleanSize: cfg.TrieCleanLimit * 1024 * 1024,
|
||||
StateCleanSize: cfg.SnapshotLimit * 1024 * 1024,
|
||||
|
||||
// TODO(rjl493456442): The write buffer represents the memory limit used
|
||||
// for flushing both trie data and state data to disk. The config name
|
||||
|
|
|
|||
179
core/rawdb/accessors_history.go
Normal file
179
core/rawdb/accessors_history.go
Normal file
|
|
@ -0,0 +1,179 @@
|
|||
// Copyright 2025 The go-ethereum Authors
|
||||
// This file is part of the go-ethereum library.
|
||||
//
|
||||
// The go-ethereum library is free software: you can redistribute it and/or modify
|
||||
// it under the terms of the GNU Lesser General Public License as published by
|
||||
// the Free Software Foundation, either version 3 of the License, or
|
||||
// (at your option) any later version.
|
||||
//
|
||||
// The go-ethereum library is distributed in the hope that it will be useful,
|
||||
// but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
// GNU Lesser General Public License for more details.
|
||||
//
|
||||
// You should have received a copy of the GNU Lesser General Public License
|
||||
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
|
||||
|
||||
package rawdb
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"errors"
|
||||
|
||||
"github.com/ethereum/go-ethereum/common"
|
||||
"github.com/ethereum/go-ethereum/ethdb"
|
||||
"github.com/ethereum/go-ethereum/log"
|
||||
)
|
||||
|
||||
// ReadStateHistoryIndexMetadata retrieves the metadata of state history index.
|
||||
func ReadStateHistoryIndexMetadata(db ethdb.KeyValueReader) []byte {
|
||||
data, _ := db.Get(headStateHistoryIndexKey)
|
||||
return data
|
||||
}
|
||||
|
||||
// WriteStateHistoryIndexMetadata stores the metadata of state history index
|
||||
// into database.
|
||||
func WriteStateHistoryIndexMetadata(db ethdb.KeyValueWriter, blob []byte) {
|
||||
if err := db.Put(headStateHistoryIndexKey, blob); err != nil {
|
||||
log.Crit("Failed to store the metadata of state history index", "err", err)
|
||||
}
|
||||
}
|
||||
|
||||
// DeleteStateHistoryIndexMetadata removes the metadata of state history index.
|
||||
func DeleteStateHistoryIndexMetadata(db ethdb.KeyValueWriter) {
|
||||
if err := db.Delete(headStateHistoryIndexKey); err != nil {
|
||||
log.Crit("Failed to delete the metadata of state history index", "err", err)
|
||||
}
|
||||
}
|
||||
|
||||
// ReadAccountHistoryIndex retrieves the account history index with the provided
|
||||
// account address.
|
||||
func ReadAccountHistoryIndex(db ethdb.KeyValueReader, addressHash common.Hash) []byte {
|
||||
data, err := db.Get(accountHistoryIndexKey(addressHash))
|
||||
if err != nil || len(data) == 0 {
|
||||
return nil
|
||||
}
|
||||
return data
|
||||
}
|
||||
|
||||
// WriteAccountHistoryIndex writes the provided account history index into database.
|
||||
func WriteAccountHistoryIndex(db ethdb.KeyValueWriter, addressHash common.Hash, data []byte) {
|
||||
if err := db.Put(accountHistoryIndexKey(addressHash), data); err != nil {
|
||||
log.Crit("Failed to store account history index", "err", err)
|
||||
}
|
||||
}
|
||||
|
||||
// DeleteAccountHistoryIndex deletes the specified account history index from
|
||||
// the database.
|
||||
func DeleteAccountHistoryIndex(db ethdb.KeyValueWriter, addressHash common.Hash) {
|
||||
if err := db.Delete(accountHistoryIndexKey(addressHash)); err != nil {
|
||||
log.Crit("Failed to delete account history index", "err", err)
|
||||
}
|
||||
}
|
||||
|
||||
// ReadStorageHistoryIndex retrieves the storage history index with the provided
|
||||
// account address and storage key hash.
|
||||
func ReadStorageHistoryIndex(db ethdb.KeyValueReader, addressHash common.Hash, storageHash common.Hash) []byte {
|
||||
data, err := db.Get(storageHistoryIndexKey(addressHash, storageHash))
|
||||
if err != nil || len(data) == 0 {
|
||||
return nil
|
||||
}
|
||||
return data
|
||||
}
|
||||
|
||||
// WriteStorageHistoryIndex writes the provided storage history index into database.
|
||||
func WriteStorageHistoryIndex(db ethdb.KeyValueWriter, addressHash common.Hash, storageHash common.Hash, data []byte) {
|
||||
if err := db.Put(storageHistoryIndexKey(addressHash, storageHash), data); err != nil {
|
||||
log.Crit("Failed to store storage history index", "err", err)
|
||||
}
|
||||
}
|
||||
|
||||
// DeleteStorageHistoryIndex deletes the specified state index from the database.
|
||||
func DeleteStorageHistoryIndex(db ethdb.KeyValueWriter, addressHash common.Hash, storageHash common.Hash) {
|
||||
if err := db.Delete(storageHistoryIndexKey(addressHash, storageHash)); err != nil {
|
||||
log.Crit("Failed to delete storage history index", "err", err)
|
||||
}
|
||||
}
|
||||
|
||||
// ReadAccountHistoryIndexBlock retrieves the index block with the provided
|
||||
// account address along with the block id.
|
||||
func ReadAccountHistoryIndexBlock(db ethdb.KeyValueReader, addressHash common.Hash, blockID uint32) []byte {
|
||||
data, err := db.Get(accountHistoryIndexBlockKey(addressHash, blockID))
|
||||
if err != nil || len(data) == 0 {
|
||||
return nil
|
||||
}
|
||||
return data
|
||||
}
|
||||
|
||||
// WriteAccountHistoryIndexBlock writes the provided index block into database.
|
||||
func WriteAccountHistoryIndexBlock(db ethdb.KeyValueWriter, addressHash common.Hash, blockID uint32, data []byte) {
|
||||
if err := db.Put(accountHistoryIndexBlockKey(addressHash, blockID), data); err != nil {
|
||||
log.Crit("Failed to store account index block", "err", err)
|
||||
}
|
||||
}
|
||||
|
||||
// DeleteAccountHistoryIndexBlock deletes the specified index block from the database.
|
||||
func DeleteAccountHistoryIndexBlock(db ethdb.KeyValueWriter, addressHash common.Hash, blockID uint32) {
|
||||
if err := db.Delete(accountHistoryIndexBlockKey(addressHash, blockID)); err != nil {
|
||||
log.Crit("Failed to delete account index block", "err", err)
|
||||
}
|
||||
}
|
||||
|
||||
// ReadStorageHistoryIndexBlock retrieves the index block with the provided state
|
||||
// identifier along with the block id.
|
||||
func ReadStorageHistoryIndexBlock(db ethdb.KeyValueReader, addressHash common.Hash, storageHash common.Hash, blockID uint32) []byte {
|
||||
data, err := db.Get(storageHistoryIndexBlockKey(addressHash, storageHash, blockID))
|
||||
if err != nil || len(data) == 0 {
|
||||
return nil
|
||||
}
|
||||
return data
|
||||
}
|
||||
|
||||
// WriteStorageHistoryIndexBlock writes the provided index block into database.
|
||||
func WriteStorageHistoryIndexBlock(db ethdb.KeyValueWriter, addressHash common.Hash, storageHash common.Hash, id uint32, data []byte) {
|
||||
if err := db.Put(storageHistoryIndexBlockKey(addressHash, storageHash, id), data); err != nil {
|
||||
log.Crit("Failed to store storage index block", "err", err)
|
||||
}
|
||||
}
|
||||
|
||||
// DeleteStorageHistoryIndexBlock deletes the specified index block from the database.
|
||||
func DeleteStorageHistoryIndexBlock(db ethdb.KeyValueWriter, addressHash common.Hash, storageHash common.Hash, id uint32) {
|
||||
if err := db.Delete(storageHistoryIndexBlockKey(addressHash, storageHash, id)); err != nil {
|
||||
log.Crit("Failed to delete storage index block", "err", err)
|
||||
}
|
||||
}
|
||||
|
||||
// increaseKey increase the input key by one bit. Return nil if the entire
|
||||
// addition operation overflows.
|
||||
func increaseKey(key []byte) []byte {
|
||||
for i := len(key) - 1; i >= 0; i-- {
|
||||
key[i]++
|
||||
if key[i] != 0x0 {
|
||||
return key
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// DeleteStateHistoryIndex completely removes all history indexing data, including
|
||||
// indexes for accounts and storages.
|
||||
//
|
||||
// Note, this method assumes the storage space with prefix `StateHistoryIndexPrefix`
|
||||
// is exclusively occupied by the history indexing data!
|
||||
func DeleteStateHistoryIndex(db ethdb.KeyValueRangeDeleter) {
|
||||
start := StateHistoryIndexPrefix
|
||||
limit := increaseKey(bytes.Clone(StateHistoryIndexPrefix))
|
||||
|
||||
// Try to remove the data in the range by a loop, as the leveldb
|
||||
// doesn't support the native range deletion.
|
||||
for {
|
||||
err := db.DeleteRange(start, limit)
|
||||
if err == nil {
|
||||
return
|
||||
}
|
||||
if errors.Is(err, ethdb.ErrTooManyKeys) {
|
||||
continue
|
||||
}
|
||||
log.Crit("Failed to delete history index range", "err", err)
|
||||
}
|
||||
}
|
||||
|
|
@ -18,6 +18,7 @@ package rawdb
|
|||
|
||||
import (
|
||||
"encoding/binary"
|
||||
"errors"
|
||||
|
||||
"github.com/ethereum/go-ethereum/common"
|
||||
"github.com/ethereum/go-ethereum/ethdb"
|
||||
|
|
@ -255,6 +256,36 @@ func ReadStateHistory(db ethdb.AncientReaderOp, id uint64) ([]byte, []byte, []by
|
|||
return meta, accountIndex, storageIndex, accountData, storageData, nil
|
||||
}
|
||||
|
||||
// ReadStateHistoryList retrieves a list of state histories from database with
|
||||
// specific range. Compute the position of state history in freezer by minus one
|
||||
// since the id of first state history starts from one(zero for initial state).
|
||||
func ReadStateHistoryList(db ethdb.AncientReaderOp, start uint64, count uint64) ([][]byte, [][]byte, [][]byte, [][]byte, [][]byte, error) {
|
||||
metaList, err := db.AncientRange(stateHistoryMeta, start-1, count, 0)
|
||||
if err != nil {
|
||||
return nil, nil, nil, nil, nil, err
|
||||
}
|
||||
aIndexList, err := db.AncientRange(stateHistoryAccountIndex, start-1, count, 0)
|
||||
if err != nil {
|
||||
return nil, nil, nil, nil, nil, err
|
||||
}
|
||||
sIndexList, err := db.AncientRange(stateHistoryStorageIndex, start-1, count, 0)
|
||||
if err != nil {
|
||||
return nil, nil, nil, nil, nil, err
|
||||
}
|
||||
aDataList, err := db.AncientRange(stateHistoryAccountData, start-1, count, 0)
|
||||
if err != nil {
|
||||
return nil, nil, nil, nil, nil, err
|
||||
}
|
||||
sDataList, err := db.AncientRange(stateHistoryStorageData, start-1, count, 0)
|
||||
if err != nil {
|
||||
return nil, nil, nil, nil, nil, err
|
||||
}
|
||||
if len(metaList) != len(aIndexList) || len(metaList) != len(sIndexList) || len(metaList) != len(aDataList) || len(metaList) != len(sDataList) {
|
||||
return nil, nil, nil, nil, nil, errors.New("state history is corrupted")
|
||||
}
|
||||
return metaList, aIndexList, sIndexList, aDataList, sDataList, nil
|
||||
}
|
||||
|
||||
// WriteStateHistory writes the provided state history to database. Compute the
|
||||
// position of state history in freezer by minus one since the id of first state
|
||||
// history starts from one(zero for initial state).
|
||||
|
|
|
|||
|
|
@ -412,6 +412,9 @@ func InspectDatabase(db ethdb.Database, keyPrefix, keyStart []byte) error {
|
|||
filterMapLastBlock stat
|
||||
filterMapBlockLV stat
|
||||
|
||||
// Path-mode archive data
|
||||
stateIndex stat
|
||||
|
||||
// Verkle statistics
|
||||
verkleTries stat
|
||||
verkleStateLookups stat
|
||||
|
|
@ -489,6 +492,10 @@ func InspectDatabase(db ethdb.Database, keyPrefix, keyStart []byte) error {
|
|||
case bytes.HasPrefix(key, bloomBitsMetaPrefix) && len(key) < len(bloomBitsMetaPrefix)+8:
|
||||
bloomBits.Add(size)
|
||||
|
||||
// Path-based historic state indexes
|
||||
case bytes.HasPrefix(key, StateHistoryIndexPrefix) && len(key) >= len(StateHistoryIndexPrefix)+common.HashLength:
|
||||
stateIndex.Add(size)
|
||||
|
||||
// Verkle trie data is detected, determine the sub-category
|
||||
case bytes.HasPrefix(key, VerklePrefix):
|
||||
remain := key[len(VerklePrefix):]
|
||||
|
|
@ -544,6 +551,7 @@ func InspectDatabase(db ethdb.Database, keyPrefix, keyStart []byte) error {
|
|||
{"Key-Value store", "Path trie state lookups", stateLookups.Size(), stateLookups.Count()},
|
||||
{"Key-Value store", "Path trie account nodes", accountTries.Size(), accountTries.Count()},
|
||||
{"Key-Value store", "Path trie storage nodes", storageTries.Size(), storageTries.Count()},
|
||||
{"Key-Value store", "Path state history indexes", stateIndex.Size(), stateIndex.Count()},
|
||||
{"Key-Value store", "Verkle trie nodes", verkleTries.Size(), verkleTries.Count()},
|
||||
{"Key-Value store", "Verkle trie state lookups", verkleStateLookups.Size(), verkleStateLookups.Count()},
|
||||
{"Key-Value store", "Trie preimages", preimages.Size(), preimages.Count()},
|
||||
|
|
@ -591,7 +599,7 @@ var knownMetadataKeys = [][]byte{
|
|||
snapshotGeneratorKey, snapshotRecoveryKey, txIndexTailKey, fastTxLookupLimitKey,
|
||||
uncleanShutdownKey, badBlockKey, transitionStatusKey, skeletonSyncStatusKey,
|
||||
persistentStateIDKey, trieJournalKey, snapshotSyncStatusKey, snapSyncStatusFlagKey,
|
||||
filterMapsRangeKey,
|
||||
filterMapsRangeKey, headStateHistoryIndexKey,
|
||||
}
|
||||
|
||||
// printChainMetadata prints out chain metadata to stderr.
|
||||
|
|
|
|||
|
|
@ -76,6 +76,10 @@ var (
|
|||
// trieJournalKey tracks the in-memory trie node layers across restarts.
|
||||
trieJournalKey = []byte("TrieJournal")
|
||||
|
||||
// headStateHistoryIndexKey tracks the ID of the latest state history that has
|
||||
// been indexed.
|
||||
headStateHistoryIndexKey = []byte("LastStateHistoryIndex")
|
||||
|
||||
// txIndexTailKey tracks the oldest block whose transactions have been indexed.
|
||||
txIndexTailKey = []byte("TransactionIndexTail")
|
||||
|
||||
|
|
@ -117,6 +121,13 @@ var (
|
|||
TrieNodeStoragePrefix = []byte("O") // TrieNodeStoragePrefix + accountHash + hexPath -> trie node
|
||||
stateIDPrefix = []byte("L") // stateIDPrefix + state root -> state id
|
||||
|
||||
// State history indexing within path-based storage scheme
|
||||
StateHistoryIndexPrefix = []byte("m") // The global prefix of state history index data
|
||||
StateHistoryAccountMetadataPrefix = []byte("ma") // StateHistoryAccountMetadataPrefix + account address hash => account metadata
|
||||
StateHistoryStorageMetadataPrefix = []byte("ms") // StateHistoryStorageMetadataPrefix + account address hash + storage slot hash => slot metadata
|
||||
StateHistoryAccountBlockPrefix = []byte("mba") // StateHistoryAccountBlockPrefix + account address hash + block_number => account block
|
||||
StateHistoryStorageBlockPrefix = []byte("mbs") // StateHistoryStorageBlockPrefix + account address hash + storage slot hash + block_number => slot block
|
||||
|
||||
// VerklePrefix is the database prefix for Verkle trie data, which includes:
|
||||
// (a) Trie nodes
|
||||
// (b) In-memory trie node journal
|
||||
|
|
@ -362,3 +373,27 @@ func filterMapBlockLVKey(number uint64) []byte {
|
|||
binary.BigEndian.PutUint64(key[l:], number)
|
||||
return key
|
||||
}
|
||||
|
||||
// accountHistoryIndexKey = StateHistoryAccountMetadataPrefix + addressHash
|
||||
func accountHistoryIndexKey(addressHash common.Hash) []byte {
|
||||
return append(StateHistoryAccountMetadataPrefix, addressHash.Bytes()...)
|
||||
}
|
||||
|
||||
// storageHistoryIndexKey = StateHistoryStorageMetadataPrefix + addressHash + storageHash
|
||||
func storageHistoryIndexKey(addressHash common.Hash, storageHash common.Hash) []byte {
|
||||
return append(append(StateHistoryStorageMetadataPrefix, addressHash.Bytes()...), storageHash.Bytes()...)
|
||||
}
|
||||
|
||||
// accountHistoryIndexBlockKey = StateHistoryAccountBlockPrefix + addressHash + blockID
|
||||
func accountHistoryIndexBlockKey(addressHash common.Hash, blockID uint32) []byte {
|
||||
var buf [4]byte
|
||||
binary.BigEndian.PutUint32(buf[:], blockID)
|
||||
return append(append(StateHistoryAccountBlockPrefix, addressHash.Bytes()...), buf[:]...)
|
||||
}
|
||||
|
||||
// storageHistoryIndexBlockKey = StateHistoryStorageBlockPrefix + addressHash + storageHash + blockID
|
||||
func storageHistoryIndexBlockKey(addressHash common.Hash, storageHash common.Hash, blockID uint32) []byte {
|
||||
var buf [4]byte
|
||||
binary.BigEndian.PutUint32(buf[:], blockID)
|
||||
return append(append(append(StateHistoryStorageBlockPrefix, addressHash.Bytes()...), storageHash.Bytes()...), buf[:]...)
|
||||
}
|
||||
|
|
|
|||
|
|
@ -83,6 +83,7 @@ type Database struct {
|
|||
estimatedCompDebtGauge *metrics.Gauge // Gauge for tracking the number of bytes that need to be compacted
|
||||
liveCompGauge *metrics.Gauge // Gauge for tracking the number of in-progress compactions
|
||||
liveCompSizeGauge *metrics.Gauge // Gauge for tracking the size of in-progress compactions
|
||||
liveIterGauge *metrics.Gauge // Gauge for tracking the number of live database iterators
|
||||
levelsGauge []*metrics.Gauge // Gauge for tracking the number of tables in levels
|
||||
|
||||
quitLock sync.RWMutex // Mutex protecting the quit channel and the closed flag
|
||||
|
|
@ -326,6 +327,7 @@ func New(file string, cache int, handles int, namespace string, readonly bool) (
|
|||
db.estimatedCompDebtGauge = metrics.GetOrRegisterGauge(namespace+"compact/estimateDebt", nil)
|
||||
db.liveCompGauge = metrics.GetOrRegisterGauge(namespace+"compact/live/count", nil)
|
||||
db.liveCompSizeGauge = metrics.GetOrRegisterGauge(namespace+"compact/live/size", nil)
|
||||
db.liveIterGauge = metrics.GetOrRegisterGauge(namespace+"iter/count", nil)
|
||||
|
||||
// Start up the metrics gathering and return
|
||||
go db.meter(metricsGatheringInterval, namespace)
|
||||
|
|
@ -582,6 +584,7 @@ func (d *Database) meter(refresh time.Duration, namespace string) {
|
|||
d.seekCompGauge.Update(stats.Compact.ReadCount)
|
||||
d.liveCompGauge.Update(stats.Compact.NumInProgress)
|
||||
d.liveCompSizeGauge.Update(stats.Compact.InProgressBytes)
|
||||
d.liveIterGauge.Update(stats.TableIters)
|
||||
|
||||
d.liveMemTablesGauge.Update(stats.MemTable.Count)
|
||||
d.zombieMemTablesGauge.Update(stats.MemTable.ZombieCount)
|
||||
|
|
|
|||
|
|
@ -114,11 +114,12 @@ type layer interface {
|
|||
|
||||
// Config contains the settings for database.
|
||||
type Config struct {
|
||||
StateHistory uint64 // Number of recent blocks to maintain state history for
|
||||
TrieCleanSize int // Maximum memory allowance (in bytes) for caching clean trie nodes
|
||||
StateCleanSize int // Maximum memory allowance (in bytes) for caching clean state data
|
||||
WriteBufferSize int // Maximum memory allowance (in bytes) for write buffer
|
||||
ReadOnly bool // Flag whether the database is opened in read only mode
|
||||
StateHistory uint64 // Number of recent blocks to maintain state history for
|
||||
EnableStateIndexing bool // Whether to enable state history indexing for external state access
|
||||
TrieCleanSize int // Maximum memory allowance (in bytes) for caching clean trie nodes
|
||||
StateCleanSize int // Maximum memory allowance (in bytes) for caching clean state data
|
||||
WriteBufferSize int // Maximum memory allowance (in bytes) for write buffer
|
||||
ReadOnly bool // Flag whether the database is opened in read only mode
|
||||
|
||||
// Testing configurations
|
||||
SnapshotNoBuild bool // Flag Whether the state generation is allowed
|
||||
|
|
@ -149,7 +150,12 @@ func (c *Config) fields() []interface{} {
|
|||
list = append(list, "triecache", common.StorageSize(c.TrieCleanSize))
|
||||
list = append(list, "statecache", common.StorageSize(c.StateCleanSize))
|
||||
list = append(list, "buffer", common.StorageSize(c.WriteBufferSize))
|
||||
list = append(list, "history", c.StateHistory)
|
||||
|
||||
if c.StateHistory == 0 {
|
||||
list = append(list, "history", "entire chain")
|
||||
} else {
|
||||
list = append(list, "history", fmt.Sprintf("last %d blocks", c.StateHistory))
|
||||
}
|
||||
return list
|
||||
}
|
||||
|
||||
|
|
@ -213,6 +219,7 @@ type Database struct {
|
|||
tree *layerTree // The group for all known layers
|
||||
freezer ethdb.ResettableAncientStore // Freezer for storing trie histories, nil possible in tests
|
||||
lock sync.RWMutex // Lock to prevent mutations from happening at the same time
|
||||
indexer *historyIndexer // History indexer
|
||||
}
|
||||
|
||||
// New attempts to load an already existing layer from a persistent key-value
|
||||
|
|
@ -262,6 +269,11 @@ func New(diskdb ethdb.Database, config *Config, isVerkle bool) *Database {
|
|||
if err := db.setStateGenerator(); err != nil {
|
||||
log.Crit("Failed to setup the generator", "err", err)
|
||||
}
|
||||
// TODO (rjl493456442) disable the background indexing in read-only mode
|
||||
if db.freezer != nil && db.config.EnableStateIndexing {
|
||||
db.indexer = newHistoryIndexer(db.diskdb, db.freezer, db.tree.bottom().stateID())
|
||||
log.Info("Enabled state history indexing")
|
||||
}
|
||||
fields := config.fields()
|
||||
if db.isVerkle {
|
||||
fields = append(fields, "verkle", true)
|
||||
|
|
@ -299,6 +311,11 @@ func (db *Database) repairHistory() error {
|
|||
log.Crit("Failed to retrieve head of state history", "err", err)
|
||||
}
|
||||
if frozen != 0 {
|
||||
// TODO(rjl493456442) would be better to group them into a batch.
|
||||
//
|
||||
// Purge all state history indexing data first
|
||||
rawdb.DeleteStateHistoryIndexMetadata(db.diskdb)
|
||||
rawdb.DeleteStateHistoryIndex(db.diskdb)
|
||||
err := db.freezer.Reset()
|
||||
if err != nil {
|
||||
log.Crit("Failed to reset state histories", "err", err)
|
||||
|
|
@ -487,6 +504,11 @@ func (db *Database) Enable(root common.Hash) error {
|
|||
// mappings can be huge and might take a while to clear
|
||||
// them, just leave them in disk and wait for overwriting.
|
||||
if db.freezer != nil {
|
||||
// TODO(rjl493456442) would be better to group them into a batch.
|
||||
//
|
||||
// Purge all state history indexing data first
|
||||
rawdb.DeleteStateHistoryIndexMetadata(db.diskdb)
|
||||
rawdb.DeleteStateHistoryIndex(db.diskdb)
|
||||
if err := db.freezer.Reset(); err != nil {
|
||||
return err
|
||||
}
|
||||
|
|
@ -611,6 +633,10 @@ func (db *Database) Close() error {
|
|||
}
|
||||
dl.resetCache() // release the memory held by clean cache
|
||||
|
||||
// Terminate the background state history indexer
|
||||
if db.indexer != nil {
|
||||
db.indexer.close()
|
||||
}
|
||||
// Close the attached state history freezer.
|
||||
if db.freezer == nil {
|
||||
return nil
|
||||
|
|
|
|||
|
|
@ -121,15 +121,16 @@ type tester struct {
|
|||
snapStorages map[common.Hash]map[common.Hash]map[common.Hash][]byte // Keyed by the hash of account address and the hash of storage key
|
||||
}
|
||||
|
||||
func newTester(t *testing.T, historyLimit uint64, isVerkle bool, layers int) *tester {
|
||||
func newTester(t *testing.T, historyLimit uint64, isVerkle bool, layers int, enableIndex bool) *tester {
|
||||
var (
|
||||
disk, _ = rawdb.Open(rawdb.NewMemoryDatabase(), rawdb.OpenOptions{Ancient: t.TempDir()})
|
||||
db = New(disk, &Config{
|
||||
StateHistory: historyLimit,
|
||||
TrieCleanSize: 256 * 1024,
|
||||
StateCleanSize: 256 * 1024,
|
||||
WriteBufferSize: 256 * 1024,
|
||||
NoAsyncFlush: true,
|
||||
StateHistory: historyLimit,
|
||||
EnableStateIndexing: enableIndex,
|
||||
TrieCleanSize: 256 * 1024,
|
||||
StateCleanSize: 256 * 1024,
|
||||
WriteBufferSize: 256 * 1024,
|
||||
NoAsyncFlush: true,
|
||||
}, isVerkle)
|
||||
|
||||
obj = &tester{
|
||||
|
|
@ -164,6 +165,20 @@ func (t *tester) hashPreimage(hash common.Hash) common.Hash {
|
|||
return common.BytesToHash(t.preimages[hash])
|
||||
}
|
||||
|
||||
func (t *tester) extend(layers int) {
|
||||
for i := 0; i < layers; i++ {
|
||||
var parent = types.EmptyRootHash
|
||||
if len(t.roots) != 0 {
|
||||
parent = t.roots[len(t.roots)-1]
|
||||
}
|
||||
root, nodes, states := t.generate(parent, true)
|
||||
if err := t.db.Update(root, parent, uint64(i), nodes, states); err != nil {
|
||||
panic(fmt.Errorf("failed to update state changes, err: %w", err))
|
||||
}
|
||||
t.roots = append(t.roots, root)
|
||||
}
|
||||
}
|
||||
|
||||
func (t *tester) release() {
|
||||
t.db.Close()
|
||||
t.db.diskdb.Close()
|
||||
|
|
@ -451,7 +466,7 @@ func TestDatabaseRollback(t *testing.T) {
|
|||
}()
|
||||
|
||||
// Verify state histories
|
||||
tester := newTester(t, 0, false, 32)
|
||||
tester := newTester(t, 0, false, 32, false)
|
||||
defer tester.release()
|
||||
|
||||
if err := tester.verifyHistory(); err != nil {
|
||||
|
|
@ -485,7 +500,7 @@ func TestDatabaseRecoverable(t *testing.T) {
|
|||
}()
|
||||
|
||||
var (
|
||||
tester = newTester(t, 0, false, 12)
|
||||
tester = newTester(t, 0, false, 12, false)
|
||||
index = tester.bottomIndex()
|
||||
)
|
||||
defer tester.release()
|
||||
|
|
@ -529,7 +544,7 @@ func TestDisable(t *testing.T) {
|
|||
maxDiffLayers = 128
|
||||
}()
|
||||
|
||||
tester := newTester(t, 0, false, 32)
|
||||
tester := newTester(t, 0, false, 32, false)
|
||||
defer tester.release()
|
||||
|
||||
stored := crypto.Keccak256Hash(rawdb.ReadAccountTrieNode(tester.db.diskdb, nil))
|
||||
|
|
@ -571,7 +586,7 @@ func TestCommit(t *testing.T) {
|
|||
maxDiffLayers = 128
|
||||
}()
|
||||
|
||||
tester := newTester(t, 0, false, 12)
|
||||
tester := newTester(t, 0, false, 12, false)
|
||||
defer tester.release()
|
||||
|
||||
if err := tester.db.Commit(tester.lastHash(), false); err != nil {
|
||||
|
|
@ -601,7 +616,7 @@ func TestJournal(t *testing.T) {
|
|||
maxDiffLayers = 128
|
||||
}()
|
||||
|
||||
tester := newTester(t, 0, false, 12)
|
||||
tester := newTester(t, 0, false, 12, false)
|
||||
defer tester.release()
|
||||
|
||||
if err := tester.db.Journal(tester.lastHash()); err != nil {
|
||||
|
|
@ -631,7 +646,7 @@ func TestCorruptedJournal(t *testing.T) {
|
|||
maxDiffLayers = 128
|
||||
}()
|
||||
|
||||
tester := newTester(t, 0, false, 12)
|
||||
tester := newTester(t, 0, false, 12, false)
|
||||
defer tester.release()
|
||||
|
||||
if err := tester.db.Journal(tester.lastHash()); err != nil {
|
||||
|
|
@ -679,7 +694,7 @@ func TestTailTruncateHistory(t *testing.T) {
|
|||
maxDiffLayers = 128
|
||||
}()
|
||||
|
||||
tester := newTester(t, 10, false, 12)
|
||||
tester := newTester(t, 10, false, 12, false)
|
||||
defer tester.release()
|
||||
|
||||
tester.db.Close()
|
||||
|
|
|
|||
|
|
@ -355,6 +355,12 @@ func (dl *diskLayer) commit(bottom *diffLayer, force bool) (*diskLayer, error) {
|
|||
overflow = true
|
||||
oldest = bottom.stateID() - limit + 1 // track the id of history **after truncation**
|
||||
}
|
||||
// Notify the state history indexer for newly created history
|
||||
if dl.db.indexer != nil {
|
||||
if err := dl.db.indexer.extend(bottom.stateID()); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
}
|
||||
// Mark the diskLayer as stale before applying any mutations on top.
|
||||
dl.stale = true
|
||||
|
|
@ -477,6 +483,12 @@ func (dl *diskLayer) revert(h *history) (*diskLayer, error) {
|
|||
|
||||
dl.stale = true
|
||||
|
||||
// Unindex the corresponding state history
|
||||
if dl.db.indexer != nil {
|
||||
if err := dl.db.indexer.shorten(dl.id); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
// State change may be applied to node buffer, or the persistent
|
||||
// state, depends on if node buffer is empty or not. If the node
|
||||
// buffer is not empty, it means that the state transition that
|
||||
|
|
|
|||
|
|
@ -505,25 +505,41 @@ func (h *history) decode(accountData, storageData, accountIndexes, storageIndexe
|
|||
|
||||
// readHistory reads and decodes the state history object by the given id.
|
||||
func readHistory(reader ethdb.AncientReader, id uint64) (*history, error) {
|
||||
blob := rawdb.ReadStateHistoryMeta(reader, id)
|
||||
if len(blob) == 0 {
|
||||
return nil, fmt.Errorf("state history not found %d", id)
|
||||
mData, accountIndexes, storageIndexes, accountData, storageData, err := rawdb.ReadStateHistory(reader, id)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
var m meta
|
||||
if err := m.decode(blob); err != nil {
|
||||
if err := m.decode(mData); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
var (
|
||||
dec = history{meta: &m}
|
||||
accountData = rawdb.ReadStateAccountHistory(reader, id)
|
||||
storageData = rawdb.ReadStateStorageHistory(reader, id)
|
||||
accountIndexes = rawdb.ReadStateAccountIndex(reader, id)
|
||||
storageIndexes = rawdb.ReadStateStorageIndex(reader, id)
|
||||
)
|
||||
if err := dec.decode(accountData, storageData, accountIndexes, storageIndexes); err != nil {
|
||||
h := history{meta: &m}
|
||||
if err := h.decode(accountData, storageData, accountIndexes, storageIndexes); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return &dec, nil
|
||||
return &h, nil
|
||||
}
|
||||
|
||||
// readHistories reads and decodes a list of state histories with the specific
|
||||
// history range.
|
||||
func readHistories(freezer ethdb.AncientReader, start uint64, count uint64) ([]*history, error) {
|
||||
var histories []*history
|
||||
metaList, aIndexList, sIndexList, aDataList, sDataList, err := rawdb.ReadStateHistoryList(freezer, start, count)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
for i := 0; i < len(metaList); i++ {
|
||||
var m meta
|
||||
if err := m.decode(metaList[i]); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
h := history{meta: &m}
|
||||
if err := h.decode(aDataList[i], sDataList[i], aIndexList[i], sIndexList[i]); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
histories = append(histories, &h)
|
||||
}
|
||||
return histories, nil
|
||||
}
|
||||
|
||||
// writeHistory persists the state history with the provided state set.
|
||||
|
|
|
|||
436
triedb/pathdb/history_index.go
Normal file
436
triedb/pathdb/history_index.go
Normal file
|
|
@ -0,0 +1,436 @@
|
|||
// Copyright 2025 The go-ethereum Authors
|
||||
// This file is part of the go-ethereum library.
|
||||
//
|
||||
// The go-ethereum library is free software: you can redistribute it and/or modify
|
||||
// it under the terms of the GNU Lesser General Public License as published by
|
||||
// the Free Software Foundation, either version 3 of the License, or
|
||||
// (at your option) any later version.
|
||||
//
|
||||
// The go-ethereum library is distributed in the hope that it will be useful,
|
||||
// but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
// GNU Lesser General Public License for more details.
|
||||
//
|
||||
// You should have received a copy of the GNU Lesser General Public License
|
||||
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/
|
||||
|
||||
package pathdb
|
||||
|
||||
import (
|
||||
"errors"
|
||||
"fmt"
|
||||
"math"
|
||||
"sort"
|
||||
|
||||
"github.com/ethereum/go-ethereum/core/rawdb"
|
||||
"github.com/ethereum/go-ethereum/ethdb"
|
||||
)
|
||||
|
||||
// parseIndex parses the index data with the supplied byte stream. The index data
|
||||
// is a list of fixed-sized metadata. Empty metadata is regarded as invalid.
|
||||
func parseIndex(blob []byte) ([]*indexBlockDesc, error) {
|
||||
if len(blob) == 0 {
|
||||
return nil, errors.New("empty state history index")
|
||||
}
|
||||
if len(blob)%indexBlockDescSize != 0 {
|
||||
return nil, fmt.Errorf("corrupted state index, len: %d", len(blob))
|
||||
}
|
||||
var (
|
||||
lastID uint32
|
||||
descList []*indexBlockDesc
|
||||
)
|
||||
for i := 0; i < len(blob)/indexBlockDescSize; i++ {
|
||||
var desc indexBlockDesc
|
||||
desc.decode(blob[i*indexBlockDescSize : (i+1)*indexBlockDescSize])
|
||||
if desc.empty() {
|
||||
return nil, errors.New("empty state history index block")
|
||||
}
|
||||
if lastID != 0 {
|
||||
if lastID+1 != desc.id {
|
||||
return nil, fmt.Errorf("index block id is out of order, last-id: %d, this-id: %d", lastID, desc.id)
|
||||
}
|
||||
// Theoretically, order should be validated between consecutive index blocks,
|
||||
// ensuring that elements within them are strictly ordered. However, since
|
||||
// tracking the minimum element in each block has non-trivial storage overhead,
|
||||
// this check is optimistically omitted.
|
||||
//
|
||||
// TODO(rjl493456442) the minimal element can be resolved from the index block,
|
||||
// evaluate the check cost (mostly IO overhead).
|
||||
|
||||
/* if desc.min <= lastMax {
|
||||
return nil, fmt.Errorf("index block range is out of order, last-max: %d, this-min: %d", lastMax, desc.min)
|
||||
}*/
|
||||
}
|
||||
lastID = desc.id
|
||||
descList = append(descList, &desc)
|
||||
}
|
||||
return descList, nil
|
||||
}
|
||||
|
||||
// indexReader is the structure to look up the state history index records
|
||||
// associated with the specific state element.
|
||||
type indexReader struct {
|
||||
db ethdb.KeyValueReader
|
||||
descList []*indexBlockDesc
|
||||
readers map[uint32]*blockReader
|
||||
state stateIdent
|
||||
}
|
||||
|
||||
// loadIndexData loads the index data associated with the specified state.
|
||||
func loadIndexData(db ethdb.KeyValueReader, state stateIdent) ([]*indexBlockDesc, error) {
|
||||
var blob []byte
|
||||
if state.account {
|
||||
blob = rawdb.ReadAccountHistoryIndex(db, state.addressHash)
|
||||
} else {
|
||||
blob = rawdb.ReadStorageHistoryIndex(db, state.addressHash, state.storageHash)
|
||||
}
|
||||
if len(blob) == 0 {
|
||||
return nil, nil
|
||||
}
|
||||
return parseIndex(blob)
|
||||
}
|
||||
|
||||
// newIndexReader constructs a index reader for the specified state. Reader with
|
||||
// empty data is allowed.
|
||||
func newIndexReader(db ethdb.KeyValueReader, state stateIdent) (*indexReader, error) {
|
||||
descList, err := loadIndexData(db, state)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return &indexReader{
|
||||
descList: descList,
|
||||
readers: make(map[uint32]*blockReader),
|
||||
db: db,
|
||||
state: state,
|
||||
}, nil
|
||||
}
|
||||
|
||||
// refresh reloads the last section of index data to account for any additional
|
||||
// elements that may have been written to disk.
|
||||
func (r *indexReader) refresh() error {
|
||||
// Release the reader for the last section of index data, as its content
|
||||
// may have been modified by additional elements written to the disk.
|
||||
if len(r.descList) != 0 {
|
||||
last := r.descList[len(r.descList)-1]
|
||||
if !last.full() {
|
||||
delete(r.readers, last.id)
|
||||
}
|
||||
}
|
||||
descList, err := loadIndexData(r.db, r.state)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
r.descList = descList
|
||||
return nil
|
||||
}
|
||||
|
||||
// readGreaterThan locates the first element that is greater than the specified
|
||||
// id. If no such element is found, MaxUint64 is returned.
|
||||
func (r *indexReader) readGreaterThan(id uint64) (uint64, error) {
|
||||
index := sort.Search(len(r.descList), func(i int) bool {
|
||||
return id < r.descList[i].max
|
||||
})
|
||||
if index == len(r.descList) {
|
||||
return math.MaxUint64, nil
|
||||
}
|
||||
desc := r.descList[index]
|
||||
|
||||
br, ok := r.readers[desc.id]
|
||||
if !ok {
|
||||
var (
|
||||
err error
|
||||
blob []byte
|
||||
)
|
||||
if r.state.account {
|
||||
blob = rawdb.ReadAccountHistoryIndexBlock(r.db, r.state.addressHash, desc.id)
|
||||
} else {
|
||||
blob = rawdb.ReadStorageHistoryIndexBlock(r.db, r.state.addressHash, r.state.storageHash, desc.id)
|
||||
}
|
||||
br, err = newBlockReader(blob)
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
r.readers[desc.id] = br
|
||||
}
|
||||
// The supplied ID is not greater than block.max, ensuring that an element
|
||||
// satisfying the condition can be found.
|
||||
return br.readGreaterThan(id)
|
||||
}
|
||||
|
||||
// indexWriter is responsible for writing index data for a specific state (either
|
||||
// an account or a storage slot). The state index follows a two-layer structure:
|
||||
// the first layer consists of a list of fixed-size metadata, each linked to a
|
||||
// second-layer block. The index data (monotonically increasing list of state
|
||||
// history ids) is stored in these second-layer index blocks, which are size
|
||||
// limited.
|
||||
type indexWriter struct {
|
||||
descList []*indexBlockDesc // The list of index block descriptions
|
||||
bw *blockWriter // The live index block writer
|
||||
frozen []*blockWriter // The finalized index block writers, waiting for flush
|
||||
lastID uint64 // The ID of the latest tracked history
|
||||
state stateIdent
|
||||
db ethdb.KeyValueReader
|
||||
}
|
||||
|
||||
// newIndexWriter constructs the index writer for the specified state.
|
||||
func newIndexWriter(db ethdb.KeyValueReader, state stateIdent) (*indexWriter, error) {
|
||||
var blob []byte
|
||||
if state.account {
|
||||
blob = rawdb.ReadAccountHistoryIndex(db, state.addressHash)
|
||||
} else {
|
||||
blob = rawdb.ReadStorageHistoryIndex(db, state.addressHash, state.storageHash)
|
||||
}
|
||||
if len(blob) == 0 {
|
||||
desc := newIndexBlockDesc(0)
|
||||
bw, _ := newBlockWriter(nil, desc)
|
||||
return &indexWriter{
|
||||
descList: []*indexBlockDesc{desc},
|
||||
bw: bw,
|
||||
state: state,
|
||||
db: db,
|
||||
}, nil
|
||||
}
|
||||
descList, err := parseIndex(blob)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
var (
|
||||
indexBlock []byte
|
||||
lastDesc = descList[len(descList)-1]
|
||||
)
|
||||
if state.account {
|
||||
indexBlock = rawdb.ReadAccountHistoryIndexBlock(db, state.addressHash, lastDesc.id)
|
||||
} else {
|
||||
indexBlock = rawdb.ReadStorageHistoryIndexBlock(db, state.addressHash, state.storageHash, lastDesc.id)
|
||||
}
|
||||
bw, err := newBlockWriter(indexBlock, lastDesc)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return &indexWriter{
|
||||
descList: descList,
|
||||
lastID: lastDesc.max,
|
||||
bw: bw,
|
||||
state: state,
|
||||
db: db,
|
||||
}, nil
|
||||
}
|
||||
|
||||
// append adds the new element into the index writer.
|
||||
func (w *indexWriter) append(id uint64) error {
|
||||
if id <= w.lastID {
|
||||
return fmt.Errorf("append element out of order, last: %d, this: %d", w.lastID, id)
|
||||
}
|
||||
if w.bw.full() {
|
||||
if err := w.rotate(); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
if err := w.bw.append(id); err != nil {
|
||||
return err
|
||||
}
|
||||
w.lastID = id
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// rotate creates a new index block for storing index records from scratch
|
||||
// and caches the current full index block for finalization.
|
||||
func (w *indexWriter) rotate() error {
|
||||
var (
|
||||
err error
|
||||
desc = newIndexBlockDesc(w.bw.desc.id + 1)
|
||||
)
|
||||
w.frozen = append(w.frozen, w.bw)
|
||||
w.bw, err = newBlockWriter(nil, desc)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
w.descList = append(w.descList, desc)
|
||||
return nil
|
||||
}
|
||||
|
||||
// finish finalizes all the frozen index block writers along with the live one
|
||||
// if it's not empty, committing the index block data and the index meta into
|
||||
// the supplied batch.
|
||||
//
|
||||
// This function is safe to be called multiple times.
|
||||
func (w *indexWriter) finish(batch ethdb.Batch) {
|
||||
var (
|
||||
writers = append(w.frozen, w.bw)
|
||||
descList = w.descList
|
||||
)
|
||||
// The live index block writer might be empty if the entire index write
|
||||
// is created from scratch, remove it from committing.
|
||||
if w.bw.empty() {
|
||||
writers = writers[:len(writers)-1]
|
||||
descList = descList[:len(descList)-1]
|
||||
}
|
||||
if len(writers) == 0 {
|
||||
return // nothing to commit
|
||||
}
|
||||
for _, bw := range writers {
|
||||
if w.state.account {
|
||||
rawdb.WriteAccountHistoryIndexBlock(batch, w.state.addressHash, bw.desc.id, bw.finish())
|
||||
} else {
|
||||
rawdb.WriteStorageHistoryIndexBlock(batch, w.state.addressHash, w.state.storageHash, bw.desc.id, bw.finish())
|
||||
}
|
||||
}
|
||||
w.frozen = nil // release all the frozen writers
|
||||
|
||||
buf := make([]byte, 0, indexBlockDescSize*len(descList))
|
||||
for _, desc := range descList {
|
||||
buf = append(buf, desc.encode()...)
|
||||
}
|
||||
if w.state.account {
|
||||
rawdb.WriteAccountHistoryIndex(batch, w.state.addressHash, buf)
|
||||
} else {
|
||||
rawdb.WriteStorageHistoryIndex(batch, w.state.addressHash, w.state.storageHash, buf)
|
||||
}
|
||||
}
|
||||
|
||||
// indexDeleter is responsible for deleting index data for a specific state.
|
||||
type indexDeleter struct {
|
||||
descList []*indexBlockDesc // The list of index block descriptions
|
||||
bw *blockWriter // The live index block writer
|
||||
dropped []uint32 // The list of index block id waiting for deleting
|
||||
lastID uint64 // The ID of the latest tracked history
|
||||
state stateIdent
|
||||
db ethdb.KeyValueReader
|
||||
}
|
||||
|
||||
// newIndexDeleter constructs the index deleter for the specified state.
|
||||
func newIndexDeleter(db ethdb.KeyValueReader, state stateIdent) (*indexDeleter, error) {
|
||||
var blob []byte
|
||||
if state.account {
|
||||
blob = rawdb.ReadAccountHistoryIndex(db, state.addressHash)
|
||||
} else {
|
||||
blob = rawdb.ReadStorageHistoryIndex(db, state.addressHash, state.storageHash)
|
||||
}
|
||||
if len(blob) == 0 {
|
||||
// TODO(rjl493456442) we can probably return an error here,
|
||||
// deleter with no data is meaningless.
|
||||
desc := newIndexBlockDesc(0)
|
||||
bw, _ := newBlockWriter(nil, desc)
|
||||
return &indexDeleter{
|
||||
descList: []*indexBlockDesc{desc},
|
||||
bw: bw,
|
||||
state: state,
|
||||
db: db,
|
||||
}, nil
|
||||
}
|
||||
descList, err := parseIndex(blob)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
var (
|
||||
indexBlock []byte
|
||||
lastDesc = descList[len(descList)-1]
|
||||
)
|
||||
if state.account {
|
||||
indexBlock = rawdb.ReadAccountHistoryIndexBlock(db, state.addressHash, lastDesc.id)
|
||||
} else {
|
||||
indexBlock = rawdb.ReadStorageHistoryIndexBlock(db, state.addressHash, state.storageHash, lastDesc.id)
|
||||
}
|
||||
bw, err := newBlockWriter(indexBlock, lastDesc)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return &indexDeleter{
|
||||
descList: descList,
|
||||
lastID: lastDesc.max,
|
||||
bw: bw,
|
||||
state: state,
|
||||
db: db,
|
||||
}, nil
|
||||
}
|
||||
|
||||
// empty returns an flag indicating whether the state index is empty.
|
||||
func (d *indexDeleter) empty() bool {
|
||||
return d.bw.empty() && len(d.descList) == 1
|
||||
}
|
||||
|
||||
// pop removes the last written element from the index writer.
|
||||
func (d *indexDeleter) pop(id uint64) error {
|
||||
if id == 0 {
|
||||
return fmt.Errorf("zero history ID is not valid")
|
||||
}
|
||||
if id != d.lastID {
|
||||
return fmt.Errorf("pop element out of order, last: %d, this: %d", d.lastID, id)
|
||||
}
|
||||
if err := d.bw.pop(id); err != nil {
|
||||
return err
|
||||
}
|
||||
if !d.bw.empty() {
|
||||
d.lastID = d.bw.desc.max
|
||||
return nil
|
||||
}
|
||||
// Discarding the last block writer if it becomes empty by popping an element
|
||||
d.dropped = append(d.dropped, d.descList[len(d.descList)-1].id)
|
||||
|
||||
// Reset the entire index writer if it becomes empty after popping an element
|
||||
if d.empty() {
|
||||
d.lastID = 0
|
||||
return nil
|
||||
}
|
||||
d.descList = d.descList[:len(d.descList)-1]
|
||||
|
||||
// Open the previous block writer for deleting
|
||||
var (
|
||||
indexBlock []byte
|
||||
lastDesc = d.descList[len(d.descList)-1]
|
||||
)
|
||||
if d.state.account {
|
||||
indexBlock = rawdb.ReadAccountHistoryIndexBlock(d.db, d.state.addressHash, lastDesc.id)
|
||||
} else {
|
||||
indexBlock = rawdb.ReadStorageHistoryIndexBlock(d.db, d.state.addressHash, d.state.storageHash, lastDesc.id)
|
||||
}
|
||||
bw, err := newBlockWriter(indexBlock, lastDesc)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
d.bw = bw
|
||||
d.lastID = bw.desc.max
|
||||
return nil
|
||||
}
|
||||
|
||||
// finish deletes the empty index blocks and updates the index meta.
|
||||
//
|
||||
// This function is safe to be called multiple times.
|
||||
func (d *indexDeleter) finish(batch ethdb.Batch) {
|
||||
for _, id := range d.dropped {
|
||||
if d.state.account {
|
||||
rawdb.DeleteAccountHistoryIndexBlock(batch, d.state.addressHash, id)
|
||||
} else {
|
||||
rawdb.DeleteStorageHistoryIndexBlock(batch, d.state.addressHash, d.state.storageHash, id)
|
||||
}
|
||||
}
|
||||
d.dropped = nil
|
||||
|
||||
// Flush the content of last block writer, regardless it's dirty or not
|
||||
if !d.bw.empty() {
|
||||
if d.state.account {
|
||||
rawdb.WriteAccountHistoryIndexBlock(batch, d.state.addressHash, d.bw.desc.id, d.bw.finish())
|
||||
} else {
|
||||
rawdb.WriteStorageHistoryIndexBlock(batch, d.state.addressHash, d.state.storageHash, d.bw.desc.id, d.bw.finish())
|
||||
}
|
||||
}
|
||||
// Flush the index metadata into the supplied batch
|
||||
if d.empty() {
|
||||
if d.state.account {
|
||||
rawdb.DeleteAccountHistoryIndex(batch, d.state.addressHash)
|
||||
} else {
|
||||
rawdb.DeleteStorageHistoryIndex(batch, d.state.addressHash, d.state.storageHash)
|
||||
}
|
||||
} else {
|
||||
buf := make([]byte, 0, indexBlockDescSize*len(d.descList))
|
||||
for _, desc := range d.descList {
|
||||
buf = append(buf, desc.encode()...)
|
||||
}
|
||||
if d.state.account {
|
||||
rawdb.WriteAccountHistoryIndex(batch, d.state.addressHash, buf)
|
||||
} else {
|
||||
rawdb.WriteStorageHistoryIndex(batch, d.state.addressHash, d.state.storageHash, buf)
|
||||
}
|
||||
}
|
||||
}
|
||||
405
triedb/pathdb/history_index_block.go
Normal file
405
triedb/pathdb/history_index_block.go
Normal file
|
|
@ -0,0 +1,405 @@
|
|||
// Copyright 2025 The go-ethereum Authors
|
||||
// This file is part of the go-ethereum library.
|
||||
//
|
||||
// The go-ethereum library is free software: you can redistribute it and/or modify
|
||||
// it under the terms of the GNU Lesser General Public License as published by
|
||||
// the Free Software Foundation, either version 3 of the License, or
|
||||
// (at your option) any later version.
|
||||
//
|
||||
// The go-ethereum library is distributed in the hope that it will be useful,
|
||||
// but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
// GNU Lesser General Public License for more details.
|
||||
//
|
||||
// You should have received a copy of the GNU Lesser General Public License
|
||||
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/
|
||||
|
||||
package pathdb
|
||||
|
||||
import (
|
||||
"encoding/binary"
|
||||
"errors"
|
||||
"fmt"
|
||||
"math"
|
||||
"sort"
|
||||
)
|
||||
|
||||
const (
|
||||
indexBlockDescSize = 14 // The size of index block descriptor
|
||||
indexBlockEntriesCap = 4096 // The maximum number of entries can be grouped in a block
|
||||
indexBlockRestartLen = 256 // The restart interval length of index block
|
||||
historyIndexBatch = 1_000_000 // The number of state history indexes for constructing or deleting as batch
|
||||
)
|
||||
|
||||
// indexBlockDesc represents a descriptor for an index block, which contains a
|
||||
// list of state mutation records associated with a specific state (either an
|
||||
// account or a storage slot).
|
||||
type indexBlockDesc struct {
|
||||
max uint64 // The maximum state ID retained within the block
|
||||
entries uint16 // The number of state mutation records retained within the block
|
||||
id uint32 // The id of the index block
|
||||
}
|
||||
|
||||
func newIndexBlockDesc(id uint32) *indexBlockDesc {
|
||||
return &indexBlockDesc{id: id}
|
||||
}
|
||||
|
||||
// empty indicates whether the block is empty with no element retained.
|
||||
func (d *indexBlockDesc) empty() bool {
|
||||
return d.entries == 0
|
||||
}
|
||||
|
||||
// full indicates whether the number of elements in the block exceeds the
|
||||
// preconfigured limit.
|
||||
func (d *indexBlockDesc) full() bool {
|
||||
return d.entries >= indexBlockEntriesCap
|
||||
}
|
||||
|
||||
// encode packs index block descriptor into byte stream.
|
||||
func (d *indexBlockDesc) encode() []byte {
|
||||
var buf [indexBlockDescSize]byte
|
||||
binary.BigEndian.PutUint64(buf[0:8], d.max)
|
||||
binary.BigEndian.PutUint16(buf[8:10], d.entries)
|
||||
binary.BigEndian.PutUint32(buf[10:14], d.id)
|
||||
return buf[:]
|
||||
}
|
||||
|
||||
// decode unpacks index block descriptor from byte stream.
|
||||
func (d *indexBlockDesc) decode(blob []byte) {
|
||||
d.max = binary.BigEndian.Uint64(blob[:8])
|
||||
d.entries = binary.BigEndian.Uint16(blob[8:10])
|
||||
d.id = binary.BigEndian.Uint32(blob[10:14])
|
||||
}
|
||||
|
||||
// parseIndexBlock parses the index block with the supplied byte stream.
|
||||
// The index block format can be illustrated as below:
|
||||
//
|
||||
// +---->+------------------+
|
||||
// | | Chunk1 |
|
||||
// | +------------------+
|
||||
// | | ...... |
|
||||
// | +-->+------------------+
|
||||
// | | | ChunkN |
|
||||
// | | +------------------+
|
||||
// +-|---| Restart1 |
|
||||
// | | Restart... | 2N bytes
|
||||
// +---| RestartN |
|
||||
// +------------------+
|
||||
// | Restart count | 1 byte
|
||||
// +------------------+
|
||||
//
|
||||
// - Chunk list: A list of data chunks
|
||||
// - Restart list: A list of 2-byte pointers, each pointing to the start position of a chunk
|
||||
// - Restart count: The number of restarts in the block, stored at the end of the block (1 byte)
|
||||
//
|
||||
// Note: the pointer is encoded as a uint16, which is sufficient within a chunk.
|
||||
// A uint16 can cover offsets in the range [0, 65536), which is more than enough
|
||||
// to store 4096 integers.
|
||||
//
|
||||
// Each chunk begins with the full value of the first integer, followed by
|
||||
// subsequent integers representing the differences between the current value
|
||||
// and the preceding one. Integers are encoded with variable-size for best
|
||||
// storage efficiency. Each chunk can be illustrated as below.
|
||||
//
|
||||
// Restart ---> +----------------+
|
||||
// | Full integer |
|
||||
// +----------------+
|
||||
// | Diff with prev |
|
||||
// +----------------+
|
||||
// | ... |
|
||||
// +----------------+
|
||||
// | Diff with prev |
|
||||
// +----------------+
|
||||
//
|
||||
// Empty index block is regarded as invalid.
|
||||
func parseIndexBlock(blob []byte) ([]uint16, []byte, error) {
|
||||
if len(blob) < 1 {
|
||||
return nil, nil, fmt.Errorf("corrupted index block, len: %d", len(blob))
|
||||
}
|
||||
restartLen := blob[len(blob)-1]
|
||||
if restartLen == 0 {
|
||||
return nil, nil, errors.New("corrupted index block, no restart")
|
||||
}
|
||||
tailLen := int(restartLen)*2 + 1
|
||||
if len(blob) < tailLen {
|
||||
return nil, nil, fmt.Errorf("truncated restarts, size: %d, restarts: %d", len(blob), restartLen)
|
||||
}
|
||||
restarts := make([]uint16, 0, restartLen)
|
||||
for i := int(restartLen); i > 0; i-- {
|
||||
restart := binary.BigEndian.Uint16(blob[len(blob)-1-2*i:])
|
||||
restarts = append(restarts, restart)
|
||||
}
|
||||
// Validate that restart points are strictly ordered and within the valid
|
||||
// data range.
|
||||
var prev uint16
|
||||
for i := 0; i < len(restarts); i++ {
|
||||
if i != 0 {
|
||||
if restarts[i] <= prev {
|
||||
return nil, nil, fmt.Errorf("restart out of order, prev: %d, next: %d", prev, restarts[i])
|
||||
}
|
||||
}
|
||||
if int(restarts[i]) >= len(blob)-tailLen {
|
||||
return nil, nil, fmt.Errorf("invalid restart position, restart: %d, size: %d", restarts[i], len(blob)-tailLen)
|
||||
}
|
||||
prev = restarts[i]
|
||||
}
|
||||
return restarts, blob[:len(blob)-tailLen], nil
|
||||
}
|
||||
|
||||
// blockReader is the reader to access the element within a block.
|
||||
type blockReader struct {
|
||||
restarts []uint16
|
||||
data []byte
|
||||
}
|
||||
|
||||
// newBlockReader constructs the block reader with the supplied block data.
|
||||
func newBlockReader(blob []byte) (*blockReader, error) {
|
||||
restarts, data, err := parseIndexBlock(blob)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return &blockReader{
|
||||
restarts: restarts,
|
||||
data: data, // safe to own the slice
|
||||
}, nil
|
||||
}
|
||||
|
||||
// readGreaterThan locates the first element in the block that is greater than
|
||||
// the specified value. If no such element is found, MaxUint64 is returned.
|
||||
func (br *blockReader) readGreaterThan(id uint64) (uint64, error) {
|
||||
var err error
|
||||
index := sort.Search(len(br.restarts), func(i int) bool {
|
||||
item, n := binary.Uvarint(br.data[br.restarts[i]:])
|
||||
if n <= 0 {
|
||||
err = fmt.Errorf("failed to decode item at restart %d", br.restarts[i])
|
||||
}
|
||||
return item > id
|
||||
})
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
if index == 0 {
|
||||
item, _ := binary.Uvarint(br.data[br.restarts[0]:])
|
||||
return item, nil
|
||||
}
|
||||
var (
|
||||
start int
|
||||
limit int
|
||||
result uint64
|
||||
)
|
||||
if index == len(br.restarts) {
|
||||
// The element being searched falls within the last restart section,
|
||||
// there is no guarantee such element can be found.
|
||||
start = int(br.restarts[len(br.restarts)-1])
|
||||
limit = len(br.data)
|
||||
} else {
|
||||
// The element being searched falls within the non-last restart section,
|
||||
// such element can be found for sure.
|
||||
start = int(br.restarts[index-1])
|
||||
limit = int(br.restarts[index])
|
||||
}
|
||||
pos := start
|
||||
for pos < limit {
|
||||
x, n := binary.Uvarint(br.data[pos:])
|
||||
if pos == start {
|
||||
result = x
|
||||
} else {
|
||||
result += x
|
||||
}
|
||||
if result > id {
|
||||
return result, nil
|
||||
}
|
||||
pos += n
|
||||
}
|
||||
// The element which is greater than specified id is not found.
|
||||
if index == len(br.restarts) {
|
||||
return math.MaxUint64, nil
|
||||
}
|
||||
// The element which is the first one greater than the specified id
|
||||
// is exactly the one located at the restart point.
|
||||
item, _ := binary.Uvarint(br.data[br.restarts[index]:])
|
||||
return item, nil
|
||||
}
|
||||
|
||||
type blockWriter struct {
|
||||
desc *indexBlockDesc // Descriptor of the block
|
||||
restarts []uint16 // Offsets into the data slice, marking the start of each section
|
||||
scratch []byte // Buffer used for encoding full integers or value differences
|
||||
data []byte // Aggregated encoded data slice
|
||||
}
|
||||
|
||||
func newBlockWriter(blob []byte, desc *indexBlockDesc) (*blockWriter, error) {
|
||||
scratch := make([]byte, binary.MaxVarintLen64)
|
||||
if len(blob) == 0 {
|
||||
return &blockWriter{
|
||||
desc: desc,
|
||||
scratch: scratch,
|
||||
data: make([]byte, 0, 1024),
|
||||
}, nil
|
||||
}
|
||||
restarts, data, err := parseIndexBlock(blob)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return &blockWriter{
|
||||
desc: desc,
|
||||
restarts: restarts,
|
||||
scratch: scratch,
|
||||
data: data, // safe to own the slice
|
||||
}, nil
|
||||
}
|
||||
|
||||
// append adds a new element to the block. The new element must be greater than
|
||||
// the previous one. The provided ID is assumed to always be greater than 0.
|
||||
func (b *blockWriter) append(id uint64) error {
|
||||
if id == 0 {
|
||||
return errors.New("invalid zero id")
|
||||
}
|
||||
if id <= b.desc.max {
|
||||
return fmt.Errorf("append element out of order, last: %d, this: %d", b.desc.max, id)
|
||||
}
|
||||
// Rotate the current restart section if it's full
|
||||
if b.desc.entries%indexBlockRestartLen == 0 {
|
||||
// Save the offset within the data slice as the restart point
|
||||
// for the next section.
|
||||
b.restarts = append(b.restarts, uint16(len(b.data)))
|
||||
|
||||
// The restart point item can either be encoded in variable
|
||||
// size or fixed size. Although variable-size encoding is
|
||||
// slightly slower (2ns per operation), it is still relatively
|
||||
// fast, therefore, it's picked for better space efficiency.
|
||||
//
|
||||
// The first element in a restart range is encoded using its
|
||||
// full value.
|
||||
n := binary.PutUvarint(b.scratch[0:], id)
|
||||
b.data = append(b.data, b.scratch[:n]...)
|
||||
} else {
|
||||
// The current section is not full, append the element.
|
||||
// The element which is not the first one in the section
|
||||
// is encoded using the value difference from the preceding
|
||||
// element.
|
||||
n := binary.PutUvarint(b.scratch[0:], id-b.desc.max)
|
||||
b.data = append(b.data, b.scratch[:n]...)
|
||||
}
|
||||
b.desc.entries++
|
||||
|
||||
// The state history ID must be greater than 0.
|
||||
//if b.desc.min == 0 {
|
||||
// b.desc.min = id
|
||||
//}
|
||||
b.desc.max = id
|
||||
return nil
|
||||
}
|
||||
|
||||
// scanSection traverses the specified section and terminates if fn returns true.
|
||||
func (b *blockWriter) scanSection(section int, fn func(uint64, int) bool) {
|
||||
var (
|
||||
value uint64
|
||||
start = int(b.restarts[section])
|
||||
pos = start
|
||||
limit int
|
||||
)
|
||||
if section == len(b.restarts)-1 {
|
||||
limit = len(b.data)
|
||||
} else {
|
||||
limit = int(b.restarts[section+1])
|
||||
}
|
||||
for pos < limit {
|
||||
x, n := binary.Uvarint(b.data[pos:])
|
||||
if pos == start {
|
||||
value = x
|
||||
} else {
|
||||
value += x
|
||||
}
|
||||
if fn(value, pos) {
|
||||
return
|
||||
}
|
||||
pos += n
|
||||
}
|
||||
}
|
||||
|
||||
// sectionLast returns the last element in the specified section.
|
||||
func (b *blockWriter) sectionLast(section int) uint64 {
|
||||
var n uint64
|
||||
b.scanSection(section, func(v uint64, _ int) bool {
|
||||
n = v
|
||||
return false
|
||||
})
|
||||
return n
|
||||
}
|
||||
|
||||
// sectionSearch looks up the specified value in the given section,
|
||||
// the position and the preceding value will be returned if found.
|
||||
func (b *blockWriter) sectionSearch(section int, n uint64) (found bool, prev uint64, pos int) {
|
||||
b.scanSection(section, func(v uint64, p int) bool {
|
||||
if n == v {
|
||||
pos = p
|
||||
found = true
|
||||
return true // terminate iteration
|
||||
}
|
||||
prev = v
|
||||
return false // continue iteration
|
||||
})
|
||||
return found, prev, pos
|
||||
}
|
||||
|
||||
// pop removes the last element from the block. The assumption is held that block
|
||||
// writer must be non-empty.
|
||||
func (b *blockWriter) pop(id uint64) error {
|
||||
if id == 0 {
|
||||
return errors.New("invalid zero id")
|
||||
}
|
||||
if id != b.desc.max {
|
||||
return fmt.Errorf("pop element out of order, last: %d, this: %d", b.desc.max, id)
|
||||
}
|
||||
// If there is only one entry left, the entire block should be reset
|
||||
if b.desc.entries == 1 {
|
||||
//b.desc.min = 0
|
||||
b.desc.max = 0
|
||||
b.desc.entries = 0
|
||||
b.restarts = nil
|
||||
b.data = b.data[:0]
|
||||
return nil
|
||||
}
|
||||
// Pop the last restart section if the section becomes empty after removing
|
||||
// one element.
|
||||
if b.desc.entries%indexBlockRestartLen == 1 {
|
||||
b.data = b.data[:b.restarts[len(b.restarts)-1]]
|
||||
b.restarts = b.restarts[:len(b.restarts)-1]
|
||||
b.desc.max = b.sectionLast(len(b.restarts) - 1)
|
||||
b.desc.entries -= 1
|
||||
return nil
|
||||
}
|
||||
// Look up the element preceding the one to be popped, in order to update
|
||||
// the maximum element in the block.
|
||||
found, prev, pos := b.sectionSearch(len(b.restarts)-1, id)
|
||||
if !found {
|
||||
return fmt.Errorf("pop element is not found, last: %d, this: %d", b.desc.max, id)
|
||||
}
|
||||
b.desc.max = prev
|
||||
b.data = b.data[:pos]
|
||||
b.desc.entries -= 1
|
||||
return nil
|
||||
}
|
||||
|
||||
func (b *blockWriter) empty() bool {
|
||||
return b.desc.empty()
|
||||
}
|
||||
|
||||
func (b *blockWriter) full() bool {
|
||||
return b.desc.full()
|
||||
}
|
||||
|
||||
// finish finalizes the index block encoding by appending the encoded restart points
|
||||
// and the restart counter to the end of the block.
|
||||
//
|
||||
// This function is safe to be called multiple times.
|
||||
func (b *blockWriter) finish() []byte {
|
||||
var buf []byte
|
||||
for _, number := range b.restarts {
|
||||
binary.BigEndian.PutUint16(b.scratch[:2], number)
|
||||
buf = append(buf, b.scratch[:2]...)
|
||||
}
|
||||
buf = append(buf, byte(len(b.restarts)))
|
||||
return append(b.data, buf...)
|
||||
}
|
||||
216
triedb/pathdb/history_index_block_test.go
Normal file
216
triedb/pathdb/history_index_block_test.go
Normal file
|
|
@ -0,0 +1,216 @@
|
|||
// Copyright 2025 The go-ethereum Authors
|
||||
// This file is part of the go-ethereum library.
|
||||
//
|
||||
// The go-ethereum library is free software: you can redistribute it and/or modify
|
||||
// it under the terms of the GNU Lesser General Public License as published by
|
||||
// the Free Software Foundation, either version 3 of the License, or
|
||||
// (at your option) any later version.
|
||||
//
|
||||
// The go-ethereum library is distributed in the hope that it will be useful,
|
||||
// but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
// GNU Lesser General Public License for more details.
|
||||
//
|
||||
// You should have received a copy of the GNU Lesser General Public License
|
||||
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/
|
||||
|
||||
package pathdb
|
||||
|
||||
import (
|
||||
"math"
|
||||
"math/rand"
|
||||
"slices"
|
||||
"sort"
|
||||
"testing"
|
||||
)
|
||||
|
||||
func TestBlockReaderBasic(t *testing.T) {
|
||||
elements := []uint64{
|
||||
1, 5, 10, 11, 20,
|
||||
}
|
||||
bw, _ := newBlockWriter(nil, newIndexBlockDesc(0))
|
||||
for i := 0; i < len(elements); i++ {
|
||||
bw.append(elements[i])
|
||||
}
|
||||
|
||||
br, err := newBlockReader(bw.finish())
|
||||
if err != nil {
|
||||
t.Fatalf("Failed to construct the block reader, %v", err)
|
||||
}
|
||||
cases := []struct {
|
||||
value uint64
|
||||
result uint64
|
||||
}{
|
||||
{0, 1},
|
||||
{1, 5},
|
||||
{10, 11},
|
||||
{19, 20},
|
||||
{20, math.MaxUint64},
|
||||
{21, math.MaxUint64},
|
||||
}
|
||||
for _, c := range cases {
|
||||
got, err := br.readGreaterThan(c.value)
|
||||
if err != nil {
|
||||
t.Fatalf("Unexpected error, got %v", err)
|
||||
}
|
||||
if got != c.result {
|
||||
t.Fatalf("Unexpected result, got %v, wanted %v", got, c.result)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func TestBlockReaderLarge(t *testing.T) {
|
||||
var elements []uint64
|
||||
for i := 0; i < 1000; i++ {
|
||||
elements = append(elements, rand.Uint64())
|
||||
}
|
||||
slices.Sort(elements)
|
||||
|
||||
bw, _ := newBlockWriter(nil, newIndexBlockDesc(0))
|
||||
for i := 0; i < len(elements); i++ {
|
||||
bw.append(elements[i])
|
||||
}
|
||||
|
||||
br, err := newBlockReader(bw.finish())
|
||||
if err != nil {
|
||||
t.Fatalf("Failed to construct the block reader, %v", err)
|
||||
}
|
||||
for i := 0; i < 100; i++ {
|
||||
value := rand.Uint64()
|
||||
pos := sort.Search(len(elements), func(i int) bool {
|
||||
return elements[i] > value
|
||||
})
|
||||
got, err := br.readGreaterThan(value)
|
||||
if err != nil {
|
||||
t.Fatalf("Unexpected error, got %v", err)
|
||||
}
|
||||
if pos == len(elements) {
|
||||
if got != math.MaxUint64 {
|
||||
t.Fatalf("Unexpected result, got %d, wanted math.MaxUint64", got)
|
||||
}
|
||||
} else if got != elements[pos] {
|
||||
t.Fatalf("Unexpected result, got %d, wanted %d", got, elements[pos])
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func TestBlockWriterBasic(t *testing.T) {
|
||||
bw, _ := newBlockWriter(nil, newIndexBlockDesc(0))
|
||||
if !bw.empty() {
|
||||
t.Fatal("expected empty block")
|
||||
}
|
||||
bw.append(2)
|
||||
if err := bw.append(1); err == nil {
|
||||
t.Fatal("out-of-order insertion is not expected")
|
||||
}
|
||||
for i := 0; i < 10; i++ {
|
||||
bw.append(uint64(i + 3))
|
||||
}
|
||||
|
||||
bw, err := newBlockWriter(bw.finish(), newIndexBlockDesc(0))
|
||||
if err != nil {
|
||||
t.Fatalf("Failed to construct the block writer, %v", err)
|
||||
}
|
||||
for i := 0; i < 10; i++ {
|
||||
if err := bw.append(uint64(i + 100)); err != nil {
|
||||
t.Fatalf("Failed to append value %d: %v", i, err)
|
||||
}
|
||||
}
|
||||
bw.finish()
|
||||
}
|
||||
|
||||
func TestBlockWriterDelete(t *testing.T) {
|
||||
bw, _ := newBlockWriter(nil, newIndexBlockDesc(0))
|
||||
for i := 0; i < 10; i++ {
|
||||
bw.append(uint64(i + 1))
|
||||
}
|
||||
// Pop unknown id, the request should be rejected
|
||||
if err := bw.pop(100); err == nil {
|
||||
t.Fatal("Expect error to occur for unknown id")
|
||||
}
|
||||
for i := 10; i >= 1; i-- {
|
||||
if err := bw.pop(uint64(i)); err != nil {
|
||||
t.Fatalf("Unexpected error for element popping, %v", err)
|
||||
}
|
||||
empty := i == 1
|
||||
if empty != bw.empty() {
|
||||
t.Fatalf("Emptiness is not matched, want: %T, got: %T", empty, bw.empty())
|
||||
}
|
||||
newMax := uint64(i - 1)
|
||||
if bw.desc.max != newMax {
|
||||
t.Fatalf("Maxmium element is not matched, want: %d, got: %d", newMax, bw.desc.max)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func TestBlcokWriterDeleteWithData(t *testing.T) {
|
||||
elements := []uint64{
|
||||
1, 5, 10, 11, 20,
|
||||
}
|
||||
bw, _ := newBlockWriter(nil, newIndexBlockDesc(0))
|
||||
for i := 0; i < len(elements); i++ {
|
||||
bw.append(elements[i])
|
||||
}
|
||||
|
||||
// Re-construct the block writer with data
|
||||
desc := &indexBlockDesc{
|
||||
id: 0,
|
||||
max: 20,
|
||||
entries: 5,
|
||||
}
|
||||
bw, err := newBlockWriter(bw.finish(), desc)
|
||||
if err != nil {
|
||||
t.Fatalf("Failed to construct block writer %v", err)
|
||||
}
|
||||
for i := len(elements) - 1; i > 0; i-- {
|
||||
if err := bw.pop(elements[i]); err != nil {
|
||||
t.Fatalf("Failed to pop element, %v", err)
|
||||
}
|
||||
newTail := elements[i-1]
|
||||
|
||||
// Ensure the element can still be queried with no issue
|
||||
br, err := newBlockReader(bw.finish())
|
||||
if err != nil {
|
||||
t.Fatalf("Failed to construct the block reader, %v", err)
|
||||
}
|
||||
cases := []struct {
|
||||
value uint64
|
||||
result uint64
|
||||
}{
|
||||
{0, 1},
|
||||
{1, 5},
|
||||
{10, 11},
|
||||
{19, 20},
|
||||
{20, math.MaxUint64},
|
||||
{21, math.MaxUint64},
|
||||
}
|
||||
for _, c := range cases {
|
||||
want := c.result
|
||||
if c.value >= newTail {
|
||||
want = math.MaxUint64
|
||||
}
|
||||
got, err := br.readGreaterThan(c.value)
|
||||
if err != nil {
|
||||
t.Fatalf("Unexpected error, got %v", err)
|
||||
}
|
||||
if got != want {
|
||||
t.Fatalf("Unexpected result, got %v, wanted %v", got, want)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func TestCorruptedIndexBlock(t *testing.T) {
|
||||
bw, _ := newBlockWriter(nil, newIndexBlockDesc(0))
|
||||
for i := 0; i < 10; i++ {
|
||||
bw.append(uint64(i + 1))
|
||||
}
|
||||
buf := bw.finish()
|
||||
|
||||
// Mutate the buffer manually
|
||||
buf[len(buf)-1]++
|
||||
_, err := newBlockWriter(buf, newIndexBlockDesc(0))
|
||||
if err == nil {
|
||||
t.Fatal("Corrupted index block data is not detected")
|
||||
}
|
||||
}
|
||||
292
triedb/pathdb/history_index_test.go
Normal file
292
triedb/pathdb/history_index_test.go
Normal file
|
|
@ -0,0 +1,292 @@
|
|||
// Copyright 2025 The go-ethereum Authors
|
||||
// This file is part of the go-ethereum library.
|
||||
//
|
||||
// The go-ethereum library is free software: you can redistribute it and/or modify
|
||||
// it under the terms of the GNU Lesser General Public License as published by
|
||||
// the Free Software Foundation, either version 3 of the License, or
|
||||
// (at your option) any later version.
|
||||
//
|
||||
// The go-ethereum library is distributed in the hope that it will be useful,
|
||||
// but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
// GNU Lesser General Public License for more details.
|
||||
//
|
||||
// You should have received a copy of the GNU Lesser General Public License
|
||||
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/
|
||||
|
||||
package pathdb
|
||||
|
||||
import (
|
||||
"math"
|
||||
"math/rand"
|
||||
"slices"
|
||||
"sort"
|
||||
"testing"
|
||||
|
||||
"github.com/ethereum/go-ethereum/common"
|
||||
"github.com/ethereum/go-ethereum/core/rawdb"
|
||||
"github.com/ethereum/go-ethereum/crypto"
|
||||
)
|
||||
|
||||
func TestIndexReaderBasic(t *testing.T) {
|
||||
elements := []uint64{
|
||||
1, 5, 10, 11, 20,
|
||||
}
|
||||
db := rawdb.NewMemoryDatabase()
|
||||
bw, _ := newIndexWriter(db, newAccountIdent(common.Hash{0xa}))
|
||||
for i := 0; i < len(elements); i++ {
|
||||
bw.append(elements[i])
|
||||
}
|
||||
batch := db.NewBatch()
|
||||
bw.finish(batch)
|
||||
batch.Write()
|
||||
|
||||
br, err := newIndexReader(db, newAccountIdent(common.Hash{0xa}))
|
||||
if err != nil {
|
||||
t.Fatalf("Failed to construct the index reader, %v", err)
|
||||
}
|
||||
cases := []struct {
|
||||
value uint64
|
||||
result uint64
|
||||
}{
|
||||
{0, 1},
|
||||
{1, 5},
|
||||
{10, 11},
|
||||
{19, 20},
|
||||
{20, math.MaxUint64},
|
||||
{21, math.MaxUint64},
|
||||
}
|
||||
for _, c := range cases {
|
||||
got, err := br.readGreaterThan(c.value)
|
||||
if err != nil {
|
||||
t.Fatalf("Unexpected error, got %v", err)
|
||||
}
|
||||
if got != c.result {
|
||||
t.Fatalf("Unexpected result, got %v, wanted %v", got, c.result)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func TestIndexReaderLarge(t *testing.T) {
|
||||
var elements []uint64
|
||||
for i := 0; i < 10*indexBlockEntriesCap; i++ {
|
||||
elements = append(elements, rand.Uint64())
|
||||
}
|
||||
slices.Sort(elements)
|
||||
|
||||
db := rawdb.NewMemoryDatabase()
|
||||
bw, _ := newIndexWriter(db, newAccountIdent(common.Hash{0xa}))
|
||||
for i := 0; i < len(elements); i++ {
|
||||
bw.append(elements[i])
|
||||
}
|
||||
batch := db.NewBatch()
|
||||
bw.finish(batch)
|
||||
batch.Write()
|
||||
|
||||
br, err := newIndexReader(db, newAccountIdent(common.Hash{0xa}))
|
||||
if err != nil {
|
||||
t.Fatalf("Failed to construct the index reader, %v", err)
|
||||
}
|
||||
for i := 0; i < 100; i++ {
|
||||
value := rand.Uint64()
|
||||
pos := sort.Search(len(elements), func(i int) bool {
|
||||
return elements[i] > value
|
||||
})
|
||||
got, err := br.readGreaterThan(value)
|
||||
if err != nil {
|
||||
t.Fatalf("Unexpected error, got %v", err)
|
||||
}
|
||||
if pos == len(elements) {
|
||||
if got != math.MaxUint64 {
|
||||
t.Fatalf("Unexpected result, got %d, wanted math.MaxUint64", got)
|
||||
}
|
||||
} else if got != elements[pos] {
|
||||
t.Fatalf("Unexpected result, got %d, wanted %d", got, elements[pos])
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func TestEmptyIndexReader(t *testing.T) {
|
||||
br, err := newIndexReader(rawdb.NewMemoryDatabase(), newAccountIdent(common.Hash{0xa}))
|
||||
if err != nil {
|
||||
t.Fatalf("Failed to construct the index reader, %v", err)
|
||||
}
|
||||
res, err := br.readGreaterThan(100)
|
||||
if err != nil {
|
||||
t.Fatalf("Failed to query, %v", err)
|
||||
}
|
||||
if res != math.MaxUint64 {
|
||||
t.Fatalf("Unexpected result, got %d, wanted math.MaxUint64", res)
|
||||
}
|
||||
}
|
||||
|
||||
func TestIndexWriterBasic(t *testing.T) {
|
||||
db := rawdb.NewMemoryDatabase()
|
||||
iw, _ := newIndexWriter(db, newAccountIdent(common.Hash{0xa}))
|
||||
iw.append(2)
|
||||
if err := iw.append(1); err == nil {
|
||||
t.Fatal("out-of-order insertion is not expected")
|
||||
}
|
||||
for i := 0; i < 10; i++ {
|
||||
iw.append(uint64(i + 3))
|
||||
}
|
||||
batch := db.NewBatch()
|
||||
iw.finish(batch)
|
||||
batch.Write()
|
||||
|
||||
iw, err := newIndexWriter(db, newAccountIdent(common.Hash{0xa}))
|
||||
if err != nil {
|
||||
t.Fatalf("Failed to construct the block writer, %v", err)
|
||||
}
|
||||
for i := 0; i < 10; i++ {
|
||||
if err := iw.append(uint64(i + 100)); err != nil {
|
||||
t.Fatalf("Failed to append item, %v", err)
|
||||
}
|
||||
}
|
||||
iw.finish(db.NewBatch())
|
||||
}
|
||||
|
||||
func TestIndexWriterDelete(t *testing.T) {
|
||||
db := rawdb.NewMemoryDatabase()
|
||||
iw, _ := newIndexWriter(db, newAccountIdent(common.Hash{0xa}))
|
||||
for i := 0; i < indexBlockEntriesCap*4; i++ {
|
||||
iw.append(uint64(i + 1))
|
||||
}
|
||||
batch := db.NewBatch()
|
||||
iw.finish(batch)
|
||||
batch.Write()
|
||||
|
||||
// Delete unknown id, the request should be rejected
|
||||
id, _ := newIndexDeleter(db, newAccountIdent(common.Hash{0xa}))
|
||||
if err := id.pop(indexBlockEntriesCap * 5); err == nil {
|
||||
t.Fatal("Expect error to occur for unknown id")
|
||||
}
|
||||
for i := indexBlockEntriesCap * 4; i >= 1; i-- {
|
||||
if err := id.pop(uint64(i)); err != nil {
|
||||
t.Fatalf("Unexpected error for element popping, %v", err)
|
||||
}
|
||||
if id.lastID != uint64(i-1) {
|
||||
t.Fatalf("Unexpected lastID, want: %d, got: %d", uint64(i-1), iw.lastID)
|
||||
}
|
||||
if rand.Intn(10) == 0 {
|
||||
batch := db.NewBatch()
|
||||
id.finish(batch)
|
||||
batch.Write()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func TestBatchIndexerWrite(t *testing.T) {
|
||||
var (
|
||||
db = rawdb.NewMemoryDatabase()
|
||||
batch = newBatchIndexer(db, false)
|
||||
histories = makeHistories(10)
|
||||
)
|
||||
for i, h := range histories {
|
||||
if err := batch.process(h, uint64(i+1)); err != nil {
|
||||
t.Fatalf("Failed to process history, %v", err)
|
||||
}
|
||||
}
|
||||
if err := batch.finish(true); err != nil {
|
||||
t.Fatalf("Failed to finish batch indexer, %v", err)
|
||||
}
|
||||
metadata := loadIndexMetadata(db)
|
||||
if metadata == nil || metadata.Last != uint64(10) {
|
||||
t.Fatal("Unexpected index position")
|
||||
}
|
||||
var (
|
||||
accounts = make(map[common.Hash][]uint64)
|
||||
storages = make(map[common.Hash]map[common.Hash][]uint64)
|
||||
)
|
||||
for i, h := range histories {
|
||||
for _, addr := range h.accountList {
|
||||
addrHash := crypto.Keccak256Hash(addr.Bytes())
|
||||
accounts[addrHash] = append(accounts[addrHash], uint64(i+1))
|
||||
|
||||
if _, ok := storages[addrHash]; !ok {
|
||||
storages[addrHash] = make(map[common.Hash][]uint64)
|
||||
}
|
||||
for _, slot := range h.storageList[addr] {
|
||||
storages[addrHash][slot] = append(storages[addrHash][slot], uint64(i+1))
|
||||
}
|
||||
}
|
||||
}
|
||||
for addrHash, indexes := range accounts {
|
||||
ir, _ := newIndexReader(db, newAccountIdent(addrHash))
|
||||
for i := 0; i < len(indexes)-1; i++ {
|
||||
n, err := ir.readGreaterThan(indexes[i])
|
||||
if err != nil {
|
||||
t.Fatalf("Failed to read index, %v", err)
|
||||
}
|
||||
if n != indexes[i+1] {
|
||||
t.Fatalf("Unexpected result, want %d, got %d", indexes[i+1], n)
|
||||
}
|
||||
}
|
||||
n, err := ir.readGreaterThan(indexes[len(indexes)-1])
|
||||
if err != nil {
|
||||
t.Fatalf("Failed to read index, %v", err)
|
||||
}
|
||||
if n != math.MaxUint64 {
|
||||
t.Fatalf("Unexpected result, want math.MaxUint64, got %d", n)
|
||||
}
|
||||
}
|
||||
for addrHash, slots := range storages {
|
||||
for slotHash, indexes := range slots {
|
||||
ir, _ := newIndexReader(db, newStorageIdent(addrHash, slotHash))
|
||||
for i := 0; i < len(indexes)-1; i++ {
|
||||
n, err := ir.readGreaterThan(indexes[i])
|
||||
if err != nil {
|
||||
t.Fatalf("Failed to read index, %v", err)
|
||||
}
|
||||
if n != indexes[i+1] {
|
||||
t.Fatalf("Unexpected result, want %d, got %d", indexes[i+1], n)
|
||||
}
|
||||
}
|
||||
n, err := ir.readGreaterThan(indexes[len(indexes)-1])
|
||||
if err != nil {
|
||||
t.Fatalf("Failed to read index, %v", err)
|
||||
}
|
||||
if n != math.MaxUint64 {
|
||||
t.Fatalf("Unexpected result, want math.MaxUint64, got %d", n)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func TestBatchIndexerDelete(t *testing.T) {
|
||||
var (
|
||||
db = rawdb.NewMemoryDatabase()
|
||||
bw = newBatchIndexer(db, false)
|
||||
histories = makeHistories(10)
|
||||
)
|
||||
// Index histories
|
||||
for i, h := range histories {
|
||||
if err := bw.process(h, uint64(i+1)); err != nil {
|
||||
t.Fatalf("Failed to process history, %v", err)
|
||||
}
|
||||
}
|
||||
if err := bw.finish(true); err != nil {
|
||||
t.Fatalf("Failed to finish batch indexer, %v", err)
|
||||
}
|
||||
|
||||
// Unindex histories
|
||||
bd := newBatchIndexer(db, true)
|
||||
for i := len(histories) - 1; i >= 0; i-- {
|
||||
if err := bd.process(histories[i], uint64(i+1)); err != nil {
|
||||
t.Fatalf("Failed to process history, %v", err)
|
||||
}
|
||||
}
|
||||
if err := bd.finish(true); err != nil {
|
||||
t.Fatalf("Failed to finish batch indexer, %v", err)
|
||||
}
|
||||
|
||||
metadata := loadIndexMetadata(db)
|
||||
if metadata != nil {
|
||||
t.Fatal("Unexpected index position")
|
||||
}
|
||||
it := db.NewIterator(rawdb.StateHistoryIndexPrefix, nil)
|
||||
for it.Next() {
|
||||
t.Fatal("Leftover history index data")
|
||||
}
|
||||
it.Release()
|
||||
}
|
||||
613
triedb/pathdb/history_indexer.go
Normal file
613
triedb/pathdb/history_indexer.go
Normal file
|
|
@ -0,0 +1,613 @@
|
|||
// Copyright 2025 The go-ethereum Authors
|
||||
// This file is part of the go-ethereum library.
|
||||
//
|
||||
// The go-ethereum library is free software: you can redistribute it and/or modify
|
||||
// it under the terms of the GNU Lesser General Public License as published by
|
||||
// the Free Software Foundation, either version 3 of the License, or
|
||||
// (at your option) any later version.
|
||||
//
|
||||
// The go-ethereum library is distributed in the hope that it will be useful,
|
||||
// but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
// GNU Lesser General Public License for more details.
|
||||
//
|
||||
// You should have received a copy of the GNU Lesser General Public License
|
||||
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/
|
||||
|
||||
package pathdb
|
||||
|
||||
import (
|
||||
"errors"
|
||||
"fmt"
|
||||
"runtime"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
"time"
|
||||
|
||||
"github.com/ethereum/go-ethereum/common"
|
||||
"github.com/ethereum/go-ethereum/core/rawdb"
|
||||
"github.com/ethereum/go-ethereum/crypto"
|
||||
"github.com/ethereum/go-ethereum/ethdb"
|
||||
"github.com/ethereum/go-ethereum/log"
|
||||
"github.com/ethereum/go-ethereum/rlp"
|
||||
"golang.org/x/sync/errgroup"
|
||||
)
|
||||
|
||||
const (
|
||||
// The batch size for reading state histories
|
||||
historyReadBatch = 1000
|
||||
|
||||
stateIndexV0 = uint8(0) // initial version of state index structure
|
||||
stateIndexVersion = stateIndexV0 // the current state index version
|
||||
)
|
||||
|
||||
type indexMetadata struct {
|
||||
Version uint8
|
||||
Last uint64
|
||||
}
|
||||
|
||||
func loadIndexMetadata(db ethdb.KeyValueReader) *indexMetadata {
|
||||
blob := rawdb.ReadStateHistoryIndexMetadata(db)
|
||||
if len(blob) == 0 {
|
||||
return nil
|
||||
}
|
||||
var m indexMetadata
|
||||
if err := rlp.DecodeBytes(blob, &m); err != nil {
|
||||
log.Error("Failed to decode index metadata", "err", err)
|
||||
return nil
|
||||
}
|
||||
return &m
|
||||
}
|
||||
|
||||
func storeIndexMetadata(db ethdb.KeyValueWriter, last uint64) {
|
||||
var m indexMetadata
|
||||
m.Version = stateIndexVersion
|
||||
m.Last = last
|
||||
blob, err := rlp.EncodeToBytes(m)
|
||||
if err != nil {
|
||||
log.Crit("Failed to encode index metadata", "err", err)
|
||||
}
|
||||
rawdb.WriteStateHistoryIndexMetadata(db, blob)
|
||||
}
|
||||
|
||||
// batchIndexer is a structure designed to perform batch indexing or unindexing
|
||||
// of state histories atomically.
|
||||
type batchIndexer struct {
|
||||
accounts map[common.Hash][]uint64 // History ID list, Keyed by account address
|
||||
storages map[common.Hash]map[common.Hash][]uint64 // History ID list, Keyed by account address and the hash of raw storage key
|
||||
counter int // The counter of processed states
|
||||
delete bool // Index or unindex mode
|
||||
lastID uint64 // The ID of latest processed history
|
||||
db ethdb.KeyValueStore
|
||||
}
|
||||
|
||||
// newBatchIndexer constructs the batch indexer with the supplied mode.
|
||||
func newBatchIndexer(db ethdb.KeyValueStore, delete bool) *batchIndexer {
|
||||
return &batchIndexer{
|
||||
accounts: make(map[common.Hash][]uint64),
|
||||
storages: make(map[common.Hash]map[common.Hash][]uint64),
|
||||
delete: delete,
|
||||
db: db,
|
||||
}
|
||||
}
|
||||
|
||||
// process iterates through the accounts and their associated storage slots in the
|
||||
// state history, tracking the mapping between state and history IDs.
|
||||
func (b *batchIndexer) process(h *history, historyID uint64) error {
|
||||
for _, address := range h.accountList {
|
||||
addrHash := crypto.Keccak256Hash(address.Bytes())
|
||||
b.counter += 1
|
||||
b.accounts[addrHash] = append(b.accounts[addrHash], historyID)
|
||||
|
||||
for _, slotKey := range h.storageList[address] {
|
||||
b.counter += 1
|
||||
if _, ok := b.storages[addrHash]; !ok {
|
||||
b.storages[addrHash] = make(map[common.Hash][]uint64)
|
||||
}
|
||||
// The hash of the storage slot key is used as the identifier because the
|
||||
// legacy history does not include the raw storage key, therefore, the
|
||||
// conversion from storage key to hash is necessary for non-v0 histories.
|
||||
slotHash := slotKey
|
||||
if h.meta.version != stateHistoryV0 {
|
||||
slotHash = crypto.Keccak256Hash(slotKey.Bytes())
|
||||
}
|
||||
b.storages[addrHash][slotHash] = append(b.storages[addrHash][slotHash], historyID)
|
||||
}
|
||||
}
|
||||
b.lastID = historyID
|
||||
return b.finish(false)
|
||||
}
|
||||
|
||||
// finish writes the accumulated state indexes into the disk if either the
|
||||
// memory limitation is reached or it's requested forcibly.
|
||||
func (b *batchIndexer) finish(force bool) error {
|
||||
if b.counter == 0 {
|
||||
return nil
|
||||
}
|
||||
if !force && b.counter < historyIndexBatch {
|
||||
return nil
|
||||
}
|
||||
var (
|
||||
batch = b.db.NewBatch()
|
||||
batchMu sync.RWMutex
|
||||
eg errgroup.Group
|
||||
)
|
||||
eg.SetLimit(runtime.NumCPU())
|
||||
|
||||
for addrHash, idList := range b.accounts {
|
||||
eg.Go(func() error {
|
||||
if !b.delete {
|
||||
iw, err := newIndexWriter(b.db, newAccountIdent(addrHash))
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
for _, n := range idList {
|
||||
if err := iw.append(n); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
batchMu.Lock()
|
||||
iw.finish(batch)
|
||||
batchMu.Unlock()
|
||||
} else {
|
||||
id, err := newIndexDeleter(b.db, newAccountIdent(addrHash))
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
for _, n := range idList {
|
||||
if err := id.pop(n); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
batchMu.Lock()
|
||||
id.finish(batch)
|
||||
batchMu.Unlock()
|
||||
}
|
||||
return nil
|
||||
})
|
||||
}
|
||||
for addrHash, slots := range b.storages {
|
||||
for storageHash, idList := range slots {
|
||||
eg.Go(func() error {
|
||||
if !b.delete {
|
||||
iw, err := newIndexWriter(b.db, newStorageIdent(addrHash, storageHash))
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
for _, n := range idList {
|
||||
if err := iw.append(n); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
batchMu.Lock()
|
||||
iw.finish(batch)
|
||||
batchMu.Unlock()
|
||||
} else {
|
||||
id, err := newIndexDeleter(b.db, newStorageIdent(addrHash, storageHash))
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
for _, n := range idList {
|
||||
if err := id.pop(n); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
batchMu.Lock()
|
||||
id.finish(batch)
|
||||
batchMu.Unlock()
|
||||
}
|
||||
return nil
|
||||
})
|
||||
}
|
||||
}
|
||||
if err := eg.Wait(); err != nil {
|
||||
return err
|
||||
}
|
||||
// Update the position of last indexed state history
|
||||
if !b.delete {
|
||||
storeIndexMetadata(batch, b.lastID)
|
||||
} else {
|
||||
if b.lastID == 1 {
|
||||
rawdb.DeleteStateHistoryIndexMetadata(batch)
|
||||
} else {
|
||||
storeIndexMetadata(batch, b.lastID-1)
|
||||
}
|
||||
}
|
||||
if err := batch.Write(); err != nil {
|
||||
return err
|
||||
}
|
||||
b.counter = 0
|
||||
b.accounts = make(map[common.Hash][]uint64)
|
||||
b.storages = make(map[common.Hash]map[common.Hash][]uint64)
|
||||
return nil
|
||||
}
|
||||
|
||||
// indexSingle processes the state history with the specified ID for indexing.
|
||||
func indexSingle(historyID uint64, db ethdb.KeyValueStore, freezer ethdb.AncientReader) error {
|
||||
defer func(start time.Time) {
|
||||
indexHistoryTimer.UpdateSince(start)
|
||||
}(time.Now())
|
||||
|
||||
metadata := loadIndexMetadata(db)
|
||||
if metadata == nil || metadata.Last+1 != historyID {
|
||||
last := "null"
|
||||
if metadata != nil {
|
||||
last = fmt.Sprintf("%v", metadata.Last)
|
||||
}
|
||||
return fmt.Errorf("history indexing is out of order, last: %s, requested: %d", last, historyID)
|
||||
}
|
||||
h, err := readHistory(freezer, historyID)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
b := newBatchIndexer(db, false)
|
||||
if err := b.process(h, historyID); err != nil {
|
||||
return err
|
||||
}
|
||||
if err := b.finish(true); err != nil {
|
||||
return err
|
||||
}
|
||||
log.Debug("Indexed state history", "id", historyID)
|
||||
return nil
|
||||
}
|
||||
|
||||
// unindexSingle processes the state history with the specified ID for unindexing.
|
||||
func unindexSingle(historyID uint64, db ethdb.KeyValueStore, freezer ethdb.AncientReader) error {
|
||||
defer func(start time.Time) {
|
||||
unindexHistoryTimer.UpdateSince(start)
|
||||
}(time.Now())
|
||||
|
||||
metadata := loadIndexMetadata(db)
|
||||
if metadata == nil || metadata.Last != historyID {
|
||||
last := "null"
|
||||
if metadata != nil {
|
||||
last = fmt.Sprintf("%v", metadata.Last)
|
||||
}
|
||||
return fmt.Errorf("history unindexing is out of order, last: %s, requested: %d", last, historyID)
|
||||
}
|
||||
h, err := readHistory(freezer, historyID)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
b := newBatchIndexer(db, true)
|
||||
if err := b.process(h, historyID); err != nil {
|
||||
return err
|
||||
}
|
||||
if err := b.finish(true); err != nil {
|
||||
return err
|
||||
}
|
||||
log.Debug("Unindexed state history", "id", historyID)
|
||||
return nil
|
||||
}
|
||||
|
||||
type interruptSignal struct {
|
||||
newLastID uint64
|
||||
result chan error
|
||||
}
|
||||
|
||||
// indexIniter is responsible for completing the indexing of remaining state
|
||||
// histories in batch. It runs as a one-time background thread and terminates
|
||||
// once all available state histories are indexed.
|
||||
//
|
||||
// Afterward, new state histories should be indexed synchronously alongside
|
||||
// the state data itself, ensuring both the history and its index are available.
|
||||
// If a state history is removed due to a rollback, the associated indexes should
|
||||
// be unmarked accordingly.
|
||||
type indexIniter struct {
|
||||
disk ethdb.KeyValueStore
|
||||
freezer ethdb.AncientStore
|
||||
interrupt chan *interruptSignal
|
||||
done chan struct{}
|
||||
closed chan struct{}
|
||||
wg sync.WaitGroup
|
||||
}
|
||||
|
||||
func newIndexIniter(disk ethdb.KeyValueStore, freezer ethdb.AncientStore, lastID uint64) *indexIniter {
|
||||
initer := &indexIniter{
|
||||
disk: disk,
|
||||
freezer: freezer,
|
||||
interrupt: make(chan *interruptSignal),
|
||||
done: make(chan struct{}),
|
||||
closed: make(chan struct{}),
|
||||
}
|
||||
initer.wg.Add(1)
|
||||
go initer.run(lastID)
|
||||
return initer
|
||||
}
|
||||
|
||||
func (i *indexIniter) close() {
|
||||
select {
|
||||
case <-i.closed:
|
||||
return
|
||||
default:
|
||||
close(i.closed)
|
||||
i.wg.Wait()
|
||||
}
|
||||
}
|
||||
|
||||
func (i *indexIniter) inited() bool {
|
||||
select {
|
||||
case <-i.closed:
|
||||
return false
|
||||
case <-i.done:
|
||||
return true
|
||||
default:
|
||||
return false
|
||||
}
|
||||
}
|
||||
|
||||
func (i *indexIniter) run(lastID uint64) {
|
||||
defer i.wg.Done()
|
||||
|
||||
// Launch background indexing thread
|
||||
var (
|
||||
done = make(chan struct{})
|
||||
interrupt = new(atomic.Int32)
|
||||
|
||||
// checkDone indicates whether all requested state histories
|
||||
// have been fully indexed.
|
||||
checkDone = func() bool {
|
||||
metadata := loadIndexMetadata(i.disk)
|
||||
return metadata != nil && metadata.Last == lastID
|
||||
}
|
||||
)
|
||||
go i.index(done, interrupt, lastID)
|
||||
|
||||
for {
|
||||
select {
|
||||
case signal := <-i.interrupt:
|
||||
// The indexing limit can only be extended or shortened continuously.
|
||||
if signal.newLastID != lastID+1 && signal.newLastID != lastID-1 {
|
||||
signal.result <- fmt.Errorf("invalid history id, last: %d, got: %d", lastID, signal.newLastID)
|
||||
continue
|
||||
}
|
||||
// The index limit is extended by one, update the limit without
|
||||
// interrupting the current background process.
|
||||
if signal.newLastID == lastID+1 {
|
||||
lastID = signal.newLastID
|
||||
signal.result <- nil
|
||||
log.Debug("Extended state history range", "last", lastID)
|
||||
continue
|
||||
}
|
||||
// The index limit is shortened by one, interrupt the current background
|
||||
// process and relaunch with new target.
|
||||
interrupt.Store(1)
|
||||
<-done
|
||||
|
||||
// If all state histories, including the one to be reverted, have
|
||||
// been fully indexed, unindex it here and shut down the initializer.
|
||||
if checkDone() {
|
||||
log.Info("Truncate the extra history", "id", lastID)
|
||||
if err := unindexSingle(lastID, i.disk, i.freezer); err != nil {
|
||||
signal.result <- err
|
||||
return
|
||||
}
|
||||
close(i.done)
|
||||
signal.result <- nil
|
||||
log.Info("State histories have been fully indexed", "last", lastID-1)
|
||||
return
|
||||
}
|
||||
// Adjust the indexing target and relaunch the process
|
||||
lastID = signal.newLastID
|
||||
done, interrupt = make(chan struct{}), new(atomic.Int32)
|
||||
go i.index(done, interrupt, lastID)
|
||||
log.Debug("Shortened state history range", "last", lastID)
|
||||
|
||||
case <-done:
|
||||
if checkDone() {
|
||||
close(i.done)
|
||||
log.Info("State histories have been fully indexed", "last", lastID)
|
||||
return
|
||||
}
|
||||
// Relaunch the background runner if some tasks are left
|
||||
done, interrupt = make(chan struct{}), new(atomic.Int32)
|
||||
go i.index(done, interrupt, lastID)
|
||||
|
||||
case <-i.closed:
|
||||
interrupt.Store(1)
|
||||
log.Info("Waiting background history index initer to exit")
|
||||
<-done
|
||||
|
||||
if checkDone() {
|
||||
close(i.done)
|
||||
}
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// next returns the ID of the next state history to be indexed.
|
||||
func (i *indexIniter) next() (uint64, error) {
|
||||
tail, err := i.freezer.Tail()
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
tailID := tail + 1 // compute the id of the oldest history
|
||||
|
||||
// Start indexing from scratch if nothing has been indexed
|
||||
metadata := loadIndexMetadata(i.disk)
|
||||
if metadata == nil {
|
||||
log.Debug("Initialize state history indexing from scratch", "id", tailID)
|
||||
return tailID, nil
|
||||
}
|
||||
// Resume indexing from the last interrupted position
|
||||
if metadata.Last+1 >= tailID {
|
||||
log.Debug("Resume state history indexing", "id", metadata.Last+1, "tail", tailID)
|
||||
return metadata.Last + 1, nil
|
||||
}
|
||||
// History has been shortened without indexing. Discard the gapped segment
|
||||
// in the history and shift to the first available element.
|
||||
//
|
||||
// The missing indexes corresponding to the gapped histories won't be visible.
|
||||
// It's fine to leave them unindexed.
|
||||
log.Info("History gap detected, discard old segment", "oldHead", metadata.Last, "newHead", tailID)
|
||||
return tailID, nil
|
||||
}
|
||||
|
||||
func (i *indexIniter) index(done chan struct{}, interrupt *atomic.Int32, lastID uint64) {
|
||||
defer close(done)
|
||||
|
||||
beginID, err := i.next()
|
||||
if err != nil {
|
||||
log.Error("Failed to find next state history for indexing", "err", err)
|
||||
return
|
||||
}
|
||||
// All available state histories have been indexed, and the last indexed one
|
||||
// exceeds the most recent available state history. This situation may occur
|
||||
// when the state is reverted manually (chain.SetHead) or the deep reorg is
|
||||
// encountered. In such cases, no indexing should be scheduled.
|
||||
if beginID > lastID {
|
||||
log.Debug("State history is fully indexed", "last", lastID)
|
||||
return
|
||||
}
|
||||
log.Info("Start history indexing", "beginID", beginID, "lastID", lastID)
|
||||
|
||||
var (
|
||||
current = beginID
|
||||
start = time.Now()
|
||||
logged = time.Now()
|
||||
batch = newBatchIndexer(i.disk, false)
|
||||
)
|
||||
for current <= lastID {
|
||||
count := lastID - current + 1
|
||||
if count > historyReadBatch {
|
||||
count = historyReadBatch
|
||||
}
|
||||
histories, err := readHistories(i.freezer, current, count)
|
||||
if err != nil {
|
||||
// The history read might fall if the history is truncated from
|
||||
// head due to revert operation.
|
||||
log.Error("Failed to read history for indexing", "current", current, "count", count, "err", err)
|
||||
return
|
||||
}
|
||||
for _, h := range histories {
|
||||
if err := batch.process(h, current); err != nil {
|
||||
log.Error("Failed to index history", "err", err)
|
||||
return
|
||||
}
|
||||
current += 1
|
||||
|
||||
// Occasionally report the indexing progress
|
||||
if time.Since(logged) > time.Second*8 {
|
||||
logged = time.Now()
|
||||
|
||||
var (
|
||||
left = lastID - current + 1
|
||||
done = current - beginID
|
||||
speed = done/uint64(time.Since(start)/time.Millisecond+1) + 1 // +1s to avoid division by zero
|
||||
)
|
||||
// Override the ETA if larger than the largest until now
|
||||
eta := time.Duration(left/speed) * time.Millisecond
|
||||
log.Info("Indexing state history", "processed", done, "left", left, "eta", common.PrettyDuration(eta))
|
||||
}
|
||||
}
|
||||
// Check interruption signal and abort process if it's fired
|
||||
if interrupt != nil {
|
||||
if signal := interrupt.Load(); signal != 0 {
|
||||
if err := batch.finish(true); err != nil {
|
||||
log.Error("Failed to flush index", "err", err)
|
||||
}
|
||||
log.Info("State indexing interrupted")
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
if err := batch.finish(true); err != nil {
|
||||
log.Error("Failed to flush index", "err", err)
|
||||
}
|
||||
log.Info("Indexed state history", "from", beginID, "to", lastID, "elapsed", common.PrettyDuration(time.Since(start)))
|
||||
}
|
||||
|
||||
// historyIndexer manages the indexing and unindexing of state histories,
|
||||
// providing access to historical states.
|
||||
//
|
||||
// Upon initialization, historyIndexer starts a one-time background process
|
||||
// to complete the indexing of any remaining state histories. Once this
|
||||
// process is finished, all state histories are marked as fully indexed,
|
||||
// enabling handling of requests for historical states. Thereafter, any new
|
||||
// state histories must be indexed or unindexed synchronously, ensuring that
|
||||
// the history index is created or removed along with the corresponding
|
||||
// state history.
|
||||
type historyIndexer struct {
|
||||
initer *indexIniter
|
||||
disk ethdb.KeyValueStore
|
||||
freezer ethdb.AncientStore
|
||||
}
|
||||
|
||||
// checkVersion checks whether the index data in the database matches the version.
|
||||
func checkVersion(disk ethdb.KeyValueStore) {
|
||||
blob := rawdb.ReadStateHistoryIndexMetadata(disk)
|
||||
if len(blob) == 0 {
|
||||
return
|
||||
}
|
||||
var m indexMetadata
|
||||
err := rlp.DecodeBytes(blob, &m)
|
||||
if err == nil && m.Version == stateIndexVersion {
|
||||
return
|
||||
}
|
||||
// TODO(rjl493456442) would be better to group them into a batch.
|
||||
rawdb.DeleteStateHistoryIndexMetadata(disk)
|
||||
rawdb.DeleteStateHistoryIndex(disk)
|
||||
|
||||
version := "unknown"
|
||||
if err == nil {
|
||||
version = fmt.Sprintf("%d", m.Version)
|
||||
}
|
||||
log.Info("Cleaned up obsolete state history index", "version", version, "want", stateIndexVersion)
|
||||
}
|
||||
|
||||
// newHistoryIndexer constructs the history indexer and launches the background
|
||||
// initer to complete the indexing of any remaining state histories.
|
||||
func newHistoryIndexer(disk ethdb.KeyValueStore, freezer ethdb.AncientStore, lastHistoryID uint64) *historyIndexer {
|
||||
checkVersion(disk)
|
||||
return &historyIndexer{
|
||||
initer: newIndexIniter(disk, freezer, lastHistoryID),
|
||||
disk: disk,
|
||||
freezer: freezer,
|
||||
}
|
||||
}
|
||||
|
||||
func (i *historyIndexer) close() {
|
||||
i.initer.close()
|
||||
}
|
||||
|
||||
// inited returns a flag indicating whether the existing state histories
|
||||
// have been fully indexed, in other words, whether they are available
|
||||
// for external access.
|
||||
func (i *historyIndexer) inited() bool {
|
||||
return i.initer.inited()
|
||||
}
|
||||
|
||||
// extend sends the notification that new state history with specified ID
|
||||
// has been written into the database and is ready for indexing.
|
||||
func (i *historyIndexer) extend(historyID uint64) error {
|
||||
signal := &interruptSignal{
|
||||
newLastID: historyID,
|
||||
result: make(chan error, 1),
|
||||
}
|
||||
select {
|
||||
case <-i.initer.closed:
|
||||
return errors.New("indexer is closed")
|
||||
case <-i.initer.done:
|
||||
return indexSingle(historyID, i.disk, i.freezer)
|
||||
case i.initer.interrupt <- signal:
|
||||
return <-signal.result
|
||||
}
|
||||
}
|
||||
|
||||
// shorten sends the notification that state history with specified ID
|
||||
// is about to be deleted from the database and should be unindexed.
|
||||
func (i *historyIndexer) shorten(historyID uint64) error {
|
||||
signal := &interruptSignal{
|
||||
newLastID: historyID - 1,
|
||||
result: make(chan error, 1),
|
||||
}
|
||||
select {
|
||||
case <-i.initer.closed:
|
||||
return errors.New("indexer is closed")
|
||||
case <-i.initer.done:
|
||||
return unindexSingle(historyID, i.disk, i.freezer)
|
||||
case i.initer.interrupt <- signal:
|
||||
return <-signal.result
|
||||
}
|
||||
}
|
||||
375
triedb/pathdb/history_reader.go
Normal file
375
triedb/pathdb/history_reader.go
Normal file
|
|
@ -0,0 +1,375 @@
|
|||
// Copyright 2025 The go-ethereum Authors
|
||||
// This file is part of the go-ethereum library.
|
||||
//
|
||||
// The go-ethereum library is free software: you can redistribute it and/or modify
|
||||
// it under the terms of the GNU Lesser General Public License as published by
|
||||
// the Free Software Foundation, either version 3 of the License, or
|
||||
// (at your option) any later version.
|
||||
//
|
||||
// The go-ethereum library is distributed in the hope that it will be useful,
|
||||
// but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
// GNU Lesser General Public License for more details.
|
||||
//
|
||||
// You should have received a copy of the GNU Lesser General Public License
|
||||
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/
|
||||
|
||||
package pathdb
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"encoding/binary"
|
||||
"errors"
|
||||
"fmt"
|
||||
"math"
|
||||
"sort"
|
||||
|
||||
"github.com/ethereum/go-ethereum/common"
|
||||
"github.com/ethereum/go-ethereum/core/rawdb"
|
||||
"github.com/ethereum/go-ethereum/ethdb"
|
||||
)
|
||||
|
||||
// stateIdent represents the identifier of a state element, which can be
|
||||
// either an account or a storage slot.
|
||||
type stateIdent struct {
|
||||
account bool
|
||||
|
||||
// The hash of the account address. This is used instead of the raw account
|
||||
// address is to align the traversal order with the Merkle-Patricia-Trie.
|
||||
addressHash common.Hash
|
||||
|
||||
// The hash of the storage slot key. This is used instead of the raw slot key
|
||||
// because, in legacy state histories (prior to the Cancun fork), the slot
|
||||
// identifier is the hash of the key, and the original key (preimage) cannot
|
||||
// be recovered. To maintain backward compatibility, the key hash is used.
|
||||
//
|
||||
// Meanwhile, using the storage key hash also preserve the traversal order
|
||||
// with Merkle-Patricia-Trie.
|
||||
//
|
||||
// This field is null if the identifier refers to account data.
|
||||
storageHash common.Hash
|
||||
}
|
||||
|
||||
// String returns the string format state identifier.
|
||||
func (ident stateIdent) String() string {
|
||||
if ident.account {
|
||||
return ident.addressHash.Hex()
|
||||
}
|
||||
return ident.addressHash.Hex() + ident.storageHash.Hex()
|
||||
}
|
||||
|
||||
// newAccountIdent constructs a state identifier for an account.
|
||||
func newAccountIdent(addressHash common.Hash) stateIdent {
|
||||
return stateIdent{
|
||||
account: true,
|
||||
addressHash: addressHash,
|
||||
}
|
||||
}
|
||||
|
||||
// newStorageIdent constructs a state identifier for a storage slot.
|
||||
// The address denotes the address of the associated account;
|
||||
// the storageHash denotes the hash of the raw storage slot key;
|
||||
func newStorageIdent(addressHash common.Hash, storageHash common.Hash) stateIdent {
|
||||
return stateIdent{
|
||||
addressHash: addressHash,
|
||||
storageHash: storageHash,
|
||||
}
|
||||
}
|
||||
|
||||
// stateIdentQuery is the extension of stateIdent by adding the raw storage key.
|
||||
type stateIdentQuery struct {
|
||||
stateIdent
|
||||
|
||||
address common.Address
|
||||
storageKey common.Hash
|
||||
}
|
||||
|
||||
// newAccountIdentQuery constructs a state identifier for an account.
|
||||
func newAccountIdentQuery(address common.Address, addressHash common.Hash) stateIdentQuery {
|
||||
return stateIdentQuery{
|
||||
stateIdent: stateIdent{
|
||||
account: true,
|
||||
addressHash: addressHash,
|
||||
},
|
||||
address: address,
|
||||
}
|
||||
}
|
||||
|
||||
// newStorageIdentQuery constructs a state identifier for a storage slot.
|
||||
// the address denotes the address of the associated account;
|
||||
// the addressHash denotes the address hash of the associated account;
|
||||
// the storageKey denotes the raw storage slot key;
|
||||
// the storageHash denotes the hash of the raw storage slot key;
|
||||
func newStorageIdentQuery(address common.Address, addressHash common.Hash, storageKey common.Hash, storageHash common.Hash) stateIdentQuery {
|
||||
return stateIdentQuery{
|
||||
stateIdent: stateIdent{
|
||||
addressHash: addressHash,
|
||||
storageHash: storageHash,
|
||||
},
|
||||
address: address,
|
||||
storageKey: storageKey,
|
||||
}
|
||||
}
|
||||
|
||||
// indexReaderWithLimitTag is a wrapper around indexReader that includes an
|
||||
// additional index position. This position represents the ID of the last
|
||||
// indexed state history at the time the reader was created, implying that
|
||||
// indexes beyond this position are unavailable.
|
||||
type indexReaderWithLimitTag struct {
|
||||
reader *indexReader
|
||||
limit uint64
|
||||
db ethdb.KeyValueReader
|
||||
}
|
||||
|
||||
// newIndexReaderWithLimitTag constructs a index reader with indexing position.
|
||||
func newIndexReaderWithLimitTag(db ethdb.KeyValueReader, state stateIdent) (*indexReaderWithLimitTag, error) {
|
||||
// Read the last indexed ID before the index reader construction
|
||||
metadata := loadIndexMetadata(db)
|
||||
if metadata == nil {
|
||||
return nil, errors.New("state history hasn't been indexed yet")
|
||||
}
|
||||
r, err := newIndexReader(db, state)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return &indexReaderWithLimitTag{
|
||||
reader: r,
|
||||
limit: metadata.Last,
|
||||
db: db,
|
||||
}, nil
|
||||
}
|
||||
|
||||
// readGreaterThan locates the first element that is greater than the specified
|
||||
// id. If no such element is found, MaxUint64 is returned.
|
||||
//
|
||||
// Note: It is possible that additional histories have been indexed since the
|
||||
// reader was created. The reader should be refreshed as needed to load the
|
||||
// latest indexed data from disk.
|
||||
func (r *indexReaderWithLimitTag) readGreaterThan(id uint64, lastID uint64) (uint64, error) {
|
||||
// Mark the index reader as stale if the tracked indexing position moves
|
||||
// backward. This can occur if the pathdb is reverted and certain state
|
||||
// histories are unindexed. For simplicity, the reader is marked as stale
|
||||
// instead of being refreshed, as this scenario is highly unlikely.
|
||||
if r.limit > lastID {
|
||||
return 0, fmt.Errorf("index reader is stale, limit: %d, last-state-id: %d", r.limit, lastID)
|
||||
}
|
||||
// Try to find the element which is greater than the specified target
|
||||
res, err := r.reader.readGreaterThan(id)
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
// Short circuit if the element is found within the current index
|
||||
if res != math.MaxUint64 {
|
||||
return res, nil
|
||||
}
|
||||
// The element was not found, and no additional histories have been indexed.
|
||||
// Return a not-found result.
|
||||
if r.limit == lastID {
|
||||
return res, nil
|
||||
}
|
||||
// Refresh the index reader and attempt again. If the latest indexed position
|
||||
// is even below the ID of the disk layer, it indicates that state histories
|
||||
// are being removed. In this case, it would theoretically be better to block
|
||||
// the state rollback operation synchronously until all readers are released.
|
||||
// Given that it's very unlikely to occur and users try to perform historical
|
||||
// state queries while reverting the states at the same time. Simply returning
|
||||
// an error should be sufficient for now.
|
||||
metadata := loadIndexMetadata(r.db)
|
||||
if metadata == nil || metadata.Last < lastID {
|
||||
return 0, errors.New("state history hasn't been indexed yet")
|
||||
}
|
||||
if err := r.reader.refresh(); err != nil {
|
||||
return 0, err
|
||||
}
|
||||
r.limit = metadata.Last
|
||||
|
||||
return r.reader.readGreaterThan(id)
|
||||
}
|
||||
|
||||
// historyReader is the structure to access historic state data.
|
||||
type historyReader struct {
|
||||
disk ethdb.KeyValueReader
|
||||
freezer ethdb.AncientReader
|
||||
readers map[string]*indexReaderWithLimitTag
|
||||
}
|
||||
|
||||
// newHistoryReader constructs the history reader with the supplied db.
|
||||
func newHistoryReader(disk ethdb.KeyValueReader, freezer ethdb.AncientReader) *historyReader {
|
||||
return &historyReader{
|
||||
disk: disk,
|
||||
freezer: freezer,
|
||||
readers: make(map[string]*indexReaderWithLimitTag),
|
||||
}
|
||||
}
|
||||
|
||||
// readAccountMetadata resolves the account metadata within the specified
|
||||
// state history.
|
||||
func (r *historyReader) readAccountMetadata(address common.Address, historyID uint64) ([]byte, error) {
|
||||
blob := rawdb.ReadStateAccountIndex(r.freezer, historyID)
|
||||
if len(blob) == 0 {
|
||||
return nil, fmt.Errorf("account index is truncated, historyID: %d", historyID)
|
||||
}
|
||||
if len(blob)%accountIndexSize != 0 {
|
||||
return nil, fmt.Errorf("account index is corrupted, historyID: %d, size: %d", historyID, len(blob))
|
||||
}
|
||||
n := len(blob) / accountIndexSize
|
||||
|
||||
pos := sort.Search(n, func(i int) bool {
|
||||
h := blob[accountIndexSize*i : accountIndexSize*i+common.HashLength]
|
||||
return bytes.Compare(h, address.Bytes()) >= 0
|
||||
})
|
||||
if pos == n {
|
||||
return nil, fmt.Errorf("account %#x is not found", address)
|
||||
}
|
||||
offset := accountIndexSize * pos
|
||||
if address != common.BytesToAddress(blob[offset:offset+common.AddressLength]) {
|
||||
return nil, fmt.Errorf("account %#x is not found", address)
|
||||
}
|
||||
return blob[offset : accountIndexSize*(pos+1)], nil
|
||||
}
|
||||
|
||||
// readStorageMetadata resolves the storage slot metadata within the specified
|
||||
// state history.
|
||||
func (r *historyReader) readStorageMetadata(storageKey common.Hash, storageHash common.Hash, historyID uint64, slotOffset, slotNumber int) ([]byte, error) {
|
||||
// TODO(rj493456442) optimize it with partial read
|
||||
blob := rawdb.ReadStateStorageIndex(r.freezer, historyID)
|
||||
if len(blob) == 0 {
|
||||
return nil, fmt.Errorf("storage index is truncated, historyID: %d", historyID)
|
||||
}
|
||||
if len(blob)%slotIndexSize != 0 {
|
||||
return nil, fmt.Errorf("storage indices is corrupted, historyID: %d, size: %d", historyID, len(blob))
|
||||
}
|
||||
if slotIndexSize*(slotOffset+slotNumber) > len(blob) {
|
||||
return nil, fmt.Errorf("storage indices is truncated, historyID: %d, size: %d, offset: %d, length: %d", historyID, len(blob), slotOffset, slotNumber)
|
||||
}
|
||||
subSlice := blob[slotIndexSize*slotOffset : slotIndexSize*(slotOffset+slotNumber)]
|
||||
|
||||
// TODO(rj493456442) get rid of the metadata resolution
|
||||
var (
|
||||
m meta
|
||||
target common.Hash
|
||||
)
|
||||
blob = rawdb.ReadStateHistoryMeta(r.freezer, historyID)
|
||||
if err := m.decode(blob); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if m.version == stateHistoryV0 {
|
||||
target = storageHash
|
||||
} else {
|
||||
target = storageKey
|
||||
}
|
||||
pos := sort.Search(slotNumber, func(i int) bool {
|
||||
slotID := subSlice[slotIndexSize*i : slotIndexSize*i+common.HashLength]
|
||||
return bytes.Compare(slotID, target.Bytes()) >= 0
|
||||
})
|
||||
if pos == slotNumber {
|
||||
return nil, fmt.Errorf("storage metadata is not found, slot key: %#x, historyID: %d", storageKey, historyID)
|
||||
}
|
||||
offset := slotIndexSize * pos
|
||||
if target != common.BytesToHash(subSlice[offset:offset+common.HashLength]) {
|
||||
return nil, fmt.Errorf("storage metadata is not found, slot key: %#x, historyID: %d", storageKey, historyID)
|
||||
}
|
||||
return subSlice[offset : slotIndexSize*(pos+1)], nil
|
||||
}
|
||||
|
||||
// readAccount retrieves the account data from the specified state history.
|
||||
func (r *historyReader) readAccount(address common.Address, historyID uint64) ([]byte, error) {
|
||||
metadata, err := r.readAccountMetadata(address, historyID)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
length := int(metadata[common.AddressLength]) // one byte for account data length
|
||||
offset := int(binary.BigEndian.Uint32(metadata[common.AddressLength+1 : common.AddressLength+5])) // four bytes for the account data offset
|
||||
|
||||
// TODO(rj493456442) optimize it with partial read
|
||||
data := rawdb.ReadStateAccountHistory(r.freezer, historyID)
|
||||
if len(data) < length+offset {
|
||||
return nil, fmt.Errorf("account data is truncated, address: %#x, historyID: %d, size: %d, offset: %d, len: %d", address, historyID, len(data), offset, length)
|
||||
}
|
||||
return data[offset : offset+length], nil
|
||||
}
|
||||
|
||||
// readStorage retrieves the storage slot data from the specified state history.
|
||||
func (r *historyReader) readStorage(address common.Address, storageKey common.Hash, storageHash common.Hash, historyID uint64) ([]byte, error) {
|
||||
metadata, err := r.readAccountMetadata(address, historyID)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
// slotIndexOffset:
|
||||
// The offset of storage indices associated with the specified account.
|
||||
// slotIndexNumber:
|
||||
// The number of storage indices associated with the specified account.
|
||||
slotIndexOffset := int(binary.BigEndian.Uint32(metadata[common.AddressLength+5 : common.AddressLength+9]))
|
||||
slotIndexNumber := int(binary.BigEndian.Uint32(metadata[common.AddressLength+9 : common.AddressLength+13]))
|
||||
|
||||
slotMetadata, err := r.readStorageMetadata(storageKey, storageHash, historyID, slotIndexOffset, slotIndexNumber)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
length := int(slotMetadata[common.HashLength]) // one byte for slot data length
|
||||
offset := int(binary.BigEndian.Uint32(slotMetadata[common.HashLength+1 : common.HashLength+5])) // four bytes for slot data offset
|
||||
|
||||
// TODO(rj493456442) optimize it with partial read
|
||||
data := rawdb.ReadStateStorageHistory(r.freezer, historyID)
|
||||
if len(data) < offset+length {
|
||||
return nil, fmt.Errorf("storage data is truncated, address: %#x, key: %#x, historyID: %d, size: %d, offset: %d, len: %d", address, storageKey, historyID, len(data), offset, length)
|
||||
}
|
||||
return data[offset : offset+length], nil
|
||||
}
|
||||
|
||||
// read retrieves the state element data associated with the stateID.
|
||||
// stateID: represents the ID of the state of the specified version;
|
||||
// lastID: represents the ID of the latest/newest state history;
|
||||
// latestValue: represents the state value at the current disk layer with ID == lastID;
|
||||
func (r *historyReader) read(state stateIdentQuery, stateID uint64, lastID uint64, latestValue []byte) ([]byte, error) {
|
||||
tail, err := r.freezer.Tail()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
// stateID == tail is allowed, as the first history object preserved
|
||||
// is tail+1
|
||||
if stateID < tail {
|
||||
return nil, errors.New("historical state has been pruned")
|
||||
}
|
||||
|
||||
// To serve the request, all state histories from stateID+1 to lastID
|
||||
// must be indexed. It's not supposed to happen unless system is very
|
||||
// wrong.
|
||||
metadata := loadIndexMetadata(r.disk)
|
||||
if metadata == nil || metadata.Last < lastID {
|
||||
indexed := "null"
|
||||
if metadata != nil {
|
||||
indexed = fmt.Sprintf("%d", metadata.Last)
|
||||
}
|
||||
return nil, fmt.Errorf("state history is not fully indexed, requested: %d, indexed: %s", stateID, indexed)
|
||||
}
|
||||
|
||||
// Construct the index reader to locate the corresponding history for
|
||||
// state retrieval
|
||||
ir, ok := r.readers[state.String()]
|
||||
if !ok {
|
||||
ir, err = newIndexReaderWithLimitTag(r.disk, state.stateIdent)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
r.readers[state.String()] = ir
|
||||
}
|
||||
historyID, err := ir.readGreaterThan(stateID, lastID)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
// The state was not found in the state histories, as it has not been modified
|
||||
// since stateID. Use the data from the associated disk layer instead.
|
||||
if historyID == math.MaxUint64 {
|
||||
return latestValue, nil
|
||||
}
|
||||
// Resolve data from the specified state history object. Notably, since the history
|
||||
// reader operates completely asynchronously with the indexer/unindexer, it's possible
|
||||
// that the associated state histories are no longer available due to a rollback.
|
||||
// Such truncation should be captured by the state resolver below, rather than returning
|
||||
// invalid data.
|
||||
if state.account {
|
||||
return r.readAccount(state.address, historyID)
|
||||
}
|
||||
return r.readStorage(state.address, state.storageKey, state.storageHash, historyID)
|
||||
}
|
||||
159
triedb/pathdb/history_reader_test.go
Normal file
159
triedb/pathdb/history_reader_test.go
Normal file
|
|
@ -0,0 +1,159 @@
|
|||
// Copyright 2025 The go-ethereum Authors
|
||||
// This file is part of the go-ethereum library.
|
||||
//
|
||||
// The go-ethereum library is free software: you can redistribute it and/or modify
|
||||
// it under the terms of the GNU Lesser General Public License as published by
|
||||
// the Free Software Foundation, either version 3 of the License, or
|
||||
// (at your option) any later version.
|
||||
//
|
||||
// The go-ethereum library is distributed in the hope that it will be useful,
|
||||
// but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
// GNU Lesser General Public License for more details.
|
||||
//
|
||||
// You should have received a copy of the GNU Lesser General Public License
|
||||
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/
|
||||
|
||||
package pathdb
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"fmt"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/ethereum/go-ethereum/common"
|
||||
"github.com/ethereum/go-ethereum/core/rawdb"
|
||||
)
|
||||
|
||||
func waitIndexing(db *Database) {
|
||||
for {
|
||||
metadata := loadIndexMetadata(db.diskdb)
|
||||
if metadata != nil && metadata.Last >= db.tree.bottom().stateID() {
|
||||
return
|
||||
}
|
||||
time.Sleep(100 * time.Millisecond)
|
||||
}
|
||||
}
|
||||
|
||||
func checkHistoricState(env *tester, root common.Hash, hr *historyReader) error {
|
||||
// Short circuit if the historical state is no longer available
|
||||
if rawdb.ReadStateID(env.db.diskdb, root) == nil {
|
||||
return nil
|
||||
}
|
||||
var (
|
||||
dl = env.db.tree.bottom()
|
||||
stateID = rawdb.ReadStateID(env.db.diskdb, root)
|
||||
accounts = env.snapAccounts[root]
|
||||
storages = env.snapStorages[root]
|
||||
)
|
||||
for addrHash, accountData := range accounts {
|
||||
latest, _ := dl.account(addrHash, 0)
|
||||
blob, err := hr.read(newAccountIdentQuery(env.accountPreimage(addrHash), addrHash), *stateID, dl.stateID(), latest)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if !bytes.Equal(accountData, blob) {
|
||||
return fmt.Errorf("wrong account data, expected %x, got %x", accountData, blob)
|
||||
}
|
||||
}
|
||||
for i := 0; i < len(env.roots); i++ {
|
||||
if env.roots[i] == root {
|
||||
break
|
||||
}
|
||||
// Find all accounts deleted in the past, ensure the associated data is null
|
||||
for addrHash := range env.snapAccounts[env.roots[i]] {
|
||||
if _, ok := accounts[addrHash]; !ok {
|
||||
latest, _ := dl.account(addrHash, 0)
|
||||
blob, err := hr.read(newAccountIdentQuery(env.accountPreimage(addrHash), addrHash), *stateID, dl.stateID(), latest)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if len(blob) != 0 {
|
||||
return fmt.Errorf("wrong account data, expected null, got %x", blob)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
for addrHash, slots := range storages {
|
||||
for slotHash, slotData := range slots {
|
||||
latest, _ := dl.storage(addrHash, slotHash, 0)
|
||||
blob, err := hr.read(newStorageIdentQuery(env.accountPreimage(addrHash), addrHash, env.hashPreimage(slotHash), slotHash), *stateID, dl.stateID(), latest)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if !bytes.Equal(slotData, blob) {
|
||||
return fmt.Errorf("wrong storage data, expected %x, got %x", slotData, blob)
|
||||
}
|
||||
}
|
||||
}
|
||||
for i := 0; i < len(env.roots); i++ {
|
||||
if env.roots[i] == root {
|
||||
break
|
||||
}
|
||||
// Find all storage slots deleted in the past, ensure the associated data is null
|
||||
for addrHash, slots := range env.snapStorages[env.roots[i]] {
|
||||
for slotHash := range slots {
|
||||
_, ok := storages[addrHash]
|
||||
if ok {
|
||||
_, ok = storages[addrHash][slotHash]
|
||||
}
|
||||
if !ok {
|
||||
latest, _ := dl.storage(addrHash, slotHash, 0)
|
||||
blob, err := hr.read(newStorageIdentQuery(env.accountPreimage(addrHash), addrHash, env.hashPreimage(slotHash), slotHash), *stateID, dl.stateID(), latest)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if len(blob) != 0 {
|
||||
return fmt.Errorf("wrong storage data, expected null, got %x", blob)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func TestHistoryReader(t *testing.T) {
|
||||
testHistoryReader(t, 0) // with all histories reserved
|
||||
testHistoryReader(t, 10) // with latest 10 histories reserved
|
||||
}
|
||||
|
||||
func testHistoryReader(t *testing.T, historyLimit uint64) {
|
||||
maxDiffLayers = 4
|
||||
defer func() {
|
||||
maxDiffLayers = 128
|
||||
}()
|
||||
//log.SetDefault(log.NewLogger(log.NewTerminalHandlerWithLevel(os.Stderr, log.LevelDebug, true)))
|
||||
|
||||
env := newTester(t, historyLimit, false, 64, true)
|
||||
defer env.release()
|
||||
waitIndexing(env.db)
|
||||
|
||||
var (
|
||||
roots = env.roots
|
||||
dRoot = env.db.tree.bottom().rootHash()
|
||||
hr = newHistoryReader(env.db.diskdb, env.db.freezer)
|
||||
)
|
||||
for _, root := range roots {
|
||||
if root == dRoot {
|
||||
break
|
||||
}
|
||||
if err := checkHistoricState(env, root, hr); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
}
|
||||
|
||||
// Pile up more histories on top, ensuring the historic reader is not affected
|
||||
env.extend(4)
|
||||
waitIndexing(env.db)
|
||||
|
||||
for _, root := range roots {
|
||||
if root == dRoot {
|
||||
break
|
||||
}
|
||||
if err := checkHistoricState(env, root, hr); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
@ -73,8 +73,14 @@ var (
|
|||
historyDataBytesMeter = metrics.NewRegisteredMeter("pathdb/history/bytes/data", nil)
|
||||
historyIndexBytesMeter = metrics.NewRegisteredMeter("pathdb/history/bytes/index", nil)
|
||||
|
||||
indexHistoryTimer = metrics.NewRegisteredResettingTimer("pathdb/history/index/time", nil)
|
||||
unindexHistoryTimer = metrics.NewRegisteredResettingTimer("pathdb/history/unindex/time", nil)
|
||||
|
||||
lookupAddLayerTimer = metrics.NewRegisteredResettingTimer("pathdb/lookup/add/time", nil)
|
||||
lookupRemoveLayerTimer = metrics.NewRegisteredResettingTimer("pathdb/lookup/remove/time", nil)
|
||||
|
||||
historicalAccountReadTimer = metrics.NewRegisteredResettingTimer("pathdb/history/account/reads", nil)
|
||||
historicalStorageReadTimer = metrics.NewRegisteredResettingTimer("pathdb/history/storage/reads", nil)
|
||||
)
|
||||
|
||||
// Metrics in generation
|
||||
|
|
|
|||
|
|
@ -19,10 +19,13 @@ package pathdb
|
|||
import (
|
||||
"errors"
|
||||
"fmt"
|
||||
"time"
|
||||
|
||||
"github.com/ethereum/go-ethereum/common"
|
||||
"github.com/ethereum/go-ethereum/common/hexutil"
|
||||
"github.com/ethereum/go-ethereum/core/rawdb"
|
||||
"github.com/ethereum/go-ethereum/core/types"
|
||||
"github.com/ethereum/go-ethereum/crypto"
|
||||
"github.com/ethereum/go-ethereum/log"
|
||||
"github.com/ethereum/go-ethereum/rlp"
|
||||
"github.com/ethereum/go-ethereum/triedb/database"
|
||||
|
|
@ -192,3 +195,122 @@ func (db *Database) StateReader(root common.Hash) (database.StateReader, error)
|
|||
layer: layer,
|
||||
}, nil
|
||||
}
|
||||
|
||||
// HistoricalStateReader is a wrapper over history reader, providing access to
|
||||
// historical state.
|
||||
type HistoricalStateReader struct {
|
||||
db *Database
|
||||
reader *historyReader
|
||||
id uint64
|
||||
}
|
||||
|
||||
// HistoricReader constructs a reader for accessing the requested historic state.
|
||||
func (db *Database) HistoricReader(root common.Hash) (*HistoricalStateReader, error) {
|
||||
// Bail out if the state history hasn't been fully indexed
|
||||
if db.indexer == nil || !db.indexer.inited() {
|
||||
return nil, errors.New("state histories haven't been fully indexed yet")
|
||||
}
|
||||
if db.freezer == nil {
|
||||
return nil, errors.New("state histories are not available")
|
||||
}
|
||||
// States at the current disk layer or above are directly accessible via
|
||||
// db.StateReader.
|
||||
//
|
||||
// States older than the current disk layer (including the disk layer
|
||||
// itself) are available through historic state access.
|
||||
//
|
||||
// Note: the requested state may refer to a stale historic state that has
|
||||
// already been pruned. This function does not validate availability, as
|
||||
// underlying states may be pruned dynamically. Validity is checked during
|
||||
// each actual state retrieval.
|
||||
id := rawdb.ReadStateID(db.diskdb, root)
|
||||
if id == nil {
|
||||
return nil, fmt.Errorf("state %#x is not available", root)
|
||||
}
|
||||
return &HistoricalStateReader{
|
||||
id: *id,
|
||||
db: db,
|
||||
reader: newHistoryReader(db.diskdb, db.freezer),
|
||||
}, nil
|
||||
}
|
||||
|
||||
// AccountRLP directly retrieves the account RLP associated with a particular
|
||||
// address in the slim data format. An error will be returned if the read
|
||||
// operation exits abnormally. Specifically, if the layer is already stale.
|
||||
//
|
||||
// Note:
|
||||
// - the returned account is not a copy, please don't modify it.
|
||||
// - no error will be returned if the requested account is not found in database.
|
||||
func (r *HistoricalStateReader) AccountRLP(address common.Address) ([]byte, error) {
|
||||
defer func(start time.Time) {
|
||||
historicalAccountReadTimer.UpdateSince(start)
|
||||
}(time.Now())
|
||||
|
||||
// TODO(rjl493456442): Theoretically, the obtained disk layer could become stale
|
||||
// within a very short time window.
|
||||
//
|
||||
// While reading the account data while holding `db.tree.lock` can resolve
|
||||
// this issue, but it will introduce a heavy contention over the lock.
|
||||
//
|
||||
// Let's optimistically assume the situation is very unlikely to happen,
|
||||
// and try to define a low granularity lock if the current approach doesn't
|
||||
// work later.
|
||||
dl := r.db.tree.bottom()
|
||||
hash := crypto.Keccak256Hash(address.Bytes())
|
||||
latest, err := dl.account(hash, 0)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return r.reader.read(newAccountIdentQuery(address, hash), r.id, dl.stateID(), latest)
|
||||
}
|
||||
|
||||
// Account directly retrieves the account associated with a particular address in
|
||||
// the slim data format. An error will be returned if the read operation exits
|
||||
// abnormally. Specifically, if the layer is already stale.
|
||||
//
|
||||
// No error will be returned if the requested account is not found in database
|
||||
func (r *HistoricalStateReader) Account(address common.Address) (*types.SlimAccount, error) {
|
||||
blob, err := r.AccountRLP(address)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if len(blob) == 0 {
|
||||
return nil, nil
|
||||
}
|
||||
account := new(types.SlimAccount)
|
||||
if err := rlp.DecodeBytes(blob, account); err != nil {
|
||||
panic(err)
|
||||
}
|
||||
return account, nil
|
||||
}
|
||||
|
||||
// Storage directly retrieves the storage data associated with a particular key,
|
||||
// within a particular account. An error will be returned if the read operation
|
||||
// exits abnormally. Specifically, if the layer is already stale.
|
||||
//
|
||||
// Note:
|
||||
// - the returned storage data is not a copy, please don't modify it.
|
||||
// - no error will be returned if the requested slot is not found in database.
|
||||
func (r *HistoricalStateReader) Storage(address common.Address, key common.Hash) ([]byte, error) {
|
||||
defer func(start time.Time) {
|
||||
historicalStorageReadTimer.UpdateSince(start)
|
||||
}(time.Now())
|
||||
|
||||
// TODO(rjl493456442): Theoretically, the obtained disk layer could become stale
|
||||
// within a very short time window.
|
||||
//
|
||||
// While reading the account data while holding `db.tree.lock` can resolve
|
||||
// this issue, but it will introduce a heavy contention over the lock.
|
||||
//
|
||||
// Let's optimistically assume the situation is very unlikely to happen,
|
||||
// and try to define a low granularity lock if the current approach doesn't
|
||||
// work later.
|
||||
dl := r.db.tree.bottom()
|
||||
addrHash := crypto.Keccak256Hash(address.Bytes())
|
||||
keyHash := crypto.Keccak256Hash(key.Bytes())
|
||||
latest, err := dl.storage(addrHash, keyHash, 0)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return r.reader.read(newStorageIdentQuery(address, addrHash, key, keyHash), r.id, dl.stateID(), latest)
|
||||
}
|
||||
|
|
|
|||
Loading…
Reference in a new issue