triedb/pathdb: thread flatStateCodec through internals

Route the flatStateCodec from Database through every flat-state call
site so that the trie-specific aspects of persistence and key derivation
live behind a single abstraction. Pure refactor: merkle behavior and
on-disk layout are unchanged because the only codec wired up is
merkleFlatCodec, whose methods are thin wrappers over the existing
rawdb accessors.

Threaded sites:

  disklayer.account/storage    use codec.{Read,AccountCacheKey,
                                StorageCacheKey} instead of direct
                                rawdb calls and bare hash slicing.
  flush.writeStates            takes a codec parameter; persistence
                                goes through codec.{Write,Delete}
                                {Account,Storage}.
  buffer.flush                 carries the codec down into writeStates.
  states.write/dbsize          takes the codec for prefix-size
                                accounting.
  generate.go (g.codec)        the generator owns a codec, used by
                                generateAccounts/generateStorages
                                callbacks; the unused top-level
                                splitMarker helper is removed in favor
                                of codec.SplitMarker.
  context.go                   the generator context owns the codec
                                and uses codec.{AccountPrefix,
                                StoragePrefix,Account/StorageKeyLength}
                                to construct iterators.
  reader.go (HistoricalState)  uses codec.{Account,Storage}Key for
                                caller-side key derivation.

The marker comparisons in writeStates remain merkle-shaped (two-tier
account+storage marker) because the bintrie path will use a separate
writer over single-tier stem markers in a later commit.

All existing pathdb tests pass.
This commit is contained in:
CPerezz 2026-04-07 15:48:28 +02:00
parent eaf5523a5a
commit f1d7143afa
No known key found for this signature in database
GPG key ID: 62045F34B97177DD
9 changed files with 78 additions and 63 deletions

View file

