core, triedb/pathdb: final integration (snapshot integration pt 5) (#30661)

In this pull request, snapshot generation in pathdb has been ported from 
the legacy state snapshot implementation. Additionally, when running in 
path mode, legacy state snapshot data is now managed by the pathdb
based snapshot logic.

Note: Existing snapshot data will be re-generated, regardless of whether 
it was previously fully constructed.
This commit is contained in:
rjl493456442 2025-05-16 18:29:38 +08:00 committed by GitHub
parent 57e985ecab
commit 892a661ee2
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
29 changed files with 2700 additions and 247 deletions

View file

@ -183,8 +183,13 @@ func (c *CacheConfig) triedbConfig(isVerkle bool) *triedb.Config {
}
if c.StateScheme == rawdb.PathScheme {
config.PathDB = &pathdb.Config{
StateHistory: c.StateHistory,
CleanCacheSize: c.TrieCleanLimit * 1024 * 1024,
StateHistory: c.StateHistory,
TrieCleanSize: c.TrieCleanLimit * 1024 * 1024,
StateCleanSize: c.SnapshotLimit * 1024 * 1024,
// TODO(rjl493456442): The write buffer represents the memory limit used
// for flushing both trie data and state data to disk. The config name
// should be updated to eliminate the confusion.
WriteBufferSize: c.TrieDirtyLimit * 1024 * 1024,
}
}
@ -380,11 +385,14 @@ func NewBlockChain(db ethdb.Database, cacheConfig *CacheConfig, genesis *Genesis
// Do nothing here until the state syncer picks it up.
log.Info("Genesis state is missing, wait state sync")
} else {
// Head state is missing, before the state recovery, find out the
// disk layer point of snapshot(if it's enabled). Make sure the
// rewound point is lower than disk layer.
// Head state is missing, before the state recovery, find out the disk
// layer point of snapshot(if it's enabled). Make sure the rewound point
// is lower than disk layer.
//
// Note it's unnecessary in path mode which always keep trie data and
// state data consistent.
var diskRoot common.Hash
if bc.cacheConfig.SnapshotLimit > 0 {
if bc.cacheConfig.SnapshotLimit > 0 && bc.cacheConfig.StateScheme == rawdb.HashScheme {
diskRoot = rawdb.ReadSnapshotRoot(bc.db)
}
if diskRoot != (common.Hash{}) {
@ -457,7 +465,32 @@ func NewBlockChain(db ethdb.Database, cacheConfig *CacheConfig, genesis *Genesis
bc.logger.OnGenesisBlock(bc.genesisBlock, alloc)
}
}
bc.setupSnapshot()
// Rewind the chain in case of an incompatible config upgrade.
if compatErr != nil {
log.Warn("Rewinding chain to upgrade configuration", "err", compatErr)
if compatErr.RewindToTime > 0 {
bc.SetHeadWithTimestamp(compatErr.RewindToTime)
} else {
bc.SetHead(compatErr.RewindToBlock)
}
rawdb.WriteChainConfig(db, genesisHash, chainConfig)
}
// Start tx indexer if it's enabled.
if txLookupLimit != nil {
bc.txIndexer = newTxIndexer(*txLookupLimit, bc)
}
return bc, nil
}
func (bc *BlockChain) setupSnapshot() {
// Short circuit if the chain is established with path scheme, as the
// state snapshot has been integrated into path database natively.
if bc.cacheConfig.StateScheme == rawdb.PathScheme {
return
}
// Load any existing snapshot, regenerating it if loading failed
if bc.cacheConfig.SnapshotLimit > 0 {
// If the chain was rewound past the snapshot persistent layer (causing
@ -465,7 +498,6 @@ func NewBlockChain(db ethdb.Database, cacheConfig *CacheConfig, genesis *Genesis
// in recovery mode and in that case, don't invalidate the snapshot on a
// head mismatch.
var recover bool
head := bc.CurrentBlock()
if layer := rawdb.ReadSnapshotRecoveryNumber(bc.db); layer != nil && *layer >= head.Number.Uint64() {
log.Warn("Enabling snapshot recovery", "chainhead", head.Number, "diskbase", *layer)
@ -482,22 +514,6 @@ func NewBlockChain(db ethdb.Database, cacheConfig *CacheConfig, genesis *Genesis
// Re-initialize the state database with snapshot
bc.statedb = state.NewDatabase(bc.triedb, bc.snaps)
}
// Rewind the chain in case of an incompatible config upgrade.
if compatErr != nil {
log.Warn("Rewinding chain to upgrade configuration", "err", compatErr)
if compatErr.RewindToTime > 0 {
bc.SetHeadWithTimestamp(compatErr.RewindToTime)
} else {
bc.SetHead(compatErr.RewindToBlock)
}
rawdb.WriteChainConfig(db, genesisHash, chainConfig)
}
// Start tx indexer if it's enabled.
if txLookupLimit != nil {
bc.txIndexer = newTxIndexer(*txLookupLimit, bc)
}
return bc, nil
}
// empty returns an indicator whether the blockchain is empty.

View file

@ -1791,7 +1791,7 @@ func testRepairWithScheme(t *testing.T, tt *rewindTest, snapshots bool, scheme s
}
)
defer engine.Close()
if snapshots {
if snapshots && scheme == rawdb.HashScheme {
config.SnapshotLimit = 256
config.SnapshotWait = true
}
@ -1820,7 +1820,7 @@ func testRepairWithScheme(t *testing.T, tt *rewindTest, snapshots bool, scheme s
if err := chain.triedb.Commit(canonblocks[tt.commitBlock-1].Root(), false); err != nil {
t.Fatalf("Failed to flush trie state: %v", err)
}
if snapshots {
if snapshots && scheme == rawdb.HashScheme {
if err := chain.snaps.Cap(canonblocks[tt.commitBlock-1].Root(), 0); err != nil {
t.Fatalf("Failed to flatten snapshots: %v", err)
}
@ -1952,8 +1952,10 @@ func testIssue23496(t *testing.T, scheme string) {
if _, err := chain.InsertChain(blocks[1:2]); err != nil {
t.Fatalf("Failed to import canonical chain start: %v", err)
}
if err := chain.snaps.Cap(blocks[1].Root(), 0); err != nil {
t.Fatalf("Failed to flatten snapshots: %v", err)
if scheme == rawdb.HashScheme {
if err := chain.snaps.Cap(blocks[1].Root(), 0); err != nil {
t.Fatalf("Failed to flatten snapshots: %v", err)
}
}
// Insert block B3 and commit the state into disk
@ -1997,15 +1999,23 @@ func testIssue23496(t *testing.T, scheme string) {
}
expHead := uint64(1)
if scheme == rawdb.PathScheme {
expHead = uint64(2)
// The pathdb database makes sure that snapshot and trie are consistent,
// so only the last block is reverted in case of a crash.
expHead = uint64(3)
}
if head := chain.CurrentBlock(); head.Number.Uint64() != expHead {
t.Errorf("Head block mismatch: have %d, want %d", head.Number, expHead)
}
// Reinsert B2-B4
if _, err := chain.InsertChain(blocks[1:]); err != nil {
t.Fatalf("Failed to import canonical chain tail: %v", err)
if scheme == rawdb.PathScheme {
// Reinsert B4
if _, err := chain.InsertChain(blocks[3:]); err != nil {
t.Fatalf("Failed to import canonical chain tail: %v", err)
}
} else {
// Reinsert B2-B4
if _, err := chain.InsertChain(blocks[1:]); err != nil {
t.Fatalf("Failed to import canonical chain tail: %v", err)
}
}
if head := chain.CurrentHeader(); head.Number.Uint64() != uint64(4) {
t.Errorf("Head header mismatch: have %d, want %d", head.Number, 4)
@ -2016,7 +2026,9 @@ func testIssue23496(t *testing.T, scheme string) {
if head := chain.CurrentBlock(); head.Number.Uint64() != uint64(4) {
t.Errorf("Head block mismatch: have %d, want %d", head.Number, uint64(4))
}
if layer := chain.Snapshots().Snapshot(blocks[2].Root()); layer == nil {
t.Error("Failed to regenerate the snapshot of known state")
if scheme == rawdb.HashScheme {
if layer := chain.Snapshots().Snapshot(blocks[2].Root()); layer == nil {
t.Error("Failed to regenerate the snapshot of known state")
}
}
}

View file

@ -2023,7 +2023,7 @@ func testSetHeadWithScheme(t *testing.T, tt *rewindTest, snapshots bool, scheme
}
if tt.commitBlock > 0 {
chain.triedb.Commit(canonblocks[tt.commitBlock-1].Root(), false)
if snapshots {
if snapshots && scheme == rawdb.HashScheme {
if err := chain.snaps.Cap(canonblocks[tt.commitBlock-1].Root(), 0); err != nil {
t.Fatalf("Failed to flatten snapshots: %v", err)
}

View file

@ -105,7 +105,7 @@ func (basic *snapshotTestBasic) prepare(t *testing.T) (*BlockChain, []*types.Blo
if basic.commitBlock > 0 && basic.commitBlock == point {
chain.TrieDB().Commit(blocks[point-1].Root(), false)
}
if basic.snapshotBlock > 0 && basic.snapshotBlock == point {
if basic.snapshotBlock > 0 && basic.snapshotBlock == point && basic.scheme == rawdb.HashScheme {
// Flushing the entire snap tree into the disk, the
// relevant (a) snapshot root and (b) snapshot generator
// will be persisted atomically.
@ -149,13 +149,17 @@ func (basic *snapshotTestBasic) verify(t *testing.T, chain *BlockChain, blocks [
block := chain.GetBlockByNumber(basic.expSnapshotBottom)
if block == nil {
t.Errorf("The corresponding block[%d] of snapshot disk layer is missing", basic.expSnapshotBottom)
} else if !bytes.Equal(chain.snaps.DiskRoot().Bytes(), block.Root().Bytes()) {
t.Errorf("The snapshot disk layer root is incorrect, want %x, get %x", block.Root(), chain.snaps.DiskRoot())
} else if basic.scheme == rawdb.HashScheme {
if !bytes.Equal(chain.snaps.DiskRoot().Bytes(), block.Root().Bytes()) {
t.Errorf("The snapshot disk layer root is incorrect, want %x, get %x", block.Root(), chain.snaps.DiskRoot())
}
}
// Check the snapshot, ensure it's integrated
if err := chain.snaps.Verify(block.Root()); err != nil {
t.Errorf("The disk layer is not integrated %v", err)
if basic.scheme == rawdb.HashScheme {
if err := chain.snaps.Verify(block.Root()); err != nil {
t.Errorf("The disk layer is not integrated %v", err)
}
}
}
@ -565,12 +569,14 @@ func TestHighCommitCrashWithNewSnapshot(t *testing.T) {
//
// Expected head header : C8
// Expected head fast block: C8
// Expected head block : G
// Expected snapshot disk : C4
// Expected head block : G (Hash mode), C6 (Hash mode)
// Expected snapshot disk : C4 (Hash mode)
for _, scheme := range []string{rawdb.HashScheme, rawdb.PathScheme} {
expHead := uint64(0)
if scheme == rawdb.PathScheme {
expHead = uint64(4)
// The pathdb database makes sure that snapshot and trie are consistent,
// so only the last two blocks are reverted in case of a crash.
expHead = uint64(6)
}
test := &crashSnapshotTest{
snapshotTestBasic{

View file

@ -175,26 +175,27 @@ func NewDatabaseForTesting() *CachingDB {
func (db *CachingDB) Reader(stateRoot common.Hash) (Reader, error) {
var readers []StateReader
// Set up the state snapshot reader if available. This feature
// is optional and may be partially useful if it's not fully
// generated.
if db.snap != nil {
// If standalone state snapshot is available (hash scheme),
// then construct the legacy snap reader.
// Configure the state reader using the standalone snapshot in hash mode.
// This reader offers improved performance but is optional and only
// partially useful if the snapshot is not fully generated.
if db.TrieDB().Scheme() == rawdb.HashScheme && db.snap != nil {
snap := db.snap.Snapshot(stateRoot)
if snap != nil {
readers = append(readers, newFlatReader(snap))
}
} else {
// If standalone state snapshot is not available, try to construct
// the state reader with database.
}
// Configure the state reader using the path database in path mode.
// This reader offers improved performance but is optional and only
// partially useful if the snapshot data in path database is not
// fully generated.
if db.TrieDB().Scheme() == rawdb.PathScheme {
reader, err := db.triedb.StateReader(stateRoot)
if err == nil {
readers = append(readers, newFlatReader(reader)) // state reader is optional
readers = append(readers, newFlatReader(reader))
}
}
// Set up the trie reader, which is expected to always be available
// as the gatekeeper unless the state is corrupted.
// Configure the trie reader, which is expected to be available as the
// gatekeeper unless the state is corrupted.
tr, err := newTrieReader(stateRoot, db.triedb, db.pointCache)
if err != nil {
return nil, err

View file

@ -166,7 +166,9 @@ func newHelper(scheme string) *testHelper {
diskdb := rawdb.NewMemoryDatabase()
config := &triedb.Config{}
if scheme == rawdb.PathScheme {
config.PathDB = &pathdb.Config{} // disable caching
config.PathDB = &pathdb.Config{
SnapshotNoBuild: true,
} // disable caching
} else {
config.HashDB = &hashdb.Config{} // disable caching
}

View file

@ -979,7 +979,8 @@ func testMissingTrieNodes(t *testing.T, scheme string) {
)
if scheme == rawdb.PathScheme {
tdb = triedb.NewDatabase(memDb, &triedb.Config{PathDB: &pathdb.Config{
CleanCacheSize: 0,
TrieCleanSize: 0,
StateCleanSize: 0,
WriteBufferSize: 0,
}}) // disable caching
} else {

View file

@ -27,6 +27,7 @@ import (
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core"
"github.com/ethereum/go-ethereum/core/forkid"
"github.com/ethereum/go-ethereum/core/rawdb"
"github.com/ethereum/go-ethereum/core/txpool"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/crypto"
@ -183,7 +184,7 @@ func newHandler(config *handlerConfig) (*handler, error) {
}
}
// If snap sync is requested but snapshots are disabled, fail loudly
if h.snapSync.Load() && config.Chain.Snapshots() == nil {
if h.snapSync.Load() && (config.Chain.Snapshots() == nil && config.Chain.TrieDB().Scheme() == rawdb.HashScheme) {
return nil, errors.New("snap sync not supported with snapshots disabled")
}
// Construct the downloader (long sync)

View file

@ -23,6 +23,8 @@ import (
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core"
"github.com/ethereum/go-ethereum/core/rawdb"
"github.com/ethereum/go-ethereum/core/state/snapshot"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/metrics"
@ -31,6 +33,7 @@ import (
"github.com/ethereum/go-ethereum/p2p/enr"
"github.com/ethereum/go-ethereum/trie"
"github.com/ethereum/go-ethereum/trie/trienode"
"github.com/ethereum/go-ethereum/triedb/database"
)
const (
@ -279,7 +282,16 @@ func ServiceGetAccountRangeQuery(chain *core.BlockChain, req *GetAccountRangePac
if err != nil {
return nil, nil
}
it, err := chain.Snapshots().AccountIterator(req.Root, req.Origin)
// Temporary solution: using the snapshot interface for both cases.
// This can be removed once the hash scheme is deprecated.
var it snapshot.AccountIterator
if chain.TrieDB().Scheme() == rawdb.HashScheme {
// The snapshot is assumed to be available in hash mode if
// the SNAP protocol is enabled.
it, err = chain.Snapshots().AccountIterator(req.Root, req.Origin)
} else {
it, err = chain.TrieDB().AccountIterator(req.Root, req.Origin)
}
if err != nil {
return nil, nil
}
@ -359,7 +371,19 @@ func ServiceGetStorageRangesQuery(chain *core.BlockChain, req *GetStorageRangesP
limit, req.Limit = common.BytesToHash(req.Limit), nil
}
// Retrieve the requested state and bail out if non existent
it, err := chain.Snapshots().StorageIterator(req.Root, account, origin)
var (
err error
it snapshot.StorageIterator
)
// Temporary solution: using the snapshot interface for both cases.
// This can be removed once the hash scheme is deprecated.
if chain.TrieDB().Scheme() == rawdb.HashScheme {
// The snapshot is assumed to be available in hash mode if
// the SNAP protocol is enabled.
it, err = chain.Snapshots().StorageIterator(req.Root, account, origin)
} else {
it, err = chain.TrieDB().StorageIterator(req.Root, account, origin)
}
if err != nil {
return nil, nil
}
@ -479,8 +503,15 @@ func ServiceGetTrieNodesQuery(chain *core.BlockChain, req *GetTrieNodesPacket, s
// We don't have the requested state available, bail out
return nil, nil
}
// The 'snap' might be nil, in which case we cannot serve storage slots.
snap := chain.Snapshots().Snapshot(req.Root)
// The 'reader' might be nil, in which case we cannot serve storage slots
// via snapshot.
var reader database.StateReader
if chain.Snapshots() != nil {
reader = chain.Snapshots().Snapshot(req.Root)
}
if reader == nil {
reader, _ = triedb.StateReader(req.Root)
}
// Retrieve trie nodes until the packet size limit is reached
var (
nodes [][]byte
@ -505,8 +536,9 @@ func ServiceGetTrieNodesQuery(chain *core.BlockChain, req *GetTrieNodesPacket, s
default:
var stRoot common.Hash
// Storage slots requested, open the storage trie and retrieve from there
if snap == nil {
if reader == nil {
// We don't have the requested state snapshotted yet (or it is stale),
// but can look up the account via the trie instead.
account, err := accTrie.GetAccountByHash(common.BytesToHash(pathset[0]))
@ -516,7 +548,7 @@ func ServiceGetTrieNodesQuery(chain *core.BlockChain, req *GetTrieNodesPacket, s
}
stRoot = account.Root
} else {
account, err := snap.Account(common.BytesToHash(pathset[0]))
account, err := reader.Account(common.BytesToHash(pathset[0]))
loads++ // always account database reads, even for failures
if err != nil || account == nil {
break

View file

@ -1962,5 +1962,5 @@ func newDbConfig(scheme string) *triedb.Config {
if scheme == rawdb.HashScheme {
return &triedb.Config{}
}
return &triedb.Config{PathDB: pathdb.Defaults}
return &triedb.Config{PathDB: &pathdb.Config{SnapshotNoBuild: true}}
}

View file

@ -187,8 +187,10 @@ func (t *BlockTest) Run(snapshotter bool, scheme string, witness bool, tracer *t
}
// Cross-check the snapshot-to-hash against the trie hash
if snapshotter {
if err := chain.Snapshots().Verify(chain.CurrentBlock().Root); err != nil {
return err
if chain.Snapshots() != nil {
if err := chain.Snapshots().Verify(chain.CurrentBlock().Root); err != nil {
return err
}
}
}
return t.validateImportedHeaders(chain, validBlocks)

View file

@ -25,7 +25,7 @@ import (
"github.com/ethereum/go-ethereum/triedb/database"
)
// testReader implements database.Reader interface, providing function to
// testReader implements database.NodeReader interface, providing function to
// access trie nodes.
type testReader struct {
db ethdb.Database
@ -33,7 +33,7 @@ type testReader struct {
nodes []*trienode.MergedNodeSet // sorted from new to old
}
// Node implements database.Reader interface, retrieving trie node with
// Node implements database.NodeReader interface, retrieving trie node with
// all available cached layers.
func (r *testReader) Node(owner common.Hash, path []byte, hash common.Hash) ([]byte, error) {
// Check the node presence with the cached layer, from latest to oldest.
@ -54,7 +54,7 @@ func (r *testReader) Node(owner common.Hash, path []byte, hash common.Hash) ([]b
return rawdb.ReadTrieNode(r.db, owner, path, hash, r.scheme), nil
}
// testDb implements database.Database interface, using for testing purpose.
// testDb implements database.NodeDatabase interface, using for testing purpose.
type testDb struct {
disk ethdb.Database
root common.Hash

View file

@ -312,6 +312,26 @@ func (db *Database) Journal(root common.Hash) error {
return pdb.Journal(root)
}
// AccountIterator creates a new account iterator for the specified root hash and
// seeks to a starting account hash.
func (db *Database) AccountIterator(root common.Hash, seek common.Hash) (pathdb.AccountIterator, error) {
pdb, ok := db.backend.(*pathdb.Database)
if !ok {
return nil, errors.New("not supported")
}
return pdb.AccountIterator(root, seek)
}
// StorageIterator creates a new storage iterator for the specified root hash and
// account. The iterator will be move to the specific start position.
func (db *Database) StorageIterator(root common.Hash, account common.Hash, seek common.Hash) (pathdb.StorageIterator, error) {
pdb, ok := db.backend.(*pathdb.Database)
if !ok {
return nil, errors.New("not supported")
}
return pdb.StorageIterator(root, account, seek)
}
// IsVerkle returns the indicator if the database is holding a verkle tree.
func (db *Database) IsVerkle() bool {
return db.config.IsVerkle

View file

@ -124,7 +124,7 @@ func (b *buffer) size() uint64 {
// flush persists the in-memory dirty trie node into the disk if the configured
// memory threshold is reached. Note, all data must be written atomically.
func (b *buffer) flush(db ethdb.KeyValueStore, freezer ethdb.AncientWriter, nodesCache *fastcache.Cache, id uint64) error {
func (b *buffer) flush(root common.Hash, db ethdb.KeyValueStore, freezer ethdb.AncientWriter, progress []byte, nodesCache, statesCache *fastcache.Cache, id uint64) error {
// Ensure the target state id is aligned with the internal counter.
head := rawdb.ReadPersistentStateID(db)
if head+b.layers != id {
@ -133,7 +133,7 @@ func (b *buffer) flush(db ethdb.KeyValueStore, freezer ethdb.AncientWriter, node
// Terminate the state snapshot generation if it's active
var (
start = time.Now()
batch = db.NewBatchWithSize(b.nodes.dbsize() * 11 / 10) // extra 10% for potential pebble internal stuff
batch = db.NewBatchWithSize((b.nodes.dbsize() + b.states.dbsize()) * 11 / 10) // extra 10% for potential pebble internal stuff
)
// Explicitly sync the state freezer to ensure all written data is persisted to disk
// before updating the key-value store.
@ -146,7 +146,9 @@ func (b *buffer) flush(db ethdb.KeyValueStore, freezer ethdb.AncientWriter, node
}
}
nodes := b.nodes.write(batch, nodesCache)
accounts, slots := b.states.write(batch, progress, statesCache)
rawdb.WritePersistentStateID(batch, id)
rawdb.WriteSnapshotRoot(batch, root)
// Flush all mutations in a single batch
size := batch.ValueSize()
@ -155,8 +157,10 @@ func (b *buffer) flush(db ethdb.KeyValueStore, freezer ethdb.AncientWriter, node
}
commitBytesMeter.Mark(int64(size))
commitNodesMeter.Mark(int64(nodes))
commitAccountsMeter.Mark(int64(accounts))
commitStoragesMeter.Mark(int64(slots))
commitTimeTimer.UpdateSince(start)
b.reset()
log.Debug("Persisted buffer content", "nodes", nodes, "bytes", common.StorageSize(size), "elapsed", common.PrettyDuration(time.Since(start)))
log.Debug("Persisted buffer content", "nodes", nodes, "accounts", accounts, "slots", slots, "bytes", common.StorageSize(size), "elapsed", common.PrettyDuration(time.Since(start)))
return nil
}

246
triedb/pathdb/context.go Normal file
View file

@ -0,0 +1,246 @@
// Copyright 2024 The go-ethereum Authors
// This file is part of the go-ethereum library.
//
// The go-ethereum library is free software: you can redistribute it and/or modify
// it under the terms of the GNU Lesser General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// The go-ethereum library is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Lesser General Public License for more details.
//
// You should have received a copy of the GNU Lesser General Public License
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
package pathdb
import (
"bytes"
"encoding/binary"
"errors"
"math"
"time"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/rawdb"
"github.com/ethereum/go-ethereum/ethdb"
"github.com/ethereum/go-ethereum/ethdb/memorydb"
"github.com/ethereum/go-ethereum/log"
)
const (
snapAccount = "account" // Identifier of account snapshot generation
snapStorage = "storage" // Identifier of storage snapshot generation
)
// generatorStats is a collection of statistics gathered by the snapshot generator
// for logging purposes. This data structure is used throughout the entire
// lifecycle of the snapshot generation process and is shared across multiple
// generation cycles.
type generatorStats struct {
origin uint64 // Origin prefix where generation started
start time.Time // Timestamp when generation started
accounts uint64 // Number of accounts indexed(generated or recovered)
slots uint64 // Number of storage slots indexed(generated or recovered)
dangling uint64 // Number of dangling storage slots
storage common.StorageSize // Total account and storage slot size(generation or recovery)
}
// log creates a contextual log with the given message and the context pulled
// from the internally maintained statistics.
func (gs *generatorStats) log(msg string, root common.Hash, marker []byte) {
var ctx []interface{}
if root != (common.Hash{}) {
ctx = append(ctx, []interface{}{"root", root}...)
}
// Figure out whether we're after or within an account
switch len(marker) {
case common.HashLength:
ctx = append(ctx, []interface{}{"at", common.BytesToHash(marker)}...)
case 2 * common.HashLength:
ctx = append(ctx, []interface{}{
"in", common.BytesToHash(marker[:common.HashLength]),
"at", common.BytesToHash(marker[common.HashLength:]),
}...)
}
// Add the usual measurements
ctx = append(ctx, []interface{}{
"accounts", gs.accounts,
"slots", gs.slots,
"storage", gs.storage,
"dangling", gs.dangling,
"elapsed", common.PrettyDuration(time.Since(gs.start)),
}...)
// Calculate the estimated indexing time based on current stats
if len(marker) > 0 {
if done := binary.BigEndian.Uint64(marker[:8]) - gs.origin; done > 0 {
left := math.MaxUint64 - binary.BigEndian.Uint64(marker[:8])
speed := done/uint64(time.Since(gs.start)/time.Millisecond+1) + 1 // +1s to avoid division by zero
ctx = append(ctx, []interface{}{
"eta", common.PrettyDuration(time.Duration(left/speed) * time.Millisecond),
}...)
}
}
log.Info(msg, ctx...)
}
// generatorContext holds several global fields that are used throughout the
// current generation cycle. It must be recreated if the generation cycle is
// restarted.
type generatorContext struct {
root common.Hash // State root of the generation target
account *holdableIterator // Iterator of account snapshot data
storage *holdableIterator // Iterator of storage snapshot data
db ethdb.KeyValueStore // Key-value store containing the snapshot data
batch ethdb.Batch // Database batch for writing data atomically
logged time.Time // The timestamp when last generation progress was displayed
}
// newGeneratorContext initializes the context for generation.
func newGeneratorContext(root common.Hash, marker []byte, db ethdb.KeyValueStore) *generatorContext {
ctx := &generatorContext{
root: root,
db: db,
batch: db.NewBatch(),
logged: time.Now(),
}
accMarker, storageMarker := splitMarker(marker)
ctx.openIterator(snapAccount, accMarker)
ctx.openIterator(snapStorage, storageMarker)
return ctx
}
// openIterator constructs global account and storage snapshot iterators
// at the interrupted position. These iterators should be reopened from time
// to time to avoid blocking leveldb compaction for a long time.
func (ctx *generatorContext) openIterator(kind string, start []byte) {
if kind == snapAccount {
iter := ctx.db.NewIterator(rawdb.SnapshotAccountPrefix, start)
ctx.account = newHoldableIterator(rawdb.NewKeyLengthIterator(iter, 1+common.HashLength))
return
}
iter := ctx.db.NewIterator(rawdb.SnapshotStoragePrefix, start)
ctx.storage = newHoldableIterator(rawdb.NewKeyLengthIterator(iter, 1+2*common.HashLength))
}
// reopenIterator releases the specified snapshot iterator and re-open it
// in the next position. It's aimed for not blocking leveldb compaction.
func (ctx *generatorContext) reopenIterator(kind string) {
// Shift iterator one more step, so that we can reopen
// the iterator at the right position.
var iter = ctx.account
if kind == snapStorage {
iter = ctx.storage
}
hasNext := iter.Next()
if !hasNext {
// Iterator exhausted, release forever and create an already exhausted virtual iterator
iter.Release()
if kind == snapAccount {
ctx.account = newHoldableIterator(memorydb.New().NewIterator(nil, nil))
return
}
ctx.storage = newHoldableIterator(memorydb.New().NewIterator(nil, nil))
return
}
next := iter.Key()
iter.Release()
ctx.openIterator(kind, next[1:])
}
// close releases all the held resources.
func (ctx *generatorContext) close() {
ctx.account.Release()
ctx.storage.Release()
}
// iterator returns the corresponding iterator specified by the kind.
func (ctx *generatorContext) iterator(kind string) *holdableIterator {
if kind == snapAccount {
return ctx.account
}
return ctx.storage
}
// removeStorageBefore deletes all storage entries which are located before
// the specified account. When the iterator touches the storage entry which
// is located in or outside the given account, it stops and holds the current
// iterated element locally.
func (ctx *generatorContext) removeStorageBefore(account common.Hash) uint64 {
var (
count uint64
start = time.Now()
iter = ctx.storage
)
for iter.Next() {
key := iter.Key()
if bytes.Compare(key[1:1+common.HashLength], account.Bytes()) >= 0 {
iter.Hold()
break
}
count++
ctx.batch.Delete(key)
if ctx.batch.ValueSize() > ethdb.IdealBatchSize {
ctx.batch.Write()
ctx.batch.Reset()
}
}
storageCleanCounter.Inc(time.Since(start).Nanoseconds())
return count
}
// removeStorageAt deletes all storage entries which are located in the specified
// account. When the iterator touches the storage entry which is outside the given
// account, it stops and holds the current iterated element locally. An error will
// be returned if the initial position of iterator is not in the given account.
func (ctx *generatorContext) removeStorageAt(account common.Hash) error {
var (
count int64
start = time.Now()
iter = ctx.storage
)
for iter.Next() {
key := iter.Key()
cmp := bytes.Compare(key[1:1+common.HashLength], account.Bytes())
if cmp < 0 {
return errors.New("invalid iterator position")
}
if cmp > 0 {
iter.Hold()
break
}
count++
ctx.batch.Delete(key)
if ctx.batch.ValueSize() > ethdb.IdealBatchSize {
ctx.batch.Write()
ctx.batch.Reset()
}
}
wipedStorageMeter.Mark(count)
storageCleanCounter.Inc(time.Since(start).Nanoseconds())
return nil
}
// removeRemainingStorage deletes all storage entries which are located after
// the current iterator position.
func (ctx *generatorContext) removeRemainingStorage() uint64 {
var (
count uint64
start = time.Now()
iter = ctx.storage
)
for iter.Next() {
count++
ctx.batch.Delete(iter.Key())
if ctx.batch.ValueSize() > ethdb.IdealBatchSize {
ctx.batch.Write()
ctx.batch.Reset()
}
}
danglingStorageMeter.Mark(int64(count))
storageCleanCounter.Inc(time.Since(start).Nanoseconds())
return count
}

View file

@ -17,6 +17,7 @@
package pathdb
import (
"encoding/binary"
"errors"
"fmt"
"io"
@ -35,8 +36,11 @@ import (
)
const (
// defaultCleanSize is the default memory allowance of clean cache.
defaultCleanSize = 16 * 1024 * 1024
// defaultTrieCleanSize is the default memory allowance of clean trie cache.
defaultTrieCleanSize = 16 * 1024 * 1024
// defaultStateCleanSize is the default memory allowance of clean state cache.
defaultStateCleanSize = 16 * 1024 * 1024
// maxBufferSize is the maximum memory allowance of node buffer.
// Too large buffer will cause the system to pause for a long
@ -111,9 +115,11 @@ type layer interface {
// Config contains the settings for database.
type Config struct {
StateHistory uint64 // Number of recent blocks to maintain state history for
CleanCacheSize int // Maximum memory allowance (in bytes) for caching clean nodes
TrieCleanSize int // Maximum memory allowance (in bytes) for caching clean trie nodes
StateCleanSize int // Maximum memory allowance (in bytes) for caching clean state data
WriteBufferSize int // Maximum memory allowance (in bytes) for write buffer
ReadOnly bool // Flag whether the database is opened in read only mode.
ReadOnly bool // Flag whether the database is opened in read only mode
SnapshotNoBuild bool // Flag Whether the background generation is allowed
}
// sanitize checks the provided user configurations and changes anything that's
@ -133,7 +139,11 @@ func (c *Config) fields() []interface{} {
if c.ReadOnly {
list = append(list, "readonly", true)
}
list = append(list, "cache", common.StorageSize(c.CleanCacheSize))
if c.SnapshotNoBuild {
list = append(list, "snapshot", false)
}
list = append(list, "triecache", common.StorageSize(c.TrieCleanSize))
list = append(list, "statecache", common.StorageSize(c.StateCleanSize))
list = append(list, "buffer", common.StorageSize(c.WriteBufferSize))
list = append(list, "history", c.StateHistory)
return list
@ -142,7 +152,8 @@ func (c *Config) fields() []interface{} {
// Defaults contains default settings for Ethereum mainnet.
var Defaults = &Config{
StateHistory: params.FullImmutabilityThreshold,
CleanCacheSize: defaultCleanSize,
TrieCleanSize: defaultTrieCleanSize,
StateCleanSize: defaultStateCleanSize,
WriteBufferSize: defaultBufferSize,
}
@ -240,6 +251,13 @@ func New(diskdb ethdb.Database, config *Config, isVerkle bool) *Database {
log.Crit("Failed to disable database", "err", err) // impossible to happen
}
}
// Resolving the state snapshot generation progress from the database is
// mandatory. This ensures that uncovered flat states are not accessed,
// even if background generation is not allowed. If permitted, the generation
// might be scheduled.
if err := db.setStateGenerator(); err != nil {
log.Crit("Failed to setup the generator", "err", err)
}
fields := config.fields()
if db.isVerkle {
fields = append(fields, "verkle", true)
@ -297,6 +315,60 @@ func (db *Database) repairHistory() error {
return nil
}
// setStateGenerator loads the state generation progress marker and potentially
// resume the state generation if it's permitted.
func (db *Database) setStateGenerator() error {
// Load the state snapshot generation progress marker to prevent access
// to uncovered states.
generator, root, err := loadGenerator(db.diskdb, db.hasher)
if err != nil {
return err
}
if generator == nil {
// Initialize an empty generator to rebuild the state snapshot from scratch
generator = &journalGenerator{
Marker: []byte{},
}
}
// Short circuit if the whole state snapshot has already been fully generated.
// The generator will be left as nil in disk layer for representing the whole
// state snapshot is available for accessing.
if generator.Done {
return nil
}
var origin uint64
if len(generator.Marker) >= 8 {
origin = binary.BigEndian.Uint64(generator.Marker)
}
stats := &generatorStats{
origin: origin,
start: time.Now(),
accounts: generator.Accounts,
slots: generator.Slots,
storage: common.StorageSize(generator.Storage),
}
dl := db.tree.bottom()
// Disable the background snapshot building in these circumstances:
// - the database is opened in read only mode
// - the snapshot build is explicitly disabled
// - the database is opened in verkle tree mode
noBuild := db.readOnly || db.config.SnapshotNoBuild || db.isVerkle
// Construct the generator and link it to the disk layer, ensuring that the
// generation progress is resolved to prevent accessing uncovered states
// regardless of whether background state snapshot generation is allowed.
dl.setGenerator(newGenerator(db.diskdb, noBuild, generator.Marker, stats))
// Short circuit if the background generation is not permitted
if noBuild || db.waitSync {
return nil
}
stats.log("Starting snapshot generation", root, generator.Marker)
dl.generator.run(root)
return nil
}
// Update adds a new layer into the tree, if that can be linked to an existing
// old parent. It is disallowed to insert a disk layer (the origin of all). Apart
// from that this function will flatten the extra diff layers at bottom into disk
@ -359,8 +431,13 @@ func (db *Database) Disable() error {
}
db.waitSync = true
// Mark the disk layer as stale to prevent access to persistent state.
db.tree.bottom().markStale()
// Terminate the state generator if it's active and mark the disk layer
// as stale to prevent access to persistent state.
disk := db.tree.bottom()
if disk.generator != nil {
disk.generator.stop()
}
disk.markStale()
// Write the initial sync flag to persist it across restarts.
rawdb.WriteSnapSyncStatusFlag(db.diskdb, rawdb.StateSyncRunning)
@ -390,6 +467,7 @@ func (db *Database) Enable(root common.Hash) error {
// reset the persistent state id back to zero.
batch := db.diskdb.NewBatch()
rawdb.DeleteTrieJournal(batch)
rawdb.DeleteSnapshotRoot(batch)
rawdb.WritePersistentStateID(batch, 0)
if err := batch.Write(); err != nil {
return err
@ -403,13 +481,13 @@ func (db *Database) Enable(root common.Hash) error {
return err
}
}
// Re-construct a new disk layer backed by persistent state
// with **empty clean cache and node buffer**.
db.tree.reset(newDiskLayer(root, 0, db, nil, newBuffer(db.config.WriteBufferSize, nil, nil, 0)))
// Re-enable the database as the final step.
db.waitSync = false
rawdb.WriteSnapSyncStatusFlag(db.diskdb, rawdb.StateSyncFinished)
// Re-construct a new disk layer backed by persistent state
// and schedule the state snapshot generation if it's permitted.
db.tree.reset(generateSnapshot(db, root, db.isVerkle || db.config.SnapshotNoBuild))
log.Info("Rebuilt trie database", "root", root)
return nil
}
@ -514,8 +592,12 @@ func (db *Database) Close() error {
// following mutations.
db.readOnly = true
// Release the memory held by clean cache.
db.tree.bottom().resetCache()
// Terminate the background generation if it's active
disk := db.tree.bottom()
if disk.generator != nil {
disk.generator.stop()
}
disk.resetCache() // release the memory held by clean cache
// Close the attached state history freezer.
if db.freezer == nil {
@ -580,14 +662,42 @@ func (db *Database) HistoryRange() (uint64, uint64, error) {
return historyRange(db.freezer)
}
// waitGeneration waits until the background generation is finished. It assumes
// that the generation is permitted; otherwise, it will block indefinitely.
func (db *Database) waitGeneration() {
gen := db.tree.bottom().generator
if gen == nil || gen.completed() {
return
}
<-gen.done
}
// AccountIterator creates a new account iterator for the specified root hash and
// seeks to a starting account hash.
func (db *Database) AccountIterator(root common.Hash, seek common.Hash) (AccountIterator, error) {
db.lock.RLock()
wait := db.waitSync
db.lock.RUnlock()
if wait {
return nil, errDatabaseWaitSync
}
if gen := db.tree.bottom().generator; gen != nil && !gen.completed() {
return nil, errNotConstructed
}
return newFastAccountIterator(db, root, seek)
}
// StorageIterator creates a new storage iterator for the specified root hash and
// account. The iterator will be moved to the specific start position.
func (db *Database) StorageIterator(root common.Hash, account common.Hash, seek common.Hash) (StorageIterator, error) {
db.lock.RLock()
wait := db.waitSync
db.lock.RUnlock()
if wait {
return nil, errDatabaseWaitSync
}
if gen := db.tree.bottom().generator; gen != nil && !gen.completed() {
return nil, errNotConstructed
}
return newFastStorageIterator(db, root, account, seek)
}

View file

@ -126,7 +126,8 @@ func newTester(t *testing.T, historyLimit uint64, isVerkle bool, layers int) *te
disk, _ = rawdb.NewDatabaseWithFreezer(rawdb.NewMemoryDatabase(), t.TempDir(), "", false)
db = New(disk, &Config{
StateHistory: historyLimit,
CleanCacheSize: 256 * 1024,
TrieCleanSize: 256 * 1024,
StateCleanSize: 256 * 1024,
WriteBufferSize: 256 * 1024,
}, isVerkle)

View file

@ -17,9 +17,10 @@
package pathdb
import (
"errors"
"bytes"
"fmt"
"sync"
"time"
"github.com/VictoriaMetrics/fastcache"
"github.com/ethereum/go-ethereum/common"
@ -30,28 +31,42 @@ import (
// diskLayer is a low level persistent layer built on top of a key-value store.
type diskLayer struct {
root common.Hash // Immutable, root hash to which this layer was made for
id uint64 // Immutable, corresponding state id
db *Database // Path-based trie database
root common.Hash // Immutable, root hash to which this layer was made for
id uint64 // Immutable, corresponding state id
db *Database // Path-based trie database
// These two caches must be maintained separately, because the key
// for the root node of the storage trie (accountHash) is identical
// to the key for the account data.
nodes *fastcache.Cache // GC friendly memory cache of clean nodes
buffer *buffer // Dirty buffer to aggregate writes of nodes and states
stale bool // Signals that the layer became stale (state progressed)
lock sync.RWMutex // Lock used to protect stale flag
states *fastcache.Cache // GC friendly memory cache of clean states
buffer *buffer // Dirty buffer to aggregate writes of nodes and states
stale bool // Signals that the layer became stale (state progressed)
lock sync.RWMutex // Lock used to protect stale flag and genMarker
// The generator is set if the state snapshot was not fully completed,
// regardless of whether the background generation is running or not.
generator *generator
}
// newDiskLayer creates a new disk layer based on the passing arguments.
func newDiskLayer(root common.Hash, id uint64, db *Database, nodes *fastcache.Cache, buffer *buffer) *diskLayer {
// Initialize a clean cache if the memory allowance is not zero
// or reuse the provided cache if it is not nil (inherited from
func newDiskLayer(root common.Hash, id uint64, db *Database, nodes *fastcache.Cache, states *fastcache.Cache, buffer *buffer) *diskLayer {
// Initialize the clean caches if the memory allowance is not zero
// or reuse the provided caches if they are not nil (inherited from
// the original disk layer).
if nodes == nil && db.config.CleanCacheSize != 0 {
nodes = fastcache.New(db.config.CleanCacheSize)
if nodes == nil && db.config.TrieCleanSize != 0 {
nodes = fastcache.New(db.config.TrieCleanSize)
}
if states == nil && db.config.StateCleanSize != 0 {
states = fastcache.New(db.config.StateCleanSize)
}
return &diskLayer{
root: root,
id: id,
db: db,
nodes: nodes,
states: states,
buffer: buffer,
}
}
@ -72,6 +87,13 @@ func (dl *diskLayer) parentLayer() layer {
return nil
}
// setGenerator links the given generator to disk layer, representing the
// associated state snapshot is not fully completed yet and the generation
// is potentially running in the background.
func (dl *diskLayer) setGenerator(generator *generator) {
dl.generator = generator
}
// isStale return whether this layer has become stale (was flattened across) or if
// it's still live.
func (dl *diskLayer) isStale() bool {
@ -168,8 +190,41 @@ func (dl *diskLayer) account(hash common.Hash, depth int) ([]byte, error) {
}
dirtyStateMissMeter.Mark(1)
// TODO(rjl493456442) support persistent state retrieval
return nil, errors.New("not supported")
// If the layer is being generated, ensure the requested account has
// already been covered by the generator.
marker := dl.genMarker()
if marker != nil && bytes.Compare(hash.Bytes(), marker) > 0 {
return nil, errNotCoveredYet
}
// Try to retrieve the account from the memory cache
if dl.states != nil {
if blob, found := dl.states.HasGet(nil, hash[:]); found {
cleanStateHitMeter.Mark(1)
cleanStateReadMeter.Mark(int64(len(blob)))
if len(blob) == 0 {
stateAccountInexMeter.Mark(1)
} else {
stateAccountExistMeter.Mark(1)
}
return blob, nil
}
cleanStateMissMeter.Mark(1)
}
// Try to retrieve the account from the disk.
blob = rawdb.ReadAccountSnapshot(dl.db.diskdb, hash)
if dl.states != nil {
dl.states.Set(hash[:], blob)
cleanStateWriteMeter.Mark(int64(len(blob)))
}
if len(blob) == 0 {
stateAccountInexMeter.Mark(1)
stateAccountInexDiskMeter.Mark(1)
} else {
stateAccountExistMeter.Mark(1)
stateAccountExistDiskMeter.Mark(1)
}
return blob, nil
}
// storage directly retrieves the storage data associated with a particular hash,
@ -203,8 +258,42 @@ func (dl *diskLayer) storage(accountHash, storageHash common.Hash, depth int) ([
}
dirtyStateMissMeter.Mark(1)
// TODO(rjl493456442) support persistent state retrieval
return nil, errors.New("not supported")
// If the layer is being generated, ensure the requested storage slot
// has already been covered by the generator.
key := append(accountHash[:], storageHash[:]...)
marker := dl.genMarker()
if marker != nil && bytes.Compare(key, marker) > 0 {
return nil, errNotCoveredYet
}
// Try to retrieve the storage slot from the memory cache
if dl.states != nil {
if blob, found := dl.states.HasGet(nil, key); found {
cleanStateHitMeter.Mark(1)
cleanStateReadMeter.Mark(int64(len(blob)))
if len(blob) == 0 {
stateStorageInexMeter.Mark(1)
} else {
stateStorageExistMeter.Mark(1)
}
return blob, nil
}
cleanStateMissMeter.Mark(1)
}
// Try to retrieve the account from the disk
blob := rawdb.ReadStorageSnapshot(dl.db.diskdb, accountHash, storageHash)
if dl.states != nil {
dl.states.Set(key, blob)
cleanStateWriteMeter.Mark(int64(len(blob)))
}
if len(blob) == 0 {
stateStorageInexMeter.Mark(1)
stateStorageInexDiskMeter.Mark(1)
} else {
stateStorageExistMeter.Mark(1)
stateStorageExistDiskMeter.Mark(1)
}
return blob, nil
}
// update implements the layer interface, returning a new diff layer on top
@ -267,13 +356,39 @@ func (dl *diskLayer) commit(bottom *diffLayer, force bool) (*diskLayer, error) {
// Merge the trie nodes and flat states of the bottom-most diff layer into the
// buffer as the combined layer.
combined := dl.buffer.commit(bottom.nodes, bottom.states.stateSet)
// Terminate the background state snapshot generation before mutating the
// persistent state.
if combined.full() || force {
if err := combined.flush(dl.db.diskdb, dl.db.freezer, dl.nodes, bottom.stateID()); err != nil {
// Terminate the background state snapshot generator before flushing
// to prevent data race.
var progress []byte
if dl.generator != nil {
dl.generator.stop()
progress = dl.generator.progressMarker()
// If the snapshot has been fully generated, unset the generator
if progress == nil {
dl.setGenerator(nil)
} else {
log.Info("Paused snapshot generation")
}
}
// Flush the content in combined buffer. Any state data after the progress
// marker will be ignored, as the generator will pick it up later.
if err := combined.flush(bottom.root, dl.db.diskdb, dl.db.freezer, progress, dl.nodes, dl.states, bottom.stateID()); err != nil {
return nil, err
}
// Resume the background generation if it's not completed yet
if progress != nil {
dl.generator.run(bottom.root)
}
}
// Link the generator if snapshot is not yet completed
ndl := newDiskLayer(bottom.root, bottom.stateID(), dl.db, dl.nodes, dl.states, combined)
if dl.generator != nil {
ndl.setGenerator(dl.generator)
}
ndl := newDiskLayer(bottom.root, bottom.stateID(), dl.db, dl.nodes, combined)
// To remove outdated history objects from the end, we set the 'tail' parameter
// to 'oldest-1' due to the offset between the freezer index and the history ID.
if overflow {
@ -288,6 +403,7 @@ func (dl *diskLayer) commit(bottom *diffLayer, force bool) (*diskLayer, error) {
// revert applies the given state history and return a reverted disk layer.
func (dl *diskLayer) revert(h *history) (*diskLayer, error) {
start := time.Now()
if h.meta.root != dl.rootHash() {
return nil, errUnexpectedHistory
}
@ -321,15 +437,40 @@ func (dl *diskLayer) revert(h *history) (*diskLayer, error) {
if err != nil {
return nil, err
}
} else {
batch := dl.db.diskdb.NewBatch()
writeNodes(batch, nodes, dl.nodes)
rawdb.WritePersistentStateID(batch, dl.id-1)
if err := batch.Write(); err != nil {
log.Crit("Failed to write states", "err", err)
ndl := newDiskLayer(h.meta.parent, dl.id-1, dl.db, dl.nodes, dl.states, dl.buffer)
// Link the generator if it exists
if dl.generator != nil {
ndl.setGenerator(dl.generator)
}
log.Debug("Reverted data in write buffer", "oldroot", h.meta.root, "newroot", h.meta.parent, "elapsed", common.PrettyDuration(time.Since(start)))
return ndl, nil
}
return newDiskLayer(h.meta.parent, dl.id-1, dl.db, dl.nodes, dl.buffer), nil
// Terminate the generation before writing any data into database
var progress []byte
if dl.generator != nil {
dl.generator.stop()
progress = dl.generator.progressMarker()
}
batch := dl.db.diskdb.NewBatch()
writeNodes(batch, nodes, dl.nodes)
// Provide the original values of modified accounts and storages for revert
writeStates(batch, progress, accounts, storages, dl.states)
rawdb.WritePersistentStateID(batch, dl.id-1)
rawdb.WriteSnapshotRoot(batch, h.meta.parent)
if err := batch.Write(); err != nil {
log.Crit("Failed to write states", "err", err)
}
// Link the generator and resume generation if the snapshot is not yet
// fully completed.
ndl := newDiskLayer(h.meta.parent, dl.id-1, dl.db, dl.nodes, dl.states, dl.buffer)
if dl.generator != nil && !dl.generator.completed() {
ndl.generator = dl.generator
ndl.generator.run(h.meta.parent)
}
log.Debug("Reverted data in persistent state", "oldroot", h.meta.root, "newroot", h.meta.parent, "elapsed", common.PrettyDuration(time.Since(start)))
return ndl, nil
}
// size returns the approximate size of cached nodes in the disk layer.
@ -355,4 +496,16 @@ func (dl *diskLayer) resetCache() {
if dl.nodes != nil {
dl.nodes.Reset()
}
if dl.states != nil {
dl.states.Reset()
}
}
// genMarker returns the current state snapshot generation progress marker. If
// the state snapshot has already been fully generated, nil is returned.
func (dl *diskLayer) genMarker() []byte {
if dl.generator == nil {
return nil
}
return dl.generator.progressMarker()
}

View file

@ -39,4 +39,13 @@ var (
// errStateUnrecoverable is returned if state is required to be reverted to
// a destination without associated state history available.
errStateUnrecoverable = errors.New("state is unrecoverable")
// errNotCoveredYet is returned from data accessors if the underlying snapshot
// is being generated currently and the requested data item is not yet in the
// range of accounts covered.
errNotCoveredYet = errors.New("not covered yet")
// errNotConstructed is returned if the callers want to iterate the snapshot
// while the generation is not finished yet.
errNotConstructed = errors.New("snapshot is not constructed")
)

View file

@ -17,6 +17,8 @@
package pathdb
import (
"bytes"
"github.com/VictoriaMetrics/fastcache"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/rawdb"
@ -63,3 +65,69 @@ func writeNodes(batch ethdb.Batch, nodes map[common.Hash]map[string]*trienode.No
}
return total
}
// writeStates flushes state mutations into the provided database batch as a whole.
//
// This function assumes the background generator is already terminated and states
// before the supplied marker has been correctly generated.
//
// TODO(rjl493456442) do we really need this generation marker? The state updates
// after the marker can also be written and will be fixed by generator later if
// it's outdated.
func writeStates(batch ethdb.Batch, genMarker []byte, accountData map[common.Hash][]byte, storageData map[common.Hash]map[common.Hash][]byte, clean *fastcache.Cache) (int, int) {
var (
accounts int
slots int
)
for addrHash, blob := range accountData {
// Skip any account not yet covered by the snapshot. The account
// at the generation marker position (addrHash == genMarker[:common.HashLength])
// should still be updated, as it would be skipped in the next
// generation cycle.
if genMarker != nil && bytes.Compare(addrHash[:], genMarker) > 0 {
continue
}
accounts += 1
if len(blob) == 0 {
rawdb.DeleteAccountSnapshot(batch, addrHash)
if clean != nil {
clean.Set(addrHash[:], nil)
}
} else {
rawdb.WriteAccountSnapshot(batch, addrHash, blob)
if clean != nil {
clean.Set(addrHash[:], blob)
}
}
}
for addrHash, storages := range storageData {
// Skip any account not covered yet by the snapshot
if genMarker != nil && bytes.Compare(addrHash[:], genMarker) > 0 {
continue
}
midAccount := genMarker != nil && bytes.Equal(addrHash[:], genMarker[:common.HashLength])
for storageHash, blob := range storages {
// Skip any storage slot not yet covered by the snapshot. The storage slot
// at the generation marker position (addrHash == genMarker[:common.HashLength]
// and storageHash == genMarker[common.HashLength:]) should still be updated,
// as it would be skipped in the next generation cycle.
if midAccount && bytes.Compare(storageHash[:], genMarker[common.HashLength:]) > 0 {
continue
}
slots += 1
if len(blob) == 0 {
rawdb.DeleteStorageSnapshot(batch, addrHash, storageHash)
if clean != nil {
clean.Set(append(addrHash[:], storageHash[:]...), nil)
}
} else {
rawdb.WriteStorageSnapshot(batch, addrHash, storageHash, blob)
if clean != nil {
clean.Set(append(addrHash[:], storageHash[:]...), blob)
}
}
}
}
return accounts, slots
}

856
triedb/pathdb/generate.go Normal file
View file

@ -0,0 +1,856 @@
// Copyright 2019 The go-ethereum Authors
// This file is part of the go-ethereum library.
//
// The go-ethereum library is free software: you can redistribute it and/or modify
// it under the terms of the GNU Lesser General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// The go-ethereum library is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Lesser General Public License for more details.
//
// You should have received a copy of the GNU Lesser General Public License
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
package pathdb
import (
"bytes"
"errors"
"fmt"
"sync"
"time"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/common/hexutil"
"github.com/ethereum/go-ethereum/core/rawdb"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/crypto"
"github.com/ethereum/go-ethereum/ethdb"
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/rlp"
"github.com/ethereum/go-ethereum/trie"
"github.com/ethereum/go-ethereum/triedb/database"
)
var (
// accountCheckRange is the upper limit of the number of accounts involved in
// each range check. This is a value estimated based on experience. If this
// range is too large, the failure rate of range proof will increase. Otherwise,
// if the range is too small, the efficiency of the state recovery will decrease.
accountCheckRange = 128
// storageCheckRange is the upper limit of the number of storage slots involved
// in each range check. This is a value estimated based on experience. If this
// range is too large, the failure rate of range proof will increase. Otherwise,
// if the range is too small, the efficiency of the state recovery will decrease.
storageCheckRange = 1024
// errMissingTrie is returned if the target trie is missing while the generation
// is running. In this case the generation is aborted and wait the new signal.
errMissingTrie = errors.New("missing trie")
)
// diskReader is a wrapper of key-value store and implements database.NodeReader,
// providing a function for accessing persistent trie nodes in the disk
type diskReader struct{ db ethdb.KeyValueStore }
// Node retrieves the trie node blob with the provided trie identifier,
// node path and the corresponding node hash. No error will be returned
// if the node is not found.
func (r *diskReader) Node(owner common.Hash, path []byte, hash common.Hash) ([]byte, error) {
if owner == (common.Hash{}) {
return rawdb.ReadAccountTrieNode(r.db, path), nil
}
return rawdb.ReadStorageTrieNode(r.db, owner, path), nil
}
// diskStore is a wrapper of key-value store and implements database.NodeDatabase.
// It's meant to be used for generating state snapshot from the trie data.
type diskStore struct {
db ethdb.KeyValueStore
}
// NodeReader returns a node reader associated with the specific state.
// An error will be returned if the specified state is not available.
func (s *diskStore) NodeReader(stateRoot common.Hash) (database.NodeReader, error) {
root := types.EmptyRootHash
if blob := rawdb.ReadAccountTrieNode(s.db, nil); len(blob) > 0 {
root = crypto.Keccak256Hash(blob)
}
if root != stateRoot {
return nil, fmt.Errorf("state %x is not available", stateRoot)
}
return &diskReader{s.db}, nil
}
// Generator is the struct for initial state snapshot generation. It is not thread-safe;
// the caller must manage concurrency issues themselves.
type generator struct {
noBuild bool // Flag indicating whether snapshot generation is permitted
running bool // Flag indicating whether the background generation is running
db ethdb.KeyValueStore // Key-value store containing the snapshot data
stats *generatorStats // Generation statistics used throughout the entire life cycle
abort chan chan struct{} // Notification channel to abort generating the snapshot in this layer
done chan struct{} // Notification channel when generation is done
progress []byte // Progress marker of the state generation, nil means it's completed
lock sync.RWMutex // Lock which protects the progress, only generator can mutate the progress
}
// newGenerator constructs the state snapshot generator.
//
// noBuild will be true if the background snapshot generation is not allowed,
// usually used in read-only mode.
//
// progress indicates the starting position for resuming snapshot generation.
// It must be provided even if generation is not allowed; otherwise, uncovered
// states may be exposed for serving.
func newGenerator(db ethdb.KeyValueStore, noBuild bool, progress []byte, stats *generatorStats) *generator {
if stats == nil {
stats = &generatorStats{start: time.Now()}
}
return &generator{
noBuild: noBuild,
progress: progress,
db: db,
stats: stats,
abort: make(chan chan struct{}),
done: make(chan struct{}),
}
}
// run starts the state snapshot generation in the background.
func (g *generator) run(root common.Hash) {
if g.noBuild {
log.Warn("Snapshot generation is not permitted")
return
}
if g.running {
g.stop()
log.Warn("Paused the leftover generation cycle")
}
g.running = true
go g.generate(newGeneratorContext(root, g.progress, g.db))
}
// stop terminates the background generation if it's actively running.
// The Recent generation progress being made will be saved before returning.
func (g *generator) stop() {
if !g.running {
log.Debug("Snapshot generation is not running")
return
}
ch := make(chan struct{})
g.abort <- ch
<-ch
g.running = false
}
// completed returns the flag indicating if the whole generation is done.
func (g *generator) completed() bool {
progress := g.progressMarker()
return progress == nil
}
// progressMarker returns the current generation progress marker. It may slightly
// lag behind the actual generation position, as the progress field is only updated
// when checkAndFlush is called. The only effect is that some generated states
// may be refused for serving.
func (g *generator) progressMarker() []byte {
g.lock.RLock()
defer g.lock.RUnlock()
return g.progress
}
// splitMarker is an internal helper which splits the generation progress marker
// into two parts.
func splitMarker(marker []byte) ([]byte, []byte) {
var accMarker []byte
if len(marker) > 0 {
accMarker = marker[:common.HashLength]
}
return accMarker, marker
}
// generateSnapshot regenerates a brand-new snapshot based on an existing state
// database and head block asynchronously. The snapshot is returned immediately
// and generation is continued in the background until done.
func generateSnapshot(triedb *Database, root common.Hash, noBuild bool) *diskLayer {
// Create a new disk layer with an initialized state marker at zero
var (
stats = &generatorStats{start: time.Now()}
genMarker = []byte{} // Initialized but empty!
)
dl := newDiskLayer(root, 0, triedb, nil, nil, newBuffer(triedb.config.WriteBufferSize, nil, nil, 0))
dl.setGenerator(newGenerator(triedb.diskdb, noBuild, genMarker, stats))
if !noBuild {
dl.generator.run(root)
log.Info("Started snapshot generation", "root", root)
}
return dl
}
// journalProgress persists the generator stats into the database to resume later.
func journalProgress(db ethdb.KeyValueWriter, marker []byte, stats *generatorStats) {
// Write out the generator marker. Note it's a standalone disk layer generator
// which is not mixed with journal. It's ok if the generator is persisted while
// journal is not.
entry := journalGenerator{
Done: marker == nil,
Marker: marker,
}
if stats != nil {
entry.Accounts = stats.accounts
entry.Slots = stats.slots
entry.Storage = uint64(stats.storage)
}
blob, err := rlp.EncodeToBytes(entry)
if err != nil {
panic(err) // Cannot happen, here to catch dev errors
}
var logstr string
switch {
case marker == nil:
logstr = "done"
case bytes.Equal(marker, []byte{}):
logstr = "empty"
case len(marker) == common.HashLength:
logstr = fmt.Sprintf("%#x", marker)
default:
logstr = fmt.Sprintf("%#x:%#x", marker[:common.HashLength], marker[common.HashLength:])
}
log.Debug("Journalled generator progress", "progress", logstr)
rawdb.WriteSnapshotGenerator(db, blob)
}
// proofResult contains the output of range proving which can be used
// for further processing regardless if it is successful or not.
type proofResult struct {
keys [][]byte // The key set of all elements being iterated, even proving is failed
vals [][]byte // The val set of all elements being iterated, even proving is failed
diskMore bool // Set when the database has extra snapshot states since last iteration
trieMore bool // Set when the trie has extra snapshot states(only meaningful for successful proving)
proofErr error // Indicator whether the given state range is valid or not
tr *trie.Trie // The trie, in case the trie was resolved by the prover (may be nil)
}
// valid returns the indicator that range proof is successful or not.
func (result *proofResult) valid() bool {
return result.proofErr == nil
}
// last returns the last verified element key regardless of whether the range proof is
// successful or not. Nil is returned if nothing involved in the proving.
func (result *proofResult) last() []byte {
var last []byte
if len(result.keys) > 0 {
last = result.keys[len(result.keys)-1]
}
return last
}
// forEach iterates all the visited elements and applies the given callback on them.
// The iteration is aborted if the callback returns non-nil error.
func (result *proofResult) forEach(callback func(key []byte, val []byte) error) error {
for i := 0; i < len(result.keys); i++ {
key, val := result.keys[i], result.vals[i]
if err := callback(key, val); err != nil {
return err
}
}
return nil
}
// proveRange proves the snapshot segment with particular prefix is "valid".
// The iteration start point will be assigned if the iterator is restored from
// the last interruption. Max will be assigned in order to limit the maximum
// amount of data involved in each iteration.
//
// The proof result will be returned if the range proving is finished, otherwise
// the error will be returned to abort the entire procedure.
func (g *generator) proveRange(ctx *generatorContext, trieId *trie.ID, prefix []byte, kind string, origin []byte, max int, valueConvertFn func([]byte) ([]byte, error)) (*proofResult, error) {
var (
keys [][]byte
vals [][]byte
proof = rawdb.NewMemoryDatabase()
diskMore = false
iter = ctx.iterator(kind)
start = time.Now()
min = append(prefix, origin...)
)
for iter.Next() {
// Ensure the iterated item is always equal or larger than the given origin.
key := iter.Key()
if bytes.Compare(key, min) < 0 {
return nil, errors.New("invalid iteration position")
}
// Ensure the iterated item still fall in the specified prefix. If
// not which means the items in the specified area are all visited.
// Move the iterator a step back since we iterate one extra element
// out.
if !bytes.Equal(key[:len(prefix)], prefix) {
iter.Hold()
break
}
// Break if we've reached the max size, and signal that we're not
// done yet. Move the iterator a step back since we iterate one
// extra element out.
if len(keys) == max {
iter.Hold()
diskMore = true
break
}
keys = append(keys, common.CopyBytes(key[len(prefix):]))
if valueConvertFn == nil {
vals = append(vals, common.CopyBytes(iter.Value()))
} else {
val, err := valueConvertFn(iter.Value())
if err != nil {
// Special case, the state data is corrupted (invalid slim-format account),
// don't abort the entire procedure directly. Instead, let the fallback
// generation to heal the invalid data.
//
// Here append the original value to ensure that the number of key and
// value are aligned.
vals = append(vals, common.CopyBytes(iter.Value()))
log.Error("Failed to convert account state data", "err", err)
} else {
vals = append(vals, val)
}
}
}
// Update metrics for database iteration and merkle proving
if kind == snapStorage {
storageSnapReadCounter.Inc(time.Since(start).Nanoseconds())
} else {
accountSnapReadCounter.Inc(time.Since(start).Nanoseconds())
}
defer func(start time.Time) {
if kind == snapStorage {
storageProveCounter.Inc(time.Since(start).Nanoseconds())
} else {
accountProveCounter.Inc(time.Since(start).Nanoseconds())
}
}(time.Now())
// The snap state is exhausted, pass the entire key/val set for verification
root := trieId.Root
if origin == nil && !diskMore {
stackTr := trie.NewStackTrie(nil)
for i, key := range keys {
if err := stackTr.Update(key, vals[i]); err != nil {
return nil, err
}
}
if gotRoot := stackTr.Hash(); gotRoot != root {
return &proofResult{
keys: keys,
vals: vals,
proofErr: fmt.Errorf("wrong root: have %#x want %#x", gotRoot, root),
}, nil
}
return &proofResult{keys: keys, vals: vals}, nil
}
// Snap state is chunked, generate edge proofs for verification.
tr, err := trie.New(trieId, &diskStore{db: g.db})
if err != nil {
log.Info("Trie missing, snapshotting paused", "state", ctx.root, "kind", kind, "root", trieId.Root)
return nil, errMissingTrie
}
// Generate the Merkle proofs for the first and last element
if origin == nil {
origin = common.Hash{}.Bytes()
}
if err := tr.Prove(origin, proof); err != nil {
log.Debug("Failed to prove range", "kind", kind, "origin", origin, "err", err)
return &proofResult{
keys: keys,
vals: vals,
diskMore: diskMore,
proofErr: err,
tr: tr,
}, nil
}
if len(keys) > 0 {
if err := tr.Prove(keys[len(keys)-1], proof); err != nil {
log.Debug("Failed to prove range", "kind", kind, "last", keys[len(keys)-1], "err", err)
return &proofResult{
keys: keys,
vals: vals,
diskMore: diskMore,
proofErr: err,
tr: tr,
}, nil
}
}
// Verify the snapshot segment with range prover, ensure that all flat states
// in this range correspond to merkle trie.
cont, err := trie.VerifyRangeProof(root, origin, keys, vals, proof)
return &proofResult{
keys: keys,
vals: vals,
diskMore: diskMore,
trieMore: cont,
proofErr: err,
tr: tr},
nil
}
// onStateCallback is a function that is called by generateRange, when processing a range of
// accounts or storage slots. For each element, the callback is invoked.
//
// - If 'delete' is true, then this element (and potential slots) needs to be deleted from the snapshot.
// - If 'write' is true, then this element needs to be updated with the 'val'.
// - If 'write' is false, then this element is already correct, and needs no update.
// The 'val' is the canonical encoding of the value (not the slim format for accounts)
//
// However, for accounts, the storage trie of the account needs to be checked. Also,
// dangling storages(storage exists but the corresponding account is missing) need to
// be cleaned up.
type onStateCallback func(key []byte, val []byte, write bool, delete bool) error
// generateRange generates the state segment with particular prefix. Generation can
// either verify the correctness of existing state through range-proof and skip
// generation, or iterate trie to regenerate state on demand.
func (g *generator) generateRange(ctx *generatorContext, trieId *trie.ID, prefix []byte, kind string, origin []byte, max int, onState onStateCallback, valueConvertFn func([]byte) ([]byte, error)) (bool, []byte, error) {
// Use range prover to check the validity of the flat state in the range
result, err := g.proveRange(ctx, trieId, prefix, kind, origin, max, valueConvertFn)
if err != nil {
return false, nil, err
}
last := result.last()
// Construct contextual logger
logCtx := []interface{}{"kind", kind, "prefix", hexutil.Encode(prefix)}
if len(origin) > 0 {
logCtx = append(logCtx, "origin", hexutil.Encode(origin))
}
logger := log.New(logCtx...)
// The range prover says the range is correct, skip trie iteration
if result.valid() {
successfulRangeProofMeter.Mark(1)
logger.Trace("Proved state range", "last", hexutil.Encode(last))
// The verification is passed, process each state with the given
// callback function. If this state represents a contract, the
// corresponding storage check will be performed in the callback
if err := result.forEach(func(key []byte, val []byte) error { return onState(key, val, false, false) }); err != nil {
return false, nil, err
}
// Only abort the iteration when both database and trie are exhausted
return !result.diskMore && !result.trieMore, last, nil
}
logger.Trace("Detected outdated state range", "last", hexutil.Encode(last), "err", result.proofErr)
failedRangeProofMeter.Mark(1)
// Special case, the entire trie is missing. In the original trie scheme,
// all the duplicated subtries will be filtered out (only one copy of data
// will be stored). While in the snapshot model, all the storage tries
// belong to different contracts will be kept even they are duplicated.
// Track it to a certain extent remove the noise data used for statistics.
if origin == nil && last == nil {
meter := missallAccountMeter
if kind == snapStorage {
meter = missallStorageMeter
}
meter.Mark(1)
}
// We use the snap data to build up a cache which can be used by the
// main account trie as a primary lookup when resolving hashes
var resolver trie.NodeResolver
if len(result.keys) > 0 {
tr := trie.NewEmpty(nil)
for i, key := range result.keys {
tr.Update(key, result.vals[i])
}
_, nodes := tr.Commit(false)
hashSet := nodes.HashSet()
resolver = func(owner common.Hash, path []byte, hash common.Hash) []byte {
return hashSet[hash]
}
}
// Construct the trie for state iteration, reuse the trie
// if it's already opened with some nodes resolved.
tr := result.tr
if tr == nil {
tr, err = trie.New(trieId, &diskStore{db: g.db})
if err != nil {
log.Info("Trie missing, snapshotting paused", "state", ctx.root, "kind", kind, "root", trieId.Root)
return false, nil, errMissingTrie
}
}
var (
trieMore bool
kvkeys, kvvals = result.keys, result.vals
// counters
count = 0 // number of states delivered by iterator
created = 0 // states created from the trie
updated = 0 // states updated from the trie
deleted = 0 // states not in trie, but were in snapshot
untouched = 0 // states already correct
// timers
start = time.Now()
internal time.Duration
)
nodeIt, err := tr.NodeIterator(origin)
if err != nil {
return false, nil, err
}
nodeIt.AddResolver(resolver)
iter := trie.NewIterator(nodeIt)
for iter.Next() {
if last != nil && bytes.Compare(iter.Key, last) > 0 {
trieMore = true
break
}
count++
write := true
created++
for len(kvkeys) > 0 {
if cmp := bytes.Compare(kvkeys[0], iter.Key); cmp < 0 {
// delete the key
istart := time.Now()
if err := onState(kvkeys[0], nil, false, true); err != nil {
return false, nil, err
}
kvkeys = kvkeys[1:]
kvvals = kvvals[1:]
deleted++
internal += time.Since(istart)
continue
} else if cmp == 0 {
// the snapshot key can be overwritten
created--
if write = !bytes.Equal(kvvals[0], iter.Value); write {
updated++
} else {
untouched++
}
kvkeys = kvkeys[1:]
kvvals = kvvals[1:]
}
break
}
istart := time.Now()
if err := onState(iter.Key, iter.Value, write, false); err != nil {
return false, nil, err
}
internal += time.Since(istart)
}
if iter.Err != nil {
// Trie errors should never happen. Still, in case of a bug, expose the
// error here, as the outer code will presume errors are interrupts, not
// some deeper issues.
log.Error("State snapshotter failed to iterate trie", "err", iter.Err)
return false, nil, iter.Err
}
// Delete all stale snapshot states remaining
istart := time.Now()
for _, key := range kvkeys {
if err := onState(key, nil, false, true); err != nil {
return false, nil, err
}
deleted += 1
}
internal += time.Since(istart)
// Update metrics for counting trie iteration
if kind == snapStorage {
storageTrieReadCounter.Inc((time.Since(start) - internal).Nanoseconds())
} else {
accountTrieReadCounter.Inc((time.Since(start) - internal).Nanoseconds())
}
logger.Trace("Regenerated state range", "root", trieId.Root, "last", hexutil.Encode(last),
"count", count, "created", created, "updated", updated, "untouched", untouched, "deleted", deleted)
// If there are either more trie items, or there are more snap items
// (in the next segment), then we need to keep working
return !trieMore && !result.diskMore, last, nil
}
// checkAndFlush checks if an interruption signal is received or the
// batch size has exceeded the allowance.
func (g *generator) checkAndFlush(ctx *generatorContext, current []byte) error {
var abort chan struct{}
select {
case abort = <-g.abort:
default:
}
if ctx.batch.ValueSize() > ethdb.IdealBatchSize || abort != nil {
if bytes.Compare(current, g.progress) < 0 {
log.Error("Snapshot generator went backwards", "current", fmt.Sprintf("%x", current), "genMarker", fmt.Sprintf("%x", g.progress))
}
// Persist the progress marker regardless of whether the batch is empty or not.
// It may happen that all the flat states in the database are correct, so the
// generator indeed makes progress even if there is nothing to commit.
journalProgress(ctx.batch, current, g.stats)
// Flush out the database writes atomically
if err := ctx.batch.Write(); err != nil {
return err
}
ctx.batch.Reset()
// Update the generation progress marker
g.lock.Lock()
g.progress = current
g.lock.Unlock()
// Abort the generation if it's required
if abort != nil {
g.stats.log("Aborting snapshot generation", ctx.root, g.progress)
return newAbortErr(abort) // bubble up an error for interruption
}
// Don't hold the iterators too long, release them to let compactor works
ctx.reopenIterator(snapAccount)
ctx.reopenIterator(snapStorage)
}
if time.Since(ctx.logged) > 8*time.Second {
g.stats.log("Generating snapshot", ctx.root, g.progress)
ctx.logged = time.Now()
}
return nil
}
// generateStorages generates the missing storage slots of the specific contract.
// It's supposed to restart the generation from the given origin position.
func (g *generator) generateStorages(ctx *generatorContext, account common.Hash, storageRoot common.Hash, storeMarker []byte) error {
onStorage := func(key []byte, val []byte, write bool, delete bool) error {
defer func(start time.Time) {
storageWriteCounter.Inc(time.Since(start).Nanoseconds())
}(time.Now())
if delete {
rawdb.DeleteStorageSnapshot(ctx.batch, account, common.BytesToHash(key))
wipedStorageMeter.Mark(1)
return nil
}
if write {
rawdb.WriteStorageSnapshot(ctx.batch, account, common.BytesToHash(key), val)
generatedStorageMeter.Mark(1)
} else {
recoveredStorageMeter.Mark(1)
}
g.stats.storage += common.StorageSize(1 + 2*common.HashLength + len(val))
g.stats.slots++
// If we've exceeded our batch allowance or termination was requested, flush to disk
if err := g.checkAndFlush(ctx, append(account[:], key...)); err != nil {
return err
}
return nil
}
// Loop for re-generating the missing storage slots.
var origin = common.CopyBytes(storeMarker)
for {
id := trie.StorageTrieID(ctx.root, account, storageRoot)
exhausted, last, err := g.generateRange(ctx, id, append(rawdb.SnapshotStoragePrefix, account.Bytes()...), snapStorage, origin, storageCheckRange, onStorage, nil)
if err != nil {
return err // The procedure it aborted, either by external signal or internal error.
}
// Abort the procedure if the entire contract storage is generated
if exhausted {
break
}
if origin = increaseKey(last); origin == nil {
break // special case, the last is 0xffffffff...fff
}
}
return nil
}
// generateAccounts generates the missing snapshot accounts as well as their
// storage slots in the main trie. It's supposed to restart the generation
// from the given origin position.
func (g *generator) generateAccounts(ctx *generatorContext, accMarker []byte) error {
onAccount := func(key []byte, val []byte, write bool, delete bool) error {
// Make sure to clear all dangling storages before this account
account := common.BytesToHash(key)
g.stats.dangling += ctx.removeStorageBefore(account)
start := time.Now()
if delete {
rawdb.DeleteAccountSnapshot(ctx.batch, account)
wipedAccountMeter.Mark(1)
accountWriteCounter.Inc(time.Since(start).Nanoseconds())
ctx.removeStorageAt(account)
return nil
}
// Retrieve the current account and flatten it into the internal format
var acc types.StateAccount
if err := rlp.DecodeBytes(val, &acc); err != nil {
log.Crit("Invalid account encountered during snapshot creation", "err", err)
}
// If the account is not yet in-progress, write it out
if accMarker == nil || !bytes.Equal(account[:], accMarker) {
dataLen := len(val) // Approximate size, saves us a round of RLP-encoding
if !write {
if bytes.Equal(acc.CodeHash, types.EmptyCodeHash[:]) {
dataLen -= 32
}
if acc.Root == types.EmptyRootHash {
dataLen -= 32
}
recoveredAccountMeter.Mark(1)
} else {
data := types.SlimAccountRLP(acc)
dataLen = len(data)
rawdb.WriteAccountSnapshot(ctx.batch, account, data)
generatedAccountMeter.Mark(1)
}
g.stats.storage += common.StorageSize(1 + common.HashLength + dataLen)
g.stats.accounts++
}
// If the snap generation goes here after interrupted, genMarker may go backward
// when last genMarker is consisted of accountHash and storageHash
marker := account[:]
if accMarker != nil && bytes.Equal(marker, accMarker) && len(g.progress) > common.HashLength {
marker = g.progress
}
// If we've exceeded our batch allowance or termination was requested, flush to disk
if err := g.checkAndFlush(ctx, marker); err != nil {
return err
}
accountWriteCounter.Inc(time.Since(start).Nanoseconds()) // let's count flush time as well
// If the iterated account is the contract, create a further loop to
// verify or regenerate the contract storage.
if acc.Root == types.EmptyRootHash {
ctx.removeStorageAt(account)
} else {
var storeMarker []byte
if accMarker != nil && bytes.Equal(account[:], accMarker) && len(g.progress) > common.HashLength {
storeMarker = g.progress[common.HashLength:]
}
if err := g.generateStorages(ctx, account, acc.Root, storeMarker); err != nil {
return err
}
}
// Some account processed, unmark the marker
accMarker = nil
return nil
}
origin := common.CopyBytes(accMarker)
for {
id := trie.StateTrieID(ctx.root)
exhausted, last, err := g.generateRange(ctx, id, rawdb.SnapshotAccountPrefix, snapAccount, origin, accountCheckRange, onAccount, types.FullAccountRLP)
if err != nil {
return err // The procedure it aborted, either by external signal or internal error.
}
origin = increaseKey(last)
// Last step, cleanup the storages after the last account.
// All the left storages should be treated as dangling.
if origin == nil || exhausted {
g.stats.dangling += ctx.removeRemainingStorage()
break
}
}
return nil
}
// generate is a background thread that iterates over the state and storage tries,
// constructing the state snapshot. All the arguments are purely for statistics
// gathering and logging, since the method surfs the blocks as they arrive, often
// being restarted.
func (g *generator) generate(ctx *generatorContext) {
g.stats.log("Resuming snapshot generation", ctx.root, g.progress)
defer ctx.close()
// Persist the initial marker and state snapshot root if progress is none
if len(g.progress) == 0 {
batch := g.db.NewBatch()
rawdb.WriteSnapshotRoot(batch, ctx.root)
journalProgress(batch, g.progress, g.stats)
if err := batch.Write(); err != nil {
log.Crit("Failed to write initialized state marker", "err", err)
}
}
// Initialize the global generator context. The snapshot iterators are
// opened at the interrupted position because the assumption is held
// that all the snapshot data are generated correctly before the marker.
// Even if the snapshot data is updated during the interruption (before
// or at the marker), the assumption is still held.
// For the account or storage slot at the interruption, they will be
// processed twice by the generator(they are already processed in the
// last run) but it's fine.
var (
accMarker, _ = splitMarker(g.progress)
abort chan struct{}
)
if err := g.generateAccounts(ctx, accMarker); err != nil {
// Extract the received interruption signal if exists
var aerr *abortErr
if errors.As(err, &aerr) {
abort = aerr.abort
}
// Aborted by internal error, wait the signal
if abort == nil {
abort = <-g.abort
}
close(abort)
return
}
// Snapshot fully generated, set the marker to nil.
// Note even there is nothing to commit, persist the
// generator anyway to mark the snapshot is complete.
journalProgress(ctx.batch, nil, g.stats)
if err := ctx.batch.Write(); err != nil {
log.Error("Failed to flush batch", "err", err)
abort = <-g.abort
close(abort)
return
}
ctx.batch.Reset()
log.Info("Generated snapshot", "accounts", g.stats.accounts, "slots", g.stats.slots,
"storage", g.stats.storage, "dangling", g.stats.dangling, "elapsed", common.PrettyDuration(time.Since(g.stats.start)))
// Update the generation progress marker
g.lock.Lock()
g.progress = nil
g.lock.Unlock()
close(g.done)
// Someone will be looking for us, wait it out
abort = <-g.abort
close(abort)
}
// increaseKey increase the input key by one bit. Return nil if the entire
// addition operation overflows.
func increaseKey(key []byte) []byte {
for i := len(key) - 1; i >= 0; i-- {
key[i]++
if key[i] != 0x0 {
return key
}
}
return nil
}
// abortErr wraps an interruption signal received to represent the
// generation is aborted by external processes.
type abortErr struct {
abort chan struct{}
}
func newAbortErr(abort chan struct{}) error {
return &abortErr{abort: abort}
}
func (err *abortErr) Error() string {
return "aborted"
}

View file

@ -0,0 +1,766 @@
// Copyright 2024 The go-ethereum Authors
// This file is part of the go-ethereum library.
//
// The go-ethereum library is free software: you can redistribute it and/or modify
// it under the terms of the GNU Lesser General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// The go-ethereum library is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Lesser General Public License for more details.
//
// You should have received a copy of the GNU Lesser General Public License
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
package pathdb
import (
"fmt"
"testing"
"time"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/rawdb"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/crypto"
"github.com/ethereum/go-ethereum/ethdb"
"github.com/ethereum/go-ethereum/internal/testrand"
"github.com/ethereum/go-ethereum/rlp"
"github.com/ethereum/go-ethereum/trie"
"github.com/ethereum/go-ethereum/trie/trienode"
"github.com/holiman/uint256"
)
func hashData(input []byte) common.Hash {
return crypto.Keccak256Hash(input)
}
type genTester struct {
diskdb ethdb.Database
db *Database
acctTrie *trie.Trie
nodes *trienode.MergedNodeSet
states *StateSetWithOrigin
}
func newGenTester() *genTester {
disk := rawdb.NewMemoryDatabase()
config := *Defaults
config.SnapshotNoBuild = true // no background generation
db := New(disk, &config, false)
tr, _ := trie.New(trie.StateTrieID(types.EmptyRootHash), db)
return &genTester{
diskdb: disk,
db: db,
acctTrie: tr,
nodes: trienode.NewMergedNodeSet(),
states: NewStateSetWithOrigin(nil, nil, nil, nil, false),
}
}
func (t *genTester) addTrieAccount(acckey string, acc *types.StateAccount) {
var (
addr = common.BytesToAddress([]byte(acckey))
key = hashData([]byte(acckey))
val, _ = rlp.EncodeToBytes(acc)
)
t.acctTrie.MustUpdate(key.Bytes(), val)
t.states.accountData[key] = val
t.states.accountOrigin[addr] = nil
}
func (t *genTester) addSnapAccount(acckey string, acc *types.StateAccount) {
key := hashData([]byte(acckey))
rawdb.WriteAccountSnapshot(t.diskdb, key, types.SlimAccountRLP(*acc))
}
func (t *genTester) addAccount(acckey string, acc *types.StateAccount) {
t.addTrieAccount(acckey, acc)
t.addSnapAccount(acckey, acc)
}
func (t *genTester) addSnapStorage(accKey string, keys []string, vals []string) {
accHash := hashData([]byte(accKey))
for i, key := range keys {
rawdb.WriteStorageSnapshot(t.diskdb, accHash, hashData([]byte(key)), []byte(vals[i]))
}
}
func (t *genTester) makeStorageTrie(accKey string, keys []string, vals []string, commit bool) common.Hash {
var (
owner = hashData([]byte(accKey))
addr = common.BytesToAddress([]byte(accKey))
id = trie.StorageTrieID(types.EmptyRootHash, owner, types.EmptyRootHash)
tr, _ = trie.New(id, t.db)
storages = make(map[common.Hash][]byte)
storageOrigins = make(map[common.Hash][]byte)
)
for i, k := range keys {
key := hashData([]byte(k))
tr.MustUpdate(key.Bytes(), []byte(vals[i]))
storages[key] = []byte(vals[i])
storageOrigins[key] = nil
}
if !commit {
return tr.Hash()
}
root, nodes := tr.Commit(false)
if nodes != nil {
t.nodes.Merge(nodes)
}
t.states.storageData[owner] = storages
t.states.storageOrigin[addr] = storageOrigins
return root
}
func (t *genTester) Commit() common.Hash {
root, nodes := t.acctTrie.Commit(true)
if nodes != nil {
t.nodes.Merge(nodes)
}
t.db.Update(root, types.EmptyRootHash, 0, t.nodes, t.states)
t.db.Commit(root, false)
return root
}
func (t *genTester) CommitAndGenerate() (common.Hash, *diskLayer) {
root := t.Commit()
dl := generateSnapshot(t.db, root, false)
return root, dl
}
// Tests that snapshot generation from an empty database.
func TestGeneration(t *testing.T) {
helper := newGenTester()
stRoot := helper.makeStorageTrie("", []string{"key-1", "key-2", "key-3"}, []string{"val-1", "val-2", "val-3"}, false)
helper.addTrieAccount("acc-1", &types.StateAccount{Balance: uint256.NewInt(1), Root: stRoot, CodeHash: types.EmptyCodeHash.Bytes()})
helper.addTrieAccount("acc-2", &types.StateAccount{Balance: uint256.NewInt(2), Root: types.EmptyRootHash, CodeHash: types.EmptyCodeHash.Bytes()})
helper.addTrieAccount("acc-3", &types.StateAccount{Balance: uint256.NewInt(3), Root: stRoot, CodeHash: types.EmptyCodeHash.Bytes()})
helper.makeStorageTrie("acc-1", []string{"key-1", "key-2", "key-3"}, []string{"val-1", "val-2", "val-3"}, true)
helper.makeStorageTrie("acc-3", []string{"key-1", "key-2", "key-3"}, []string{"val-1", "val-2", "val-3"}, true)
root, dl := helper.CommitAndGenerate()
if have, want := root, common.HexToHash("0xe3712f1a226f3782caca78ca770ccc19ee000552813a9f59d479f8611db9b1fd"); have != want {
t.Fatalf("have %#x want %#x", have, want)
}
select {
case <-dl.generator.done:
// Snapshot generation succeeded
case <-time.After(3 * time.Second):
t.Errorf("Snapshot generation failed")
}
// TODO(rjl493456442) enable the snapshot tests
// checkSnapRoot(t, snap, root)
// Signal abortion to the generator and wait for it to tear down
dl.generator.stop()
}
// Tests that snapshot generation with existent flat state, where the flat state
// contains some errors:
// - the contract with empty storage root but has storage entries in the disk
// - the contract with non empty storage root but empty storage slots
// - the contract(non-empty storage) misses some storage slots
// - miss in the beginning
// - miss in the middle
// - miss in the end
//
// - the contract(non-empty storage) has wrong storage slots
// - wrong slots in the beginning
// - wrong slots in the middle
// - wrong slots in the end
//
// - the contract(non-empty storage) has extra storage slots
// - extra slots in the beginning
// - extra slots in the middle
// - extra slots in the end
func TestGenerateExistentStateWithWrongStorage(t *testing.T) {
helper := newGenTester()
// Account one, empty storage trie root but non-empty flat states
helper.addAccount("acc-1", &types.StateAccount{Balance: uint256.NewInt(1), Root: types.EmptyRootHash, CodeHash: types.EmptyCodeHash.Bytes()})
helper.addSnapStorage("acc-1", []string{"key-1", "key-2", "key-3"}, []string{"val-1", "val-2", "val-3"})
// Account two, non-empty storage trie root but empty flat states
stRoot := helper.makeStorageTrie("acc-2", []string{"key-1", "key-2", "key-3"}, []string{"val-1", "val-2", "val-3"}, true)
helper.addAccount("acc-2", &types.StateAccount{Balance: uint256.NewInt(1), Root: stRoot, CodeHash: types.EmptyCodeHash.Bytes()})
// Miss slots
{
// Account three, non-empty root but misses slots in the beginning
helper.makeStorageTrie("acc-3", []string{"key-1", "key-2", "key-3"}, []string{"val-1", "val-2", "val-3"}, true)
helper.addAccount("acc-3", &types.StateAccount{Balance: uint256.NewInt(1), Root: stRoot, CodeHash: types.EmptyCodeHash.Bytes()})
helper.addSnapStorage("acc-3", []string{"key-2", "key-3"}, []string{"val-2", "val-3"})
// Account four, non-empty root but misses slots in the middle
helper.makeStorageTrie("acc-4", []string{"key-1", "key-2", "key-3"}, []string{"val-1", "val-2", "val-3"}, true)
helper.addAccount("acc-4", &types.StateAccount{Balance: uint256.NewInt(1), Root: stRoot, CodeHash: types.EmptyCodeHash.Bytes()})
helper.addSnapStorage("acc-4", []string{"key-1", "key-3"}, []string{"val-1", "val-3"})
// Account five, non-empty root but misses slots in the end
helper.makeStorageTrie("acc-5", []string{"key-1", "key-2", "key-3"}, []string{"val-1", "val-2", "val-3"}, true)
helper.addAccount("acc-5", &types.StateAccount{Balance: uint256.NewInt(1), Root: stRoot, CodeHash: types.EmptyCodeHash.Bytes()})
helper.addSnapStorage("acc-5", []string{"key-1", "key-2"}, []string{"val-1", "val-2"})
}
// Wrong storage slots
{
// Account six, non-empty root but wrong slots in the beginning
helper.makeStorageTrie("acc-6", []string{"key-1", "key-2", "key-3"}, []string{"val-1", "val-2", "val-3"}, true)
helper.addAccount("acc-6", &types.StateAccount{Balance: uint256.NewInt(1), Root: stRoot, CodeHash: types.EmptyCodeHash.Bytes()})
helper.addSnapStorage("acc-6", []string{"key-1", "key-2", "key-3"}, []string{"badval-1", "val-2", "val-3"})
// Account seven, non-empty root but wrong slots in the middle
helper.makeStorageTrie("acc-7", []string{"key-1", "key-2", "key-3"}, []string{"val-1", "val-2", "val-3"}, true)
helper.addAccount("acc-7", &types.StateAccount{Balance: uint256.NewInt(1), Root: stRoot, CodeHash: types.EmptyCodeHash.Bytes()})
helper.addSnapStorage("acc-7", []string{"key-1", "key-2", "key-3"}, []string{"val-1", "badval-2", "val-3"})
// Account eight, non-empty root but wrong slots in the end
helper.makeStorageTrie("acc-8", []string{"key-1", "key-2", "key-3"}, []string{"val-1", "val-2", "val-3"}, true)
helper.addAccount("acc-8", &types.StateAccount{Balance: uint256.NewInt(1), Root: stRoot, CodeHash: types.EmptyCodeHash.Bytes()})
helper.addSnapStorage("acc-8", []string{"key-1", "key-2", "key-3"}, []string{"val-1", "val-2", "badval-3"})
// Account 9, non-empty root but rotated slots
helper.makeStorageTrie("acc-9", []string{"key-1", "key-2", "key-3"}, []string{"val-1", "val-2", "val-3"}, true)
helper.addAccount("acc-9", &types.StateAccount{Balance: uint256.NewInt(1), Root: stRoot, CodeHash: types.EmptyCodeHash.Bytes()})
helper.addSnapStorage("acc-9", []string{"key-1", "key-2", "key-3"}, []string{"val-1", "val-3", "val-2"})
}
// Extra storage slots
{
// Account 10, non-empty root but extra slots in the beginning
helper.makeStorageTrie("acc-10", []string{"key-1", "key-2", "key-3"}, []string{"val-1", "val-2", "val-3"}, true)
helper.addAccount("acc-10", &types.StateAccount{Balance: uint256.NewInt(1), Root: stRoot, CodeHash: types.EmptyCodeHash.Bytes()})
helper.addSnapStorage("acc-10", []string{"key-0", "key-1", "key-2", "key-3"}, []string{"val-0", "val-1", "val-2", "val-3"})
// Account 11, non-empty root but extra slots in the middle
helper.makeStorageTrie("acc-11", []string{"key-1", "key-2", "key-3"}, []string{"val-1", "val-2", "val-3"}, true)
helper.addAccount("acc-11", &types.StateAccount{Balance: uint256.NewInt(1), Root: stRoot, CodeHash: types.EmptyCodeHash.Bytes()})
helper.addSnapStorage("acc-11", []string{"key-1", "key-2", "key-2-1", "key-3"}, []string{"val-1", "val-2", "val-2-1", "val-3"})
// Account 12, non-empty root but extra slots in the end
helper.makeStorageTrie("acc-12", []string{"key-1", "key-2", "key-3"}, []string{"val-1", "val-2", "val-3"}, true)
helper.addAccount("acc-12", &types.StateAccount{Balance: uint256.NewInt(1), Root: stRoot, CodeHash: types.EmptyCodeHash.Bytes()})
helper.addSnapStorage("acc-12", []string{"key-1", "key-2", "key-3", "key-4"}, []string{"val-1", "val-2", "val-3", "val-4"})
}
root, dl := helper.CommitAndGenerate()
t.Logf("Root: %#x\n", root) // Root = 0x8746cce9fd9c658b2cfd639878ed6584b7a2b3e73bb40f607fcfa156002429a0
select {
case <-dl.generator.done:
// Snapshot generation succeeded
case <-time.After(3 * time.Second):
t.Errorf("Snapshot generation failed")
}
// TODO(rjl493456442) enable the snapshot tests
// checkSnapRoot(t, snap, root)
// Signal abortion to the generator and wait for it to tear down
dl.generator.stop()
}
// Tests that snapshot generation with existent flat state, where the flat state
// contains some errors:
// - miss accounts
// - wrong accounts
// - extra accounts
func TestGenerateExistentStateWithWrongAccounts(t *testing.T) {
helper := newGenTester()
// Trie accounts [acc-1, acc-2, acc-3, acc-4, acc-6]
helper.makeStorageTrie("acc-1", []string{"key-1", "key-2", "key-3"}, []string{"val-1", "val-2", "val-3"}, true)
helper.makeStorageTrie("acc-2", []string{"key-1", "key-2", "key-3"}, []string{"val-1", "val-2", "val-3"}, true)
helper.makeStorageTrie("acc-3", []string{"key-1", "key-2", "key-3"}, []string{"val-1", "val-2", "val-3"}, true)
helper.makeStorageTrie("acc-4", []string{"key-1", "key-2", "key-3"}, []string{"val-1", "val-2", "val-3"}, true)
stRoot := helper.makeStorageTrie("acc-6", []string{"key-1", "key-2", "key-3"}, []string{"val-1", "val-2", "val-3"}, true)
// Missing accounts, only in the trie
{
helper.addTrieAccount("acc-1", &types.StateAccount{Balance: uint256.NewInt(1), Root: stRoot, CodeHash: types.EmptyCodeHash.Bytes()}) // Beginning
helper.addTrieAccount("acc-4", &types.StateAccount{Balance: uint256.NewInt(1), Root: stRoot, CodeHash: types.EmptyCodeHash.Bytes()}) // Middle
helper.addTrieAccount("acc-6", &types.StateAccount{Balance: uint256.NewInt(1), Root: stRoot, CodeHash: types.EmptyCodeHash.Bytes()}) // End
}
// Wrong accounts
{
helper.addTrieAccount("acc-2", &types.StateAccount{Balance: uint256.NewInt(1), Root: stRoot, CodeHash: types.EmptyCodeHash.Bytes()})
helper.addSnapAccount("acc-2", &types.StateAccount{Balance: uint256.NewInt(1), Root: stRoot, CodeHash: common.Hex2Bytes("0x1234")})
helper.addTrieAccount("acc-3", &types.StateAccount{Balance: uint256.NewInt(1), Root: stRoot, CodeHash: types.EmptyCodeHash.Bytes()})
helper.addSnapAccount("acc-3", &types.StateAccount{Balance: uint256.NewInt(1), Root: types.EmptyRootHash, CodeHash: types.EmptyCodeHash.Bytes()})
}
// Extra accounts, only in the snap
{
helper.addSnapAccount("acc-0", &types.StateAccount{Balance: uint256.NewInt(1), Root: stRoot, CodeHash: types.EmptyCodeHash.Bytes()}) // before the beginning
helper.addSnapAccount("acc-5", &types.StateAccount{Balance: uint256.NewInt(1), Root: types.EmptyRootHash, CodeHash: common.Hex2Bytes("0x1234")}) // Middle
helper.addSnapAccount("acc-7", &types.StateAccount{Balance: uint256.NewInt(1), Root: types.EmptyRootHash, CodeHash: types.EmptyCodeHash.Bytes()}) // after the end
}
root, dl := helper.CommitAndGenerate()
t.Logf("Root: %#x\n", root) // Root = 0x825891472281463511e7ebcc7f109e4f9200c20fa384754e11fd605cd98464e8
select {
case <-dl.generator.done:
// Snapshot generation succeeded
case <-time.After(3 * time.Second):
t.Errorf("Snapshot generation failed")
}
// TODO(rjl493456442) enable the snapshot tests
// checkSnapRoot(t, snap, root)
// Signal abortion to the generator and wait for it to tear down
dl.generator.stop()
}
func TestGenerateCorruptAccountTrie(t *testing.T) {
helper := newGenTester()
helper.addTrieAccount("acc-1", &types.StateAccount{Balance: uint256.NewInt(1), Root: types.EmptyRootHash, CodeHash: types.EmptyCodeHash.Bytes()}) // 0xc7a30f39aff471c95d8a837497ad0e49b65be475cc0953540f80cfcdbdcd9074
helper.addTrieAccount("acc-2", &types.StateAccount{Balance: uint256.NewInt(2), Root: types.EmptyRootHash, CodeHash: types.EmptyCodeHash.Bytes()}) // 0x65145f923027566669a1ae5ccac66f945b55ff6eaeb17d2ea8e048b7d381f2d7
helper.addTrieAccount("acc-3", &types.StateAccount{Balance: uint256.NewInt(3), Root: types.EmptyRootHash, CodeHash: types.EmptyCodeHash.Bytes()}) // 0x19ead688e907b0fab07176120dceec244a72aff2f0aa51e8b827584e378772f4
root := helper.Commit() // Root: 0xa04693ea110a31037fb5ee814308a6f1d76bdab0b11676bdf4541d2de55ba978
// Delete an account trie node and ensure the generator chokes
path := []byte{0xc}
if !rawdb.HasAccountTrieNode(helper.diskdb, path) {
t.Logf("Invalid node path to delete, %v", path)
}
rawdb.DeleteAccountTrieNode(helper.diskdb, path)
helper.db.tree.bottom().resetCache()
dl := generateSnapshot(helper.db, root, false)
select {
case <-dl.generator.done:
// Snapshot generation succeeded
t.Errorf("Snapshot generated against corrupt account trie")
case <-time.After(time.Second):
// Not generated fast enough, hopefully blocked inside on missing trie node fail
}
// Signal abortion to the generator and wait for it to tear down
dl.generator.stop()
}
func TestGenerateMissingStorageTrie(t *testing.T) {
var (
acc1 = hashData([]byte("acc-1"))
acc3 = hashData([]byte("acc-3"))
helper = newGenTester()
)
stRoot := helper.makeStorageTrie("acc-1", []string{"key-1", "key-2", "key-3"}, []string{"val-1", "val-2", "val-3"}, true) // 0xddefcd9376dd029653ef384bd2f0a126bb755fe84fdcc9e7cf421ba454f2bc67
helper.addTrieAccount("acc-1", &types.StateAccount{Balance: uint256.NewInt(1), Root: stRoot, CodeHash: types.EmptyCodeHash.Bytes()}) // 0x9250573b9c18c664139f3b6a7a8081b7d8f8916a8fcc5d94feec6c29f5fd4e9e
helper.addTrieAccount("acc-2", &types.StateAccount{Balance: uint256.NewInt(2), Root: types.EmptyRootHash, CodeHash: types.EmptyCodeHash.Bytes()}) // 0x65145f923027566669a1ae5ccac66f945b55ff6eaeb17d2ea8e048b7d381f2d7
stRoot = helper.makeStorageTrie("acc-3", []string{"key-1", "key-2", "key-3"}, []string{"val-1", "val-2", "val-3"}, true)
helper.addTrieAccount("acc-3", &types.StateAccount{Balance: uint256.NewInt(3), Root: stRoot, CodeHash: types.EmptyCodeHash.Bytes()}) // 0x50815097425d000edfc8b3a4a13e175fc2bdcfee8bdfbf2d1ff61041d3c235b2
root := helper.Commit()
// Delete storage trie root of account one and three.
rawdb.DeleteStorageTrieNode(helper.diskdb, acc1, nil)
rawdb.DeleteStorageTrieNode(helper.diskdb, acc3, nil)
helper.db.tree.bottom().resetCache()
dl := generateSnapshot(helper.db, root, false)
select {
case <-dl.generator.done:
// Snapshot generation succeeded
t.Errorf("Snapshot generated against corrupt storage trie")
case <-time.After(time.Second):
// Not generated fast enough, hopefully blocked inside on missing trie node fail
}
// Signal abortion to the generator and wait for it to tear down
dl.generator.stop()
}
func TestGenerateCorruptStorageTrie(t *testing.T) {
helper := newGenTester()
stRoot := helper.makeStorageTrie("acc-1", []string{"key-1", "key-2", "key-3"}, []string{"val-1", "val-2", "val-3"}, true) // 0xddefcd9376dd029653ef384bd2f0a126bb755fe84fdcc9e7cf421ba454f2bc67
helper.addTrieAccount("acc-1", &types.StateAccount{Balance: uint256.NewInt(1), Root: stRoot, CodeHash: types.EmptyCodeHash.Bytes()}) // 0x9250573b9c18c664139f3b6a7a8081b7d8f8916a8fcc5d94feec6c29f5fd4e9e
helper.addTrieAccount("acc-2", &types.StateAccount{Balance: uint256.NewInt(2), Root: types.EmptyRootHash, CodeHash: types.EmptyCodeHash.Bytes()}) // 0x65145f923027566669a1ae5ccac66f945b55ff6eaeb17d2ea8e048b7d381f2d7
stRoot = helper.makeStorageTrie("acc-3", []string{"key-1", "key-2", "key-3"}, []string{"val-1", "val-2", "val-3"}, true)
helper.addTrieAccount("acc-3", &types.StateAccount{Balance: uint256.NewInt(3), Root: stRoot, CodeHash: types.EmptyCodeHash.Bytes()}) // 0x50815097425d000edfc8b3a4a13e175fc2bdcfee8bdfbf2d1ff61041d3c235b2
root := helper.Commit()
// Delete a node in the storage trie.
path := []byte{0x4}
if !rawdb.HasStorageTrieNode(helper.diskdb, hashData([]byte("acc-1")), path) {
t.Logf("Invalid node path to delete, %v", path)
}
rawdb.DeleteStorageTrieNode(helper.diskdb, hashData([]byte("acc-1")), []byte{0x4})
if !rawdb.HasStorageTrieNode(helper.diskdb, hashData([]byte("acc-3")), path) {
t.Logf("Invalid node path to delete, %v", path)
}
rawdb.DeleteStorageTrieNode(helper.diskdb, hashData([]byte("acc-3")), []byte{0x4})
helper.db.tree.bottom().resetCache()
dl := generateSnapshot(helper.db, root, false)
select {
case <-dl.generator.done:
// Snapshot generation succeeded
t.Errorf("Snapshot generated against corrupt storage trie")
case <-time.After(time.Second):
// Not generated fast enough, hopefully blocked inside on missing trie node fail
}
// Signal abortion to the generator and wait for it to tear down
dl.generator.stop()
}
func TestGenerateWithExtraAccounts(t *testing.T) {
helper := newGenTester()
// Account one in the trie
stRoot := helper.makeStorageTrie("acc-1",
[]string{"key-1", "key-2", "key-3", "key-4", "key-5"},
[]string{"val-1", "val-2", "val-3", "val-4", "val-5"},
true,
)
acc := &types.StateAccount{Balance: uint256.NewInt(1), Root: stRoot, CodeHash: types.EmptyCodeHash.Bytes()}
val, _ := rlp.EncodeToBytes(acc)
helper.acctTrie.MustUpdate(hashData([]byte("acc-1")).Bytes(), val) // 0x9250573b9c18c664139f3b6a7a8081b7d8f8916a8fcc5d94feec6c29f5fd4e9e
// Identical in the snap
key := hashData([]byte("acc-1"))
rawdb.WriteAccountSnapshot(helper.diskdb, key, val)
rawdb.WriteStorageSnapshot(helper.diskdb, key, hashData([]byte("key-1")), []byte("val-1"))
rawdb.WriteStorageSnapshot(helper.diskdb, key, hashData([]byte("key-2")), []byte("val-2"))
rawdb.WriteStorageSnapshot(helper.diskdb, key, hashData([]byte("key-3")), []byte("val-3"))
rawdb.WriteStorageSnapshot(helper.diskdb, key, hashData([]byte("key-4")), []byte("val-4"))
rawdb.WriteStorageSnapshot(helper.diskdb, key, hashData([]byte("key-5")), []byte("val-5"))
// Account two exists only in the snapshot
stRoot = helper.makeStorageTrie("acc-2",
[]string{"key-1", "key-2", "key-3", "key-4", "key-5"},
[]string{"val-1", "val-2", "val-3", "val-4", "val-5"},
true,
)
acc = &types.StateAccount{Balance: uint256.NewInt(1), Root: stRoot, CodeHash: types.EmptyCodeHash.Bytes()}
val, _ = rlp.EncodeToBytes(acc)
key = hashData([]byte("acc-2"))
rawdb.WriteAccountSnapshot(helper.diskdb, key, val)
rawdb.WriteStorageSnapshot(helper.diskdb, key, hashData([]byte("b-key-1")), []byte("b-val-1"))
rawdb.WriteStorageSnapshot(helper.diskdb, key, hashData([]byte("b-key-2")), []byte("b-val-2"))
rawdb.WriteStorageSnapshot(helper.diskdb, key, hashData([]byte("b-key-3")), []byte("b-val-3"))
root := helper.Commit()
// To verify the test: If we now inspect the snap db, there should exist extraneous storage items
if data := rawdb.ReadStorageSnapshot(helper.diskdb, hashData([]byte("acc-2")), hashData([]byte("b-key-1"))); data == nil {
t.Fatalf("expected snap storage to exist")
}
dl := generateSnapshot(helper.db, root, false)
select {
case <-dl.generator.done:
// Snapshot generation succeeded
case <-time.After(3 * time.Second):
t.Errorf("Snapshot generation failed")
}
// TODO(rjl493456442) enable the snapshot tests
// checkSnapRoot(t, snap, root)
// Signal abortion to the generator and wait for it to tear down
dl.generator.stop()
// If we now inspect the snap db, there should exist no extraneous storage items
if data := rawdb.ReadStorageSnapshot(helper.diskdb, hashData([]byte("acc-2")), hashData([]byte("b-key-1"))); data != nil {
t.Fatalf("expected slot to be removed, got %v", string(data))
}
}
func TestGenerateWithManyExtraAccounts(t *testing.T) {
helper := newGenTester()
// Account one in the trie
stRoot := helper.makeStorageTrie("acc-1",
[]string{"key-1", "key-2", "key-3"},
[]string{"val-1", "val-2", "val-3"},
true,
)
acc := &types.StateAccount{Balance: uint256.NewInt(1), Root: stRoot, CodeHash: types.EmptyCodeHash.Bytes()}
val, _ := rlp.EncodeToBytes(acc)
helper.acctTrie.MustUpdate(hashData([]byte("acc-1")).Bytes(), val) // 0x9250573b9c18c664139f3b6a7a8081b7d8f8916a8fcc5d94feec6c29f5fd4e9e
// Identical in the snap
key := hashData([]byte("acc-1"))
rawdb.WriteAccountSnapshot(helper.diskdb, key, val)
rawdb.WriteStorageSnapshot(helper.diskdb, key, hashData([]byte("key-1")), []byte("val-1"))
rawdb.WriteStorageSnapshot(helper.diskdb, key, hashData([]byte("key-2")), []byte("val-2"))
rawdb.WriteStorageSnapshot(helper.diskdb, key, hashData([]byte("key-3")), []byte("val-3"))
// 100 accounts exist only in snapshot
for i := 0; i < 1000; i++ {
acc := &types.StateAccount{Balance: uint256.NewInt(uint64(i)), Root: types.EmptyRootHash, CodeHash: types.EmptyCodeHash.Bytes()}
val, _ := rlp.EncodeToBytes(acc)
key := hashData([]byte(fmt.Sprintf("acc-%d", i)))
rawdb.WriteAccountSnapshot(helper.diskdb, key, val)
}
_, dl := helper.CommitAndGenerate()
select {
case <-dl.generator.done:
// Snapshot generation succeeded
case <-time.After(3 * time.Second):
t.Errorf("Snapshot generation failed")
}
// TODO(rjl493456442) enable the snapshot tests
// checkSnapRoot(t, snap, root)
// Signal abortion to the generator and wait for it to tear down
dl.generator.stop()
}
func TestGenerateWithExtraBeforeAndAfter(t *testing.T) {
helper := newGenTester()
acc := &types.StateAccount{Balance: uint256.NewInt(1), Root: types.EmptyRootHash, CodeHash: types.EmptyCodeHash.Bytes()}
val, _ := rlp.EncodeToBytes(acc)
acctHashA := hashData([]byte("acc-1"))
acctHashB := hashData([]byte("acc-2"))
helper.acctTrie.MustUpdate(acctHashA.Bytes(), val)
helper.acctTrie.MustUpdate(acctHashB.Bytes(), val)
rawdb.WriteAccountSnapshot(helper.diskdb, acctHashA, val)
rawdb.WriteAccountSnapshot(helper.diskdb, acctHashB, val)
for i := 0; i < 16; i++ {
rawdb.WriteAccountSnapshot(helper.diskdb, common.Hash{byte(i)}, val)
}
_, dl := helper.CommitAndGenerate()
select {
case <-dl.generator.done:
// Snapshot generation succeeded
case <-time.After(3 * time.Second):
t.Errorf("Snapshot generation failed")
}
// TODO(rjl493456442) enable the snapshot tests
// checkSnapRoot(t, snap, root)
// Signal abortion to the generator and wait for it to tear down
dl.generator.stop()
}
func TestGenerateWithMalformedStateData(t *testing.T) {
helper := newGenTester()
acctHash := hashData([]byte("acc"))
acc := &types.StateAccount{Balance: uint256.NewInt(1), Root: types.EmptyRootHash, CodeHash: types.EmptyCodeHash.Bytes()}
val, _ := rlp.EncodeToBytes(acc)
helper.acctTrie.MustUpdate(acctHash.Bytes(), val)
junk := make([]byte, 100)
copy(junk, []byte{0xde, 0xad})
rawdb.WriteAccountSnapshot(helper.diskdb, acctHash, junk)
for i := 0; i < 16; i++ {
rawdb.WriteAccountSnapshot(helper.diskdb, common.Hash{byte(i)}, junk)
}
_, dl := helper.CommitAndGenerate()
select {
case <-dl.generator.done:
// Snapshot generation succeeded
case <-time.After(3 * time.Second):
t.Errorf("Snapshot generation failed")
}
// TODO(rjl493456442) enable the snapshot tests
// checkSnapRoot(t, snap, root)
// Signal abortion to the generator and wait for it to tear down
dl.generator.stop()
}
func TestGenerateFromEmptySnap(t *testing.T) {
helper := newGenTester()
for i := 0; i < 400; i++ {
stRoot := helper.makeStorageTrie(fmt.Sprintf("acc-%d", i), []string{"key-1", "key-2", "key-3"}, []string{"val-1", "val-2", "val-3"}, true)
helper.addTrieAccount(fmt.Sprintf("acc-%d", i), &types.StateAccount{Balance: uint256.NewInt(1), Root: stRoot, CodeHash: types.EmptyCodeHash.Bytes()})
}
root, snap := helper.CommitAndGenerate()
t.Logf("Root: %#x\n", root) // Root: 0x6f7af6d2e1a1bf2b84a3beb3f8b64388465fbc1e274ca5d5d3fc787ca78f59e4
select {
case <-snap.generator.done:
// Snapshot generation succeeded
case <-time.After(3 * time.Second):
t.Errorf("Snapshot generation failed")
}
// TODO(rjl493456442) enable the snapshot tests
// checkSnapRoot(t, snap, root)
// Signal abortion to the generator and wait for it to tear down
snap.generator.stop()
}
func TestGenerateWithIncompleteStorage(t *testing.T) {
helper := newGenTester()
stKeys := []string{"1", "2", "3", "4", "5", "6", "7", "8"}
stVals := []string{"v1", "v2", "v3", "v4", "v5", "v6", "v7", "v8"}
// We add 8 accounts, each one is missing exactly one of the storage slots. This means
// we don't have to order the keys and figure out exactly which hash-key winds up
// on the sensitive spots at the boundaries
for i := 0; i < 8; i++ {
accKey := fmt.Sprintf("acc-%d", i)
stRoot := helper.makeStorageTrie(accKey, stKeys, stVals, true)
helper.addAccount(accKey, &types.StateAccount{Balance: uint256.NewInt(uint64(i)), Root: stRoot, CodeHash: types.EmptyCodeHash.Bytes()})
var moddedKeys []string
var moddedVals []string
for ii := 0; ii < 8; ii++ {
if ii != i {
moddedKeys = append(moddedKeys, stKeys[ii])
moddedVals = append(moddedVals, stVals[ii])
}
}
helper.addSnapStorage(accKey, moddedKeys, moddedVals)
}
root, dl := helper.CommitAndGenerate()
t.Logf("Root: %#x\n", root) // Root: 0xca73f6f05ba4ca3024ef340ef3dfca8fdabc1b677ff13f5a9571fd49c16e67ff
select {
case <-dl.generator.done:
// Snapshot generation succeeded
case <-time.After(3 * time.Second):
t.Errorf("Snapshot generation failed")
}
// TODO(rjl493456442) enable the snapshot tests
// checkSnapRoot(t, snap, root)
// Signal abortion to the generator and wait for it to tear down
dl.generator.stop()
}
func incKey(key []byte) []byte {
for i := len(key) - 1; i >= 0; i-- {
key[i]++
if key[i] != 0x0 {
break
}
}
return key
}
func decKey(key []byte) []byte {
for i := len(key) - 1; i >= 0; i-- {
key[i]--
if key[i] != 0xff {
break
}
}
return key
}
func populateDangling(disk ethdb.KeyValueStore) {
populate := func(accountHash common.Hash, keys []string, vals []string) {
for i, key := range keys {
rawdb.WriteStorageSnapshot(disk, accountHash, hashData([]byte(key)), []byte(vals[i]))
}
}
// Dangling storages of the "first" account
populate(common.Hash{}, []string{"key-1", "key-2", "key-3"}, []string{"val-1", "val-2", "val-3"})
// Dangling storages of the "last" account
populate(common.HexToHash("ffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffff"), []string{"key-1", "key-2", "key-3"}, []string{"val-1", "val-2", "val-3"})
// Dangling storages around the account 1
hash := decKey(hashData([]byte("acc-1")).Bytes())
populate(common.BytesToHash(hash), []string{"key-1", "key-2", "key-3"}, []string{"val-1", "val-2", "val-3"})
hash = incKey(hashData([]byte("acc-1")).Bytes())
populate(common.BytesToHash(hash), []string{"key-1", "key-2", "key-3"}, []string{"val-1", "val-2", "val-3"})
// Dangling storages around the account 2
hash = decKey(hashData([]byte("acc-2")).Bytes())
populate(common.BytesToHash(hash), []string{"key-1", "key-2", "key-3"}, []string{"val-1", "val-2", "val-3"})
hash = incKey(hashData([]byte("acc-2")).Bytes())
populate(common.BytesToHash(hash), []string{"key-1", "key-2", "key-3"}, []string{"val-1", "val-2", "val-3"})
// Dangling storages around the account 3
hash = decKey(hashData([]byte("acc-3")).Bytes())
populate(common.BytesToHash(hash), []string{"key-1", "key-2", "key-3"}, []string{"val-1", "val-2", "val-3"})
hash = incKey(hashData([]byte("acc-3")).Bytes())
populate(common.BytesToHash(hash), []string{"key-1", "key-2", "key-3"}, []string{"val-1", "val-2", "val-3"})
// Dangling storages of the random account
populate(testrand.Hash(), []string{"key-1", "key-2", "key-3"}, []string{"val-1", "val-2", "val-3"})
populate(testrand.Hash(), []string{"key-1", "key-2", "key-3"}, []string{"val-1", "val-2", "val-3"})
populate(testrand.Hash(), []string{"key-1", "key-2", "key-3"}, []string{"val-1", "val-2", "val-3"})
}
func TestGenerateCompleteSnapshotWithDanglingStorage(t *testing.T) {
var helper = newGenTester()
stRoot := helper.makeStorageTrie("acc-1", []string{"key-1", "key-2", "key-3"}, []string{"val-1", "val-2", "val-3"}, true)
helper.addAccount("acc-1", &types.StateAccount{Balance: uint256.NewInt(1), Root: stRoot, CodeHash: types.EmptyCodeHash.Bytes()})
helper.addAccount("acc-2", &types.StateAccount{Balance: uint256.NewInt(1), Root: types.EmptyRootHash, CodeHash: types.EmptyCodeHash.Bytes()})
helper.makeStorageTrie("acc-3", []string{"key-1", "key-2", "key-3"}, []string{"val-1", "val-2", "val-3"}, true)
helper.addAccount("acc-3", &types.StateAccount{Balance: uint256.NewInt(1), Root: stRoot, CodeHash: types.EmptyCodeHash.Bytes()})
helper.addSnapStorage("acc-1", []string{"key-1", "key-2", "key-3"}, []string{"val-1", "val-2", "val-3"})
helper.addSnapStorage("acc-3", []string{"key-1", "key-2", "key-3"}, []string{"val-1", "val-2", "val-3"})
populateDangling(helper.diskdb)
_, dl := helper.CommitAndGenerate()
select {
case <-dl.generator.done:
// Snapshot generation succeeded
case <-time.After(3 * time.Second):
t.Errorf("Snapshot generation failed")
}
// TODO(rjl493456442) enable the snapshot tests
// checkSnapRoot(t, snap, root)
// Signal abortion to the generator and wait for it to tear down
dl.generator.stop()
}
func TestGenerateBrokenSnapshotWithDanglingStorage(t *testing.T) {
var helper = newGenTester()
stRoot := helper.makeStorageTrie("acc-1", []string{"key-1", "key-2", "key-3"}, []string{"val-1", "val-2", "val-3"}, true)
helper.addTrieAccount("acc-1", &types.StateAccount{Balance: uint256.NewInt(1), Root: stRoot, CodeHash: types.EmptyCodeHash.Bytes()})
helper.addTrieAccount("acc-2", &types.StateAccount{Balance: uint256.NewInt(2), Root: types.EmptyRootHash, CodeHash: types.EmptyCodeHash.Bytes()})
helper.makeStorageTrie("acc-3", []string{"key-1", "key-2", "key-3"}, []string{"val-1", "val-2", "val-3"}, true)
helper.addTrieAccount("acc-3", &types.StateAccount{Balance: uint256.NewInt(3), Root: stRoot, CodeHash: types.EmptyCodeHash.Bytes()})
populateDangling(helper.diskdb)
_, dl := helper.CommitAndGenerate()
select {
case <-dl.generator.done:
// Snapshot generation succeeded
case <-time.After(3 * time.Second):
t.Errorf("Snapshot generation failed")
}
// TODO(rjl493456442) enable the snapshot tests
// checkSnapRoot(t, snap, root)
// Signal abortion to the generator and wait for it to tear down
dl.generator.stop()
}

View file

@ -91,15 +91,14 @@ type diffAccountIterator struct {
}
// newDiffAccountIterator creates an account iterator over the given state set.
func newDiffAccountIterator(seek common.Hash, states *stateSet, fn loadAccount) AccountIterator {
func newDiffAccountIterator(seek common.Hash, accountList []common.Hash, fn loadAccount) AccountIterator {
// Seek out the requested starting account
hashes := states.accountList()
index := sort.Search(len(hashes), func(i int) bool {
return bytes.Compare(seek[:], hashes[i][:]) <= 0
index := sort.Search(len(accountList), func(i int) bool {
return bytes.Compare(seek[:], accountList[i][:]) <= 0
})
// Assemble and returned the already seeked iterator
return &diffAccountIterator{
keys: hashes[index:],
keys: accountList[index:],
loadFn: fn,
}
}
@ -236,15 +235,14 @@ type diffStorageIterator struct {
}
// newDiffStorageIterator creates a storage iterator over a single diff layer.
func newDiffStorageIterator(account common.Hash, seek common.Hash, states *stateSet, fn loadStorage) StorageIterator {
hashes := states.storageList(account)
index := sort.Search(len(hashes), func(i int) bool {
return bytes.Compare(seek[:], hashes[i][:]) <= 0
func newDiffStorageIterator(account common.Hash, seek common.Hash, storageList []common.Hash, fn loadStorage) StorageIterator {
index := sort.Search(len(storageList), func(i int) bool {
return bytes.Compare(seek[:], storageList[i][:]) <= 0
})
// Assemble and returned the already seeked iterator
return &diffStorageIterator{
account: account,
keys: hashes[index:],
keys: storageList[index:],
loadFn: fn,
}
}

View file

@ -45,6 +45,12 @@ type binaryIterator struct {
// accounts in a slow, but easily verifiable way. Note this function is used
// for initialization, use `newBinaryAccountIterator` as the API.
func (dl *diskLayer) initBinaryAccountIterator(seek common.Hash) *binaryIterator {
// The state set in the disk layer is mutable, hold the lock before obtaining
// the account list to prevent concurrent map iteration and write.
dl.lock.RLock()
accountList := dl.buffer.states.accountList()
dl.lock.RUnlock()
// Create two iterators for state buffer and the persistent state in disk
// respectively and combine them as a binary iterator.
l := &binaryIterator{
@ -54,7 +60,7 @@ func (dl *diskLayer) initBinaryAccountIterator(seek common.Hash) *binaryIterator
// The account key list for iteration is deterministic once the iterator
// is constructed, no matter the referenced disk layer is stale or not
// later.
a: newDiffAccountIterator(seek, dl.buffer.states, nil),
a: newDiffAccountIterator(seek, accountList, nil),
b: newDiskAccountIterator(dl.db.diskdb, seek),
}
l.aDone = !l.a.Next()
@ -68,6 +74,9 @@ func (dl *diskLayer) initBinaryAccountIterator(seek common.Hash) *binaryIterator
func (dl *diffLayer) initBinaryAccountIterator(seek common.Hash) *binaryIterator {
parent, ok := dl.parent.(*diffLayer)
if !ok {
// The state set in diff layer is immutable and will never be stale,
// so the read lock protection is unnecessary.
accountList := dl.states.stateSet.accountList()
l := &binaryIterator{
// The account loader function is unnecessary; the account key list
// produced by the supplied state set alone is sufficient for iteration.
@ -75,13 +84,16 @@ func (dl *diffLayer) initBinaryAccountIterator(seek common.Hash) *binaryIterator
// The account key list for iteration is deterministic once the iterator
// is constructed, no matter the referenced disk layer is stale or not
// later.
a: newDiffAccountIterator(seek, dl.states.stateSet, nil),
a: newDiffAccountIterator(seek, accountList, nil),
b: dl.parent.(*diskLayer).initBinaryAccountIterator(seek),
}
l.aDone = !l.a.Next()
l.bDone = !l.b.Next()
return l
}
// The state set in diff layer is immutable and will never be stale,
// so the read lock protection is unnecessary.
accountList := dl.states.stateSet.accountList()
l := &binaryIterator{
// The account loader function is unnecessary; the account key list
// produced by the supplied state set alone is sufficient for iteration.
@ -89,7 +101,7 @@ func (dl *diffLayer) initBinaryAccountIterator(seek common.Hash) *binaryIterator
// The account key list for iteration is deterministic once the iterator
// is constructed, no matter the referenced disk layer is stale or not
// later.
a: newDiffAccountIterator(seek, dl.states.stateSet, nil),
a: newDiffAccountIterator(seek, accountList, nil),
b: parent.initBinaryAccountIterator(seek),
}
l.aDone = !l.a.Next()
@ -101,6 +113,12 @@ func (dl *diffLayer) initBinaryAccountIterator(seek common.Hash) *binaryIterator
// storage slots in a slow, but easily verifiable way. Note this function is used
// for initialization, use `newBinaryStorageIterator` as the API.
func (dl *diskLayer) initBinaryStorageIterator(account common.Hash, seek common.Hash) *binaryIterator {
// The state set in the disk layer is mutable, hold the lock before obtaining
// the storage list to prevent concurrent map iteration and write.
dl.lock.RLock()
storageList := dl.buffer.states.storageList(account)
dl.lock.RUnlock()
// Create two iterators for state buffer and the persistent state in disk
// respectively and combine them as a binary iterator.
l := &binaryIterator{
@ -110,7 +128,7 @@ func (dl *diskLayer) initBinaryStorageIterator(account common.Hash, seek common.
// The storage key list for iteration is deterministic once the iterator
// is constructed, no matter the referenced disk layer is stale or not
// later.
a: newDiffStorageIterator(account, seek, dl.buffer.states, nil),
a: newDiffStorageIterator(account, seek, storageList, nil),
b: newDiskStorageIterator(dl.db.diskdb, account, seek),
}
l.aDone = !l.a.Next()
@ -124,6 +142,9 @@ func (dl *diskLayer) initBinaryStorageIterator(account common.Hash, seek common.
func (dl *diffLayer) initBinaryStorageIterator(account common.Hash, seek common.Hash) *binaryIterator {
parent, ok := dl.parent.(*diffLayer)
if !ok {
// The state set in diff layer is immutable and will never be stale,
// so the read lock protection is unnecessary.
storageList := dl.states.stateSet.storageList(account)
l := &binaryIterator{
// The storage loader function is unnecessary; the storage key list
// produced by the supplied state set alone is sufficient for iteration.
@ -131,13 +152,16 @@ func (dl *diffLayer) initBinaryStorageIterator(account common.Hash, seek common.
// The storage key list for iteration is deterministic once the iterator
// is constructed, no matter the referenced disk layer is stale or not
// later.
a: newDiffStorageIterator(account, seek, dl.states.stateSet, nil),
a: newDiffStorageIterator(account, seek, storageList, nil),
b: dl.parent.(*diskLayer).initBinaryStorageIterator(account, seek),
}
l.aDone = !l.a.Next()
l.bDone = !l.b.Next()
return l
}
// The state set in diff layer is immutable and will never be stale,
// so the read lock protection is unnecessary.
storageList := dl.states.stateSet.storageList(account)
l := &binaryIterator{
// The storage loader function is unnecessary; the storage key list
// produced by the supplied state set alone is sufficient for iteration.
@ -145,7 +169,7 @@ func (dl *diffLayer) initBinaryStorageIterator(account common.Hash, seek common.
// The storage key list for iteration is deterministic once the iterator
// is constructed, no matter the referenced disk layer is stale or not
// later.
a: newDiffStorageIterator(account, seek, dl.states.stateSet, nil),
a: newDiffStorageIterator(account, seek, storageList, nil),
b: parent.initBinaryStorageIterator(account, seek),
}
l.aDone = !l.a.Next()

View file

@ -76,11 +76,17 @@ func newFastIterator(db *Database, root common.Hash, account common.Hash, seek c
if accountIterator {
switch dl := current.(type) {
case *diskLayer:
// The state set in the disk layer is mutable, hold the lock before obtaining
// the account list to prevent concurrent map iteration and write.
dl.lock.RLock()
accountList := dl.buffer.states.accountList()
dl.lock.RUnlock()
fi.iterators = append(fi.iterators, &weightedIterator{
// The state set in the disk layer is mutable, and the entire state becomes stale
// if a diff layer above is merged into it. Therefore, staleness must be checked,
// and the storage slot should be retrieved with read lock protection.
it: newDiffAccountIterator(seek, dl.buffer.states, func(hash common.Hash) ([]byte, error) {
it: newDiffAccountIterator(seek, accountList, func(hash common.Hash) ([]byte, error) {
dl.lock.RLock()
defer dl.lock.RUnlock()
@ -98,19 +104,26 @@ func newFastIterator(db *Database, root common.Hash, account common.Hash, seek c
case *diffLayer:
// The state set in diff layer is immutable and will never be stale,
// so the read lock protection is unnecessary.
accountList := dl.states.accountList()
fi.iterators = append(fi.iterators, &weightedIterator{
it: newDiffAccountIterator(seek, dl.states.stateSet, dl.states.mustAccount),
it: newDiffAccountIterator(seek, accountList, dl.states.mustAccount),
priority: depth,
})
}
} else {
switch dl := current.(type) {
case *diskLayer:
// The state set in the disk layer is mutable, hold the lock before obtaining
// the storage list to prevent concurrent map iteration and write.
dl.lock.RLock()
storageList := dl.buffer.states.storageList(account)
dl.lock.RUnlock()
fi.iterators = append(fi.iterators, &weightedIterator{
// The state set in the disk layer is mutable, and the entire state becomes stale
// if a diff layer above is merged into it. Therefore, staleness must be checked,
// and the storage slot should be retrieved with read lock protection.
it: newDiffStorageIterator(account, seek, dl.buffer.states, func(addrHash common.Hash, slotHash common.Hash) ([]byte, error) {
it: newDiffStorageIterator(account, seek, storageList, func(addrHash common.Hash, slotHash common.Hash) ([]byte, error) {
dl.lock.RLock()
defer dl.lock.RUnlock()
@ -126,10 +139,14 @@ func newFastIterator(db *Database, root common.Hash, account common.Hash, seek c
priority: depth + 1,
})
case *diffLayer:
// The state set in diff layer is immutable and will never be stale,
// so the read lock protection is unnecessary.
storageList := dl.states.storageList(account)
// The state set in diff layer is immutable and will never be stale,
// so the read lock protection is unnecessary.
fi.iterators = append(fi.iterators, &weightedIterator{
it: newDiffStorageIterator(account, seek, dl.states.stateSet, dl.states.mustStorage),
it: newDiffStorageIterator(account, seek, storageList, dl.states.mustStorage),
priority: depth,
})
}

View file

@ -132,18 +132,15 @@ func TestAccountIteratorBasics(t *testing.T) {
}
}
states := newStates(accounts, storage, false)
it := newDiffAccountIterator(common.Hash{}, states, nil)
it := newDiffAccountIterator(common.Hash{}, states.accountList(), nil)
verifyIterator(t, 100, it, verifyNothing) // Nil is allowed for single layer iterator
// TODO reenable these tests once the persistent state iteration
// is implemented.
//db := rawdb.NewMemoryDatabase()
//batch := db.NewBatch()
//states.write(db, batch, nil, nil)
//batch.Write()
//it = newDiskAccountIterator(db, common.Hash{})
//verifyIterator(t, 100, it, verifyNothing) // Nil is allowed for single layer iterator
db := rawdb.NewMemoryDatabase()
batch := db.NewBatch()
states.write(batch, nil, nil)
batch.Write()
it = newDiskAccountIterator(db, common.Hash{})
verifyIterator(t, 100, it, verifyNothing) // Nil is allowed for single layer iterator
}
// TestStorageIteratorBasics tests some simple single-layer(diff and disk) iteration for storage
@ -173,21 +170,18 @@ func TestStorageIteratorBasics(t *testing.T) {
}
states := newStates(accounts, storage, false)
for account := range accounts {
it := newDiffStorageIterator(account, common.Hash{}, states, nil)
it := newDiffStorageIterator(account, common.Hash{}, states.storageList(account), nil)
verifyIterator(t, 100, it, verifyNothing) // Nil is allowed for single layer iterator
}
// TODO reenable these tests once the persistent state iteration
// is implemented.
//db := rawdb.NewMemoryDatabase()
//batch := db.NewBatch()
//states.write(db, batch, nil, nil)
//batch.Write()
//for account := range accounts {
// it := newDiskStorageIterator(db, account, common.Hash{})
// verifyIterator(t, 100-nilStorage[account], it, verifyNothing) // Nil is allowed for single layer iterator
//}
db := rawdb.NewMemoryDatabase()
batch := db.NewBatch()
states.write(batch, nil, nil)
batch.Write()
for account := range accounts {
it := newDiskStorageIterator(db, account, common.Hash{})
verifyIterator(t, 100-nilStorage[account], it, verifyNothing) // Nil is allowed for single layer iterator
}
}
type testIterator struct {
@ -263,7 +257,7 @@ func TestAccountIteratorTraversal(t *testing.T) {
WriteBufferSize: 0,
}
db := New(rawdb.NewMemoryDatabase(), config, false)
// db.WaitGeneration()
db.waitGeneration()
// Stack three diff layers on top with various overlaps
db.Update(common.HexToHash("0x02"), types.EmptyRootHash, 0, trienode.NewMergedNodeSet(),
@ -279,7 +273,7 @@ func TestAccountIteratorTraversal(t *testing.T) {
head := db.tree.get(common.HexToHash("0x04"))
// singleLayer: 0xcc, 0xf0, 0xff
it := newDiffAccountIterator(common.Hash{}, head.(*diffLayer).states.stateSet, nil)
it := newDiffAccountIterator(common.Hash{}, head.(*diffLayer).states.stateSet.accountList(), nil)
verifyIterator(t, 3, it, verifyNothing)
// binaryIterator: 0xaa, 0xbb, 0xcc, 0xdd, 0xee, 0xf0, 0xff
@ -290,19 +284,16 @@ func TestAccountIteratorTraversal(t *testing.T) {
verifyIterator(t, 7, it, verifyAccount)
it.Release()
// TODO reenable these tests once the persistent state iteration
// is implemented.
// Test after persist some bottom-most layers into the disk,
// the functionalities still work.
//db.tree.cap(common.HexToHash("0x04"), 2)
db.tree.cap(common.HexToHash("0x04"), 2)
//head = db.tree.get(common.HexToHash("0x04"))
//verifyIterator(t, 7, head.(*diffLayer).newBinaryAccountIterator(), verifyAccount)
//
//it, _ = db.AccountIterator(common.HexToHash("0x04"), common.Hash{})
//verifyIterator(t, 7, it, verifyAccount)
//it.Release()
head = db.tree.get(common.HexToHash("0x04"))
verifyIterator(t, 7, head.(*diffLayer).newBinaryAccountIterator(common.Hash{}), verifyAccount)
it, _ = db.AccountIterator(common.HexToHash("0x04"), common.Hash{})
verifyIterator(t, 7, it, verifyAccount)
it.Release()
}
func TestStorageIteratorTraversal(t *testing.T) {
@ -310,7 +301,7 @@ func TestStorageIteratorTraversal(t *testing.T) {
WriteBufferSize: 0,
}
db := New(rawdb.NewMemoryDatabase(), config, false)
// db.WaitGeneration()
db.waitGeneration()
// Stack three diff layers on top with various overlaps
db.Update(common.HexToHash("0x02"), types.EmptyRootHash, 0, trienode.NewMergedNodeSet(),
@ -326,7 +317,7 @@ func TestStorageIteratorTraversal(t *testing.T) {
head := db.tree.get(common.HexToHash("0x04"))
// singleLayer: 0x1, 0x2, 0x3
diffIter := newDiffStorageIterator(common.HexToHash("0xaa"), common.Hash{}, head.(*diffLayer).states.stateSet, nil)
diffIter := newDiffStorageIterator(common.HexToHash("0xaa"), common.Hash{}, head.(*diffLayer).states.stateSet.storageList(common.HexToHash("0xaa")), nil)
verifyIterator(t, 3, diffIter, verifyNothing)
// binaryIterator: 0x1, 0x2, 0x3, 0x4, 0x5, 0x6
@ -337,17 +328,14 @@ func TestStorageIteratorTraversal(t *testing.T) {
verifyIterator(t, 6, it, verifyStorage)
it.Release()
// TODO reenable these tests once the persistent state iteration
// is implemented.
// Test after persist some bottom-most layers into the disk,
// the functionalities still work.
//db.tree.cap(common.HexToHash("0x04"), 2)
//verifyIterator(t, 6, head.(*diffLayer).newBinaryStorageIterator(common.HexToHash("0xaa")), verifyStorage)
//
//it, _ = db.StorageIterator(common.HexToHash("0x04"), common.HexToHash("0xaa"), common.Hash{})
//verifyIterator(t, 6, it, verifyStorage)
//it.Release()
db.tree.cap(common.HexToHash("0x04"), 2)
verifyIterator(t, 6, head.(*diffLayer).newBinaryStorageIterator(common.HexToHash("0xaa"), common.Hash{}), verifyStorage)
it, _ = db.StorageIterator(common.HexToHash("0x04"), common.HexToHash("0xaa"), common.Hash{})
verifyIterator(t, 6, it, verifyStorage)
it.Release()
}
// TestAccountIteratorTraversalValues tests some multi-layer iteration, where we
@ -357,7 +345,7 @@ func TestAccountIteratorTraversalValues(t *testing.T) {
WriteBufferSize: 0,
}
db := New(rawdb.NewMemoryDatabase(), config, false)
// db.WaitGeneration()
db.waitGeneration()
// Create a batch of account sets to seed subsequent layers with
var (
@ -434,26 +422,38 @@ func TestAccountIteratorTraversalValues(t *testing.T) {
}
it.Release()
// TODO reenable these tests once the persistent state iteration
// is implemented.
// Test after persist some bottom-most layers into the disk,
// the functionalities still work.
//db.tree.cap(common.HexToHash("0x09"), 2)
//
//it, _ = db.AccountIterator(common.HexToHash("0x09"), common.Hash{})
//for it.Next() {
// hash := it.Hash()
// account, err := head.Account(hash)
// if err != nil {
// t.Fatalf("failed to retrieve expected account: %v", err)
// }
// want, _ := rlp.EncodeToBytes(account)
// if have := it.Account(); !bytes.Equal(want, have) {
// t.Fatalf("hash %x: account mismatch: have %x, want %x", hash, have, want)
// }
//}
//it.Release()
db.tree.cap(common.HexToHash("0x09"), 2)
// binaryIterator
head = db.tree.get(common.HexToHash("0x09"))
it = head.(*diffLayer).newBinaryAccountIterator(common.Hash{})
for it.Next() {
hash := it.Hash()
want, err := r.(*reader).AccountRLP(hash)
if err != nil {
t.Fatalf("failed to retrieve expected account: %v", err)
}
if have := it.Account(); !bytes.Equal(want, have) {
t.Fatalf("hash %x: account mismatch: have %x, want %x", hash, have, want)
}
}
it.Release()
// fastIterator
it, _ = db.AccountIterator(common.HexToHash("0x09"), common.Hash{})
for it.Next() {
hash := it.Hash()
want, err := r.(*reader).AccountRLP(hash)
if err != nil {
t.Fatalf("failed to retrieve expected account: %v", err)
}
if have := it.Account(); !bytes.Equal(want, have) {
t.Fatalf("hash %x: account mismatch: have %x, want %x", hash, have, want)
}
}
it.Release()
}
func TestStorageIteratorTraversalValues(t *testing.T) {
@ -461,7 +461,7 @@ func TestStorageIteratorTraversalValues(t *testing.T) {
WriteBufferSize: 0,
}
db := New(rawdb.NewMemoryDatabase(), config, false)
// db.WaitGeneration()
db.waitGeneration()
wrapStorage := func(storage map[common.Hash][]byte) map[common.Hash]map[common.Hash][]byte {
return map[common.Hash]map[common.Hash][]byte{
@ -543,25 +543,38 @@ func TestStorageIteratorTraversalValues(t *testing.T) {
}
it.Release()
// TODO reenable these tests once the persistent state iteration
// is implemented.
// Test after persist some bottom-most layers into the disk,
// the functionalities still work.
//db.tree.cap(common.HexToHash("0x09"), 2)
//
//it, _ = db.StorageIterator(common.HexToHash("0x09"), common.HexToHash("0xaa"), common.Hash{})
//for it.Next() {
// hash := it.Hash()
// want, err := head.Storage(common.HexToHash("0xaa"), hash)
// if err != nil {
// t.Fatalf("failed to retrieve expected slot: %v", err)
// }
// if have := it.Slot(); !bytes.Equal(want, have) {
// t.Fatalf("hash %x: slot mismatch: have %x, want %x", hash, have, want)
// }
//}
//it.Release()
db.tree.cap(common.HexToHash("0x09"), 2)
// binaryIterator
head = db.tree.get(common.HexToHash("0x09"))
it = head.(*diffLayer).newBinaryStorageIterator(common.HexToHash("0xaa"), common.Hash{})
for it.Next() {
hash := it.Hash()
want, err := r.Storage(common.HexToHash("0xaa"), hash)
if err != nil {
t.Fatalf("failed to retrieve expected account: %v", err)
}
if have := it.Slot(); !bytes.Equal(want, have) {
t.Fatalf("hash %x: account mismatch: have %x, want %x", hash, have, want)
}
}
it.Release()
// fastIterator
it, _ = db.StorageIterator(common.HexToHash("0x09"), common.HexToHash("0xaa"), common.Hash{})
for it.Next() {
hash := it.Hash()
want, err := r.Storage(common.HexToHash("0xaa"), hash)
if err != nil {
t.Fatalf("failed to retrieve expected storage slot: %v", err)
}
if have := it.Slot(); !bytes.Equal(want, have) {
t.Fatalf("hash %x: slot mismatch: have %x, want %x", hash, have, want)
}
}
it.Release()
}
// This testcase is notorious, all layers contain the exact same 200 accounts.
@ -581,7 +594,8 @@ func TestAccountIteratorLargeTraversal(t *testing.T) {
WriteBufferSize: 0,
}
db := New(rawdb.NewMemoryDatabase(), config, false)
// db.WaitGeneration()
db.waitGeneration()
for i := 1; i < 128; i++ {
parent := types.EmptyRootHash
if i == 1 {
@ -592,25 +606,22 @@ func TestAccountIteratorLargeTraversal(t *testing.T) {
}
// Iterate the entire stack and ensure everything is hit only once
head := db.tree.get(common.HexToHash("0x80"))
verifyIterator(t, 200, newDiffAccountIterator(common.Hash{}, head.(*diffLayer).states.stateSet, nil), verifyNothing)
verifyIterator(t, 200, newDiffAccountIterator(common.Hash{}, head.(*diffLayer).states.stateSet.accountList(), nil), verifyNothing)
verifyIterator(t, 200, head.(*diffLayer).newBinaryAccountIterator(common.Hash{}), verifyAccount)
it, _ := db.AccountIterator(common.HexToHash("0x80"), common.Hash{})
verifyIterator(t, 200, it, verifyAccount)
it.Release()
// TODO reenable these tests once the persistent state iteration
// is implemented.
// Test after persist some bottom-most layers into the disk,
// the functionalities still work.
//db.tree.cap(common.HexToHash("0x80"), 2)
//
//verifyIterator(t, 200, head.(*diffLayer).newBinaryAccountIterator(), verifyAccount)
//
//it, _ = db.AccountIterator(common.HexToHash("0x80"), common.Hash{})
//verifyIterator(t, 200, it, verifyAccount)
//it.Release()
db.tree.cap(common.HexToHash("0x80"), 2)
verifyIterator(t, 200, head.(*diffLayer).newBinaryAccountIterator(common.Hash{}), verifyAccount)
it, _ = db.AccountIterator(common.HexToHash("0x80"), common.Hash{})
verifyIterator(t, 200, it, verifyAccount)
it.Release()
}
// TestAccountIteratorFlattening tests what happens when we
@ -622,7 +633,7 @@ func TestAccountIteratorFlattening(t *testing.T) {
WriteBufferSize: 10 * 1024,
}
db := New(rawdb.NewMemoryDatabase(), config, false)
// db.WaitGeneration()
db.waitGeneration()
// Create a stack of diffs on top
db.Update(common.HexToHash("0x02"), types.EmptyRootHash, 1, trienode.NewMergedNodeSet(),
@ -655,7 +666,7 @@ func TestAccountIteratorSeek(t *testing.T) {
WriteBufferSize: 0,
}
db := New(rawdb.NewMemoryDatabase(), config, false)
// db.WaitGeneration()
db.waitGeneration()
db.Update(common.HexToHash("0x02"), types.EmptyRootHash, 1, trienode.NewMergedNodeSet(),
NewStateSetWithOrigin(randomAccountSet("0xaa", "0xee", "0xff", "0xf0"), nil, nil, nil, false))
@ -727,7 +738,7 @@ func testStorageIteratorSeek(t *testing.T, newIterator func(db *Database, root,
WriteBufferSize: 0,
}
db := New(rawdb.NewMemoryDatabase(), config, false)
// db.WaitGeneration()
db.waitGeneration()
// Stack three diff layers on top with various overlaps
db.Update(common.HexToHash("0x02"), types.EmptyRootHash, 1, trienode.NewMergedNodeSet(),
@ -799,7 +810,7 @@ func testAccountIteratorDeletions(t *testing.T, newIterator func(db *Database, r
WriteBufferSize: 0,
}
db := New(rawdb.NewMemoryDatabase(), config, false)
// db.WaitGeneration()
db.waitGeneration()
// Stack three diff layers on top with various overlaps
db.Update(common.HexToHash("0x02"), types.EmptyRootHash, 1, trienode.NewMergedNodeSet(),
@ -839,7 +850,7 @@ func TestStorageIteratorDeletions(t *testing.T) {
WriteBufferSize: 0,
}
db := New(rawdb.NewMemoryDatabase(), config, false)
// db.WaitGeneration()
db.waitGeneration()
// Stack three diff layers on top with various overlaps
db.Update(common.HexToHash("0x02"), types.EmptyRootHash, 1, trienode.NewMergedNodeSet(),
@ -907,7 +918,7 @@ func testStaleIterator(t *testing.T, newIter func(db *Database, hash common.Hash
WriteBufferSize: 16 * 1024 * 1024,
}
db := New(rawdb.NewMemoryDatabase(), config, false)
// db.WaitGeneration()
db.waitGeneration()
// [02 (disk), 03]
db.Update(common.HexToHash("0x02"), types.EmptyRootHash, 1, trienode.NewMergedNodeSet(),
@ -962,7 +973,7 @@ func BenchmarkAccountIteratorTraversal(b *testing.B) {
WriteBufferSize: 0,
}
db := New(rawdb.NewMemoryDatabase(), config, false)
// db.WaitGeneration()
db.waitGeneration()
for i := 1; i <= 100; i++ {
parent := types.EmptyRootHash
@ -1057,7 +1068,7 @@ func BenchmarkAccountIteratorLargeBaselayer(b *testing.B) {
WriteBufferSize: 0,
}
db := New(rawdb.NewMemoryDatabase(), config, false)
// db.WaitGeneration()
db.waitGeneration()
db.Update(common.HexToHash("0x02"), types.EmptyRootHash, 1, trienode.NewMergedNodeSet(), NewStateSetWithOrigin(makeAccounts(2000), nil, nil, nil, false))
for i := 2; i <= 100; i++ {

View file

@ -26,6 +26,7 @@ import (
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/rawdb"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/ethdb"
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/rlp"
)
@ -90,6 +91,56 @@ func (db *Database) loadJournal(diskRoot common.Hash) (layer, error) {
return head, nil
}
// journalGenerator is a disk layer entry containing the generator progress marker.
type journalGenerator struct {
// Indicator that whether the database was in progress of being wiped.
// It's deprecated but keep it here for backward compatibility.
Wiping bool
Done bool // Whether the generator finished creating the snapshot
Marker []byte
Accounts uint64
Slots uint64
Storage uint64
}
// loadGenerator loads the state generation progress marker from the database.
func loadGenerator(db ethdb.KeyValueReader, hash nodeHasher) (*journalGenerator, common.Hash, error) {
trieRoot, err := hash(rawdb.ReadAccountTrieNode(db, nil))
if err != nil {
return nil, common.Hash{}, err
}
// State generation progress marker is lost, rebuild it
blob := rawdb.ReadSnapshotGenerator(db)
if len(blob) == 0 {
log.Info("State snapshot generator is not found")
return nil, trieRoot, nil
}
// State generation progress marker is not compatible, rebuild it
var generator journalGenerator
if err := rlp.DecodeBytes(blob, &generator); err != nil {
log.Info("State snapshot generator is not compatible")
return nil, trieRoot, nil
}
// The state snapshot is inconsistent with the trie data and must
// be rebuilt.
//
// Note: The SnapshotRoot and SnapshotGenerator are always consistent
// with each other, both in the legacy state snapshot and the path database.
// Therefore, if the SnapshotRoot does not match the trie root,
// the entire generator is considered stale and must be discarded.
stateRoot := rawdb.ReadSnapshotRoot(db)
if trieRoot != stateRoot {
log.Info("State snapshot is not consistent", "trie", trieRoot, "state", stateRoot)
return nil, trieRoot, nil
}
// Slice null-ness is lost after rlp decoding, reset it back to empty
if !generator.Done && generator.Marker == nil {
generator.Marker = []byte{}
}
return &generator, trieRoot, nil
}
// loadLayers loads a pre-existing state layer backed by a key-value store.
func (db *Database) loadLayers() layer {
// Retrieve the root node of persistent state.
@ -109,7 +160,7 @@ func (db *Database) loadLayers() layer {
log.Info("Failed to load journal, discard it", "err", err)
}
// Return single layer with persistent state.
return newDiskLayer(root, rawdb.ReadPersistentStateID(db.diskdb), db, nil, newBuffer(db.config.WriteBufferSize, nil, nil, 0))
return newDiskLayer(root, rawdb.ReadPersistentStateID(db.diskdb), db, nil, nil, newBuffer(db.config.WriteBufferSize, nil, nil, 0))
}
// loadDiskLayer reads the binary blob from the layer journal, reconstructing
@ -141,7 +192,7 @@ func (db *Database) loadDiskLayer(r *rlp.Stream) (layer, error) {
if err := states.decode(r); err != nil {
return nil, err
}
return newDiskLayer(root, id, db, nil, newBuffer(db.config.WriteBufferSize, &nodes, &states, id-stored)), nil
return newDiskLayer(root, id, db, nil, nil, newBuffer(db.config.WriteBufferSize, &nodes, &states, id-stored)), nil
}
// loadDiffLayer reads the next sections of a layer journal, reconstructing a new
@ -250,6 +301,10 @@ func (db *Database) Journal(root common.Hash) error {
} else { // disk layer only on noop runs (likely) or deep reorgs (unlikely)
log.Info("Persisting dirty state to disk", "root", root, "layers", disk.buffer.layers)
}
// Terminate the background state generation if it's active
if disk.generator != nil {
disk.generator.stop()
}
start := time.Now()
// Run the journaling

View file

@ -24,16 +24,26 @@ var (
cleanNodeReadMeter = metrics.NewRegisteredMeter("pathdb/clean/node/read", nil)
cleanNodeWriteMeter = metrics.NewRegisteredMeter("pathdb/clean/node/write", nil)
cleanStateHitMeter = metrics.NewRegisteredMeter("pathdb/clean/state/hit", nil)
cleanStateMissMeter = metrics.NewRegisteredMeter("pathdb/clean/state/miss", nil)
cleanStateReadMeter = metrics.NewRegisteredMeter("pathdb/clean/state/read", nil)
cleanStateWriteMeter = metrics.NewRegisteredMeter("pathdb/clean/state/write", nil)
dirtyNodeHitMeter = metrics.NewRegisteredMeter("pathdb/dirty/node/hit", nil)
dirtyNodeMissMeter = metrics.NewRegisteredMeter("pathdb/dirty/node/miss", nil)
dirtyNodeReadMeter = metrics.NewRegisteredMeter("pathdb/dirty/node/read", nil)
dirtyNodeWriteMeter = metrics.NewRegisteredMeter("pathdb/dirty/node/write", nil)
dirtyNodeHitDepthHist = metrics.NewRegisteredHistogram("pathdb/dirty/node/depth", nil, metrics.NewExpDecaySample(1028, 0.015))
stateAccountInexMeter = metrics.NewRegisteredMeter("pathdb/state/account/inex/total", nil)
stateStorageInexMeter = metrics.NewRegisteredMeter("pathdb/state/storage/inex/total", nil)
stateAccountExistMeter = metrics.NewRegisteredMeter("pathdb/state/account/exist/total", nil)
stateStorageExistMeter = metrics.NewRegisteredMeter("pathdb/state/storage/exist/total", nil)
stateAccountInexMeter = metrics.NewRegisteredMeter("pathdb/state/account/inex/total", nil)
stateStorageInexMeter = metrics.NewRegisteredMeter("pathdb/state/storage/inex/total", nil)
stateAccountInexDiskMeter = metrics.NewRegisteredMeter("pathdb/state/account/inex/disk", nil)
stateStorageInexDiskMeter = metrics.NewRegisteredMeter("pathdb/state/storage/inex/disk", nil)
stateAccountExistMeter = metrics.NewRegisteredMeter("pathdb/state/account/exist/total", nil)
stateStorageExistMeter = metrics.NewRegisteredMeter("pathdb/state/storage/exist/total", nil)
stateAccountExistDiskMeter = metrics.NewRegisteredMeter("pathdb/state/account/exist/disk", nil)
stateStorageExistDiskMeter = metrics.NewRegisteredMeter("pathdb/state/storage/exist/disk", nil)
dirtyStateHitMeter = metrics.NewRegisteredMeter("pathdb/dirty/state/hit", nil)
dirtyStateMissMeter = metrics.NewRegisteredMeter("pathdb/dirty/state/miss", nil)
@ -46,9 +56,11 @@ var (
nodeDiskFalseMeter = metrics.NewRegisteredMeter("pathdb/disk/false", nil)
nodeDiffFalseMeter = metrics.NewRegisteredMeter("pathdb/diff/false", nil)
commitTimeTimer = metrics.NewRegisteredResettingTimer("pathdb/commit/time", nil)
commitNodesMeter = metrics.NewRegisteredMeter("pathdb/commit/nodes", nil)
commitBytesMeter = metrics.NewRegisteredMeter("pathdb/commit/bytes", nil)
commitTimeTimer = metrics.NewRegisteredResettingTimer("pathdb/commit/time", nil)
commitNodesMeter = metrics.NewRegisteredMeter("pathdb/commit/nodes", nil)
commitAccountsMeter = metrics.NewRegisteredMeter("pathdb/commit/accounts", nil)
commitStoragesMeter = metrics.NewRegisteredMeter("pathdb/commit/slots", nil)
commitBytesMeter = metrics.NewRegisteredMeter("pathdb/commit/bytes", nil)
gcTrieNodeMeter = metrics.NewRegisteredMeter("pathdb/gc/node/count", nil)
gcTrieNodeBytesMeter = metrics.NewRegisteredMeter("pathdb/gc/node/bytes", nil)
@ -61,3 +73,28 @@ var (
historyDataBytesMeter = metrics.NewRegisteredMeter("pathdb/history/bytes/data", nil)
historyIndexBytesMeter = metrics.NewRegisteredMeter("pathdb/history/bytes/index", nil)
)
// Metrics in generation
var (
generatedAccountMeter = metrics.NewRegisteredMeter("pathdb/generation/account/generated", nil)
recoveredAccountMeter = metrics.NewRegisteredMeter("pathdb/generation/account/recovered", nil)
wipedAccountMeter = metrics.NewRegisteredMeter("pathdb/generation/account/wiped", nil)
missallAccountMeter = metrics.NewRegisteredMeter("pathdb/generation/account/missall", nil)
generatedStorageMeter = metrics.NewRegisteredMeter("pathdb/generation/storage/generated", nil)
recoveredStorageMeter = metrics.NewRegisteredMeter("pathdb/generation/storage/recovered", nil)
wipedStorageMeter = metrics.NewRegisteredMeter("pathdb/generation/storage/wiped", nil)
missallStorageMeter = metrics.NewRegisteredMeter("pathdb/generation/storage/missall", nil)
danglingStorageMeter = metrics.NewRegisteredMeter("pathdb/generation/storage/dangling", nil)
successfulRangeProofMeter = metrics.NewRegisteredMeter("pathdb/generation/proof/success", nil)
failedRangeProofMeter = metrics.NewRegisteredMeter("pathdb/generation/proof/failure", nil)
accountProveCounter = metrics.NewRegisteredCounter("pathdb/generation/duration/account/prove", nil)
accountTrieReadCounter = metrics.NewRegisteredCounter("pathdb/generation/duration/account/trieread", nil)
accountSnapReadCounter = metrics.NewRegisteredCounter("pathdb/generation/duration/account/snapread", nil)
accountWriteCounter = metrics.NewRegisteredCounter("pathdb/generation/duration/account/write", nil)
storageProveCounter = metrics.NewRegisteredCounter("pathdb/generation/duration/storage/prove", nil)
storageTrieReadCounter = metrics.NewRegisteredCounter("pathdb/generation/duration/storage/trieread", nil)
storageSnapReadCounter = metrics.NewRegisteredCounter("pathdb/generation/duration/storage/snapread", nil)
storageWriteCounter = metrics.NewRegisteredCounter("pathdb/generation/duration/storage/write", nil)
storageCleanCounter = metrics.NewRegisteredCounter("state/snapshot/generation/duration/storage/clean", nil)
)

View file

@ -23,8 +23,10 @@ import (
"slices"
"sync"
"github.com/VictoriaMetrics/fastcache"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/rawdb"
"github.com/ethereum/go-ethereum/ethdb"
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/metrics"
"github.com/ethereum/go-ethereum/rlp"
@ -416,6 +418,11 @@ func (s *stateSet) decode(r *rlp.Stream) error {
return nil
}
// write flushes state mutations into the provided database batch as a whole.
func (s *stateSet) write(batch ethdb.Batch, genMarker []byte, clean *fastcache.Cache) (int, int) {
return writeStates(batch, genMarker, s.accountData, s.storageData, clean)
}
// reset clears all cached state data, including any optional sorted lists that
// may have been generated.
func (s *stateSet) reset() {
@ -427,8 +434,6 @@ func (s *stateSet) reset() {
}
// dbsize returns the approximate size for db write.
//
// nolint:unused
func (s *stateSet) dbsize() int {
m := len(s.accountData) * len(rawdb.SnapshotAccountPrefix)
for _, slots := range s.storageData {