mirror of
https://github.com/ethereum/go-ethereum.git
synced 2026-02-26 07:37:20 +00:00
triedb/pathdb: generalize the history indexer (#32523)
This pull request is based on #32306 , is the second part for shipping trienode history. Specifically, this pull request generalize the existing index mechanism, making is usable by both state history and trienode history in the near future.
This commit is contained in:
parent
2d3704c4d8
commit
21769f3474
11 changed files with 468 additions and 345 deletions
|
|
@ -189,7 +189,7 @@ func New(diskdb ethdb.Database, config *Config, isVerkle bool) *Database {
|
|||
}
|
||||
// TODO (rjl493456442) disable the background indexing in read-only mode
|
||||
if db.stateFreezer != nil && db.config.EnableStateIndexing {
|
||||
db.stateIndexer = newHistoryIndexer(db.diskdb, db.stateFreezer, db.tree.bottom().stateID())
|
||||
db.stateIndexer = newHistoryIndexer(db.diskdb, db.stateFreezer, db.tree.bottom().stateID(), typeStateHistory)
|
||||
log.Info("Enabled state history indexing")
|
||||
}
|
||||
fields := config.fields()
|
||||
|
|
@ -245,7 +245,7 @@ func (db *Database) repairHistory() error {
|
|||
}
|
||||
// Truncate the extra state histories above in freezer in case it's not
|
||||
// aligned with the disk layer. It might happen after a unclean shutdown.
|
||||
pruned, err := truncateFromHead(db.stateFreezer, id)
|
||||
pruned, err := truncateFromHead(db.stateFreezer, typeStateHistory, id)
|
||||
if err != nil {
|
||||
log.Crit("Failed to truncate extra state histories", "err", err)
|
||||
}
|
||||
|
|
@ -448,7 +448,7 @@ func (db *Database) Enable(root common.Hash) error {
|
|||
// 2. Re-initialize the indexer so it starts indexing from the new state root.
|
||||
if db.stateIndexer != nil && db.stateFreezer != nil && db.config.EnableStateIndexing {
|
||||
db.stateIndexer.close()
|
||||
db.stateIndexer = newHistoryIndexer(db.diskdb, db.stateFreezer, db.tree.bottom().stateID())
|
||||
db.stateIndexer = newHistoryIndexer(db.diskdb, db.stateFreezer, db.tree.bottom().stateID(), typeStateHistory)
|
||||
log.Info("Re-enabled state history indexing")
|
||||
}
|
||||
log.Info("Rebuilt trie database", "root", root)
|
||||
|
|
@ -502,7 +502,7 @@ func (db *Database) Recover(root common.Hash) error {
|
|||
if err := db.diskdb.SyncKeyValue(); err != nil {
|
||||
return err
|
||||
}
|
||||
_, err := truncateFromHead(db.stateFreezer, dl.stateID())
|
||||
_, err := truncateFromHead(db.stateFreezer, typeStateHistory, dl.stateID())
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
|
|
|||
|
|
@ -378,7 +378,7 @@ func (dl *diskLayer) writeStateHistory(diff *diffLayer) (bool, error) {
|
|||
log.Debug("Skip tail truncation", "persistentID", persistentID, "tailID", tail+1, "headID", diff.stateID(), "limit", limit)
|
||||
return true, nil
|
||||
}
|
||||
pruned, err := truncateFromTail(dl.db.stateFreezer, newFirst-1)
|
||||
pruned, err := truncateFromTail(dl.db.stateFreezer, typeStateHistory, newFirst-1)
|
||||
if err != nil {
|
||||
return false, err
|
||||
}
|
||||
|
|
|
|||
|
|
@ -19,11 +19,147 @@ package pathdb
|
|||
import (
|
||||
"errors"
|
||||
"fmt"
|
||||
"iter"
|
||||
|
||||
"github.com/ethereum/go-ethereum/common"
|
||||
"github.com/ethereum/go-ethereum/ethdb"
|
||||
"github.com/ethereum/go-ethereum/log"
|
||||
)
|
||||
|
||||
// historyType represents the category of historical data.
|
||||
type historyType uint8
|
||||
|
||||
const (
|
||||
// typeStateHistory indicates history data related to account or storage changes.
|
||||
typeStateHistory historyType = 0
|
||||
)
|
||||
|
||||
// String returns the string format representation.
|
||||
func (h historyType) String() string {
|
||||
switch h {
|
||||
case typeStateHistory:
|
||||
return "state"
|
||||
default:
|
||||
return fmt.Sprintf("unknown type: %d", h)
|
||||
}
|
||||
}
|
||||
|
||||
// elementType represents the category of state element.
|
||||
type elementType uint8
|
||||
|
||||
const (
|
||||
typeAccount elementType = 0 // represents the account data
|
||||
typeStorage elementType = 1 // represents the storage slot data
|
||||
)
|
||||
|
||||
// String returns the string format representation.
|
||||
func (e elementType) String() string {
|
||||
switch e {
|
||||
case typeAccount:
|
||||
return "account"
|
||||
case typeStorage:
|
||||
return "storage"
|
||||
default:
|
||||
return fmt.Sprintf("unknown element type: %d", e)
|
||||
}
|
||||
}
|
||||
|
||||
// toHistoryType maps an element type to its corresponding history type.
|
||||
func toHistoryType(typ elementType) historyType {
|
||||
if typ == typeAccount || typ == typeStorage {
|
||||
return typeStateHistory
|
||||
}
|
||||
panic(fmt.Sprintf("unknown element type %v", typ))
|
||||
}
|
||||
|
||||
// stateIdent represents the identifier of a state element, which can be
|
||||
// an account or a storage slot.
|
||||
type stateIdent struct {
|
||||
typ elementType
|
||||
|
||||
// The hash of the account address. This is used instead of the raw account
|
||||
// address is to align the traversal order with the Merkle-Patricia-Trie.
|
||||
addressHash common.Hash
|
||||
|
||||
// The hash of the storage slot key. This is used instead of the raw slot key
|
||||
// because, in legacy state histories (prior to the Cancun fork), the slot
|
||||
// identifier is the hash of the key, and the original key (preimage) cannot
|
||||
// be recovered. To maintain backward compatibility, the key hash is used.
|
||||
//
|
||||
// Meanwhile, using the storage key hash also preserve the traversal order
|
||||
// with Merkle-Patricia-Trie.
|
||||
//
|
||||
// This field is null if the identifier refers to an account or a trie node.
|
||||
storageHash common.Hash
|
||||
}
|
||||
|
||||
// String returns the string format state identifier.
|
||||
func (ident stateIdent) String() string {
|
||||
if ident.typ == typeAccount {
|
||||
return ident.addressHash.Hex()
|
||||
}
|
||||
return ident.addressHash.Hex() + ident.storageHash.Hex()
|
||||
}
|
||||
|
||||
// newAccountIdent constructs a state identifier for an account.
|
||||
func newAccountIdent(addressHash common.Hash) stateIdent {
|
||||
return stateIdent{
|
||||
typ: typeAccount,
|
||||
addressHash: addressHash,
|
||||
}
|
||||
}
|
||||
|
||||
// newStorageIdent constructs a state identifier for a storage slot.
|
||||
// The address denotes the address hash of the associated account;
|
||||
// the storageHash denotes the hash of the raw storage slot key;
|
||||
func newStorageIdent(addressHash common.Hash, storageHash common.Hash) stateIdent {
|
||||
return stateIdent{
|
||||
typ: typeStorage,
|
||||
addressHash: addressHash,
|
||||
storageHash: storageHash,
|
||||
}
|
||||
}
|
||||
|
||||
// stateIdentQuery is the extension of stateIdent by adding the account address
|
||||
// and raw storage key.
|
||||
type stateIdentQuery struct {
|
||||
stateIdent
|
||||
|
||||
address common.Address
|
||||
storageKey common.Hash
|
||||
}
|
||||
|
||||
// newAccountIdentQuery constructs a state identifier for an account.
|
||||
func newAccountIdentQuery(address common.Address, addressHash common.Hash) stateIdentQuery {
|
||||
return stateIdentQuery{
|
||||
stateIdent: newAccountIdent(addressHash),
|
||||
address: address,
|
||||
}
|
||||
}
|
||||
|
||||
// newStorageIdentQuery constructs a state identifier for a storage slot.
|
||||
// the address denotes the address of the associated account;
|
||||
// the addressHash denotes the address hash of the associated account;
|
||||
// the storageKey denotes the raw storage slot key;
|
||||
// the storageHash denotes the hash of the raw storage slot key;
|
||||
func newStorageIdentQuery(address common.Address, addressHash common.Hash, storageKey common.Hash, storageHash common.Hash) stateIdentQuery {
|
||||
return stateIdentQuery{
|
||||
stateIdent: newStorageIdent(addressHash, storageHash),
|
||||
address: address,
|
||||
storageKey: storageKey,
|
||||
}
|
||||
}
|
||||
|
||||
// history defines the interface of historical data, implemented by stateHistory
|
||||
// and trienodeHistory (in the near future).
|
||||
type history interface {
|
||||
// typ returns the historical data type held in the history.
|
||||
typ() historyType
|
||||
|
||||
// forEach returns an iterator to traverse the state entries in the history.
|
||||
forEach() iter.Seq[stateIdent]
|
||||
}
|
||||
|
||||
var (
|
||||
errHeadTruncationOutOfRange = errors.New("history head truncation out of range")
|
||||
errTailTruncationOutOfRange = errors.New("history tail truncation out of range")
|
||||
|
|
@ -31,7 +167,7 @@ var (
|
|||
|
||||
// truncateFromHead removes excess elements from the head of the freezer based
|
||||
// on the given parameters. It returns the number of items that were removed.
|
||||
func truncateFromHead(store ethdb.AncientStore, nhead uint64) (int, error) {
|
||||
func truncateFromHead(store ethdb.AncientStore, typ historyType, nhead uint64) (int, error) {
|
||||
ohead, err := store.Ancients()
|
||||
if err != nil {
|
||||
return 0, err
|
||||
|
|
@ -40,11 +176,11 @@ func truncateFromHead(store ethdb.AncientStore, nhead uint64) (int, error) {
|
|||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
log.Info("Truncating from head", "ohead", ohead, "tail", otail, "nhead", nhead)
|
||||
log.Info("Truncating from head", "type", typ.String(), "ohead", ohead, "tail", otail, "nhead", nhead)
|
||||
|
||||
// Ensure that the truncation target falls within the valid range.
|
||||
if ohead < nhead || nhead < otail {
|
||||
return 0, fmt.Errorf("%w, tail: %d, head: %d, target: %d", errHeadTruncationOutOfRange, otail, ohead, nhead)
|
||||
return 0, fmt.Errorf("%w, %s, tail: %d, head: %d, target: %d", errHeadTruncationOutOfRange, typ, otail, ohead, nhead)
|
||||
}
|
||||
// Short circuit if nothing to truncate.
|
||||
if ohead == nhead {
|
||||
|
|
@ -61,7 +197,7 @@ func truncateFromHead(store ethdb.AncientStore, nhead uint64) (int, error) {
|
|||
|
||||
// truncateFromTail removes excess elements from the end of the freezer based
|
||||
// on the given parameters. It returns the number of items that were removed.
|
||||
func truncateFromTail(store ethdb.AncientStore, ntail uint64) (int, error) {
|
||||
func truncateFromTail(store ethdb.AncientStore, typ historyType, ntail uint64) (int, error) {
|
||||
ohead, err := store.Ancients()
|
||||
if err != nil {
|
||||
return 0, err
|
||||
|
|
@ -72,7 +208,7 @@ func truncateFromTail(store ethdb.AncientStore, ntail uint64) (int, error) {
|
|||
}
|
||||
// Ensure that the truncation target falls within the valid range.
|
||||
if otail > ntail || ntail > ohead {
|
||||
return 0, fmt.Errorf("%w, tail: %d, head: %d, target: %d", errTailTruncationOutOfRange, otail, ohead, ntail)
|
||||
return 0, fmt.Errorf("%w, %s, tail: %d, head: %d, target: %d", errTailTruncationOutOfRange, typ, otail, ohead, ntail)
|
||||
}
|
||||
// Short circuit if nothing to truncate.
|
||||
if otail == ntail {
|
||||
|
|
|
|||
|
|
@ -78,12 +78,7 @@ type indexReader struct {
|
|||
|
||||
// loadIndexData loads the index data associated with the specified state.
|
||||
func loadIndexData(db ethdb.KeyValueReader, state stateIdent) ([]*indexBlockDesc, error) {
|
||||
var blob []byte
|
||||
if state.account {
|
||||
blob = rawdb.ReadAccountHistoryIndex(db, state.addressHash)
|
||||
} else {
|
||||
blob = rawdb.ReadStorageHistoryIndex(db, state.addressHash, state.storageHash)
|
||||
}
|
||||
blob := readStateIndex(state, db)
|
||||
if len(blob) == 0 {
|
||||
return nil, nil
|
||||
}
|
||||
|
|
@ -137,15 +132,8 @@ func (r *indexReader) readGreaterThan(id uint64) (uint64, error) {
|
|||
|
||||
br, ok := r.readers[desc.id]
|
||||
if !ok {
|
||||
var (
|
||||
err error
|
||||
blob []byte
|
||||
)
|
||||
if r.state.account {
|
||||
blob = rawdb.ReadAccountHistoryIndexBlock(r.db, r.state.addressHash, desc.id)
|
||||
} else {
|
||||
blob = rawdb.ReadStorageHistoryIndexBlock(r.db, r.state.addressHash, r.state.storageHash, desc.id)
|
||||
}
|
||||
var err error
|
||||
blob := readStateIndexBlock(r.state, r.db, desc.id)
|
||||
br, err = newBlockReader(blob)
|
||||
if err != nil {
|
||||
return 0, err
|
||||
|
|
@ -174,12 +162,7 @@ type indexWriter struct {
|
|||
|
||||
// newIndexWriter constructs the index writer for the specified state.
|
||||
func newIndexWriter(db ethdb.KeyValueReader, state stateIdent) (*indexWriter, error) {
|
||||
var blob []byte
|
||||
if state.account {
|
||||
blob = rawdb.ReadAccountHistoryIndex(db, state.addressHash)
|
||||
} else {
|
||||
blob = rawdb.ReadStorageHistoryIndex(db, state.addressHash, state.storageHash)
|
||||
}
|
||||
blob := readStateIndex(state, db)
|
||||
if len(blob) == 0 {
|
||||
desc := newIndexBlockDesc(0)
|
||||
bw, _ := newBlockWriter(nil, desc)
|
||||
|
|
@ -194,15 +177,8 @@ func newIndexWriter(db ethdb.KeyValueReader, state stateIdent) (*indexWriter, er
|
|||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
var (
|
||||
indexBlock []byte
|
||||
lastDesc = descList[len(descList)-1]
|
||||
)
|
||||
if state.account {
|
||||
indexBlock = rawdb.ReadAccountHistoryIndexBlock(db, state.addressHash, lastDesc.id)
|
||||
} else {
|
||||
indexBlock = rawdb.ReadStorageHistoryIndexBlock(db, state.addressHash, state.storageHash, lastDesc.id)
|
||||
}
|
||||
lastDesc := descList[len(descList)-1]
|
||||
indexBlock := readStateIndexBlock(state, db, lastDesc.id)
|
||||
bw, err := newBlockWriter(indexBlock, lastDesc)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
|
|
@ -270,11 +246,7 @@ func (w *indexWriter) finish(batch ethdb.Batch) {
|
|||
return // nothing to commit
|
||||
}
|
||||
for _, bw := range writers {
|
||||
if w.state.account {
|
||||
rawdb.WriteAccountHistoryIndexBlock(batch, w.state.addressHash, bw.desc.id, bw.finish())
|
||||
} else {
|
||||
rawdb.WriteStorageHistoryIndexBlock(batch, w.state.addressHash, w.state.storageHash, bw.desc.id, bw.finish())
|
||||
}
|
||||
writeStateIndexBlock(w.state, batch, bw.desc.id, bw.finish())
|
||||
}
|
||||
w.frozen = nil // release all the frozen writers
|
||||
|
||||
|
|
@ -282,11 +254,7 @@ func (w *indexWriter) finish(batch ethdb.Batch) {
|
|||
for _, desc := range descList {
|
||||
buf = append(buf, desc.encode()...)
|
||||
}
|
||||
if w.state.account {
|
||||
rawdb.WriteAccountHistoryIndex(batch, w.state.addressHash, buf)
|
||||
} else {
|
||||
rawdb.WriteStorageHistoryIndex(batch, w.state.addressHash, w.state.storageHash, buf)
|
||||
}
|
||||
writeStateIndex(w.state, batch, buf)
|
||||
}
|
||||
|
||||
// indexDeleter is responsible for deleting index data for a specific state.
|
||||
|
|
@ -301,12 +269,7 @@ type indexDeleter struct {
|
|||
|
||||
// newIndexDeleter constructs the index deleter for the specified state.
|
||||
func newIndexDeleter(db ethdb.KeyValueReader, state stateIdent) (*indexDeleter, error) {
|
||||
var blob []byte
|
||||
if state.account {
|
||||
blob = rawdb.ReadAccountHistoryIndex(db, state.addressHash)
|
||||
} else {
|
||||
blob = rawdb.ReadStorageHistoryIndex(db, state.addressHash, state.storageHash)
|
||||
}
|
||||
blob := readStateIndex(state, db)
|
||||
if len(blob) == 0 {
|
||||
// TODO(rjl493456442) we can probably return an error here,
|
||||
// deleter with no data is meaningless.
|
||||
|
|
@ -323,15 +286,8 @@ func newIndexDeleter(db ethdb.KeyValueReader, state stateIdent) (*indexDeleter,
|
|||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
var (
|
||||
indexBlock []byte
|
||||
lastDesc = descList[len(descList)-1]
|
||||
)
|
||||
if state.account {
|
||||
indexBlock = rawdb.ReadAccountHistoryIndexBlock(db, state.addressHash, lastDesc.id)
|
||||
} else {
|
||||
indexBlock = rawdb.ReadStorageHistoryIndexBlock(db, state.addressHash, state.storageHash, lastDesc.id)
|
||||
}
|
||||
lastDesc := descList[len(descList)-1]
|
||||
indexBlock := readStateIndexBlock(state, db, lastDesc.id)
|
||||
bw, err := newBlockWriter(indexBlock, lastDesc)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
|
|
@ -376,15 +332,8 @@ func (d *indexDeleter) pop(id uint64) error {
|
|||
d.descList = d.descList[:len(d.descList)-1]
|
||||
|
||||
// Open the previous block writer for deleting
|
||||
var (
|
||||
indexBlock []byte
|
||||
lastDesc = d.descList[len(d.descList)-1]
|
||||
)
|
||||
if d.state.account {
|
||||
indexBlock = rawdb.ReadAccountHistoryIndexBlock(d.db, d.state.addressHash, lastDesc.id)
|
||||
} else {
|
||||
indexBlock = rawdb.ReadStorageHistoryIndexBlock(d.db, d.state.addressHash, d.state.storageHash, lastDesc.id)
|
||||
}
|
||||
lastDesc := d.descList[len(d.descList)-1]
|
||||
indexBlock := readStateIndexBlock(d.state, d.db, lastDesc.id)
|
||||
bw, err := newBlockWriter(indexBlock, lastDesc)
|
||||
if err != nil {
|
||||
return err
|
||||
|
|
@ -399,38 +348,100 @@ func (d *indexDeleter) pop(id uint64) error {
|
|||
// This function is safe to be called multiple times.
|
||||
func (d *indexDeleter) finish(batch ethdb.Batch) {
|
||||
for _, id := range d.dropped {
|
||||
if d.state.account {
|
||||
rawdb.DeleteAccountHistoryIndexBlock(batch, d.state.addressHash, id)
|
||||
} else {
|
||||
rawdb.DeleteStorageHistoryIndexBlock(batch, d.state.addressHash, d.state.storageHash, id)
|
||||
}
|
||||
deleteStateIndexBlock(d.state, batch, id)
|
||||
}
|
||||
d.dropped = nil
|
||||
|
||||
// Flush the content of last block writer, regardless it's dirty or not
|
||||
if !d.bw.empty() {
|
||||
if d.state.account {
|
||||
rawdb.WriteAccountHistoryIndexBlock(batch, d.state.addressHash, d.bw.desc.id, d.bw.finish())
|
||||
} else {
|
||||
rawdb.WriteStorageHistoryIndexBlock(batch, d.state.addressHash, d.state.storageHash, d.bw.desc.id, d.bw.finish())
|
||||
}
|
||||
writeStateIndexBlock(d.state, batch, d.bw.desc.id, d.bw.finish())
|
||||
}
|
||||
// Flush the index metadata into the supplied batch
|
||||
if d.empty() {
|
||||
if d.state.account {
|
||||
rawdb.DeleteAccountHistoryIndex(batch, d.state.addressHash)
|
||||
} else {
|
||||
rawdb.DeleteStorageHistoryIndex(batch, d.state.addressHash, d.state.storageHash)
|
||||
}
|
||||
deleteStateIndex(d.state, batch)
|
||||
} else {
|
||||
buf := make([]byte, 0, indexBlockDescSize*len(d.descList))
|
||||
for _, desc := range d.descList {
|
||||
buf = append(buf, desc.encode()...)
|
||||
}
|
||||
if d.state.account {
|
||||
rawdb.WriteAccountHistoryIndex(batch, d.state.addressHash, buf)
|
||||
} else {
|
||||
rawdb.WriteStorageHistoryIndex(batch, d.state.addressHash, d.state.storageHash, buf)
|
||||
}
|
||||
writeStateIndex(d.state, batch, buf)
|
||||
}
|
||||
}
|
||||
|
||||
// readStateIndex retrieves the index metadata for the given state identifier.
|
||||
// This function is shared by accounts and storage slots.
|
||||
func readStateIndex(ident stateIdent, db ethdb.KeyValueReader) []byte {
|
||||
switch ident.typ {
|
||||
case typeAccount:
|
||||
return rawdb.ReadAccountHistoryIndex(db, ident.addressHash)
|
||||
case typeStorage:
|
||||
return rawdb.ReadStorageHistoryIndex(db, ident.addressHash, ident.storageHash)
|
||||
default:
|
||||
panic(fmt.Errorf("unknown type: %v", ident.typ))
|
||||
}
|
||||
}
|
||||
|
||||
// writeStateIndex writes the provided index metadata into database with the
|
||||
// given state identifier. This function is shared by accounts and storage slots.
|
||||
func writeStateIndex(ident stateIdent, db ethdb.KeyValueWriter, data []byte) {
|
||||
switch ident.typ {
|
||||
case typeAccount:
|
||||
rawdb.WriteAccountHistoryIndex(db, ident.addressHash, data)
|
||||
case typeStorage:
|
||||
rawdb.WriteStorageHistoryIndex(db, ident.addressHash, ident.storageHash, data)
|
||||
default:
|
||||
panic(fmt.Errorf("unknown type: %v", ident.typ))
|
||||
}
|
||||
}
|
||||
|
||||
// deleteStateIndex removes the index metadata for the given state identifier.
|
||||
// This function is shared by accounts and storage slots.
|
||||
func deleteStateIndex(ident stateIdent, db ethdb.KeyValueWriter) {
|
||||
switch ident.typ {
|
||||
case typeAccount:
|
||||
rawdb.DeleteAccountHistoryIndex(db, ident.addressHash)
|
||||
case typeStorage:
|
||||
rawdb.DeleteStorageHistoryIndex(db, ident.addressHash, ident.storageHash)
|
||||
default:
|
||||
panic(fmt.Errorf("unknown type: %v", ident.typ))
|
||||
}
|
||||
}
|
||||
|
||||
// readStateIndexBlock retrieves the index block for the given state identifier
|
||||
// and block ID. This function is shared by accounts and storage slots.
|
||||
func readStateIndexBlock(ident stateIdent, db ethdb.KeyValueReader, id uint32) []byte {
|
||||
switch ident.typ {
|
||||
case typeAccount:
|
||||
return rawdb.ReadAccountHistoryIndexBlock(db, ident.addressHash, id)
|
||||
case typeStorage:
|
||||
return rawdb.ReadStorageHistoryIndexBlock(db, ident.addressHash, ident.storageHash, id)
|
||||
default:
|
||||
panic(fmt.Errorf("unknown type: %v", ident.typ))
|
||||
}
|
||||
}
|
||||
|
||||
// writeStateIndexBlock writes the provided index block into database with the
|
||||
// given state identifier. This function is shared by accounts and storage slots.
|
||||
func writeStateIndexBlock(ident stateIdent, db ethdb.KeyValueWriter, id uint32, data []byte) {
|
||||
switch ident.typ {
|
||||
case typeAccount:
|
||||
rawdb.WriteAccountHistoryIndexBlock(db, ident.addressHash, id, data)
|
||||
case typeStorage:
|
||||
rawdb.WriteStorageHistoryIndexBlock(db, ident.addressHash, ident.storageHash, id, data)
|
||||
default:
|
||||
panic(fmt.Errorf("unknown type: %v", ident.typ))
|
||||
}
|
||||
}
|
||||
|
||||
// deleteStateIndexBlock removes the index block from database with the given
|
||||
// state identifier. This function is shared by accounts and storage slots.
|
||||
func deleteStateIndexBlock(ident stateIdent, db ethdb.KeyValueWriter, id uint32) {
|
||||
switch ident.typ {
|
||||
case typeAccount:
|
||||
rawdb.DeleteAccountHistoryIndexBlock(db, ident.addressHash, id)
|
||||
case typeStorage:
|
||||
rawdb.DeleteStorageHistoryIndexBlock(db, ident.addressHash, ident.storageHash, id)
|
||||
default:
|
||||
panic(fmt.Errorf("unknown type: %v", ident.typ))
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -179,7 +179,7 @@ func TestIndexWriterDelete(t *testing.T) {
|
|||
func TestBatchIndexerWrite(t *testing.T) {
|
||||
var (
|
||||
db = rawdb.NewMemoryDatabase()
|
||||
batch = newBatchIndexer(db, false)
|
||||
batch = newBatchIndexer(db, false, typeStateHistory)
|
||||
histories = makeStateHistories(10)
|
||||
)
|
||||
for i, h := range histories {
|
||||
|
|
@ -190,7 +190,7 @@ func TestBatchIndexerWrite(t *testing.T) {
|
|||
if err := batch.finish(true); err != nil {
|
||||
t.Fatalf("Failed to finish batch indexer, %v", err)
|
||||
}
|
||||
metadata := loadIndexMetadata(db)
|
||||
metadata := loadIndexMetadata(db, typeStateHistory)
|
||||
if metadata == nil || metadata.Last != uint64(10) {
|
||||
t.Fatal("Unexpected index position")
|
||||
}
|
||||
|
|
@ -256,7 +256,7 @@ func TestBatchIndexerWrite(t *testing.T) {
|
|||
func TestBatchIndexerDelete(t *testing.T) {
|
||||
var (
|
||||
db = rawdb.NewMemoryDatabase()
|
||||
bw = newBatchIndexer(db, false)
|
||||
bw = newBatchIndexer(db, false, typeStateHistory)
|
||||
histories = makeStateHistories(10)
|
||||
)
|
||||
// Index histories
|
||||
|
|
@ -270,7 +270,7 @@ func TestBatchIndexerDelete(t *testing.T) {
|
|||
}
|
||||
|
||||
// Unindex histories
|
||||
bd := newBatchIndexer(db, true)
|
||||
bd := newBatchIndexer(db, true, typeStateHistory)
|
||||
for i := len(histories) - 1; i >= 0; i-- {
|
||||
if err := bd.process(histories[i], uint64(i+1)); err != nil {
|
||||
t.Fatalf("Failed to process history, %v", err)
|
||||
|
|
@ -280,7 +280,7 @@ func TestBatchIndexerDelete(t *testing.T) {
|
|||
t.Fatalf("Failed to finish batch indexer, %v", err)
|
||||
}
|
||||
|
||||
metadata := loadIndexMetadata(db)
|
||||
metadata := loadIndexMetadata(db, typeStateHistory)
|
||||
if metadata != nil {
|
||||
t.Fatal("Unexpected index position")
|
||||
}
|
||||
|
|
|
|||
|
|
@ -26,7 +26,6 @@ import (
|
|||
|
||||
"github.com/ethereum/go-ethereum/common"
|
||||
"github.com/ethereum/go-ethereum/core/rawdb"
|
||||
"github.com/ethereum/go-ethereum/crypto"
|
||||
"github.com/ethereum/go-ethereum/ethdb"
|
||||
"github.com/ethereum/go-ethereum/log"
|
||||
"github.com/ethereum/go-ethereum/rlp"
|
||||
|
|
@ -41,13 +40,32 @@ const (
|
|||
stateIndexVersion = stateIndexV0 // the current state index version
|
||||
)
|
||||
|
||||
// indexVersion returns the latest index version for the given history type.
|
||||
// It panics if the history type is unknown.
|
||||
func indexVersion(typ historyType) uint8 {
|
||||
switch typ {
|
||||
case typeStateHistory:
|
||||
return stateIndexVersion
|
||||
default:
|
||||
panic(fmt.Errorf("unknown history type: %d", typ))
|
||||
}
|
||||
}
|
||||
|
||||
// indexMetadata describes the metadata of the historical data index.
|
||||
type indexMetadata struct {
|
||||
Version uint8
|
||||
Last uint64
|
||||
}
|
||||
|
||||
func loadIndexMetadata(db ethdb.KeyValueReader) *indexMetadata {
|
||||
blob := rawdb.ReadStateHistoryIndexMetadata(db)
|
||||
// loadIndexMetadata reads the metadata of the specific history index.
|
||||
func loadIndexMetadata(db ethdb.KeyValueReader, typ historyType) *indexMetadata {
|
||||
var blob []byte
|
||||
switch typ {
|
||||
case typeStateHistory:
|
||||
blob = rawdb.ReadStateHistoryIndexMetadata(db)
|
||||
default:
|
||||
panic(fmt.Errorf("unknown history type %d", typ))
|
||||
}
|
||||
if len(blob) == 0 {
|
||||
return nil
|
||||
}
|
||||
|
|
@ -59,91 +77,94 @@ func loadIndexMetadata(db ethdb.KeyValueReader) *indexMetadata {
|
|||
return &m
|
||||
}
|
||||
|
||||
func storeIndexMetadata(db ethdb.KeyValueWriter, last uint64) {
|
||||
var m indexMetadata
|
||||
m.Version = stateIndexVersion
|
||||
m.Last = last
|
||||
// storeIndexMetadata stores the metadata of the specific history index.
|
||||
func storeIndexMetadata(db ethdb.KeyValueWriter, typ historyType, last uint64) {
|
||||
m := indexMetadata{
|
||||
Version: indexVersion(typ),
|
||||
Last: last,
|
||||
}
|
||||
blob, err := rlp.EncodeToBytes(m)
|
||||
if err != nil {
|
||||
log.Crit("Failed to encode index metadata", "err", err)
|
||||
panic(fmt.Errorf("fail to encode index metadata, %v", err))
|
||||
}
|
||||
rawdb.WriteStateHistoryIndexMetadata(db, blob)
|
||||
switch typ {
|
||||
case typeStateHistory:
|
||||
rawdb.WriteStateHistoryIndexMetadata(db, blob)
|
||||
default:
|
||||
panic(fmt.Errorf("unknown history type %d", typ))
|
||||
}
|
||||
log.Debug("Written index metadata", "type", typ, "last", last)
|
||||
}
|
||||
|
||||
// batchIndexer is a structure designed to perform batch indexing or unindexing
|
||||
// of state histories atomically.
|
||||
// deleteIndexMetadata deletes the metadata of the specific history index.
|
||||
func deleteIndexMetadata(db ethdb.KeyValueWriter, typ historyType) {
|
||||
switch typ {
|
||||
case typeStateHistory:
|
||||
rawdb.DeleteStateHistoryIndexMetadata(db)
|
||||
default:
|
||||
panic(fmt.Errorf("unknown history type %d", typ))
|
||||
}
|
||||
log.Debug("Deleted index metadata", "type", typ)
|
||||
}
|
||||
|
||||
// batchIndexer is responsible for performing batch indexing or unindexing
|
||||
// of historical data (e.g., state or trie node changes) atomically.
|
||||
type batchIndexer struct {
|
||||
accounts map[common.Hash][]uint64 // History ID list, Keyed by the hash of account address
|
||||
storages map[common.Hash]map[common.Hash][]uint64 // History ID list, Keyed by the hash of account address and the hash of raw storage key
|
||||
counter int // The counter of processed states
|
||||
delete bool // Index or unindex mode
|
||||
lastID uint64 // The ID of latest processed history
|
||||
db ethdb.KeyValueStore
|
||||
index map[stateIdent][]uint64 // List of history IDs for tracked state entry
|
||||
pending int // Number of entries processed in the current batch.
|
||||
delete bool // Operation mode: true for unindex, false for index.
|
||||
lastID uint64 // ID of the most recently processed history.
|
||||
typ historyType // Type of history being processed (e.g., state or trienode).
|
||||
db ethdb.KeyValueStore // Key-value database used to store or delete index data.
|
||||
}
|
||||
|
||||
// newBatchIndexer constructs the batch indexer with the supplied mode.
|
||||
func newBatchIndexer(db ethdb.KeyValueStore, delete bool) *batchIndexer {
|
||||
func newBatchIndexer(db ethdb.KeyValueStore, delete bool, typ historyType) *batchIndexer {
|
||||
return &batchIndexer{
|
||||
accounts: make(map[common.Hash][]uint64),
|
||||
storages: make(map[common.Hash]map[common.Hash][]uint64),
|
||||
delete: delete,
|
||||
db: db,
|
||||
index: make(map[stateIdent][]uint64),
|
||||
delete: delete,
|
||||
typ: typ,
|
||||
db: db,
|
||||
}
|
||||
}
|
||||
|
||||
// process iterates through the accounts and their associated storage slots in the
|
||||
// state history, tracking the mapping between state and history IDs.
|
||||
func (b *batchIndexer) process(h *stateHistory, historyID uint64) error {
|
||||
for _, address := range h.accountList {
|
||||
addrHash := crypto.Keccak256Hash(address.Bytes())
|
||||
b.counter += 1
|
||||
b.accounts[addrHash] = append(b.accounts[addrHash], historyID)
|
||||
|
||||
for _, slotKey := range h.storageList[address] {
|
||||
b.counter += 1
|
||||
if _, ok := b.storages[addrHash]; !ok {
|
||||
b.storages[addrHash] = make(map[common.Hash][]uint64)
|
||||
}
|
||||
// The hash of the storage slot key is used as the identifier because the
|
||||
// legacy history does not include the raw storage key, therefore, the
|
||||
// conversion from storage key to hash is necessary for non-v0 histories.
|
||||
slotHash := slotKey
|
||||
if h.meta.version != stateHistoryV0 {
|
||||
slotHash = crypto.Keccak256Hash(slotKey.Bytes())
|
||||
}
|
||||
b.storages[addrHash][slotHash] = append(b.storages[addrHash][slotHash], historyID)
|
||||
}
|
||||
// process traverses the state entries within the provided history and tracks the mutation
|
||||
// records for them.
|
||||
func (b *batchIndexer) process(h history, id uint64) error {
|
||||
for ident := range h.forEach() {
|
||||
b.index[ident] = append(b.index[ident], id)
|
||||
b.pending++
|
||||
}
|
||||
b.lastID = historyID
|
||||
b.lastID = id
|
||||
|
||||
return b.finish(false)
|
||||
}
|
||||
|
||||
// finish writes the accumulated state indexes into the disk if either the
|
||||
// memory limitation is reached or it's requested forcibly.
|
||||
func (b *batchIndexer) finish(force bool) error {
|
||||
if b.counter == 0 {
|
||||
if b.pending == 0 {
|
||||
return nil
|
||||
}
|
||||
if !force && b.counter < historyIndexBatch {
|
||||
if !force && b.pending < historyIndexBatch {
|
||||
return nil
|
||||
}
|
||||
var (
|
||||
batch = b.db.NewBatch()
|
||||
batchMu sync.RWMutex
|
||||
storages int
|
||||
start = time.Now()
|
||||
eg errgroup.Group
|
||||
batch = b.db.NewBatch()
|
||||
batchMu sync.RWMutex
|
||||
start = time.Now()
|
||||
eg errgroup.Group
|
||||
)
|
||||
eg.SetLimit(runtime.NumCPU())
|
||||
|
||||
for addrHash, idList := range b.accounts {
|
||||
for ident, list := range b.index {
|
||||
eg.Go(func() error {
|
||||
if !b.delete {
|
||||
iw, err := newIndexWriter(b.db, newAccountIdent(addrHash))
|
||||
iw, err := newIndexWriter(b.db, ident)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
for _, n := range idList {
|
||||
for _, n := range list {
|
||||
if err := iw.append(n); err != nil {
|
||||
return err
|
||||
}
|
||||
|
|
@ -152,11 +173,11 @@ func (b *batchIndexer) finish(force bool) error {
|
|||
iw.finish(batch)
|
||||
batchMu.Unlock()
|
||||
} else {
|
||||
id, err := newIndexDeleter(b.db, newAccountIdent(addrHash))
|
||||
id, err := newIndexDeleter(b.db, ident)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
for _, n := range idList {
|
||||
for _, n := range list {
|
||||
if err := id.pop(n); err != nil {
|
||||
return err
|
||||
}
|
||||
|
|
@ -168,72 +189,36 @@ func (b *batchIndexer) finish(force bool) error {
|
|||
return nil
|
||||
})
|
||||
}
|
||||
for addrHash, slots := range b.storages {
|
||||
storages += len(slots)
|
||||
for storageHash, idList := range slots {
|
||||
eg.Go(func() error {
|
||||
if !b.delete {
|
||||
iw, err := newIndexWriter(b.db, newStorageIdent(addrHash, storageHash))
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
for _, n := range idList {
|
||||
if err := iw.append(n); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
batchMu.Lock()
|
||||
iw.finish(batch)
|
||||
batchMu.Unlock()
|
||||
} else {
|
||||
id, err := newIndexDeleter(b.db, newStorageIdent(addrHash, storageHash))
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
for _, n := range idList {
|
||||
if err := id.pop(n); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
batchMu.Lock()
|
||||
id.finish(batch)
|
||||
batchMu.Unlock()
|
||||
}
|
||||
return nil
|
||||
})
|
||||
}
|
||||
}
|
||||
if err := eg.Wait(); err != nil {
|
||||
return err
|
||||
}
|
||||
// Update the position of last indexed state history
|
||||
if !b.delete {
|
||||
storeIndexMetadata(batch, b.lastID)
|
||||
storeIndexMetadata(batch, b.typ, b.lastID)
|
||||
} else {
|
||||
if b.lastID == 1 {
|
||||
rawdb.DeleteStateHistoryIndexMetadata(batch)
|
||||
deleteIndexMetadata(batch, b.typ)
|
||||
} else {
|
||||
storeIndexMetadata(batch, b.lastID-1)
|
||||
storeIndexMetadata(batch, b.typ, b.lastID-1)
|
||||
}
|
||||
}
|
||||
if err := batch.Write(); err != nil {
|
||||
return err
|
||||
}
|
||||
log.Debug("Committed batch indexer", "accounts", len(b.accounts), "storages", storages, "records", b.counter, "elapsed", common.PrettyDuration(time.Since(start)))
|
||||
b.counter = 0
|
||||
b.accounts = make(map[common.Hash][]uint64)
|
||||
b.storages = make(map[common.Hash]map[common.Hash][]uint64)
|
||||
log.Debug("Committed batch indexer", "type", b.typ, "entries", len(b.index), "records", b.pending, "elapsed", common.PrettyDuration(time.Since(start)))
|
||||
b.pending = 0
|
||||
b.index = make(map[stateIdent][]uint64)
|
||||
return nil
|
||||
}
|
||||
|
||||
// indexSingle processes the state history with the specified ID for indexing.
|
||||
func indexSingle(historyID uint64, db ethdb.KeyValueStore, freezer ethdb.AncientReader) error {
|
||||
func indexSingle(historyID uint64, db ethdb.KeyValueStore, freezer ethdb.AncientReader, typ historyType) error {
|
||||
start := time.Now()
|
||||
defer func() {
|
||||
indexHistoryTimer.UpdateSince(start)
|
||||
}()
|
||||
|
||||
metadata := loadIndexMetadata(db)
|
||||
metadata := loadIndexMetadata(db, typ)
|
||||
if metadata == nil || metadata.Last+1 != historyID {
|
||||
last := "null"
|
||||
if metadata != nil {
|
||||
|
|
@ -241,29 +226,37 @@ func indexSingle(historyID uint64, db ethdb.KeyValueStore, freezer ethdb.Ancient
|
|||
}
|
||||
return fmt.Errorf("history indexing is out of order, last: %s, requested: %d", last, historyID)
|
||||
}
|
||||
h, err := readStateHistory(freezer, historyID)
|
||||
var (
|
||||
err error
|
||||
h history
|
||||
b = newBatchIndexer(db, false, typ)
|
||||
)
|
||||
if typ == typeStateHistory {
|
||||
h, err = readStateHistory(freezer, historyID)
|
||||
} else {
|
||||
// h, err = readTrienodeHistory(freezer, historyID)
|
||||
}
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
b := newBatchIndexer(db, false)
|
||||
if err := b.process(h, historyID); err != nil {
|
||||
return err
|
||||
}
|
||||
if err := b.finish(true); err != nil {
|
||||
return err
|
||||
}
|
||||
log.Debug("Indexed state history", "id", historyID, "elapsed", common.PrettyDuration(time.Since(start)))
|
||||
log.Debug("Indexed history", "type", typ, "id", historyID, "elapsed", common.PrettyDuration(time.Since(start)))
|
||||
return nil
|
||||
}
|
||||
|
||||
// unindexSingle processes the state history with the specified ID for unindexing.
|
||||
func unindexSingle(historyID uint64, db ethdb.KeyValueStore, freezer ethdb.AncientReader) error {
|
||||
func unindexSingle(historyID uint64, db ethdb.KeyValueStore, freezer ethdb.AncientReader, typ historyType) error {
|
||||
start := time.Now()
|
||||
defer func() {
|
||||
unindexHistoryTimer.UpdateSince(start)
|
||||
}()
|
||||
|
||||
metadata := loadIndexMetadata(db)
|
||||
metadata := loadIndexMetadata(db, typ)
|
||||
if metadata == nil || metadata.Last != historyID {
|
||||
last := "null"
|
||||
if metadata != nil {
|
||||
|
|
@ -271,18 +264,26 @@ func unindexSingle(historyID uint64, db ethdb.KeyValueStore, freezer ethdb.Ancie
|
|||
}
|
||||
return fmt.Errorf("history unindexing is out of order, last: %s, requested: %d", last, historyID)
|
||||
}
|
||||
h, err := readStateHistory(freezer, historyID)
|
||||
var (
|
||||
err error
|
||||
h history
|
||||
)
|
||||
b := newBatchIndexer(db, true, typ)
|
||||
if typ == typeStateHistory {
|
||||
h, err = readStateHistory(freezer, historyID)
|
||||
} else {
|
||||
// h, err = readTrienodeHistory(freezer, historyID)
|
||||
}
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
b := newBatchIndexer(db, true)
|
||||
if err := b.process(h, historyID); err != nil {
|
||||
return err
|
||||
}
|
||||
if err := b.finish(true); err != nil {
|
||||
return err
|
||||
}
|
||||
log.Debug("Unindexed state history", "id", historyID, "elapsed", common.PrettyDuration(time.Since(start)))
|
||||
log.Debug("Unindexed history", "type", typ, "id", historyID, "elapsed", common.PrettyDuration(time.Since(start)))
|
||||
return nil
|
||||
}
|
||||
|
||||
|
|
@ -305,6 +306,8 @@ type indexIniter struct {
|
|||
interrupt chan *interruptSignal
|
||||
done chan struct{}
|
||||
closed chan struct{}
|
||||
typ historyType
|
||||
log log.Logger // Contextual logger with the history type injected
|
||||
|
||||
// indexing progress
|
||||
indexed atomic.Uint64 // the id of latest indexed state
|
||||
|
|
@ -313,18 +316,20 @@ type indexIniter struct {
|
|||
wg sync.WaitGroup
|
||||
}
|
||||
|
||||
func newIndexIniter(disk ethdb.KeyValueStore, freezer ethdb.AncientStore, lastID uint64) *indexIniter {
|
||||
func newIndexIniter(disk ethdb.KeyValueStore, freezer ethdb.AncientStore, typ historyType, lastID uint64) *indexIniter {
|
||||
initer := &indexIniter{
|
||||
disk: disk,
|
||||
freezer: freezer,
|
||||
interrupt: make(chan *interruptSignal),
|
||||
done: make(chan struct{}),
|
||||
closed: make(chan struct{}),
|
||||
typ: typ,
|
||||
log: log.New("type", typ.String()),
|
||||
}
|
||||
// Load indexing progress
|
||||
var recover bool
|
||||
initer.last.Store(lastID)
|
||||
metadata := loadIndexMetadata(disk)
|
||||
metadata := loadIndexMetadata(disk, typ)
|
||||
if metadata != nil {
|
||||
initer.indexed.Store(metadata.Last)
|
||||
recover = metadata.Last > lastID
|
||||
|
|
@ -371,7 +376,7 @@ func (i *indexIniter) remain() uint64 {
|
|||
default:
|
||||
last, indexed := i.last.Load(), i.indexed.Load()
|
||||
if last < indexed {
|
||||
log.Warn("State indexer is in recovery", "indexed", indexed, "last", last)
|
||||
i.log.Warn("State indexer is in recovery", "indexed", indexed, "last", last)
|
||||
return indexed - last
|
||||
}
|
||||
return last - indexed
|
||||
|
|
@ -389,7 +394,7 @@ func (i *indexIniter) run(lastID uint64) {
|
|||
// checkDone indicates whether all requested state histories
|
||||
// have been fully indexed.
|
||||
checkDone = func() bool {
|
||||
metadata := loadIndexMetadata(i.disk)
|
||||
metadata := loadIndexMetadata(i.disk, i.typ)
|
||||
return metadata != nil && metadata.Last == lastID
|
||||
}
|
||||
)
|
||||
|
|
@ -411,7 +416,7 @@ func (i *indexIniter) run(lastID uint64) {
|
|||
if newLastID == lastID+1 {
|
||||
lastID = newLastID
|
||||
signal.result <- nil
|
||||
log.Debug("Extended state history range", "last", lastID)
|
||||
i.log.Debug("Extended history range", "last", lastID)
|
||||
continue
|
||||
}
|
||||
// The index limit is shortened by one, interrupt the current background
|
||||
|
|
@ -422,14 +427,14 @@ func (i *indexIniter) run(lastID uint64) {
|
|||
// If all state histories, including the one to be reverted, have
|
||||
// been fully indexed, unindex it here and shut down the initializer.
|
||||
if checkDone() {
|
||||
log.Info("Truncate the extra history", "id", lastID)
|
||||
if err := unindexSingle(lastID, i.disk, i.freezer); err != nil {
|
||||
i.log.Info("Truncate the extra history", "id", lastID)
|
||||
if err := unindexSingle(lastID, i.disk, i.freezer, i.typ); err != nil {
|
||||
signal.result <- err
|
||||
return
|
||||
}
|
||||
close(i.done)
|
||||
signal.result <- nil
|
||||
log.Info("State histories have been fully indexed", "last", lastID-1)
|
||||
i.log.Info("Histories have been fully indexed", "last", lastID-1)
|
||||
return
|
||||
}
|
||||
// Adjust the indexing target and relaunch the process
|
||||
|
|
@ -438,12 +443,12 @@ func (i *indexIniter) run(lastID uint64) {
|
|||
|
||||
done, interrupt = make(chan struct{}), new(atomic.Int32)
|
||||
go i.index(done, interrupt, lastID)
|
||||
log.Debug("Shortened state history range", "last", lastID)
|
||||
i.log.Debug("Shortened history range", "last", lastID)
|
||||
|
||||
case <-done:
|
||||
if checkDone() {
|
||||
close(i.done)
|
||||
log.Info("State histories have been fully indexed", "last", lastID)
|
||||
i.log.Info("Histories have been fully indexed", "last", lastID)
|
||||
return
|
||||
}
|
||||
// Relaunch the background runner if some tasks are left
|
||||
|
|
@ -452,7 +457,7 @@ func (i *indexIniter) run(lastID uint64) {
|
|||
|
||||
case <-i.closed:
|
||||
interrupt.Store(1)
|
||||
log.Info("Waiting background history index initer to exit")
|
||||
i.log.Info("Waiting background history index initer to exit")
|
||||
<-done
|
||||
|
||||
if checkDone() {
|
||||
|
|
@ -472,14 +477,14 @@ func (i *indexIniter) next() (uint64, error) {
|
|||
tailID := tail + 1 // compute the id of the oldest history
|
||||
|
||||
// Start indexing from scratch if nothing has been indexed
|
||||
metadata := loadIndexMetadata(i.disk)
|
||||
metadata := loadIndexMetadata(i.disk, i.typ)
|
||||
if metadata == nil {
|
||||
log.Debug("Initialize state history indexing from scratch", "id", tailID)
|
||||
i.log.Debug("Initialize history indexing from scratch", "id", tailID)
|
||||
return tailID, nil
|
||||
}
|
||||
// Resume indexing from the last interrupted position
|
||||
if metadata.Last+1 >= tailID {
|
||||
log.Debug("Resume state history indexing", "id", metadata.Last+1, "tail", tailID)
|
||||
i.log.Debug("Resume history indexing", "id", metadata.Last+1, "tail", tailID)
|
||||
return metadata.Last + 1, nil
|
||||
}
|
||||
// History has been shortened without indexing. Discard the gapped segment
|
||||
|
|
@ -487,7 +492,7 @@ func (i *indexIniter) next() (uint64, error) {
|
|||
//
|
||||
// The missing indexes corresponding to the gapped histories won't be visible.
|
||||
// It's fine to leave them unindexed.
|
||||
log.Info("History gap detected, discard old segment", "oldHead", metadata.Last, "newHead", tailID)
|
||||
i.log.Info("History gap detected, discard old segment", "oldHead", metadata.Last, "newHead", tailID)
|
||||
return tailID, nil
|
||||
}
|
||||
|
||||
|
|
@ -496,7 +501,7 @@ func (i *indexIniter) index(done chan struct{}, interrupt *atomic.Int32, lastID
|
|||
|
||||
beginID, err := i.next()
|
||||
if err != nil {
|
||||
log.Error("Failed to find next state history for indexing", "err", err)
|
||||
i.log.Error("Failed to find next history for indexing", "err", err)
|
||||
return
|
||||
}
|
||||
// All available state histories have been indexed, and the last indexed one
|
||||
|
|
@ -511,36 +516,47 @@ func (i *indexIniter) index(done chan struct{}, interrupt *atomic.Int32, lastID
|
|||
//
|
||||
// This step is essential to avoid spinning up indexing thread
|
||||
// endlessly until a history object is produced.
|
||||
storeIndexMetadata(i.disk, 0)
|
||||
log.Info("Initialized history indexing flag")
|
||||
storeIndexMetadata(i.disk, i.typ, 0)
|
||||
i.log.Info("Initialized history indexing flag")
|
||||
} else {
|
||||
log.Debug("State history is fully indexed", "last", lastID)
|
||||
i.log.Debug("History is fully indexed", "last", lastID)
|
||||
}
|
||||
return
|
||||
}
|
||||
log.Info("Start history indexing", "beginID", beginID, "lastID", lastID)
|
||||
i.log.Info("Start history indexing", "beginID", beginID, "lastID", lastID)
|
||||
|
||||
var (
|
||||
current = beginID
|
||||
start = time.Now()
|
||||
logged = time.Now()
|
||||
batch = newBatchIndexer(i.disk, false)
|
||||
batch = newBatchIndexer(i.disk, false, i.typ)
|
||||
)
|
||||
for current <= lastID {
|
||||
count := lastID - current + 1
|
||||
if count > historyReadBatch {
|
||||
count = historyReadBatch
|
||||
}
|
||||
histories, err := readStateHistories(i.freezer, current, count)
|
||||
if err != nil {
|
||||
// The history read might fall if the history is truncated from
|
||||
// head due to revert operation.
|
||||
log.Error("Failed to read history for indexing", "current", current, "count", count, "err", err)
|
||||
return
|
||||
var histories []history
|
||||
if i.typ == typeStateHistory {
|
||||
histories, err = readStateHistories(i.freezer, current, count)
|
||||
if err != nil {
|
||||
// The history read might fall if the history is truncated from
|
||||
// head due to revert operation.
|
||||
i.log.Error("Failed to read history for indexing", "current", current, "count", count, "err", err)
|
||||
return
|
||||
}
|
||||
} else {
|
||||
// histories, err = readTrienodeHistories(i.freezer, current, count)
|
||||
// if err != nil {
|
||||
// // The history read might fall if the history is truncated from
|
||||
// // head due to revert operation.
|
||||
// i.log.Error("Failed to read history for indexing", "current", current, "count", count, "err", err)
|
||||
// return
|
||||
// }
|
||||
}
|
||||
for _, h := range histories {
|
||||
if err := batch.process(h, current); err != nil {
|
||||
log.Error("Failed to index history", "err", err)
|
||||
i.log.Error("Failed to index history", "err", err)
|
||||
return
|
||||
}
|
||||
current += 1
|
||||
|
|
@ -554,7 +570,7 @@ func (i *indexIniter) index(done chan struct{}, interrupt *atomic.Int32, lastID
|
|||
done = current - beginID
|
||||
)
|
||||
eta := common.CalculateETA(done, left, time.Since(start))
|
||||
log.Info("Indexing state history", "processed", done, "left", left, "elapsed", common.PrettyDuration(time.Since(start)), "eta", common.PrettyDuration(eta))
|
||||
i.log.Info("Indexing state history", "processed", done, "left", left, "elapsed", common.PrettyDuration(time.Since(start)), "eta", common.PrettyDuration(eta))
|
||||
}
|
||||
}
|
||||
i.indexed.Store(current - 1) // update indexing progress
|
||||
|
|
@ -563,7 +579,7 @@ func (i *indexIniter) index(done chan struct{}, interrupt *atomic.Int32, lastID
|
|||
if interrupt != nil {
|
||||
if signal := interrupt.Load(); signal != 0 {
|
||||
if err := batch.finish(true); err != nil {
|
||||
log.Error("Failed to flush index", "err", err)
|
||||
i.log.Error("Failed to flush index", "err", err)
|
||||
}
|
||||
log.Info("State indexing interrupted")
|
||||
return
|
||||
|
|
@ -571,9 +587,9 @@ func (i *indexIniter) index(done chan struct{}, interrupt *atomic.Int32, lastID
|
|||
}
|
||||
}
|
||||
if err := batch.finish(true); err != nil {
|
||||
log.Error("Failed to flush index", "err", err)
|
||||
i.log.Error("Failed to flush index", "err", err)
|
||||
}
|
||||
log.Info("Indexed state history", "from", beginID, "to", lastID, "elapsed", common.PrettyDuration(time.Since(start)))
|
||||
i.log.Info("Indexed history", "from", beginID, "to", lastID, "elapsed", common.PrettyDuration(time.Since(start)))
|
||||
}
|
||||
|
||||
// recover handles unclean shutdown recovery. After an unclean shutdown, any
|
||||
|
|
@ -602,14 +618,14 @@ func (i *indexIniter) recover(lastID uint64) {
|
|||
lastID = newLastID
|
||||
signal.result <- nil
|
||||
i.last.Store(newLastID)
|
||||
log.Debug("Updated history index flag", "last", lastID)
|
||||
i.log.Debug("Updated history index flag", "last", lastID)
|
||||
|
||||
// Terminate the recovery routine once the histories are fully aligned
|
||||
// with the index data, indicating that index initialization is complete.
|
||||
metadata := loadIndexMetadata(i.disk)
|
||||
metadata := loadIndexMetadata(i.disk, i.typ)
|
||||
if metadata != nil && metadata.Last == lastID {
|
||||
close(i.done)
|
||||
log.Info("History indexer is recovered", "last", lastID)
|
||||
i.log.Info("History indexer is recovered", "last", lastID)
|
||||
return
|
||||
}
|
||||
|
||||
|
|
@ -631,21 +647,31 @@ func (i *indexIniter) recover(lastID uint64) {
|
|||
// state history.
|
||||
type historyIndexer struct {
|
||||
initer *indexIniter
|
||||
typ historyType
|
||||
disk ethdb.KeyValueStore
|
||||
freezer ethdb.AncientStore
|
||||
}
|
||||
|
||||
// checkVersion checks whether the index data in the database matches the version.
|
||||
func checkVersion(disk ethdb.KeyValueStore) {
|
||||
blob := rawdb.ReadStateHistoryIndexMetadata(disk)
|
||||
func checkVersion(disk ethdb.KeyValueStore, typ historyType) {
|
||||
var blob []byte
|
||||
if typ == typeStateHistory {
|
||||
blob = rawdb.ReadStateHistoryIndexMetadata(disk)
|
||||
} else {
|
||||
panic(fmt.Errorf("unknown history type: %v", typ))
|
||||
}
|
||||
// Short circuit if metadata is not found, re-index is required
|
||||
// from scratch.
|
||||
if len(blob) == 0 {
|
||||
return
|
||||
}
|
||||
// Short circuit if the metadata is found and the version is matched
|
||||
var m indexMetadata
|
||||
err := rlp.DecodeBytes(blob, &m)
|
||||
if err == nil && m.Version == stateIndexVersion {
|
||||
return
|
||||
}
|
||||
// Version is not matched, prune the existing data and re-index from scratch
|
||||
version := "unknown"
|
||||
if err == nil {
|
||||
version = fmt.Sprintf("%d", m.Version)
|
||||
|
|
@ -662,10 +688,11 @@ func checkVersion(disk ethdb.KeyValueStore) {
|
|||
|
||||
// newHistoryIndexer constructs the history indexer and launches the background
|
||||
// initer to complete the indexing of any remaining state histories.
|
||||
func newHistoryIndexer(disk ethdb.KeyValueStore, freezer ethdb.AncientStore, lastHistoryID uint64) *historyIndexer {
|
||||
checkVersion(disk)
|
||||
func newHistoryIndexer(disk ethdb.KeyValueStore, freezer ethdb.AncientStore, lastHistoryID uint64, typ historyType) *historyIndexer {
|
||||
checkVersion(disk, typ)
|
||||
return &historyIndexer{
|
||||
initer: newIndexIniter(disk, freezer, lastHistoryID),
|
||||
initer: newIndexIniter(disk, freezer, typ, lastHistoryID),
|
||||
typ: typ,
|
||||
disk: disk,
|
||||
freezer: freezer,
|
||||
}
|
||||
|
|
@ -693,7 +720,7 @@ func (i *historyIndexer) extend(historyID uint64) error {
|
|||
case <-i.initer.closed:
|
||||
return errors.New("indexer is closed")
|
||||
case <-i.initer.done:
|
||||
return indexSingle(historyID, i.disk, i.freezer)
|
||||
return indexSingle(historyID, i.disk, i.freezer, i.typ)
|
||||
case i.initer.interrupt <- signal:
|
||||
return <-signal.result
|
||||
}
|
||||
|
|
@ -710,7 +737,7 @@ func (i *historyIndexer) shorten(historyID uint64) error {
|
|||
case <-i.initer.closed:
|
||||
return errors.New("indexer is closed")
|
||||
case <-i.initer.done:
|
||||
return unindexSingle(historyID, i.disk, i.freezer)
|
||||
return unindexSingle(historyID, i.disk, i.freezer, i.typ)
|
||||
case i.initer.interrupt <- signal:
|
||||
return <-signal.result
|
||||
}
|
||||
|
|
|
|||
|
|
@ -38,7 +38,7 @@ func TestHistoryIndexerShortenDeadlock(t *testing.T) {
|
|||
rawdb.WriteStateHistory(freezer, uint64(i+1), h.meta.encode(), accountIndex, storageIndex, accountData, storageData)
|
||||
}
|
||||
// As a workaround, assign a future block to keep the initer running indefinitely
|
||||
indexer := newHistoryIndexer(db, freezer, 200)
|
||||
indexer := newHistoryIndexer(db, freezer, 200, typeStateHistory)
|
||||
defer indexer.close()
|
||||
|
||||
done := make(chan error, 1)
|
||||
|
|
|
|||
|
|
@ -29,88 +29,6 @@ import (
|
|||
"github.com/ethereum/go-ethereum/ethdb"
|
||||
)
|
||||
|
||||
// stateIdent represents the identifier of a state element, which can be
|
||||
// either an account or a storage slot.
|
||||
type stateIdent struct {
|
||||
account bool
|
||||
|
||||
// The hash of the account address. This is used instead of the raw account
|
||||
// address is to align the traversal order with the Merkle-Patricia-Trie.
|
||||
addressHash common.Hash
|
||||
|
||||
// The hash of the storage slot key. This is used instead of the raw slot key
|
||||
// because, in legacy state histories (prior to the Cancun fork), the slot
|
||||
// identifier is the hash of the key, and the original key (preimage) cannot
|
||||
// be recovered. To maintain backward compatibility, the key hash is used.
|
||||
//
|
||||
// Meanwhile, using the storage key hash also preserve the traversal order
|
||||
// with Merkle-Patricia-Trie.
|
||||
//
|
||||
// This field is null if the identifier refers to account data.
|
||||
storageHash common.Hash
|
||||
}
|
||||
|
||||
// String returns the string format state identifier.
|
||||
func (ident stateIdent) String() string {
|
||||
if ident.account {
|
||||
return ident.addressHash.Hex()
|
||||
}
|
||||
return ident.addressHash.Hex() + ident.storageHash.Hex()
|
||||
}
|
||||
|
||||
// newAccountIdent constructs a state identifier for an account.
|
||||
func newAccountIdent(addressHash common.Hash) stateIdent {
|
||||
return stateIdent{
|
||||
account: true,
|
||||
addressHash: addressHash,
|
||||
}
|
||||
}
|
||||
|
||||
// newStorageIdent constructs a state identifier for a storage slot.
|
||||
// The address denotes the address of the associated account;
|
||||
// the storageHash denotes the hash of the raw storage slot key;
|
||||
func newStorageIdent(addressHash common.Hash, storageHash common.Hash) stateIdent {
|
||||
return stateIdent{
|
||||
addressHash: addressHash,
|
||||
storageHash: storageHash,
|
||||
}
|
||||
}
|
||||
|
||||
// stateIdentQuery is the extension of stateIdent by adding the raw storage key.
|
||||
type stateIdentQuery struct {
|
||||
stateIdent
|
||||
|
||||
address common.Address
|
||||
storageKey common.Hash
|
||||
}
|
||||
|
||||
// newAccountIdentQuery constructs a state identifier for an account.
|
||||
func newAccountIdentQuery(address common.Address, addressHash common.Hash) stateIdentQuery {
|
||||
return stateIdentQuery{
|
||||
stateIdent: stateIdent{
|
||||
account: true,
|
||||
addressHash: addressHash,
|
||||
},
|
||||
address: address,
|
||||
}
|
||||
}
|
||||
|
||||
// newStorageIdentQuery constructs a state identifier for a storage slot.
|
||||
// the address denotes the address of the associated account;
|
||||
// the addressHash denotes the address hash of the associated account;
|
||||
// the storageKey denotes the raw storage slot key;
|
||||
// the storageHash denotes the hash of the raw storage slot key;
|
||||
func newStorageIdentQuery(address common.Address, addressHash common.Hash, storageKey common.Hash, storageHash common.Hash) stateIdentQuery {
|
||||
return stateIdentQuery{
|
||||
stateIdent: stateIdent{
|
||||
addressHash: addressHash,
|
||||
storageHash: storageHash,
|
||||
},
|
||||
address: address,
|
||||
storageKey: storageKey,
|
||||
}
|
||||
}
|
||||
|
||||
// indexReaderWithLimitTag is a wrapper around indexReader that includes an
|
||||
// additional index position. This position represents the ID of the last
|
||||
// indexed state history at the time the reader was created, implying that
|
||||
|
|
@ -169,7 +87,7 @@ func (r *indexReaderWithLimitTag) readGreaterThan(id uint64, lastID uint64) (uin
|
|||
// Given that it's very unlikely to occur and users try to perform historical
|
||||
// state queries while reverting the states at the same time. Simply returning
|
||||
// an error should be sufficient for now.
|
||||
metadata := loadIndexMetadata(r.db)
|
||||
metadata := loadIndexMetadata(r.db, toHistoryType(r.reader.state.typ))
|
||||
if metadata == nil || metadata.Last < lastID {
|
||||
return 0, errors.New("state history hasn't been indexed yet")
|
||||
}
|
||||
|
|
@ -331,7 +249,7 @@ func (r *historyReader) read(state stateIdentQuery, stateID uint64, lastID uint6
|
|||
// To serve the request, all state histories from stateID+1 to lastID
|
||||
// must be indexed. It's not supposed to happen unless system is very
|
||||
// wrong.
|
||||
metadata := loadIndexMetadata(r.disk)
|
||||
metadata := loadIndexMetadata(r.disk, toHistoryType(state.typ))
|
||||
if metadata == nil || metadata.Last < lastID {
|
||||
indexed := "null"
|
||||
if metadata != nil {
|
||||
|
|
@ -364,7 +282,7 @@ func (r *historyReader) read(state stateIdentQuery, stateID uint64, lastID uint6
|
|||
// that the associated state histories are no longer available due to a rollback.
|
||||
// Such truncation should be captured by the state resolver below, rather than returning
|
||||
// invalid data.
|
||||
if state.account {
|
||||
if state.typ == typeAccount {
|
||||
return r.readAccount(state.address, historyID)
|
||||
}
|
||||
return r.readStorage(state.address, state.storageKey, state.storageHash, historyID)
|
||||
|
|
|
|||
|
|
@ -29,7 +29,7 @@ import (
|
|||
|
||||
func waitIndexing(db *Database) {
|
||||
for {
|
||||
metadata := loadIndexMetadata(db.diskdb)
|
||||
metadata := loadIndexMetadata(db.diskdb, typeStateHistory)
|
||||
if metadata != nil && metadata.Last >= db.tree.bottom().stateID() {
|
||||
return
|
||||
}
|
||||
|
|
|
|||
|
|
@ -21,6 +21,7 @@ import (
|
|||
"encoding/binary"
|
||||
"errors"
|
||||
"fmt"
|
||||
"iter"
|
||||
"maps"
|
||||
"slices"
|
||||
"time"
|
||||
|
|
@ -275,6 +276,36 @@ func newStateHistory(root common.Hash, parent common.Hash, block uint64, account
|
|||
}
|
||||
}
|
||||
|
||||
// typ implements the history interface, returning the historical data type held.
|
||||
func (h *stateHistory) typ() historyType {
|
||||
return typeStateHistory
|
||||
}
|
||||
|
||||
// forEach implements the history interface, returning an iterator to traverse the
|
||||
// state entries in the history.
|
||||
func (h *stateHistory) forEach() iter.Seq[stateIdent] {
|
||||
return func(yield func(stateIdent) bool) {
|
||||
for _, addr := range h.accountList {
|
||||
addrHash := crypto.Keccak256Hash(addr.Bytes())
|
||||
if !yield(newAccountIdent(addrHash)) {
|
||||
return
|
||||
}
|
||||
for _, slotKey := range h.storageList[addr] {
|
||||
// The hash of the storage slot key is used as the identifier because the
|
||||
// legacy history does not include the raw storage key, therefore, the
|
||||
// conversion from storage key to hash is necessary for non-v0 histories.
|
||||
slotHash := slotKey
|
||||
if h.meta.version != stateHistoryV0 {
|
||||
slotHash = crypto.Keccak256Hash(slotKey.Bytes())
|
||||
}
|
||||
if !yield(newStorageIdent(addrHash, slotHash)) {
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// stateSet returns the state set, keyed by the hash of the account address
|
||||
// and the hash of the storage slot key.
|
||||
func (h *stateHistory) stateSet() (map[common.Hash][]byte, map[common.Hash]map[common.Hash][]byte) {
|
||||
|
|
@ -536,8 +567,8 @@ func readStateHistory(reader ethdb.AncientReader, id uint64) (*stateHistory, err
|
|||
}
|
||||
|
||||
// readStateHistories reads a list of state history records within the specified range.
|
||||
func readStateHistories(freezer ethdb.AncientReader, start uint64, count uint64) ([]*stateHistory, error) {
|
||||
var histories []*stateHistory
|
||||
func readStateHistories(freezer ethdb.AncientReader, start uint64, count uint64) ([]history, error) {
|
||||
var histories []history
|
||||
metaList, aIndexList, sIndexList, aDataList, sDataList, err := rawdb.ReadStateHistoryList(freezer, start, count)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
|
|
|
|||
|
|
@ -137,7 +137,7 @@ func TestTruncateHeadStateHistory(t *testing.T) {
|
|||
rawdb.WriteStateHistory(freezer, uint64(i+1), hs[i].meta.encode(), accountIndex, storageIndex, accountData, storageData)
|
||||
}
|
||||
for size := len(hs); size > 0; size-- {
|
||||
pruned, err := truncateFromHead(freezer, uint64(size-1))
|
||||
pruned, err := truncateFromHead(freezer, typeStateHistory, uint64(size-1))
|
||||
if err != nil {
|
||||
t.Fatalf("Failed to truncate from head %v", err)
|
||||
}
|
||||
|
|
@ -161,7 +161,7 @@ func TestTruncateTailStateHistory(t *testing.T) {
|
|||
rawdb.WriteStateHistory(freezer, uint64(i+1), hs[i].meta.encode(), accountIndex, storageIndex, accountData, storageData)
|
||||
}
|
||||
for newTail := 1; newTail < len(hs); newTail++ {
|
||||
pruned, _ := truncateFromTail(freezer, uint64(newTail))
|
||||
pruned, _ := truncateFromTail(freezer, typeStateHistory, uint64(newTail))
|
||||
if pruned != 1 {
|
||||
t.Error("Unexpected pruned items", "want", 1, "got", pruned)
|
||||
}
|
||||
|
|
@ -209,7 +209,7 @@ func TestTruncateTailStateHistories(t *testing.T) {
|
|||
accountData, storageData, accountIndex, storageIndex := hs[i].encode()
|
||||
rawdb.WriteStateHistory(freezer, uint64(i+1), hs[i].meta.encode(), accountIndex, storageIndex, accountData, storageData)
|
||||
}
|
||||
pruned, _ := truncateFromTail(freezer, uint64(10)-c.limit)
|
||||
pruned, _ := truncateFromTail(freezer, typeStateHistory, uint64(10)-c.limit)
|
||||
if pruned != c.expPruned {
|
||||
t.Error("Unexpected pruned items", "want", c.expPruned, "got", pruned)
|
||||
}
|
||||
|
|
@ -233,7 +233,7 @@ func TestTruncateOutOfRange(t *testing.T) {
|
|||
accountData, storageData, accountIndex, storageIndex := hs[i].encode()
|
||||
rawdb.WriteStateHistory(freezer, uint64(i+1), hs[i].meta.encode(), accountIndex, storageIndex, accountData, storageData)
|
||||
}
|
||||
truncateFromTail(freezer, uint64(len(hs)/2))
|
||||
truncateFromTail(freezer, typeStateHistory, uint64(len(hs)/2))
|
||||
|
||||
// Ensure of-out-range truncations are rejected correctly.
|
||||
head, _ := freezer.Ancients()
|
||||
|
|
@ -254,9 +254,9 @@ func TestTruncateOutOfRange(t *testing.T) {
|
|||
for _, c := range cases {
|
||||
var gotErr error
|
||||
if c.mode == 0 {
|
||||
_, gotErr = truncateFromHead(freezer, c.target)
|
||||
_, gotErr = truncateFromHead(freezer, typeStateHistory, c.target)
|
||||
} else {
|
||||
_, gotErr = truncateFromTail(freezer, c.target)
|
||||
_, gotErr = truncateFromTail(freezer, typeStateHistory, c.target)
|
||||
}
|
||||
if !errors.Is(gotErr, c.expErr) {
|
||||
t.Errorf("Unexpected error, want: %v, got: %v", c.expErr, gotErr)
|
||||
|
|
|
|||
Loading…
Reference in a new issue