@ -132,7 +132,10 @@ func (b *buffer) size() uint64 {
// flush persists the in-memory dirty trie node into the disk if the configured
// memory threshold is reached. Note, all data must be written atomically.
func (b *buffer) flush(root common.Hash, db ethdb.KeyValueStore, freezers []ethdb.AncientWriter, progress []byte, nodesCache, statesCache *fastcache.Cache, id uint64, postFlush func()) {
//
// codec is the flat-state codec used for state persistence and cache key
// derivation. It is supplied by the disk layer's owning Database.
func (b *buffer) flush(root common.Hash, db ethdb.KeyValueStore, codec flatStateCodec, freezers []ethdb.AncientWriter, progress []byte, nodesCache, statesCache *fastcache.Cache, id uint64, postFlush func()) {
if b.done != nil {
panic("duplicated flush operation")
}
@ -158,7 +161,7 @@ func (b *buffer) flush(root common.Hash, db ethdb.KeyValueStore, freezers []ethd
// Terminate the state snapshot generation if it's active
var (
start = time.Now()
batch = db.NewBatchWithSize((b.nodes.dbsize() + b.states.dbsize()) * 11 / 10) // extra 10% for potential pebble internal stuff
batch = db.NewBatchWithSize((b.nodes.dbsize() + b.states.dbsize(codec)) * 11 / 10) // extra 10% for potential pebble internal stuff
)
// Explicitly sync the state freezer to ensure all written data is persisted to disk
// before updating the key-value store.
@ -170,7 +173,7 @@ func (b *buffer) flush(root common.Hash, db ethdb.KeyValueStore, freezers []ethd
return
}
nodes := b.nodes.write(batch, nodesCache)
accounts, slots := b.states.write(batch, progress, statesCache)
accounts, slots := b.states.write(batch, codec, progress, statesCache)
rawdb.WritePersistentStateID(batch, id)
rawdb.WriteSnapshotRoot(batch, root)

View file

@ -95,19 +95,21 @@ type generatorContext struct {
account *holdableIterator // Iterator of account snapshot data
storage *holdableIterator // Iterator of storage snapshot data
db ethdb.KeyValueStore // Key-value store containing the snapshot data
codec flatStateCodec // Flat-state codec for prefix/key-length selection
batch ethdb.Batch // Database batch for writing data atomically
logged time.Time // The timestamp when last generation progress was displayed
}
// newGeneratorContext initializes the context for generation.
func newGeneratorContext(root common.Hash, marker []byte, db ethdb.KeyValueStore) *generatorContext {
func newGeneratorContext(root common.Hash, marker []byte, db ethdb.KeyValueStore, codec flatStateCodec) *generatorContext {
ctx := &generatorContext{
root: root,
db: db,
codec: codec,
batch: db.NewBatch(),
logged: time.Now(),
}
accMarker, storageMarker := splitMarker(marker)
accMarker, storageMarker := codec.SplitMarker(marker)
ctx.openIterator(snapAccount, accMarker)
ctx.openIterator(snapStorage, storageMarker)
return ctx
@ -118,12 +120,12 @@ func newGeneratorContext(root common.Hash, marker []byte, db ethdb.KeyValueStore
// to time to avoid blocking leveldb compaction for a long time.
func (ctx *generatorContext) openIterator(kind string, start []byte) {
if kind == snapAccount {
iter := ctx.db.NewIterator(rawdb.SnapshotAccountPrefix, start)
ctx.account = newHoldableIterator(rawdb.NewKeyLengthIterator(iter, 1+common.HashLength))
iter := ctx.db.NewIterator(ctx.codec.AccountPrefix(), start)
ctx.account = newHoldableIterator(rawdb.NewKeyLengthIterator(iter, ctx.codec.AccountKeyLength()))
return
}
iter := ctx.db.NewIterator(rawdb.SnapshotStoragePrefix, start)
ctx.storage = newHoldableIterator(rawdb.NewKeyLengthIterator(iter, 1+2*common.HashLength))
iter := ctx.db.NewIterator(ctx.codec.StoragePrefix(), start)
ctx.storage = newHoldableIterator(rawdb.NewKeyLengthIterator(iter, ctx.codec.StorageKeyLength()))
}
// reopenIterator releases the specified snapshot iterator and re-open it

View file

@ -276,7 +276,7 @@ func (db *Database) setStateGenerator() error {
// Construct the generator and link it to the disk layer, ensuring that the
// generation progress is resolved to prevent accessing uncovered states
// regardless of whether background state snapshot generation is allowed.
dl.setGenerator(newGenerator(db.diskdb, noBuild, generator.Marker, stats))
dl.setGenerator(newGenerator(db.diskdb, db.flatCodec, noBuild, generator.Marker, stats))
// Short circuit if the background generation is not permitted
if noBuild || db.waitSync {

View file

@ -17,7 +17,6 @@
package pathdb
import (
"bytes"
"fmt"
"sync"
"time"
@ -199,13 +198,15 @@ func (dl *diskLayer) account(hash common.Hash, depth int) ([]byte, error) {
// If the layer is being generated, ensure the requested account has
// already been covered by the generator.
codec := dl.db.flatCodec
marker := dl.genMarker()
if marker != nil && bytes.Compare(hash.Bytes(), marker) > 0 {
if marker != nil && codec.MarkerCompare(hash.Bytes(), marker) > 0 {
return nil, errNotCoveredYet
}
// Try to retrieve the account from the memory cache
cacheKey := codec.AccountCacheKey(hash)
if dl.states != nil {
if blob, found := dl.states.HasGet(nil, hash[:]); found {
if blob, found := dl.states.HasGet(nil, cacheKey); found {
cleanStateHitMeter.Mark(1)
cleanStateReadMeter.Mark(int64(len(blob)))
@ -219,7 +220,7 @@ func (dl *diskLayer) account(hash common.Hash, depth int) ([]byte, error) {
cleanStateMissMeter.Mark(1)
}
// Try to retrieve the account from the disk.
blob := rawdb.ReadAccountSnapshot(dl.db.diskdb, hash)
blob := codec.ReadAccount(dl.db.diskdb, hash)
// Store the resolved data in the clean cache. The background buffer flusher
// may also write to the clean cache concurrently, but two writers cannot
@ -227,7 +228,7 @@ func (dl *diskLayer) account(hash common.Hash, depth int) ([]byte, error) {
// it will be found in the frozen buffer, eliminating the need to check the
// database.
if dl.states != nil {
dl.states.Set(hash[:], blob)
dl.states.Set(cacheKey, blob)
cleanStateWriteMeter.Mark(int64(len(blob)))
}
if len(blob) == 0 {
@ -276,14 +277,19 @@ func (dl *diskLayer) storage(accountHash, storageHash common.Hash, depth int) ([
// If the layer is being generated, ensure the requested storage slot
// has already been covered by the generator.
key := storageKeySlice(accountHash, storageHash)
codec := dl.db.flatCodec
combinedKey := storageKeySlice(accountHash, storageHash) // marker comparison key (merkle layout)
marker := dl.genMarker()
if marker != nil && bytes.Compare(key, marker) > 0 {
if marker != nil && codec.MarkerCompare(combinedKey, marker) > 0 {
return nil, errNotCoveredYet
}
// Try to retrieve the storage slot from the memory cache
// Try to retrieve the storage slot from the memory cache. The codec
// decides the cache key shape so it can avoid colliding with account
// keys (relevant once the bintrie codec lands; for merkle this remains
// the historical 64-byte combined key).
cacheKey := codec.StorageCacheKey(accountHash, storageHash)
if dl.states != nil {
if blob, found := dl.states.HasGet(nil, key); found {
if blob, found := dl.states.HasGet(nil, cacheKey); found {
cleanStateHitMeter.Mark(1)
cleanStateReadMeter.Mark(int64(len(blob)))
@ -296,8 +302,8 @@ func (dl *diskLayer) storage(accountHash, storageHash common.Hash, depth int) ([
}
cleanStateMissMeter.Mark(1)
}
// Try to retrieve the account from the disk
blob := rawdb.ReadStorageSnapshot(dl.db.diskdb, accountHash, storageHash)
// Try to retrieve the storage slot from the disk
blob := codec.ReadStorage(dl.db.diskdb, accountHash, storageHash)
// Store the resolved data in the clean cache. The background buffer flusher
// may also write to the clean cache concurrently, but two writers cannot
@ -305,7 +311,7 @@ func (dl *diskLayer) storage(accountHash, storageHash common.Hash, depth int) ([
// it will be found in the frozen buffer, eliminating the need to check the
// database.
if dl.states != nil {
dl.states.Set(key, blob)
dl.states.Set(cacheKey, blob)
cleanStateWriteMeter.Mark(int64(len(blob)))
}
if len(blob) == 0 {
@ -491,7 +497,7 @@ func (dl *diskLayer) commit(bottom *diffLayer, force bool) (*diskLayer, error) {
// Freeze the live buffer and schedule background flushing
dl.frozen = combined
dl.frozen.flush(bottom.root, dl.db.diskdb, []ethdb.AncientWriter{dl.db.stateFreezer, dl.db.trienodeFreezer}, progress, dl.nodes, dl.states, bottom.stateID(), func() {
dl.frozen.flush(bottom.root, dl.db.diskdb, dl.db.flatCodec, []ethdb.AncientWriter{dl.db.stateFreezer, dl.db.trienodeFreezer}, progress, dl.nodes, dl.states, bottom.stateID(), func() {
// Resume the background generation if it's not completed yet.
// The generator is assumed to be available if the progress is
// not nil.
@ -599,7 +605,7 @@ func (dl *diskLayer) revert(h *stateHistory) (*diskLayer, error) {
writeNodes(batch, nodes, dl.nodes)
// Provide the original values of modified accounts and storages for revert
writeStates(batch, progress, accounts, storages, dl.states)
writeStates(batch, dl.db.flatCodec, progress, accounts, storages, dl.states)
rawdb.WritePersistentStateID(batch, dl.id-1)
rawdb.WriteSnapshotRoot(batch, h.meta.parent)
if err := batch.Write(); err != nil {

View file

@ -71,10 +71,16 @@ func writeNodes(batch ethdb.Batch, nodes map[common.Hash]map[string]*trienode.No
// This function assumes the background generator is already terminated and states
// before the supplied marker has been correctly generated.
//
// The codec parameter abstracts the trie-specific persistence and cache key
// derivation. The marker comparisons retain merkle-specific shape (two-tier
// account+storage marker) because the bintrie path uses a separate writer
// (writeStems, added in a later commit) that operates on a single-tier
// marker over stems rather than (account, storage) pairs.
//
// TODO(rjl493456442) do we really need this generation marker? The state updates
// after the marker can also be written and will be fixed by generator later if
// it's outdated.
func writeStates(batch ethdb.Batch, genMarker []byte, accountData map[common.Hash][]byte, storageData map[common.Hash]map[common.Hash][]byte, clean *fastcache.Cache) (int, int) {
func writeStates(batch ethdb.Batch, codec flatStateCodec, genMarker []byte, accountData map[common.Hash][]byte, storageData map[common.Hash]map[common.Hash][]byte, clean *fastcache.Cache) (int, int) {
var (
accounts int
slots int
@ -88,15 +94,16 @@ func writeStates(batch ethdb.Batch, genMarker []byte, accountData map[common.Has
continue
}
accounts += 1
cacheKey := codec.AccountCacheKey(addrHash)
if len(blob) == 0 {
rawdb.DeleteAccountSnapshot(batch, addrHash)
codec.DeleteAccount(batch, addrHash)
if clean != nil {
clean.Set(addrHash[:], nil)
clean.Set(cacheKey, nil)
}
} else {
rawdb.WriteAccountSnapshot(batch, addrHash, blob)
codec.WriteAccount(batch, addrHash, blob)
if clean != nil {
clean.Set(addrHash[:], blob)
clean.Set(cacheKey, blob)
}
}
}
@ -116,16 +123,16 @@ func writeStates(batch ethdb.Batch, genMarker []byte, accountData map[common.Has
continue
}
slots += 1
key := storageKeySlice(addrHash, storageHash)
cacheKey := codec.StorageCacheKey(addrHash, storageHash)
if len(blob) == 0 {
rawdb.DeleteStorageSnapshot(batch, addrHash, storageHash)
codec.DeleteStorage(batch, addrHash, storageHash)
if clean != nil {
clean.Set(key, nil)
clean.Set(cacheKey, nil)
}
} else {
rawdb.WriteStorageSnapshot(batch, addrHash, storageHash, blob)
codec.WriteStorage(batch, addrHash, storageHash, blob)
if clean != nil {
clean.Set(key, blob)
clean.Set(cacheKey, blob)
}
}
}

View file

@ -93,6 +93,7 @@ type generator struct {
running bool // Flag indicating whether the background generation is running
db ethdb.KeyValueStore // Key-value store containing the snapshot data
codec flatStateCodec // Flat-state codec for key derivation, persistence, iterators
stats *generatorStats // Generation statistics used throughout the entire life cycle
abort chan chan struct{} // Notification channel to abort generating the snapshot in this layer
done chan struct{} // Notification channel when generation is done
@ -109,7 +110,11 @@ type generator struct {
// progress indicates the starting position for resuming snapshot generation.
// It must be provided even if generation is not allowed; otherwise, uncovered
// states may be exposed for serving.
func newGenerator(db ethdb.KeyValueStore, noBuild bool, progress []byte, stats *generatorStats) *generator {
//
// codec is the flat-state codec used for marker handling, prefix selection,
// persistence, and iterator construction. It must match the codec configured
// on the owning Database.
func newGenerator(db ethdb.KeyValueStore, codec flatStateCodec, noBuild bool, progress []byte, stats *generatorStats) *generator {
if stats == nil {
stats = &generatorStats{start: time.Now()}
}
@ -117,6 +122,7 @@ func newGenerator(db ethdb.KeyValueStore, noBuild bool, progress []byte, stats *
noBuild: noBuild,
progress: progress,
db: db,
codec: codec,
stats: stats,
abort: make(chan chan struct{}),
done: make(chan struct{}),
@ -134,7 +140,7 @@ func (g *generator) run(root common.Hash) {
log.Warn("Paused the leftover generation cycle")
}
g.running = true
go g.generate(newGeneratorContext(root, g.progress, g.db))
go g.generate(newGeneratorContext(root, g.progress, g.db, g.codec))
}
// stop terminates the background generation if it's actively running.
@ -168,15 +174,6 @@ func (g *generator) progressMarker() []byte {
return g.progress
}
// splitMarker is an internal helper which splits the generation progress marker
// into two parts.
func splitMarker(marker []byte) ([]byte, []byte) {
var accMarker []byte
if len(marker) > 0 {
accMarker = marker[:common.HashLength]
}
return accMarker, marker
}
// generateSnapshot regenerates a brand-new snapshot based on an existing state
// database and head block asynchronously. The snapshot is returned immediately
@ -188,7 +185,7 @@ func generateSnapshot(triedb *Database, root common.Hash, noBuild bool) *diskLay
genMarker = []byte{} // Initialized but empty!
)
dl := newDiskLayer(root, 0, triedb, nil, nil, newBuffer(triedb.config.WriteBufferSize, nil, nil, 0), nil)
dl.setGenerator(newGenerator(triedb.diskdb, noBuild, genMarker, stats))
dl.setGenerator(newGenerator(triedb.diskdb, triedb.flatCodec, noBuild, genMarker, stats))
if !noBuild {
dl.generator.run(root)
@ -633,12 +630,12 @@ func (g *generator) generateStorages(ctx *generatorContext, account common.Hash,
}(time.Now())
if delete {
rawdb.DeleteStorageSnapshot(ctx.batch, account, common.BytesToHash(key))
g.codec.DeleteStorage(ctx.batch, account, common.BytesToHash(key))
wipedStorageMeter.Mark(1)
return nil
}
if write {
rawdb.WriteStorageSnapshot(ctx.batch, account, common.BytesToHash(key), val)
g.codec.WriteStorage(ctx.batch, account, common.BytesToHash(key), val)
generatedStorageMeter.Mark(1)
} else {
recoveredStorageMeter.Mark(1)
@ -682,7 +679,7 @@ func (g *generator) generateAccounts(ctx *generatorContext, accMarker []byte) er
start := time.Now()
if delete {
rawdb.DeleteAccountSnapshot(ctx.batch, account)
g.codec.DeleteAccount(ctx.batch, account)
wipedAccountMeter.Mark(1)
accountWriteCounter.Inc(time.Since(start).Nanoseconds())
@ -708,7 +705,7 @@ func (g *generator) generateAccounts(ctx *generatorContext, accMarker []byte) er
} else {
data := types.SlimAccountRLP(acc)
dataLen = len(data)
rawdb.WriteAccountSnapshot(ctx.batch, account, data)
g.codec.WriteAccount(ctx.batch, account, data)
generatedAccountMeter.Mark(1)
}
g.stats.storage += common.StorageSize(1 + common.HashLength + dataLen)
@ -788,7 +785,7 @@ func (g *generator) generate(ctx *generatorContext) {
// processed twice by the generator(they are already processed in the
// last run) but it's fine.
var (
accMarker, _ = splitMarker(g.progress)
accMarker, _ = g.codec.SplitMarker(g.progress)
abort chan struct{}
)
if err := g.generateAccounts(ctx, accMarker); err != nil {

View file

@ -137,7 +137,7 @@ func TestAccountIteratorBasics(t *testing.T) {
db := rawdb.NewMemoryDatabase()
batch := db.NewBatch()
states.write(batch, nil, nil)
states.write(batch, &merkleFlatCodec{}, nil, nil)
batch.Write()
it = newDiskAccountIterator(db, common.Hash{})
verifyIterator(t, 100, it, verifyNothing) // Nil is allowed for single layer iterator
@ -176,7 +176,7 @@ func TestStorageIteratorBasics(t *testing.T) {
db := rawdb.NewMemoryDatabase()
batch := db.NewBatch()
states.write(batch, nil, nil)
states.write(batch, &merkleFlatCodec{}, nil, nil)
batch.Write()
for account := range accounts {
it := newDiskStorageIterator(db, account, common.Hash{})

View file

@ -260,7 +260,7 @@ func (r *HistoricalStateReader) AccountRLP(address common.Address) ([]byte, erro
// and try to define a low granularity lock if the current approach doesn't
// work later.
dl := r.db.tree.bottom()
hash := crypto.Keccak256Hash(address.Bytes())
hash := r.db.flatCodec.AccountKey(address)
latest, err := dl.account(hash, 0)
if err != nil {
return nil, err
@ -310,8 +310,7 @@ func (r *HistoricalStateReader) Storage(address common.Address, key common.Hash)
// and try to define a low granularity lock if the current approach doesn't
// work later.
dl := r.db.tree.bottom()
addrHash := crypto.Keccak256Hash(address.Bytes())
keyHash := crypto.Keccak256Hash(key.Bytes())
addrHash, keyHash := r.db.flatCodec.StorageKey(address, key)
latest, err := dl.storage(addrHash, keyHash, 0)
if err != nil {
return nil, err

View file

@ -25,7 +25,6 @@ import (
"github.com/VictoriaMetrics/fastcache"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/rawdb"
"github.com/ethereum/go-ethereum/ethdb"
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/metrics"
@ -424,8 +423,8 @@ func (s *stateSet) decode(r *rlp.Stream) error {
}
// write flushes state mutations into the provided database batch as a whole.
func (s *stateSet) write(batch ethdb.Batch, genMarker []byte, clean *fastcache.Cache) (int, int) {
return writeStates(batch, genMarker, s.accountData, s.storageData, clean)
func (s *stateSet) write(batch ethdb.Batch, codec flatStateCodec, genMarker []byte, clean *fastcache.Cache) (int, int) {
return writeStates(batch, codec, genMarker, s.accountData, s.storageData, clean)
}
// reset clears all cached state data, including any optional sorted lists that
@ -438,11 +437,13 @@ func (s *stateSet) reset() {
s.storageListSorted = make(map[common.Hash][]common.Hash)
}
// dbsize returns the approximate size for db write.
func (s *stateSet) dbsize() int {
m := len(s.accountData) * len(rawdb.SnapshotAccountPrefix)
// dbsize returns the approximate size for db write. The codec supplies
// the per-entry on-disk overhead so this calculation tracks the actual
// schema in use (merkle vs. bintrie).
func (s *stateSet) dbsize(codec flatStateCodec) int {
m := len(s.accountData) * codec.AccountPrefixSize()
for _, slots := range s.storageData {
m += len(slots) * len(rawdb.SnapshotStoragePrefix)
m += len(slots) * codec.StoragePrefixSize()
}
return m + int(s.size)
}