mirror of
https://github.com/ethereum/go-ethereum.git
synced 2026-02-26 07:37:20 +00:00
core, eth, internal, triedb/pathdb: enable eth_getProofs for history (#32727)
This PR enables the `eth_getProofs ` endpoint against the historical states.
This commit is contained in:
parent
35922bcd33
commit
1022c7637d
24 changed files with 756 additions and 121 deletions
|
|
@ -121,6 +121,7 @@ if one is set. Otherwise it prints the genesis from the datadir.`,
|
|||
utils.LogExportCheckpointsFlag,
|
||||
utils.StateHistoryFlag,
|
||||
utils.TrienodeHistoryFlag,
|
||||
utils.TrienodeHistoryFullValueCheckpointFlag,
|
||||
}, utils.DatabaseFlags, debug.Flags),
|
||||
Before: func(ctx *cli.Context) error {
|
||||
flags.MigrateGlobalFlags(ctx)
|
||||
|
|
|
|||
|
|
@ -95,6 +95,7 @@ var (
|
|||
utils.LogExportCheckpointsFlag,
|
||||
utils.StateHistoryFlag,
|
||||
utils.TrienodeHistoryFlag,
|
||||
utils.TrienodeHistoryFullValueCheckpointFlag,
|
||||
utils.LightKDFFlag,
|
||||
utils.EthRequiredBlocksFlag,
|
||||
utils.LegacyWhitelistFlag, // deprecated
|
||||
|
|
|
|||
|
|
@ -33,7 +33,6 @@ require (
|
|||
github.com/tklauser/go-sysconf v0.3.12 // indirect
|
||||
github.com/tklauser/numcpus v0.6.1 // indirect
|
||||
golang.org/x/crypto v0.36.0 // indirect
|
||||
golang.org/x/exp v0.0.0-20230626212559-97b1e661b5df // indirect
|
||||
golang.org/x/sync v0.12.0 // indirect
|
||||
golang.org/x/sys v0.39.0 // indirect
|
||||
gopkg.in/yaml.v2 v2.4.0 // indirect
|
||||
|
|
|
|||
|
|
@ -301,6 +301,12 @@ var (
|
|||
Value: ethconfig.Defaults.TrienodeHistory,
|
||||
Category: flags.StateCategory,
|
||||
}
|
||||
TrienodeHistoryFullValueCheckpointFlag = &cli.UintFlag{
|
||||
Name: "history.trienode.full-value-checkpoint",
|
||||
Usage: "The frequency of full-value encoding. Every n-th node is stored in full-value format; all other nodes are stored as diffs relative to their predecessor",
|
||||
Value: uint(ethconfig.Defaults.NodeFullValueCheckpoint),
|
||||
Category: flags.StateCategory,
|
||||
}
|
||||
TransactionHistoryFlag = &cli.Uint64Flag{
|
||||
Name: "history.transactions",
|
||||
Usage: "Number of recent blocks to maintain transactions index for (default = about one year, 0 = entire chain)",
|
||||
|
|
@ -1714,6 +1720,9 @@ func SetEthConfig(ctx *cli.Context, stack *node.Node, cfg *ethconfig.Config) {
|
|||
if ctx.IsSet(TrienodeHistoryFlag.Name) {
|
||||
cfg.TrienodeHistory = ctx.Int64(TrienodeHistoryFlag.Name)
|
||||
}
|
||||
if ctx.IsSet(TrienodeHistoryFullValueCheckpointFlag.Name) {
|
||||
cfg.NodeFullValueCheckpoint = uint32(ctx.Uint(TrienodeHistoryFullValueCheckpointFlag.Name))
|
||||
}
|
||||
if ctx.IsSet(StateSchemeFlag.Name) {
|
||||
cfg.StateScheme = ctx.String(StateSchemeFlag.Name)
|
||||
}
|
||||
|
|
@ -2318,16 +2327,17 @@ func MakeChain(ctx *cli.Context, stack *node.Node, readonly bool) (*core.BlockCh
|
|||
Fatalf("%v", err)
|
||||
}
|
||||
options := &core.BlockChainConfig{
|
||||
TrieCleanLimit: ethconfig.Defaults.TrieCleanCache,
|
||||
NoPrefetch: ctx.Bool(CacheNoPrefetchFlag.Name),
|
||||
TrieDirtyLimit: ethconfig.Defaults.TrieDirtyCache,
|
||||
ArchiveMode: ctx.String(GCModeFlag.Name) == "archive",
|
||||
TrieTimeLimit: ethconfig.Defaults.TrieTimeout,
|
||||
SnapshotLimit: ethconfig.Defaults.SnapshotCache,
|
||||
Preimages: ctx.Bool(CachePreimagesFlag.Name),
|
||||
StateScheme: scheme,
|
||||
StateHistory: ctx.Uint64(StateHistoryFlag.Name),
|
||||
TrienodeHistory: ctx.Int64(TrienodeHistoryFlag.Name),
|
||||
TrieCleanLimit: ethconfig.Defaults.TrieCleanCache,
|
||||
NoPrefetch: ctx.Bool(CacheNoPrefetchFlag.Name),
|
||||
TrieDirtyLimit: ethconfig.Defaults.TrieDirtyCache,
|
||||
ArchiveMode: ctx.String(GCModeFlag.Name) == "archive",
|
||||
TrieTimeLimit: ethconfig.Defaults.TrieTimeout,
|
||||
SnapshotLimit: ethconfig.Defaults.SnapshotCache,
|
||||
Preimages: ctx.Bool(CachePreimagesFlag.Name),
|
||||
StateScheme: scheme,
|
||||
StateHistory: ctx.Uint64(StateHistoryFlag.Name),
|
||||
TrienodeHistory: ctx.Int64(TrienodeHistoryFlag.Name),
|
||||
NodeFullValueCheckpoint: uint32(ctx.Uint(TrienodeHistoryFullValueCheckpointFlag.Name)),
|
||||
|
||||
// Disable transaction indexing/unindexing.
|
||||
TxLookupLimit: -1,
|
||||
|
|
|
|||
|
|
@ -182,6 +182,12 @@ type BlockChainConfig struct {
|
|||
// If set to -1, no trienode history will be retained;
|
||||
TrienodeHistory int64
|
||||
|
||||
// The frequency of full-value encoding. For example, a value of 16 means
|
||||
// that, on average, for a given trie node across its 16 consecutive historical
|
||||
// versions, only one version is stored in full format, while the others
|
||||
// are stored in diff mode for storage compression.
|
||||
NodeFullValueCheckpoint uint32
|
||||
|
||||
// State snapshot related options
|
||||
SnapshotLimit int // Memory allowance (MB) to use for caching snapshot entries in memory
|
||||
SnapshotNoBuild bool // Whether the background generation is allowed
|
||||
|
|
@ -259,18 +265,22 @@ func (cfg *BlockChainConfig) triedbConfig(isVerkle bool) *triedb.Config {
|
|||
}
|
||||
if cfg.StateScheme == rawdb.PathScheme {
|
||||
config.PathDB = &pathdb.Config{
|
||||
StateHistory: cfg.StateHistory,
|
||||
TrienodeHistory: cfg.TrienodeHistory,
|
||||
EnableStateIndexing: cfg.ArchiveMode,
|
||||
TrieCleanSize: cfg.TrieCleanLimit * 1024 * 1024,
|
||||
StateCleanSize: cfg.SnapshotLimit * 1024 * 1024,
|
||||
JournalDirectory: cfg.TrieJournalDirectory,
|
||||
|
||||
TrieCleanSize: cfg.TrieCleanLimit * 1024 * 1024,
|
||||
StateCleanSize: cfg.SnapshotLimit * 1024 * 1024,
|
||||
// TODO(rjl493456442): The write buffer represents the memory limit used
|
||||
// for flushing both trie data and state data to disk. The config name
|
||||
// should be updated to eliminate the confusion.
|
||||
WriteBufferSize: cfg.TrieDirtyLimit * 1024 * 1024,
|
||||
NoAsyncFlush: cfg.TrieNoAsyncFlush,
|
||||
WriteBufferSize: cfg.TrieDirtyLimit * 1024 * 1024,
|
||||
JournalDirectory: cfg.TrieJournalDirectory,
|
||||
|
||||
// Historical state configurations
|
||||
StateHistory: cfg.StateHistory,
|
||||
TrienodeHistory: cfg.TrienodeHistory,
|
||||
EnableStateIndexing: cfg.ArchiveMode,
|
||||
FullValueCheckpoint: cfg.NodeFullValueCheckpoint,
|
||||
|
||||
// Testing configurations
|
||||
NoAsyncFlush: cfg.TrieNoAsyncFlush,
|
||||
}
|
||||
}
|
||||
return config
|
||||
|
|
|
|||
|
|
@ -429,7 +429,8 @@ func InspectDatabase(db ethdb.Database, keyPrefix, keyStart []byte) error {
|
|||
filterMapBlockLV stat
|
||||
|
||||
// Path-mode archive data
|
||||
stateIndex stat
|
||||
stateIndex stat
|
||||
trienodeIndex stat
|
||||
|
||||
// Verkle statistics
|
||||
verkleTries stat
|
||||
|
|
@ -524,8 +525,19 @@ func InspectDatabase(db ethdb.Database, keyPrefix, keyStart []byte) error {
|
|||
bloomBits.add(size)
|
||||
|
||||
// Path-based historic state indexes
|
||||
case bytes.HasPrefix(key, StateHistoryIndexPrefix) && len(key) >= len(StateHistoryIndexPrefix)+common.HashLength:
|
||||
case bytes.HasPrefix(key, StateHistoryAccountMetadataPrefix) && len(key) == len(StateHistoryAccountMetadataPrefix)+common.HashLength:
|
||||
stateIndex.add(size)
|
||||
case bytes.HasPrefix(key, StateHistoryStorageMetadataPrefix) && len(key) == len(StateHistoryStorageMetadataPrefix)+2*common.HashLength:
|
||||
stateIndex.add(size)
|
||||
case bytes.HasPrefix(key, StateHistoryAccountBlockPrefix) && len(key) == len(StateHistoryAccountBlockPrefix)+common.HashLength+4:
|
||||
stateIndex.add(size)
|
||||
case bytes.HasPrefix(key, StateHistoryStorageBlockPrefix) && len(key) == len(StateHistoryStorageBlockPrefix)+2*common.HashLength+4:
|
||||
stateIndex.add(size)
|
||||
|
||||
case bytes.HasPrefix(key, TrienodeHistoryMetadataPrefix) && len(key) >= len(TrienodeHistoryMetadataPrefix)+common.HashLength:
|
||||
trienodeIndex.add(size)
|
||||
case bytes.HasPrefix(key, TrienodeHistoryBlockPrefix) && len(key) >= len(TrienodeHistoryBlockPrefix)+common.HashLength+4:
|
||||
trienodeIndex.add(size)
|
||||
|
||||
// Verkle trie data is detected, determine the sub-category
|
||||
case bytes.HasPrefix(key, VerklePrefix):
|
||||
|
|
@ -622,12 +634,13 @@ func InspectDatabase(db ethdb.Database, keyPrefix, keyStart []byte) error {
|
|||
{"Key-Value store", "Path trie state lookups", stateLookups.sizeString(), stateLookups.countString()},
|
||||
{"Key-Value store", "Path trie account nodes", accountTries.sizeString(), accountTries.countString()},
|
||||
{"Key-Value store", "Path trie storage nodes", storageTries.sizeString(), storageTries.countString()},
|
||||
{"Key-Value store", "Path state history indexes", stateIndex.sizeString(), stateIndex.countString()},
|
||||
{"Key-Value store", "Verkle trie nodes", verkleTries.sizeString(), verkleTries.countString()},
|
||||
{"Key-Value store", "Verkle trie state lookups", verkleStateLookups.sizeString(), verkleStateLookups.countString()},
|
||||
{"Key-Value store", "Trie preimages", preimages.sizeString(), preimages.countString()},
|
||||
{"Key-Value store", "Account snapshot", accountSnaps.sizeString(), accountSnaps.countString()},
|
||||
{"Key-Value store", "Storage snapshot", storageSnaps.sizeString(), storageSnaps.countString()},
|
||||
{"Key-Value store", "Historical state index", stateIndex.sizeString(), stateIndex.countString()},
|
||||
{"Key-Value store", "Historical trie index", trienodeIndex.sizeString(), trienodeIndex.countString()},
|
||||
{"Key-Value store", "Beacon sync headers", beaconHeaders.sizeString(), beaconHeaders.countString()},
|
||||
{"Key-Value store", "Clique snapshots", cliqueSnaps.sizeString(), cliqueSnaps.countString()},
|
||||
{"Key-Value store", "Singleton metadata", metadata.sizeString(), metadata.countString()},
|
||||
|
|
@ -672,7 +685,7 @@ var knownMetadataKeys = [][]byte{
|
|||
snapshotGeneratorKey, snapshotRecoveryKey, txIndexTailKey, fastTxLookupLimitKey,
|
||||
uncleanShutdownKey, badBlockKey, transitionStatusKey, skeletonSyncStatusKey,
|
||||
persistentStateIDKey, trieJournalKey, snapshotSyncStatusKey, snapSyncStatusFlagKey,
|
||||
filterMapsRangeKey, headStateHistoryIndexKey, VerkleTransitionStatePrefix,
|
||||
filterMapsRangeKey, headStateHistoryIndexKey, headTrienodeHistoryIndexKey, VerkleTransitionStatePrefix,
|
||||
}
|
||||
|
||||
// printChainMetadata prints out chain metadata to stderr.
|
||||
|
|
|
|||
|
|
@ -17,40 +17,39 @@
|
|||
package state
|
||||
|
||||
import (
|
||||
"errors"
|
||||
"fmt"
|
||||
"sync"
|
||||
|
||||
"github.com/ethereum/go-ethereum/common"
|
||||
"github.com/ethereum/go-ethereum/common/lru"
|
||||
"github.com/ethereum/go-ethereum/core/state/snapshot"
|
||||
"github.com/ethereum/go-ethereum/core/types"
|
||||
"github.com/ethereum/go-ethereum/crypto"
|
||||
"github.com/ethereum/go-ethereum/ethdb"
|
||||
"github.com/ethereum/go-ethereum/rlp"
|
||||
"github.com/ethereum/go-ethereum/trie"
|
||||
"github.com/ethereum/go-ethereum/triedb"
|
||||
"github.com/ethereum/go-ethereum/triedb/database"
|
||||
"github.com/ethereum/go-ethereum/triedb/pathdb"
|
||||
)
|
||||
|
||||
// historicReader wraps a historical state reader defined in path database,
|
||||
// providing historic state serving over the path scheme.
|
||||
//
|
||||
// TODO(rjl493456442): historicReader is not thread-safe and does not fully
|
||||
// comply with the StateReader interface requirements, needs to be fixed.
|
||||
// Currently, it is only used in a non-concurrent context, so it is safe for now.
|
||||
type historicReader struct {
|
||||
// historicStateReader implements StateReader, wrapping a historical state reader
|
||||
// defined in path database and provide historic state serving over the path scheme.
|
||||
type historicStateReader struct {
|
||||
reader *pathdb.HistoricalStateReader
|
||||
lock sync.Mutex // Lock for protecting concurrent read
|
||||
}
|
||||
|
||||
// newHistoricReader constructs a reader for historic state serving.
|
||||
func newHistoricReader(r *pathdb.HistoricalStateReader) *historicReader {
|
||||
return &historicReader{reader: r}
|
||||
// newHistoricStateReader constructs a reader for historical state serving.
|
||||
func newHistoricStateReader(r *pathdb.HistoricalStateReader) *historicStateReader {
|
||||
return &historicStateReader{reader: r}
|
||||
}
|
||||
|
||||
// Account implements StateReader, retrieving the account specified by the address.
|
||||
//
|
||||
// An error will be returned if the associated snapshot is already stale or
|
||||
// the requested account is not yet covered by the snapshot.
|
||||
//
|
||||
// The returned account might be nil if it's not existent.
|
||||
func (r *historicReader) Account(addr common.Address) (*types.StateAccount, error) {
|
||||
func (r *historicStateReader) Account(addr common.Address) (*types.StateAccount, error) {
|
||||
r.lock.Lock()
|
||||
defer r.lock.Unlock()
|
||||
|
||||
account, err := r.reader.Account(addr)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
|
|
@ -80,7 +79,10 @@ func (r *historicReader) Account(addr common.Address) (*types.StateAccount, erro
|
|||
// the requested storage slot is not yet covered by the snapshot.
|
||||
//
|
||||
// The returned storage slot might be empty if it's not existent.
|
||||
func (r *historicReader) Storage(addr common.Address, key common.Hash) (common.Hash, error) {
|
||||
func (r *historicStateReader) Storage(addr common.Address, key common.Hash) (common.Hash, error) {
|
||||
r.lock.Lock()
|
||||
defer r.lock.Unlock()
|
||||
|
||||
blob, err := r.reader.Storage(addr, key)
|
||||
if err != nil {
|
||||
return common.Hash{}, err
|
||||
|
|
@ -97,6 +99,125 @@ func (r *historicReader) Storage(addr common.Address, key common.Hash) (common.H
|
|||
return slot, nil
|
||||
}
|
||||
|
||||
// historicTrieOpener is a wrapper of pathdb.HistoricalNodeReader, implementing
|
||||
// the database.NodeDatabase by adding NodeReader function.
|
||||
type historicTrieOpener struct {
|
||||
root common.Hash
|
||||
reader *pathdb.HistoricalNodeReader
|
||||
}
|
||||
|
||||
// newHistoricTrieOpener constructs the historical trie opener.
|
||||
func newHistoricTrieOpener(root common.Hash, reader *pathdb.HistoricalNodeReader) *historicTrieOpener {
|
||||
return &historicTrieOpener{
|
||||
root: root,
|
||||
reader: reader,
|
||||
}
|
||||
}
|
||||
|
||||
// NodeReader implements database.NodeDatabase, returning a node reader of a
|
||||
// specified state.
|
||||
func (o *historicTrieOpener) NodeReader(root common.Hash) (database.NodeReader, error) {
|
||||
if root != o.root {
|
||||
return nil, fmt.Errorf("state %x is not available", root)
|
||||
}
|
||||
return o.reader, nil
|
||||
}
|
||||
|
||||
// historicalTrieReader wraps a historical node reader defined in path database,
|
||||
// providing historical node serving over the path scheme.
|
||||
type historicalTrieReader struct {
|
||||
root common.Hash
|
||||
opener *historicTrieOpener
|
||||
tr Trie
|
||||
|
||||
subRoots map[common.Address]common.Hash // Set of storage roots, cached when the account is resolved
|
||||
subTries map[common.Address]Trie // Group of storage tries, cached when it's resolved
|
||||
lock sync.Mutex // Lock for protecting concurrent read
|
||||
}
|
||||
|
||||
// newHistoricalTrieReader constructs a reader for historical trie node serving.
|
||||
func newHistoricalTrieReader(root common.Hash, r *pathdb.HistoricalNodeReader) (*historicalTrieReader, error) {
|
||||
opener := newHistoricTrieOpener(root, r)
|
||||
tr, err := trie.NewStateTrie(trie.StateTrieID(root), opener)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return &historicalTrieReader{
|
||||
root: root,
|
||||
opener: opener,
|
||||
tr: tr,
|
||||
subRoots: make(map[common.Address]common.Hash),
|
||||
subTries: make(map[common.Address]Trie),
|
||||
}, nil
|
||||
}
|
||||
|
||||
// account is the inner version of Account and assumes the r.lock is already held.
|
||||
func (r *historicalTrieReader) account(addr common.Address) (*types.StateAccount, error) {
|
||||
account, err := r.tr.GetAccount(addr)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if account == nil {
|
||||
r.subRoots[addr] = types.EmptyRootHash
|
||||
} else {
|
||||
r.subRoots[addr] = account.Root
|
||||
}
|
||||
return account, nil
|
||||
}
|
||||
|
||||
// Account implements StateReader, retrieving the account specified by the address.
|
||||
//
|
||||
// An error will be returned if the associated snapshot is already stale or
|
||||
// the requested account is not yet covered by the snapshot.
|
||||
//
|
||||
// The returned account might be nil if it's not existent.
|
||||
func (r *historicalTrieReader) Account(addr common.Address) (*types.StateAccount, error) {
|
||||
r.lock.Lock()
|
||||
defer r.lock.Unlock()
|
||||
|
||||
return r.account(addr)
|
||||
}
|
||||
|
||||
// Storage implements StateReader, retrieving the storage slot specified by the
|
||||
// address and slot key.
|
||||
//
|
||||
// An error will be returned if the associated snapshot is already stale or
|
||||
// the requested storage slot is not yet covered by the snapshot.
|
||||
//
|
||||
// The returned storage slot might be empty if it's not existent.
|
||||
func (r *historicalTrieReader) Storage(addr common.Address, key common.Hash) (common.Hash, error) {
|
||||
r.lock.Lock()
|
||||
defer r.lock.Unlock()
|
||||
|
||||
tr, found := r.subTries[addr]
|
||||
if !found {
|
||||
root, ok := r.subRoots[addr]
|
||||
|
||||
// The storage slot is accessed without account caching. It's unexpected
|
||||
// behavior but try to resolve the account first anyway.
|
||||
if !ok {
|
||||
_, err := r.account(addr)
|
||||
if err != nil {
|
||||
return common.Hash{}, err
|
||||
}
|
||||
root = r.subRoots[addr]
|
||||
}
|
||||
var err error
|
||||
tr, err = trie.NewStateTrie(trie.StorageTrieID(r.root, crypto.Keccak256Hash(addr.Bytes()), root), r.opener)
|
||||
if err != nil {
|
||||
return common.Hash{}, err
|
||||
}
|
||||
r.subTries[addr] = tr
|
||||
}
|
||||
ret, err := tr.GetStorage(addr, key.Bytes())
|
||||
if err != nil {
|
||||
return common.Hash{}, err
|
||||
}
|
||||
var value common.Hash
|
||||
value.SetBytes(ret)
|
||||
return value, nil
|
||||
}
|
||||
|
||||
// HistoricDB is the implementation of Database interface, with the ability to
|
||||
// access historical state.
|
||||
type HistoricDB struct {
|
||||
|
|
@ -118,22 +239,54 @@ func NewHistoricDatabase(disk ethdb.KeyValueStore, triedb *triedb.Database) *His
|
|||
|
||||
// Reader implements Database interface, returning a reader of the specific state.
|
||||
func (db *HistoricDB) Reader(stateRoot common.Hash) (Reader, error) {
|
||||
hr, err := db.triedb.HistoricReader(stateRoot)
|
||||
var readers []StateReader
|
||||
sr, err := db.triedb.HistoricStateReader(stateRoot)
|
||||
if err == nil {
|
||||
readers = append(readers, newHistoricStateReader(sr))
|
||||
}
|
||||
nr, err := db.triedb.HistoricNodeReader(stateRoot)
|
||||
if err == nil {
|
||||
tr, err := newHistoricalTrieReader(stateRoot, nr)
|
||||
if err == nil {
|
||||
readers = append(readers, tr)
|
||||
}
|
||||
}
|
||||
if len(readers) == 0 {
|
||||
return nil, fmt.Errorf("historical state %x is not available", stateRoot)
|
||||
}
|
||||
combined, err := newMultiStateReader(readers...)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return newReader(newCachingCodeReader(db.disk, db.codeCache, db.codeSizeCache), newHistoricReader(hr)), nil
|
||||
return newReader(newCachingCodeReader(db.disk, db.codeCache, db.codeSizeCache), combined), nil
|
||||
}
|
||||
|
||||
// OpenTrie opens the main account trie. It's not supported by historic database.
|
||||
func (db *HistoricDB) OpenTrie(root common.Hash) (Trie, error) {
|
||||
return nil, errors.New("not implemented")
|
||||
nr, err := db.triedb.HistoricNodeReader(root)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
tr, err := trie.NewStateTrie(trie.StateTrieID(root), newHistoricTrieOpener(root, nr))
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return tr, nil
|
||||
}
|
||||
|
||||
// OpenStorageTrie opens the storage trie of an account. It's not supported by
|
||||
// historic database.
|
||||
func (db *HistoricDB) OpenStorageTrie(stateRoot common.Hash, address common.Address, root common.Hash, trie Trie) (Trie, error) {
|
||||
return nil, errors.New("not implemented")
|
||||
func (db *HistoricDB) OpenStorageTrie(stateRoot common.Hash, address common.Address, root common.Hash, _ Trie) (Trie, error) {
|
||||
nr, err := db.triedb.HistoricNodeReader(stateRoot)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
id := trie.StorageTrieID(stateRoot, crypto.Keccak256Hash(address.Bytes()), root)
|
||||
tr, err := trie.NewStateTrie(id, newHistoricTrieOpener(stateRoot, nr))
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return tr, nil
|
||||
}
|
||||
|
||||
// TrieDB returns the underlying trie database for managing trie nodes.
|
||||
|
|
|
|||
|
|
@ -222,18 +222,19 @@ func New(stack *node.Node, config *ethconfig.Config) (*Ethereum, error) {
|
|||
}
|
||||
var (
|
||||
options = &core.BlockChainConfig{
|
||||
TrieCleanLimit: config.TrieCleanCache,
|
||||
NoPrefetch: config.NoPrefetch,
|
||||
TrieDirtyLimit: config.TrieDirtyCache,
|
||||
ArchiveMode: config.NoPruning,
|
||||
TrieTimeLimit: config.TrieTimeout,
|
||||
SnapshotLimit: config.SnapshotCache,
|
||||
Preimages: config.Preimages,
|
||||
StateHistory: config.StateHistory,
|
||||
TrienodeHistory: config.TrienodeHistory,
|
||||
StateScheme: scheme,
|
||||
ChainHistoryMode: config.HistoryMode,
|
||||
TxLookupLimit: int64(min(config.TransactionHistory, math.MaxInt64)),
|
||||
TrieCleanLimit: config.TrieCleanCache,
|
||||
NoPrefetch: config.NoPrefetch,
|
||||
TrieDirtyLimit: config.TrieDirtyCache,
|
||||
ArchiveMode: config.NoPruning,
|
||||
TrieTimeLimit: config.TrieTimeout,
|
||||
SnapshotLimit: config.SnapshotCache,
|
||||
Preimages: config.Preimages,
|
||||
StateHistory: config.StateHistory,
|
||||
TrienodeHistory: config.TrienodeHistory,
|
||||
NodeFullValueCheckpoint: config.NodeFullValueCheckpoint,
|
||||
StateScheme: scheme,
|
||||
ChainHistoryMode: config.HistoryMode,
|
||||
TxLookupLimit: int64(min(config.TransactionHistory, math.MaxInt64)),
|
||||
VmConfig: vm.Config{
|
||||
EnablePreimageRecording: config.EnablePreimageRecording,
|
||||
EnableWitnessStats: config.EnableWitnessStats,
|
||||
|
|
|
|||
|
|
@ -35,6 +35,7 @@ import (
|
|||
"github.com/ethereum/go-ethereum/log"
|
||||
"github.com/ethereum/go-ethereum/miner"
|
||||
"github.com/ethereum/go-ethereum/params"
|
||||
"github.com/ethereum/go-ethereum/triedb/pathdb"
|
||||
)
|
||||
|
||||
// FullNodeGPO contains default gasprice oracle settings for full node.
|
||||
|
|
@ -49,32 +50,33 @@ var FullNodeGPO = gasprice.Config{
|
|||
|
||||
// Defaults contains default settings for use on the Ethereum main net.
|
||||
var Defaults = Config{
|
||||
HistoryMode: history.KeepAll,
|
||||
SyncMode: SnapSync,
|
||||
NetworkId: 0, // enable auto configuration of networkID == chainID
|
||||
TxLookupLimit: 2350000,
|
||||
TransactionHistory: 2350000,
|
||||
LogHistory: 2350000,
|
||||
StateHistory: params.FullImmutabilityThreshold,
|
||||
TrienodeHistory: -1,
|
||||
DatabaseCache: 512,
|
||||
TrieCleanCache: 154,
|
||||
TrieDirtyCache: 256,
|
||||
TrieTimeout: 60 * time.Minute,
|
||||
SnapshotCache: 102,
|
||||
FilterLogCacheSize: 32,
|
||||
LogQueryLimit: 1000,
|
||||
Miner: miner.DefaultConfig,
|
||||
TxPool: legacypool.DefaultConfig,
|
||||
BlobPool: blobpool.DefaultConfig,
|
||||
RPCGasCap: 50000000,
|
||||
RPCEVMTimeout: 5 * time.Second,
|
||||
GPO: FullNodeGPO,
|
||||
RPCTxFeeCap: 1, // 1 ether
|
||||
TxSyncDefaultTimeout: 20 * time.Second,
|
||||
TxSyncMaxTimeout: 1 * time.Minute,
|
||||
SlowBlockThreshold: time.Second * 2,
|
||||
RangeLimit: 0,
|
||||
HistoryMode: history.KeepAll,
|
||||
SyncMode: SnapSync,
|
||||
NetworkId: 0, // enable auto configuration of networkID == chainID
|
||||
TxLookupLimit: 2350000,
|
||||
TransactionHistory: 2350000,
|
||||
LogHistory: 2350000,
|
||||
StateHistory: pathdb.Defaults.StateHistory,
|
||||
TrienodeHistory: pathdb.Defaults.TrienodeHistory,
|
||||
NodeFullValueCheckpoint: pathdb.Defaults.FullValueCheckpoint,
|
||||
DatabaseCache: 512,
|
||||
TrieCleanCache: 154,
|
||||
TrieDirtyCache: 256,
|
||||
TrieTimeout: 60 * time.Minute,
|
||||
SnapshotCache: 102,
|
||||
FilterLogCacheSize: 32,
|
||||
LogQueryLimit: 1000,
|
||||
Miner: miner.DefaultConfig,
|
||||
TxPool: legacypool.DefaultConfig,
|
||||
BlobPool: blobpool.DefaultConfig,
|
||||
RPCGasCap: 50000000,
|
||||
RPCEVMTimeout: 5 * time.Second,
|
||||
GPO: FullNodeGPO,
|
||||
RPCTxFeeCap: 1, // 1 ether
|
||||
TxSyncDefaultTimeout: 20 * time.Second,
|
||||
TxSyncMaxTimeout: 1 * time.Minute,
|
||||
SlowBlockThreshold: time.Second * 2,
|
||||
RangeLimit: 0,
|
||||
}
|
||||
|
||||
//go:generate go run github.com/fjl/gencodec -type Config -formats toml -out gen_config.go
|
||||
|
|
@ -112,6 +114,12 @@ type Config struct {
|
|||
StateHistory uint64 `toml:",omitempty"` // The maximum number of blocks from head whose state histories are reserved.
|
||||
TrienodeHistory int64 `toml:",omitempty"` // Number of blocks from the chain head for which trienode histories are retained
|
||||
|
||||
// The frequency of full-value encoding. For example, a value of 16 means
|
||||
// that, on average, for a given trie node across its 16 consecutive historical
|
||||
// versions, only one version is stored in full format, while the others
|
||||
// are stored in diff mode for storage compression.
|
||||
NodeFullValueCheckpoint uint32 `toml:",omitempty"`
|
||||
|
||||
// State scheme represents the scheme used to store ethereum states and trie
|
||||
// nodes on top. It can be 'hash', 'path', or none which means use the scheme
|
||||
// consistent with persistent state.
|
||||
|
|
|
|||
|
|
@ -32,6 +32,7 @@ func (c Config) MarshalTOML() (interface{}, error) {
|
|||
LogExportCheckpoints string
|
||||
StateHistory uint64 `toml:",omitempty"`
|
||||
TrienodeHistory int64 `toml:",omitempty"`
|
||||
NodeFullValueCheckpoint uint32 `toml:",omitempty"`
|
||||
StateScheme string `toml:",omitempty"`
|
||||
RequiredBlocks map[uint64]common.Hash `toml:"-"`
|
||||
SlowBlockThreshold time.Duration `toml:",omitempty"`
|
||||
|
|
@ -84,6 +85,7 @@ func (c Config) MarshalTOML() (interface{}, error) {
|
|||
enc.LogExportCheckpoints = c.LogExportCheckpoints
|
||||
enc.StateHistory = c.StateHistory
|
||||
enc.TrienodeHistory = c.TrienodeHistory
|
||||
enc.NodeFullValueCheckpoint = c.NodeFullValueCheckpoint
|
||||
enc.StateScheme = c.StateScheme
|
||||
enc.RequiredBlocks = c.RequiredBlocks
|
||||
enc.SlowBlockThreshold = c.SlowBlockThreshold
|
||||
|
|
@ -140,6 +142,7 @@ func (c *Config) UnmarshalTOML(unmarshal func(interface{}) error) error {
|
|||
LogExportCheckpoints *string
|
||||
StateHistory *uint64 `toml:",omitempty"`
|
||||
TrienodeHistory *int64 `toml:",omitempty"`
|
||||
NodeFullValueCheckpoint *uint32 `toml:",omitempty"`
|
||||
StateScheme *string `toml:",omitempty"`
|
||||
RequiredBlocks map[uint64]common.Hash `toml:"-"`
|
||||
SlowBlockThreshold *time.Duration `toml:",omitempty"`
|
||||
|
|
@ -225,6 +228,9 @@ func (c *Config) UnmarshalTOML(unmarshal func(interface{}) error) error {
|
|||
if dec.TrienodeHistory != nil {
|
||||
c.TrienodeHistory = *dec.TrienodeHistory
|
||||
}
|
||||
if dec.NodeFullValueCheckpoint != nil {
|
||||
c.NodeFullValueCheckpoint = *dec.NodeFullValueCheckpoint
|
||||
}
|
||||
if dec.StateScheme != nil {
|
||||
c.StateScheme = *dec.StateScheme
|
||||
}
|
||||
|
|
|
|||
|
|
@ -47,7 +47,6 @@ import (
|
|||
"github.com/ethereum/go-ethereum/params"
|
||||
"github.com/ethereum/go-ethereum/rlp"
|
||||
"github.com/ethereum/go-ethereum/rpc"
|
||||
"github.com/ethereum/go-ethereum/trie"
|
||||
)
|
||||
|
||||
// estimateGasErrorRatio is the amount of overestimation eth_estimateGas is
|
||||
|
|
@ -382,8 +381,7 @@ func (api *BlockChainAPI) GetProof(ctx context.Context, address common.Address,
|
|||
if len(keys) > 0 {
|
||||
var storageTrie state.Trie
|
||||
if storageRoot != types.EmptyRootHash && storageRoot != (common.Hash{}) {
|
||||
id := trie.StorageTrieID(header.Root, crypto.Keccak256Hash(address.Bytes()), storageRoot)
|
||||
st, err := trie.NewStateTrie(id, statedb.Database().TrieDB())
|
||||
st, err := statedb.Database().OpenStorageTrie(header.Root, address, storageRoot, nil)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
|
@ -414,7 +412,7 @@ func (api *BlockChainAPI) GetProof(ctx context.Context, address common.Address,
|
|||
}
|
||||
}
|
||||
// Create the accountProof.
|
||||
tr, err := trie.NewStateTrie(trie.StateTrieID(header.Root), statedb.Database().TrieDB())
|
||||
tr, err := statedb.Database().OpenTrie(header.Root)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
|
|
|||
|
|
@ -153,6 +153,11 @@ func CountValues(b []byte) (int, error) {
|
|||
}
|
||||
|
||||
// SplitListValues extracts the raw elements from the list RLP-encoding blob.
|
||||
//
|
||||
// Note: the returned slice must not be modified, as it shares the same
|
||||
// backing array as the original slice. It's acceptable to deep-copy the elements
|
||||
// out if necessary, but let's stick with this approach for less allocation
|
||||
// overhead.
|
||||
func SplitListValues(b []byte) ([][]byte, error) {
|
||||
b, _, err := SplitList(b)
|
||||
if err != nil {
|
||||
|
|
|
|||
|
|
@ -129,8 +129,8 @@ func (db *Database) StateReader(blockRoot common.Hash) (database.StateReader, er
|
|||
return db.backend.StateReader(blockRoot)
|
||||
}
|
||||
|
||||
// HistoricReader constructs a reader for accessing the requested historic state.
|
||||
func (db *Database) HistoricReader(root common.Hash) (*pathdb.HistoricalStateReader, error) {
|
||||
// HistoricStateReader constructs a reader for accessing the requested historic state.
|
||||
func (db *Database) HistoricStateReader(root common.Hash) (*pathdb.HistoricalStateReader, error) {
|
||||
pdb, ok := db.backend.(*pathdb.Database)
|
||||
if !ok {
|
||||
return nil, errors.New("not supported")
|
||||
|
|
@ -138,6 +138,15 @@ func (db *Database) HistoricReader(root common.Hash) (*pathdb.HistoricalStateRea
|
|||
return pdb.HistoricReader(root)
|
||||
}
|
||||
|
||||
// HistoricNodeReader constructs a reader for accessing the historical trie node.
|
||||
func (db *Database) HistoricNodeReader(root common.Hash) (*pathdb.HistoricalNodeReader, error) {
|
||||
pdb, ok := db.backend.(*pathdb.Database)
|
||||
if !ok {
|
||||
return nil, errors.New("not supported")
|
||||
}
|
||||
return pdb.HistoricNodeReader(root)
|
||||
}
|
||||
|
||||
// Update performs a state transition by committing dirty nodes contained in the
|
||||
// given set in order to update state from the specified parent to the specified
|
||||
// root. The held pre-images accumulated up to this point will be flushed in case
|
||||
|
|
|
|||
|
|
@ -43,6 +43,26 @@ const (
|
|||
// Do not increase the buffer size arbitrarily, otherwise the system
|
||||
// pause time will increase when the database writes happen.
|
||||
defaultBufferSize = 64 * 1024 * 1024
|
||||
|
||||
// maxFullValueCheckpoint defines the maximum allowed encoding frequency (1/16)
|
||||
// for storing nodes in full format. With this setting, a node may be written
|
||||
// to the trienode history as a full value at the specified frequency.
|
||||
//
|
||||
// Note that the frequency is not strict: the actual decision is probabilistic.
|
||||
// Only the overall long-term full-value encoding rate is enforced.
|
||||
//
|
||||
// Values beyond this limit are considered ineffective, as the trienode history
|
||||
// is already well compressed. Increasing it further will only degrade read
|
||||
// performance linearly.
|
||||
maxFullValueCheckpoint = 16
|
||||
|
||||
// defaultFullValueCheckpoint defines the default full-value encoding frequency
|
||||
// (1/8) for storing nodes in full format. With this setting, nodes may be
|
||||
// written to the trienode history as full values at the specified rate.
|
||||
//
|
||||
// This strikes a balance between effective compression of the trienode history
|
||||
// and acceptable read performance.
|
||||
defaultFullValueCheckpoint = 8
|
||||
)
|
||||
|
||||
var (
|
||||
|
|
@ -54,6 +74,7 @@ var (
|
|||
var Defaults = &Config{
|
||||
StateHistory: params.FullImmutabilityThreshold,
|
||||
TrienodeHistory: -1,
|
||||
FullValueCheckpoint: defaultFullValueCheckpoint,
|
||||
EnableStateIndexing: false,
|
||||
TrieCleanSize: defaultTrieCleanSize,
|
||||
StateCleanSize: defaultStateCleanSize,
|
||||
|
|
@ -62,22 +83,26 @@ var Defaults = &Config{
|
|||
|
||||
// ReadOnly is the config in order to open database in read only mode.
|
||||
var ReadOnly = &Config{
|
||||
ReadOnly: true,
|
||||
TrienodeHistory: -1,
|
||||
TrieCleanSize: defaultTrieCleanSize,
|
||||
StateCleanSize: defaultStateCleanSize,
|
||||
ReadOnly: true,
|
||||
TrienodeHistory: -1,
|
||||
TrieCleanSize: defaultTrieCleanSize,
|
||||
StateCleanSize: defaultStateCleanSize,
|
||||
FullValueCheckpoint: defaultFullValueCheckpoint,
|
||||
}
|
||||
|
||||
// Config contains the settings for database.
|
||||
type Config struct {
|
||||
TrieCleanSize int // Maximum memory allowance (in bytes) for caching clean trie data
|
||||
StateCleanSize int // Maximum memory allowance (in bytes) for caching clean state data
|
||||
WriteBufferSize int // Maximum memory allowance (in bytes) for write buffer
|
||||
ReadOnly bool // Flag whether the database is opened in read only mode
|
||||
JournalDirectory string // Absolute path of journal directory (null means the journal data is persisted in key-value store)
|
||||
|
||||
// Historical state configurations
|
||||
StateHistory uint64 // Number of recent blocks to maintain state history for, 0: full chain
|
||||
TrienodeHistory int64 // Number of recent blocks to maintain trienode history for, 0: full chain, negative: disable
|
||||
EnableStateIndexing bool // Whether to enable state history indexing for external state access
|
||||
TrieCleanSize int // Maximum memory allowance (in bytes) for caching clean trie data
|
||||
StateCleanSize int // Maximum memory allowance (in bytes) for caching clean state data
|
||||
WriteBufferSize int // Maximum memory allowance (in bytes) for write buffer
|
||||
ReadOnly bool // Flag whether the database is opened in read only mode
|
||||
JournalDirectory string // Absolute path of journal directory (null means the journal data is persisted in key-value store)
|
||||
FullValueCheckpoint uint32 // The rate at which trie nodes are encoded in full-value format
|
||||
|
||||
// Testing configurations
|
||||
SnapshotNoBuild bool // Flag Whether the state generation is disabled
|
||||
|
|
@ -93,6 +118,14 @@ func (c *Config) sanitize() *Config {
|
|||
log.Warn("Sanitizing invalid node buffer size", "provided", common.StorageSize(conf.WriteBufferSize), "updated", common.StorageSize(maxBufferSize))
|
||||
conf.WriteBufferSize = maxBufferSize
|
||||
}
|
||||
if conf.FullValueCheckpoint > maxFullValueCheckpoint {
|
||||
log.Warn("Sanitizing trienode history full value checkpoint", "provided", conf.FullValueCheckpoint, "updated", maxFullValueCheckpoint)
|
||||
conf.FullValueCheckpoint = maxFullValueCheckpoint
|
||||
}
|
||||
if conf.FullValueCheckpoint == 0 {
|
||||
conf.FullValueCheckpoint = 1
|
||||
log.Info("Disabling diff mode trie node history encoding")
|
||||
}
|
||||
return &conf
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -346,8 +346,9 @@ func (dl *diskLayer) writeHistory(typ historyType, diff *diffLayer) (bool, error
|
|||
case typeTrienodeHistory:
|
||||
freezer = dl.db.trienodeFreezer
|
||||
indexer = dl.db.trienodeIndexer
|
||||
writeFunc = writeTrienodeHistory
|
||||
|
||||
writeFunc = func(writer ethdb.AncientWriter, diff *diffLayer) error {
|
||||
return writeTrienodeHistory(writer, diff, dl.db.config.FullValueCheckpoint)
|
||||
}
|
||||
// Skip the history commit if the trienode history is not permitted
|
||||
if dl.db.config.TrienodeHistory < 0 {
|
||||
return false, nil
|
||||
|
|
|
|||
|
|
@ -158,8 +158,6 @@ func newStorageIdent(addressHash common.Hash, storageHash common.Hash) stateIden
|
|||
// newTrienodeIdent constructs a state identifier for a trie node.
|
||||
// The address denotes the address hash of the associated account;
|
||||
// the path denotes the path of the node within the trie;
|
||||
//
|
||||
// nolint:unused
|
||||
func newTrienodeIdent(addressHash common.Hash, path string) stateIdent {
|
||||
return stateIdent{
|
||||
typ: typeTrienode,
|
||||
|
|
|
|||
|
|
@ -609,3 +609,48 @@ func (it *indexIterator) Error() error {
|
|||
func (it *indexIterator) ID() uint64 {
|
||||
return it.blockIt.ID()
|
||||
}
|
||||
|
||||
// seqIter provides a simple iterator over a contiguous sequence of
|
||||
// unsigned integers, ending at end (end is included).
|
||||
type seqIter struct {
|
||||
cur uint64 // current position
|
||||
end uint64 // iteration stops at end-1
|
||||
done bool // true when iteration is exhausted
|
||||
}
|
||||
|
||||
func newSeqIter(last uint64) *seqIter {
|
||||
return &seqIter{end: last + 1}
|
||||
}
|
||||
|
||||
// SeekGT positions the iterator at the smallest element > id.
|
||||
// Returns false if no such element exists.
|
||||
func (it *seqIter) SeekGT(id uint64) bool {
|
||||
if id+1 >= it.end {
|
||||
it.done = true
|
||||
return false
|
||||
}
|
||||
it.cur = id + 1
|
||||
it.done = false
|
||||
return true
|
||||
}
|
||||
|
||||
// Next advances the iterator. Returns false if exhausted.
|
||||
func (it *seqIter) Next() bool {
|
||||
if it.done {
|
||||
return false
|
||||
}
|
||||
if it.cur+1 < it.end {
|
||||
it.cur++
|
||||
return true
|
||||
}
|
||||
// this was the last element
|
||||
it.done = true
|
||||
return false
|
||||
}
|
||||
|
||||
// ID returns the id of the element where the iterator is positioned at.
|
||||
func (it *seqIter) ID() uint64 { return it.cur }
|
||||
|
||||
// Error returns any accumulated error. Exhausting all the elements is not
|
||||
// considered to be an error.
|
||||
func (it *seqIter) Error() error { return nil }
|
||||
|
|
|
|||
|
|
@ -313,3 +313,49 @@ func TestIndexIteratorTraversal(t *testing.T) {
|
|||
}
|
||||
}
|
||||
}
|
||||
|
||||
func TestSeqIterBasicIteration(t *testing.T) {
|
||||
it := newSeqIter(5) // iterates over [1..5]
|
||||
it.SeekGT(0)
|
||||
|
||||
var (
|
||||
got []uint64
|
||||
expected = []uint64{1, 2, 3, 4, 5}
|
||||
)
|
||||
got = append(got, it.ID())
|
||||
for it.Next() {
|
||||
got = append(got, it.ID())
|
||||
}
|
||||
if len(got) != len(expected) {
|
||||
t.Fatalf("iteration length mismatch: got %v, expected %v", got, expected)
|
||||
}
|
||||
for i := range expected {
|
||||
if got[i] != expected[i] {
|
||||
t.Fatalf("element mismatch at %d: got %d, expected %d", i, got[i], expected[i])
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func TestSeqIterSeekGT(t *testing.T) {
|
||||
it := newSeqIter(5)
|
||||
|
||||
tests := []struct {
|
||||
input uint64
|
||||
ok bool
|
||||
expected uint64
|
||||
}{
|
||||
{0, true, 1},
|
||||
{1, true, 2},
|
||||
{4, true, 5},
|
||||
{5, false, 0}, // 6 is out of range
|
||||
}
|
||||
for _, tt := range tests {
|
||||
ok := it.SeekGT(tt.input)
|
||||
if ok != tt.ok {
|
||||
t.Fatalf("SeekGT(%d) ok mismatch: got %v, expected %v", tt.input, ok, tt.ok)
|
||||
}
|
||||
if ok && it.ID() != tt.expected {
|
||||
t.Fatalf("SeekGT(%d) positioned at %d, expected %d", tt.input, it.ID(), tt.expected)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -29,7 +29,6 @@ import (
|
|||
"github.com/ethereum/go-ethereum/ethdb"
|
||||
"github.com/ethereum/go-ethereum/log"
|
||||
"github.com/ethereum/go-ethereum/rlp"
|
||||
"golang.org/x/exp/maps"
|
||||
"golang.org/x/sync/errgroup"
|
||||
)
|
||||
|
||||
|
|
@ -247,8 +246,8 @@ func (b *batchIndexer) finish(force bool) error {
|
|||
log.Debug("Committed batch indexer", "type", b.typ, "entries", len(b.index), "records", b.pending, "size", common.StorageSize(batchSize), "elapsed", common.PrettyDuration(time.Since(start)))
|
||||
|
||||
b.pending = 0
|
||||
maps.Clear(b.index)
|
||||
maps.Clear(b.ext)
|
||||
clear(b.index)
|
||||
clear(b.ext)
|
||||
return nil
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -22,11 +22,17 @@ import (
|
|||
"errors"
|
||||
"fmt"
|
||||
"math"
|
||||
"slices"
|
||||
"sort"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
|
||||
"github.com/ethereum/go-ethereum/common"
|
||||
"github.com/ethereum/go-ethereum/core/rawdb"
|
||||
"github.com/ethereum/go-ethereum/ethdb"
|
||||
"github.com/ethereum/go-ethereum/log"
|
||||
"github.com/ethereum/go-ethereum/rlp"
|
||||
"golang.org/x/sync/errgroup"
|
||||
)
|
||||
|
||||
// indexReaderWithLimitTag is a wrapper around indexReader that includes an
|
||||
|
|
@ -260,9 +266,204 @@ func (r *stateHistoryReader) read(state stateIdentQuery, stateID uint64, lastID
|
|||
return r.readStorage(state.address, state.storageKey, state.storageHash, historyID)
|
||||
}
|
||||
|
||||
// trienodeReader is the structure to access historical trienode data.
|
||||
type trienodeReader struct {
|
||||
disk ethdb.KeyValueReader
|
||||
freezer ethdb.AncientReader
|
||||
readConcurrency int // The concurrency used to load trie node data from history
|
||||
}
|
||||
|
||||
// newTrienodeReader constructs the history reader with the supplied db
|
||||
// for accessing historical trie nodes.
|
||||
func newTrienodeReader(disk ethdb.KeyValueReader, freezer ethdb.AncientReader, readConcurrency int) *trienodeReader {
|
||||
return &trienodeReader{
|
||||
disk: disk,
|
||||
freezer: freezer,
|
||||
readConcurrency: readConcurrency,
|
||||
}
|
||||
}
|
||||
|
||||
// readTrienode retrieves the trienode data from the specified trienode history.
|
||||
func (r *trienodeReader) readTrienode(addrHash common.Hash, path string, historyID uint64) ([]byte, error) {
|
||||
tr, err := newTrienodeHistoryReader(historyID, r.freezer)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return tr.read(addrHash, path)
|
||||
}
|
||||
|
||||
// assembleNode takes a complete node value as the base and applies a list of
|
||||
// mutation records to assemble the final node value accordingly.
|
||||
func assembleNode(blob []byte, elements [][][]byte, indices [][]int) ([]byte, error) {
|
||||
if len(elements) == 0 && len(indices) == 0 {
|
||||
return blob, nil
|
||||
}
|
||||
children, err := rlp.SplitListValues(blob)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
for i := 0; i < len(elements); i++ {
|
||||
for j, pos := range indices[i] {
|
||||
children[pos] = elements[i][j]
|
||||
}
|
||||
}
|
||||
return rlp.MergeListValues(children)
|
||||
}
|
||||
|
||||
type resultQueue struct {
|
||||
data [][]byte
|
||||
lock sync.Mutex
|
||||
}
|
||||
|
||||
func newResultQueue(size int) *resultQueue {
|
||||
return &resultQueue{
|
||||
data: make([][]byte, size, size*2),
|
||||
}
|
||||
}
|
||||
|
||||
func (q *resultQueue) set(data []byte, pos int) {
|
||||
q.lock.Lock()
|
||||
defer q.lock.Unlock()
|
||||
|
||||
if pos >= len(q.data) {
|
||||
newSize := pos + 1
|
||||
if cap(q.data) < newSize {
|
||||
newData := make([][]byte, newSize, newSize*2)
|
||||
copy(newData, q.data)
|
||||
q.data = newData
|
||||
}
|
||||
q.data = q.data[:newSize]
|
||||
}
|
||||
q.data[pos] = data
|
||||
}
|
||||
|
||||
func (r *trienodeReader) readOptimized(state stateIdent, it HistoryIndexIterator, latestValue []byte) ([]byte, error) {
|
||||
var (
|
||||
elements [][][]byte
|
||||
indices [][]int
|
||||
blob = latestValue
|
||||
|
||||
eg errgroup.Group
|
||||
seq int
|
||||
term atomic.Bool
|
||||
queue = newResultQueue(r.readConcurrency * 2)
|
||||
)
|
||||
eg.SetLimit(r.readConcurrency)
|
||||
|
||||
for {
|
||||
id, pos := it.ID(), seq
|
||||
seq += 1
|
||||
|
||||
eg.Go(func() error {
|
||||
// In optimistic readahead mode, it is theoretically possible to encounter a
|
||||
// NotFound error, where the trie node does not actually exist and the iterator
|
||||
// reports a false-positive mutation record. Terminate the iterator if so, as
|
||||
// all the necessary data (checkpoints and all diffs) required has already been
|
||||
// fetching.
|
||||
data, err := r.readTrienode(state.addressHash, state.path, id)
|
||||
if err != nil {
|
||||
term.Store(true)
|
||||
log.Debug("Failed to read the trienode", "err", err)
|
||||
return nil
|
||||
}
|
||||
full, _, err := decodeNodeFull(data)
|
||||
if err != nil {
|
||||
term.Store(true)
|
||||
return err
|
||||
}
|
||||
if full {
|
||||
term.Store(true)
|
||||
}
|
||||
queue.set(data, pos)
|
||||
return nil
|
||||
})
|
||||
if term.Load() || !it.Next() {
|
||||
break
|
||||
}
|
||||
}
|
||||
if err := eg.Wait(); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if err := it.Error(); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
for i := 0; i < seq; i++ {
|
||||
isComplete, fullBlob, err := decodeNodeFull(queue.data[i])
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
// Terminate the loop is the node with full value has been found
|
||||
if isComplete {
|
||||
blob = fullBlob
|
||||
break
|
||||
}
|
||||
// Decode the partial encoded node and keep iterating the node history
|
||||
// until the node with full value being reached.
|
||||
element, index, err := decodeNodeCompressed(queue.data[i])
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
elements, indices = append(elements, element), append(indices, index)
|
||||
}
|
||||
slices.Reverse(elements)
|
||||
slices.Reverse(indices)
|
||||
return assembleNode(blob, elements, indices)
|
||||
}
|
||||
|
||||
// read retrieves the trie node data associated with the stateID.
|
||||
// stateID: represents the ID of the state of the specified version;
|
||||
// lastID: represents the ID of the latest/newest trie node history;
|
||||
// latestValue: represents the trie node value at the current disk layer with ID == lastID;
|
||||
func (r *trienodeReader) read(state stateIdent, stateID uint64, lastID uint64, latestValue []byte) ([]byte, error) {
|
||||
_, err := checkStateAvail(state, typeTrienodeHistory, r.freezer, stateID, lastID, r.disk)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
// Construct the index iterator to traverse the trienode history
|
||||
var (
|
||||
scheme *indexScheme
|
||||
it HistoryIndexIterator
|
||||
)
|
||||
if state.addressHash == (common.Hash{}) {
|
||||
scheme = accountIndexScheme
|
||||
} else {
|
||||
scheme = storageIndexScheme
|
||||
}
|
||||
if state.addressHash == (common.Hash{}) && state.path == "" {
|
||||
it = newSeqIter(lastID)
|
||||
} else {
|
||||
chunkID, nodeID := scheme.splitPathLast(state.path)
|
||||
|
||||
queryIdent := state
|
||||
queryIdent.path = chunkID
|
||||
ir, err := newIndexReader(r.disk, queryIdent, scheme.getBitmapSize(len(chunkID)))
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
filter := extFilter(nodeID)
|
||||
it = ir.newIterator(&filter)
|
||||
}
|
||||
// Move the iterator to the first element whose id is greater than
|
||||
// the given number.
|
||||
found := it.SeekGT(stateID)
|
||||
if err := it.Error(); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
// The state was not found in the trie node histories, as it has not been
|
||||
// modified since stateID. Use the data from the associated disk layer
|
||||
// instead (full value node as always)
|
||||
if !found {
|
||||
return latestValue, nil
|
||||
}
|
||||
return r.readOptimized(state, it, latestValue)
|
||||
}
|
||||
|
||||
// checkStateAvail determines whether the requested historical state is available
|
||||
// for accessing. What's more, it also returns the ID of the latest indexed history
|
||||
// entry for subsequent usage.
|
||||
//
|
||||
// TODO(rjl493456442) it's really expensive to perform the check for every state
|
||||
// retrieval, please rework this later.
|
||||
func checkStateAvail(state stateIdent, exptyp historyType, freezer ethdb.AncientReader, stateID uint64, lastID uint64, db ethdb.KeyValueReader) (uint64, error) {
|
||||
if toHistoryType(state.typ) != exptyp {
|
||||
return 0, fmt.Errorf("unsupported history type: %d, want: %v", toHistoryType(state.typ), exptyp)
|
||||
|
|
|
|||
|
|
@ -436,6 +436,7 @@ func decodeSingle(keySection []byte, onValue func([]byte, int, int) error) ([]st
|
|||
}
|
||||
key = unsharedKey
|
||||
} else {
|
||||
// TODO(rjl493456442) mitigate the allocation pressure.
|
||||
if int(nShared) > len(prevKey) {
|
||||
return nil, fmt.Errorf("unexpected shared key prefix: %d, prefix key length: %d", nShared, len(prevKey))
|
||||
}
|
||||
|
|
@ -556,7 +557,11 @@ type singleTrienodeHistoryReader struct {
|
|||
valueInternalOffsets map[string]iRange // value offset within the single trie data
|
||||
}
|
||||
|
||||
// TODO(rjl493456442): This function performs a large number of allocations.
|
||||
// Given the large data size, a byte pool could be used to mitigate this.
|
||||
func newSingleTrienodeHistoryReader(id uint64, reader ethdb.AncientReader, keyRange iRange, valueRange iRange) (*singleTrienodeHistoryReader, error) {
|
||||
// TODO(rjl493456442) the data size is known in advance, allocating the
|
||||
// dedicated byte slices from the pool.
|
||||
keyData, err := rawdb.ReadTrienodeHistoryKeySection(reader, id, uint64(keyRange.start), uint64(keyRange.len()))
|
||||
if err != nil {
|
||||
return nil, err
|
||||
|
|
@ -672,9 +677,13 @@ func (r *trienodeHistoryReader) read(owner common.Hash, path string) ([]byte, er
|
|||
}
|
||||
|
||||
// writeTrienodeHistory persists the trienode history associated with the given diff layer.
|
||||
func writeTrienodeHistory(writer ethdb.AncientWriter, dl *diffLayer) error {
|
||||
func writeTrienodeHistory(writer ethdb.AncientWriter, dl *diffLayer, rate uint32) error {
|
||||
start := time.Now()
|
||||
h := newTrienodeHistory(dl.rootHash(), dl.parent.rootHash(), dl.block, dl.nodes.nodeOrigin)
|
||||
nodes, err := dl.nodes.encodeNodeHistory(dl.root, rate)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
h := newTrienodeHistory(dl.rootHash(), dl.parent.rootHash(), dl.block, nodes)
|
||||
header, keySection, valueSection, err := h.encode()
|
||||
if err != nil {
|
||||
return err
|
||||
|
|
|
|||
|
|
@ -85,8 +85,9 @@ var (
|
|||
lookupAddLayerTimer = metrics.NewRegisteredResettingTimer("pathdb/lookup/add/time", nil)
|
||||
lookupRemoveLayerTimer = metrics.NewRegisteredResettingTimer("pathdb/lookup/remove/time", nil)
|
||||
|
||||
historicalAccountReadTimer = metrics.NewRegisteredResettingTimer("pathdb/history/account/reads", nil)
|
||||
historicalStorageReadTimer = metrics.NewRegisteredResettingTimer("pathdb/history/storage/reads", nil)
|
||||
historicalAccountReadTimer = metrics.NewRegisteredResettingTimer("pathdb/history/account/reads", nil)
|
||||
historicalStorageReadTimer = metrics.NewRegisteredResettingTimer("pathdb/history/storage/reads", nil)
|
||||
historicalTrienodeReadTimer = metrics.NewRegisteredResettingTimer("pathdb/history/trienode/reads", nil)
|
||||
)
|
||||
|
||||
// Metrics in generation
|
||||
|
|
|
|||
|
|
@ -517,6 +517,8 @@ func encodeNodeCompressed(addExtension bool, elements [][]byte, indices []int) [
|
|||
//
|
||||
// - metadata byte layout (1 byte): 0b0
|
||||
// - node value
|
||||
//
|
||||
// TODO(rjl493456442) it's not allocation efficient, please improve it.
|
||||
func encodeNodeFull(value []byte) []byte {
|
||||
enc := make([]byte, len(value)+1)
|
||||
copy(enc[1:], value)
|
||||
|
|
@ -596,21 +598,17 @@ func decodeNodeCompressed(data []byte) ([][]byte, []int, error) {
|
|||
}
|
||||
|
||||
// decodeNodeFull decodes the byte stream of full value trie node.
|
||||
func decodeNodeFull(data []byte) ([]byte, error) {
|
||||
func decodeNodeFull(data []byte) (bool, []byte, error) {
|
||||
if len(data) < 1 {
|
||||
return nil, errors.New("invalid data: too short")
|
||||
return false, nil, errors.New("invalid data: too short")
|
||||
}
|
||||
flag := data[0]
|
||||
if flag != byte(0) {
|
||||
return nil, errors.New("invalid data: compressed node value")
|
||||
return false, nil, nil
|
||||
}
|
||||
return data[1:], nil
|
||||
return true, data[1:], nil
|
||||
}
|
||||
|
||||
// encodeFullFrequency specifies the frequency (1/16) for encoding node in
|
||||
// full format. TODO(rjl493456442) making it configurable.
|
||||
const encodeFullFrequency = 16
|
||||
|
||||
// encodeNodeHistory encodes the history of a node. Typically, the original values
|
||||
// of dirty nodes serve as the history, but this can lead to significant storage
|
||||
// overhead.
|
||||
|
|
@ -626,7 +624,7 @@ const encodeFullFrequency = 16
|
|||
// history records, which is computationally and IO intensive. To mitigate this, we
|
||||
// periodically record the full value of a node as a checkpoint. The frequency of
|
||||
// these checkpoints is a tradeoff between the compression rate and read overhead.
|
||||
func (s *nodeSetWithOrigin) encodeNodeHistory(root common.Hash) (map[common.Hash]map[string][]byte, error) {
|
||||
func (s *nodeSetWithOrigin) encodeNodeHistory(root common.Hash, rate uint32) (map[common.Hash]map[string][]byte, error) {
|
||||
var (
|
||||
// the set of all encoded node history elements
|
||||
nodes = make(map[common.Hash]map[string][]byte)
|
||||
|
|
@ -644,7 +642,7 @@ func (s *nodeSetWithOrigin) encodeNodeHistory(root common.Hash) (map[common.Hash
|
|||
h.Write(root.Bytes())
|
||||
h.Write(owner.Bytes())
|
||||
h.Write([]byte(path))
|
||||
return h.Sum32()%uint32(encodeFullFrequency) == 0
|
||||
return h.Sum32()%rate == 0
|
||||
}
|
||||
)
|
||||
for owner, origins := range s.nodeOrigin {
|
||||
|
|
@ -664,6 +662,9 @@ func (s *nodeSetWithOrigin) encodeNodeHistory(root common.Hash) (map[common.Hash
|
|||
}
|
||||
encodeFull := encodeFullValue(owner, path)
|
||||
if !encodeFull {
|
||||
// TODO(rjl493456442) the diff-mode reencoding can take non-trivial
|
||||
// time, like 1-2ms per block, is there any way to mitigate the overhead?
|
||||
|
||||
// Partial encoding is required, try to find the node diffs and
|
||||
// fallback to the full-value encoding if fails.
|
||||
//
|
||||
|
|
|
|||
|
|
@ -318,3 +318,90 @@ func (r *HistoricalStateReader) Storage(address common.Address, key common.Hash)
|
|||
}
|
||||
return r.reader.read(newStorageIdentQuery(address, addrHash, key, keyHash), r.id, dl.stateID(), latest)
|
||||
}
|
||||
|
||||
// HistoricalNodeReader is a wrapper over history reader, providing access to
|
||||
// historical trie node data.
|
||||
type HistoricalNodeReader struct {
|
||||
db *Database
|
||||
reader *trienodeReader
|
||||
id uint64
|
||||
}
|
||||
|
||||
// HistoricNodeReader constructs a reader for accessing the requested historic state.
|
||||
func (db *Database) HistoricNodeReader(root common.Hash) (*HistoricalNodeReader, error) {
|
||||
// Bail out if the state history hasn't been fully indexed
|
||||
if db.trienodeIndexer == nil || db.trienodeFreezer == nil {
|
||||
return nil, fmt.Errorf("historical state %x is not available", root)
|
||||
}
|
||||
if !db.trienodeIndexer.inited() {
|
||||
return nil, errors.New("trienode histories haven't been fully indexed yet")
|
||||
}
|
||||
// - States at the current disk layer or above are directly accessible
|
||||
// via `db.NodeReader`.
|
||||
//
|
||||
// - States older than the current disk layer (including the disk layer
|
||||
// itself) are available via `db.HistoricalNodeReader`.
|
||||
id := rawdb.ReadStateID(db.diskdb, root)
|
||||
if id == nil {
|
||||
return nil, fmt.Errorf("state %#x is not available", root)
|
||||
}
|
||||
// Ensure the requested trienode history is canonical, states on side chain
|
||||
// are not accessible.
|
||||
meta, err := readTrienodeMetadata(db.trienodeFreezer, *id+1)
|
||||
if err != nil {
|
||||
return nil, err // e.g., the referred trienode history has been pruned
|
||||
}
|
||||
if meta.parent != root {
|
||||
return nil, fmt.Errorf("state %#x is not canonincal", root)
|
||||
}
|
||||
return &HistoricalNodeReader{
|
||||
id: *id,
|
||||
db: db,
|
||||
reader: newTrienodeReader(db.diskdb, db.trienodeFreezer, int(db.config.FullValueCheckpoint)),
|
||||
}, nil
|
||||
}
|
||||
|
||||
// Node directly retrieves the trie node data associated with a particular path,
|
||||
// within a particular account. An error will be returned if the read operation
|
||||
// exits abnormally. Specifically, if the layer is already stale.
|
||||
//
|
||||
// Note:
|
||||
// - the returned trie node data is not a copy, please don't modify it.
|
||||
// - an error will be returned if the requested trie node is not found in database.
|
||||
func (r *HistoricalNodeReader) Node(owner common.Hash, path []byte, hash common.Hash) ([]byte, error) {
|
||||
defer func(start time.Time) {
|
||||
historicalTrienodeReadTimer.UpdateSince(start)
|
||||
}(time.Now())
|
||||
|
||||
// TODO(rjl493456442): Theoretically, the obtained disk layer could become stale
|
||||
// within a very short time window.
|
||||
//
|
||||
// While reading the account data while holding `db.tree.lock` can resolve
|
||||
// this issue, but it will introduce a heavy contention over the lock.
|
||||
//
|
||||
// Let's optimistically assume the situation is very unlikely to happen,
|
||||
// and try to define a low granularity lock if the current approach doesn't
|
||||
// work later.
|
||||
dl := r.db.tree.bottom()
|
||||
latest, h, _, err := dl.node(owner, path, 0)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if h == hash {
|
||||
return latest, nil
|
||||
}
|
||||
blob, err := r.reader.read(newTrienodeIdent(owner, string(path)), r.id, dl.stateID(), latest)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
// Error out if the local one is inconsistent with the target.
|
||||
if crypto.Keccak256Hash(blob) != hash {
|
||||
blobHex := "nil"
|
||||
if len(blob) > 0 {
|
||||
blobHex = hexutil.Encode(blob)
|
||||
}
|
||||
log.Error("Unexpected historical trie node", "owner", owner.Hex(), "path", path, "blob", blobHex)
|
||||
return nil, fmt.Errorf("unexpected historical trie node: (%x %v), blob: %s", owner, path, blobHex)
|
||||
}
|
||||
return blob, nil
|
||||
}
|
||||
|
|
|
|||
Loading…
Reference in a new issue