triedb/pathdb, eth: use double-buffer mechanism in pathdb (#30464)
Some checks are pending
/ Linux Build (push) Waiting to run
/ Linux Build (arm) (push) Waiting to run
/ Docker Image (push) Waiting to run

Previously, PathDB used a single buffer to aggregate database writes,
which needed to be flushed atomically. However, flushing large amounts
of data (e.g., 256MB) caused significant overhead, often blocking the
system for around 3 seconds during the flush.

To mitigate this overhead and reduce performance spikes, a double-buffer
mechanism is introduced. When the active buffer fills up, it is marked
as frozen and a background flushing process is triggered. Meanwhile, a
new buffer is allocated for incoming writes, allowing operations to
continue uninterrupted.

This approach reduces system blocking times and provides flexibility in
adjusting buffer parameters for improved performance.
This commit is contained in:
rjl493456442 2025-06-22 20:40:54 +08:00 committed by GitHub
parent 338d754ed0
commit 21920207e4
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
17 changed files with 382 additions and 176 deletions

View file

@ -165,6 +165,7 @@ type BlockChainConfig struct {
TrieCleanLimit int // Memory allowance (MB) to use for caching trie nodes in memory
TrieDirtyLimit int // Memory limit (MB) at which to start flushing dirty trie nodes to disk
TrieTimeLimit time.Duration // Time limit after which to flush the current in-memory trie to disk
TrieNoAsyncFlush bool // Whether the asynchronous buffer flushing is disallowed
Preimages bool // Whether to store preimage of trie key to the disk
StateHistory uint64 // Number of blocks from head whose state histories are reserved.
@ -210,7 +211,7 @@ func DefaultConfig() *BlockChainConfig {
}
}
// WithArchive enabled/disables archive mode on the config.
// WithArchive enables/disables archive mode on the config.
func (cfg BlockChainConfig) WithArchive(on bool) *BlockChainConfig {
cfg.ArchiveMode = on
return &cfg
@ -222,6 +223,12 @@ func (cfg BlockChainConfig) WithStateScheme(scheme string) *BlockChainConfig {
return &cfg
}
// WithNoAsyncFlush enables/disables asynchronous buffer flushing mode on the config.
func (cfg BlockChainConfig) WithNoAsyncFlush(on bool) *BlockChainConfig {
cfg.TrieNoAsyncFlush = on
return &cfg
}
// triedbConfig derives the configures for trie database.
func (cfg *BlockChainConfig) triedbConfig(isVerkle bool) *triedb.Config {
config := &triedb.Config{
@ -243,6 +250,7 @@ func (cfg *BlockChainConfig) triedbConfig(isVerkle bool) *triedb.Config {
// for flushing both trie data and state data to disk. The config name
// should be updated to eliminate the confusion.
WriteBufferSize: cfg.TrieDirtyLimit * 1024 * 1024,
NoAsyncFlush: cfg.TrieNoAsyncFlush,
}
}
return config

View file

@ -81,7 +81,7 @@ func (basic *snapshotTestBasic) prepare(t *testing.T) (*BlockChain, []*types.Blo
}
engine = ethash.NewFullFaker()
)
chain, err := NewBlockChain(db, gspec, engine, DefaultConfig().WithStateScheme(basic.scheme))
chain, err := NewBlockChain(db, gspec, engine, DefaultConfig().WithStateScheme(basic.scheme).WithNoAsyncFlush(true))
if err != nil {
t.Fatalf("Failed to create chain: %v", err)
}
@ -572,7 +572,7 @@ func TestHighCommitCrashWithNewSnapshot(t *testing.T) {
//
// Expected head header : C8
// Expected head fast block: C8
// Expected head block : G (Hash mode), C6 (Hash mode)
// Expected head block : G (Hash mode), C6 (Path mode)
// Expected snapshot disk : C4 (Hash mode)
for _, scheme := range []string{rawdb.HashScheme, rawdb.PathScheme} {
expHead := uint64(0)

View file

@ -256,7 +256,9 @@ func newDbConfig(scheme string) *triedb.Config {
if scheme == rawdb.HashScheme {
return triedb.HashDefaults
}
return &triedb.Config{PathDB: pathdb.Defaults}
config := *pathdb.Defaults
config.NoAsyncFlush = true
return &triedb.Config{PathDB: &config}
}
func TestVerkleGenesisCommit(t *testing.T) {
@ -313,7 +315,14 @@ func TestVerkleGenesisCommit(t *testing.T) {
}
db := rawdb.NewMemoryDatabase()
triedb := triedb.NewDatabase(db, triedb.VerkleDefaults)
config := *pathdb.Defaults
config.NoAsyncFlush = true
triedb := triedb.NewDatabase(db, &triedb.Config{
IsVerkle: true,
PathDB: &config,
})
block := genesis.MustCommit(db, triedb)
if !bytes.Equal(block.Root().Bytes(), expected) {
t.Fatalf("invalid genesis state root, expected %x, got %x", expected, block.Root())

View file

@ -168,6 +168,7 @@ func newHelper(scheme string) *testHelper {
if scheme == rawdb.PathScheme {
config.PathDB = &pathdb.Config{
SnapshotNoBuild: true,
NoAsyncFlush: true,
} // disable caching
} else {
config.HashDB = &hashdb.Config{} // disable caching

View file

@ -982,6 +982,7 @@ func testMissingTrieNodes(t *testing.T, scheme string) {
TrieCleanSize: 0,
StateCleanSize: 0,
WriteBufferSize: 0,
NoAsyncFlush: true,
}}) // disable caching
} else {
tdb = triedb.NewDatabase(memDb, &triedb.Config{HashDB: &hashdb.Config{
@ -1004,18 +1005,25 @@ func testMissingTrieNodes(t *testing.T, scheme string) {
// force-flush
tdb.Commit(root, false)
}
// Create a new state on the old root
state, _ = New(root, db)
// Now we clear out the memdb
it := memDb.NewIterator(nil, nil)
for it.Next() {
k := it.Key()
// Leave the root intact
if scheme == rawdb.HashScheme {
if !bytes.Equal(k, root[:]) {
t.Logf("key: %x", k)
memDb.Delete(k)
}
}
if scheme == rawdb.PathScheme {
rk := k[len(rawdb.TrieNodeAccountPrefix):]
if len(rk) != 0 {
t.Logf("key: %x", k)
memDb.Delete(k)
}
}
}
state, _ = New(root, db)
balance := state.GetBalance(addr)
// The removed elem should lead to it returning zero balance
if exp, got := uint64(0), balance.Uint64(); got != exp {

View file

@ -46,7 +46,9 @@ func makeTestState(scheme string) (ethdb.Database, Database, *triedb.Database, c
// Create an empty state
config := &triedb.Config{Preimages: true}
if scheme == rawdb.PathScheme {
config.PathDB = pathdb.Defaults
pconfig := *pathdb.Defaults
pconfig.NoAsyncFlush = true
config.PathDB = &pconfig
} else {
config.HashDB = hashdb.Defaults
}

View file

@ -17,6 +17,7 @@
package pathdb
import (
"errors"
"fmt"
"time"
@ -37,6 +38,13 @@ type buffer struct {
limit uint64 // The maximum memory allowance in bytes
nodes *nodeSet // Aggregated trie node set
states *stateSet // Aggregated state set
// done is the notifier whether the content in buffer has been flushed or not.
// This channel is nil if the buffer is not frozen.
done chan struct{}
// flushErr memorizes the error if any exception occurs during flushing
flushErr error
}
// newBuffer initializes the buffer with the provided states and trie nodes.
@ -61,7 +69,7 @@ func (b *buffer) account(hash common.Hash) ([]byte, bool) {
return b.states.account(hash)
}
// storage retrieves the storage slot with account address hash and slot key.
// storage retrieves the storage slot with account address hash and slot key hash.
func (b *buffer) storage(addrHash common.Hash, storageHash common.Hash) ([]byte, bool) {
return b.states.storage(addrHash, storageHash)
}
@ -124,12 +132,29 @@ func (b *buffer) size() uint64 {
// flush persists the in-memory dirty trie node into the disk if the configured
// memory threshold is reached. Note, all data must be written atomically.
func (b *buffer) flush(root common.Hash, db ethdb.KeyValueStore, freezer ethdb.AncientWriter, progress []byte, nodesCache, statesCache *fastcache.Cache, id uint64) error {
func (b *buffer) flush(root common.Hash, db ethdb.KeyValueStore, freezer ethdb.AncientWriter, progress []byte, nodesCache, statesCache *fastcache.Cache, id uint64, postFlush func()) {
if b.done != nil {
panic("duplicated flush operation")
}
b.done = make(chan struct{}) // allocate the channel for notification
// Schedule the background thread to construct the batch, which usually
// take a few seconds.
go func() {
defer func() {
if postFlush != nil {
postFlush()
}
close(b.done)
}()
// Ensure the target state id is aligned with the internal counter.
head := rawdb.ReadPersistentStateID(db)
if head+b.layers != id {
return fmt.Errorf("buffer layers (%d) cannot be applied on top of persisted state id (%d) to reach requested state id (%d)", b.layers, head, id)
b.flushErr = fmt.Errorf("buffer layers (%d) cannot be applied on top of persisted state id (%d) to reach requested state id (%d)", b.layers, head, id)
return
}
// Terminate the state snapshot generation if it's active
var (
start = time.Now()
@ -142,7 +167,8 @@ func (b *buffer) flush(root common.Hash, db ethdb.KeyValueStore, freezer ethdb.A
// available for state rollback.
if freezer != nil {
if err := freezer.SyncAncient(); err != nil {
return err
b.flushErr = err
return
}
}
nodes := b.nodes.write(batch, nodesCache)
@ -153,14 +179,31 @@ func (b *buffer) flush(root common.Hash, db ethdb.KeyValueStore, freezer ethdb.A
// Flush all mutations in a single batch
size := batch.ValueSize()
if err := batch.Write(); err != nil {
return err
b.flushErr = err
return
}
commitBytesMeter.Mark(int64(size))
commitNodesMeter.Mark(int64(nodes))
commitAccountsMeter.Mark(int64(accounts))
commitStoragesMeter.Mark(int64(slots))
commitTimeTimer.UpdateSince(start)
b.reset()
// The content in the frozen buffer is kept for consequent state access,
// TODO (rjl493456442) measure the gc overhead for holding this struct.
// TODO (rjl493456442) can we somehow get rid of it after flushing??
// TODO (rjl493456442) buffer itself is not thread-safe, add the lock
// protection if try to reset the buffer here.
// b.reset()
log.Debug("Persisted buffer content", "nodes", nodes, "accounts", accounts, "slots", slots, "bytes", common.StorageSize(size), "elapsed", common.PrettyDuration(time.Since(start)))
return nil
}()
}
// waitFlush blocks until the buffer has been fully flushed and returns any
// stored errors that occurred during the process.
func (b *buffer) waitFlush() error {
if b.done == nil {
return errors.New("the buffer is not frozen")
}
<-b.done
return b.flushErr
}

View file

@ -119,7 +119,11 @@ type Config struct {
StateCleanSize int // Maximum memory allowance (in bytes) for caching clean state data
WriteBufferSize int // Maximum memory allowance (in bytes) for write buffer
ReadOnly bool // Flag whether the database is opened in read only mode
SnapshotNoBuild bool // Flag Whether the background generation is allowed
// Testing configurations
SnapshotNoBuild bool // Flag Whether the state generation is allowed
NoAsyncFlush bool // Flag whether the background buffer flushing is allowed
NoAsyncGeneration bool // Flag whether the background generation is allowed
}
// sanitize checks the provided user configurations and changes anything that's
@ -366,6 +370,12 @@ func (db *Database) setStateGenerator() error {
}
stats.log("Starting snapshot generation", root, generator.Marker)
dl.generator.run(root)
// Block until the generation completes. It's the feature used in
// unit tests.
if db.config.NoAsyncGeneration {
<-dl.generator.done
}
return nil
}
@ -434,8 +444,8 @@ func (db *Database) Disable() error {
// Terminate the state generator if it's active and mark the disk layer
// as stale to prevent access to persistent state.
disk := db.tree.bottom()
if disk.generator != nil {
disk.generator.stop()
if err := disk.terminate(); err != nil {
return err
}
disk.markStale()
@ -592,12 +602,14 @@ func (db *Database) Close() error {
// following mutations.
db.readOnly = true
// Terminate the background generation if it's active
disk := db.tree.bottom()
if disk.generator != nil {
disk.generator.stop()
// Block until the background flushing is finished. It must
// be done before terminating the potential background snapshot
// generator.
dl := db.tree.bottom()
if err := dl.terminate(); err != nil {
return err
}
disk.resetCache() // release the memory held by clean cache
dl.resetCache() // release the memory held by clean cache
// Close the attached state history freezer.
if db.freezer == nil {
@ -662,16 +674,6 @@ func (db *Database) HistoryRange() (uint64, uint64, error) {
return historyRange(db.freezer)
}
// waitGeneration waits until the background generation is finished. It assumes
// that the generation is permitted; otherwise, it will block indefinitely.
func (db *Database) waitGeneration() {
gen := db.tree.bottom().generator
if gen == nil || gen.completed() {
return
}
<-gen.done
}
// AccountIterator creates a new account iterator for the specified root hash and
// seeks to a starting account hash.
func (db *Database) AccountIterator(root common.Hash, seek common.Hash) (AccountIterator, error) {
@ -681,7 +683,7 @@ func (db *Database) AccountIterator(root common.Hash, seek common.Hash) (Account
if wait {
return nil, errDatabaseWaitSync
}
if gen := db.tree.bottom().generator; gen != nil && !gen.completed() {
if !db.tree.bottom().genComplete() {
return nil, errNotConstructed
}
return newFastAccountIterator(db, root, seek)
@ -696,7 +698,7 @@ func (db *Database) StorageIterator(root common.Hash, account common.Hash, seek
if wait {
return nil, errDatabaseWaitSync
}
if gen := db.tree.bottom().generator; gen != nil && !gen.completed() {
if !db.tree.bottom().genComplete() {
return nil, errNotConstructed
}
return newFastStorageIterator(db, root, account, seek)

View file

@ -129,6 +129,7 @@ func newTester(t *testing.T, historyLimit uint64, isVerkle bool, layers int) *te
TrieCleanSize: 256 * 1024,
StateCleanSize: 256 * 1024,
WriteBufferSize: 256 * 1024,
NoAsyncFlush: true,
}, isVerkle)
obj = &tester{

View file

@ -41,17 +41,20 @@ type diskLayer struct {
nodes *fastcache.Cache // GC friendly memory cache of clean nodes
states *fastcache.Cache // GC friendly memory cache of clean states
buffer *buffer // Dirty buffer to aggregate writes of nodes and states
buffer *buffer // Live buffer to aggregate writes
frozen *buffer // Frozen node buffer waiting for flushing
stale bool // Signals that the layer became stale (state progressed)
lock sync.RWMutex // Lock used to protect stale flag and genMarker
// The generator is set if the state snapshot was not fully completed,
// regardless of whether the background generation is running or not.
// It should only be unset if the generation completes.
generator *generator
}
// newDiskLayer creates a new disk layer based on the passing arguments.
func newDiskLayer(root common.Hash, id uint64, db *Database, nodes *fastcache.Cache, states *fastcache.Cache, buffer *buffer) *diskLayer {
func newDiskLayer(root common.Hash, id uint64, db *Database, nodes *fastcache.Cache, states *fastcache.Cache, buffer *buffer, frozen *buffer) *diskLayer {
// Initialize the clean caches if the memory allowance is not zero
// or reuse the provided caches if they are not nil (inherited from
// the original disk layer).
@ -68,6 +71,7 @@ func newDiskLayer(root common.Hash, id uint64, db *Database, nodes *fastcache.Ca
nodes: nodes,
states: states,
buffer: buffer,
frozen: frozen,
}
}
@ -114,17 +118,20 @@ func (dl *diskLayer) node(owner common.Hash, path []byte, depth int) ([]byte, co
if dl.stale {
return nil, common.Hash{}, nil, errSnapshotStale
}
// Try to retrieve the trie node from the not-yet-written
// node buffer first. Note the buffer is lock free since
// it's impossible to mutate the buffer before tagging the
// layer as stale.
n, found := dl.buffer.node(owner, path)
// Try to retrieve the trie node from the not-yet-written node buffer first
// (both the live one and the frozen one). Note the buffer is lock free since
// it's impossible to mutate the buffer before tagging the layer as stale.
for _, buffer := range []*buffer{dl.buffer, dl.frozen} {
if buffer != nil {
n, found := buffer.node(owner, path)
if found {
dirtyNodeHitMeter.Mark(1)
dirtyNodeReadMeter.Mark(int64(len(n.Blob)))
dirtyNodeHitDepthHist.Update(int64(depth))
return n.Blob, n.Hash, &nodeLoc{loc: locDirtyCache, depth: depth}, nil
}
}
}
dirtyNodeMissMeter.Mark(1)
// Try to retrieve the trie node from the clean memory cache
@ -144,6 +151,11 @@ func (dl *diskLayer) node(owner common.Hash, path []byte, depth int) ([]byte, co
} else {
blob = rawdb.ReadStorageTrieNode(dl.db.diskdb, owner, path)
}
// Store the resolved data in the clean cache. The background buffer flusher
// may also write to the clean cache concurrently, but two writers cannot
// write the same item with different content. If the item already exists,
// it will be found in the frozen buffer, eliminating the need to check the
// database.
if dl.nodes != nil && len(blob) > 0 {
dl.nodes.Set(key, blob)
cleanNodeWriteMeter.Mark(int64(len(blob)))
@ -162,11 +174,12 @@ func (dl *diskLayer) account(hash common.Hash, depth int) ([]byte, error) {
if dl.stale {
return nil, errSnapshotStale
}
// Try to retrieve the account from the not-yet-written
// node buffer first. Note the buffer is lock free since
// it's impossible to mutate the buffer before tagging the
// layer as stale.
blob, found := dl.buffer.account(hash)
// Try to retrieve the trie node from the not-yet-written node buffer first
// (both the live one and the frozen one). Note the buffer is lock free since
// it's impossible to mutate the buffer before tagging the layer as stale.
for _, buffer := range []*buffer{dl.buffer, dl.frozen} {
if buffer != nil {
blob, found := buffer.account(hash)
if found {
dirtyStateHitMeter.Mark(1)
dirtyStateReadMeter.Mark(int64(len(blob)))
@ -179,6 +192,8 @@ func (dl *diskLayer) account(hash common.Hash, depth int) ([]byte, error) {
}
return blob, nil
}
}
}
dirtyStateMissMeter.Mark(1)
// If the layer is being generated, ensure the requested account has
@ -203,7 +218,13 @@ func (dl *diskLayer) account(hash common.Hash, depth int) ([]byte, error) {
cleanStateMissMeter.Mark(1)
}
// Try to retrieve the account from the disk.
blob = rawdb.ReadAccountSnapshot(dl.db.diskdb, hash)
blob := rawdb.ReadAccountSnapshot(dl.db.diskdb, hash)
// Store the resolved data in the clean cache. The background buffer flusher
// may also write to the clean cache concurrently, but two writers cannot
// write the same item with different content. If the item already exists,
// it will be found in the frozen buffer, eliminating the need to check the
// database.
if dl.states != nil {
dl.states.Set(hash[:], blob)
cleanStateWriteMeter.Mark(int64(len(blob)))
@ -231,11 +252,12 @@ func (dl *diskLayer) storage(accountHash, storageHash common.Hash, depth int) ([
if dl.stale {
return nil, errSnapshotStale
}
// Try to retrieve the storage slot from the not-yet-written
// node buffer first. Note the buffer is lock free since
// it's impossible to mutate the buffer before tagging the
// layer as stale.
if blob, found := dl.buffer.storage(accountHash, storageHash); found {
// Try to retrieve the trie node from the not-yet-written node buffer first
// (both the live one and the frozen one). Note the buffer is lock free since
// it's impossible to mutate the buffer before tagging the layer as stale.
for _, buffer := range []*buffer{dl.buffer, dl.frozen} {
if buffer != nil {
if blob, found := buffer.storage(accountHash, storageHash); found {
dirtyStateHitMeter.Mark(1)
dirtyStateReadMeter.Mark(int64(len(blob)))
dirtyStateHitDepthHist.Update(int64(depth))
@ -247,6 +269,8 @@ func (dl *diskLayer) storage(accountHash, storageHash common.Hash, depth int) ([
}
return blob, nil
}
}
}
dirtyStateMissMeter.Mark(1)
// If the layer is being generated, ensure the requested storage slot
@ -273,6 +297,12 @@ func (dl *diskLayer) storage(accountHash, storageHash common.Hash, depth int) ([
}
// Try to retrieve the account from the disk
blob := rawdb.ReadStorageSnapshot(dl.db.diskdb, accountHash, storageHash)
// Store the resolved data in the clean cache. The background buffer flusher
// may also write to the clean cache concurrently, but two writers cannot
// write the same item with different content. If the item already exists,
// it will be found in the frozen buffer, eliminating the need to check the
// database.
if dl.states != nil {
dl.states.Set(key, blob)
cleanStateWriteMeter.Mark(int64(len(blob)))
@ -341,7 +371,8 @@ func (dl *diskLayer) commit(bottom *diffLayer, force bool) (*diskLayer, error) {
// truncation) surpasses the persisted state ID, we take the necessary action
// of forcibly committing the cached dirty states to ensure that the persisted
// state ID remains higher.
if !force && rawdb.ReadPersistentStateID(dl.db.diskdb) < oldest {
persistedID := rawdb.ReadPersistentStateID(dl.db.diskdb)
if !force && persistedID < oldest {
force = true
}
// Merge the trie nodes and flat states of the bottom-most diff layer into the
@ -351,12 +382,25 @@ func (dl *diskLayer) commit(bottom *diffLayer, force bool) (*diskLayer, error) {
// Terminate the background state snapshot generation before mutating the
// persistent state.
if combined.full() || force {
// Wait until the previous frozen buffer is fully flushed
if dl.frozen != nil {
if err := dl.frozen.waitFlush(); err != nil {
return nil, err
}
}
// Release the frozen buffer and the internally referenced maps will
// be reclaimed by GC.
dl.frozen = nil
// Terminate the background state snapshot generator before flushing
// to prevent data race.
var progress []byte
if dl.generator != nil {
dl.generator.stop()
progress = dl.generator.progressMarker()
var (
progress []byte
gen = dl.generator
)
if gen != nil {
gen.stop()
progress = gen.progressMarker()
// If the snapshot has been fully generated, unset the generator
if progress == nil {
@ -365,18 +409,33 @@ func (dl *diskLayer) commit(bottom *diffLayer, force bool) (*diskLayer, error) {
log.Info("Paused snapshot generation")
}
}
// Flush the content in combined buffer. Any state data after the progress
// marker will be ignored, as the generator will pick it up later.
if err := combined.flush(bottom.root, dl.db.diskdb, dl.db.freezer, progress, dl.nodes, dl.states, bottom.stateID()); err != nil {
// Freeze the live buffer and schedule background flushing
dl.frozen = combined
dl.frozen.flush(bottom.root, dl.db.diskdb, dl.db.freezer, progress, dl.nodes, dl.states, bottom.stateID(), func() {
// Resume the background generation if it's not completed yet.
// The generator is assumed to be available if the progress is
// not nil.
//
// Notably, the generator will be shared and linked by all the
// disk layer instances, regardless of the generation is terminated
// or not.
if progress != nil {
gen.run(bottom.root)
}
})
// Block until the frozen buffer is fully flushed out if the async flushing
// is not allowed, or if the oldest history surpasses the persisted state ID.
if dl.db.config.NoAsyncFlush || persistedID < oldest {
if err := dl.frozen.waitFlush(); err != nil {
return nil, err
}
// Resume the background generation if it's not completed yet
if progress != nil {
dl.generator.run(bottom.root)
dl.frozen = nil
}
combined = newBuffer(dl.db.config.WriteBufferSize, nil, nil, 0)
}
// Link the generator if snapshot is not yet completed
ndl := newDiskLayer(bottom.root, bottom.stateID(), dl.db, dl.nodes, dl.states, combined)
ndl := newDiskLayer(bottom.root, bottom.stateID(), dl.db, dl.nodes, dl.states, combined, dl.frozen)
if dl.generator != nil {
ndl.setGenerator(dl.generator)
}
@ -428,7 +487,7 @@ func (dl *diskLayer) revert(h *history) (*diskLayer, error) {
if err != nil {
return nil, err
}
ndl := newDiskLayer(h.meta.parent, dl.id-1, dl.db, dl.nodes, dl.states, dl.buffer)
ndl := newDiskLayer(h.meta.parent, dl.id-1, dl.db, dl.nodes, dl.states, dl.buffer, dl.frozen)
// Link the generator if it exists
if dl.generator != nil {
@ -437,7 +496,19 @@ func (dl *diskLayer) revert(h *history) (*diskLayer, error) {
log.Debug("Reverted data in write buffer", "oldroot", h.meta.root, "newroot", h.meta.parent, "elapsed", common.PrettyDuration(time.Since(start)))
return ndl, nil
}
// Terminate the generation before writing any data into database
// Block until the frozen buffer is fully flushed
if dl.frozen != nil {
if err := dl.frozen.waitFlush(); err != nil {
return nil, err
}
// Unset the frozen buffer if it exists, otherwise these "reverted"
// states will still be accessible after revert in frozen buffer.
dl.frozen = nil
}
// Terminate the generator before writing any data to the database.
// This must be done after flushing the frozen buffer, as the generator
// may be restarted at the end of the flush process.
var progress []byte
if dl.generator != nil {
dl.generator.stop()
@ -455,7 +526,7 @@ func (dl *diskLayer) revert(h *history) (*diskLayer, error) {
}
// Link the generator and resume generation if the snapshot is not yet
// fully completed.
ndl := newDiskLayer(h.meta.parent, dl.id-1, dl.db, dl.nodes, dl.states, dl.buffer)
ndl := newDiskLayer(h.meta.parent, dl.id-1, dl.db, dl.nodes, dl.states, dl.buffer, dl.frozen)
if dl.generator != nil && !dl.generator.completed() {
ndl.generator = dl.generator
ndl.generator.run(h.meta.parent)
@ -500,3 +571,41 @@ func (dl *diskLayer) genMarker() []byte {
}
return dl.generator.progressMarker()
}
// genComplete returns a flag indicating whether the state snapshot has been
// fully generated.
func (dl *diskLayer) genComplete() bool {
dl.lock.RLock()
defer dl.lock.RUnlock()
return dl.genMarker() == nil
}
// waitFlush blocks until the background buffer flush is completed.
func (dl *diskLayer) waitFlush() error {
dl.lock.RLock()
defer dl.lock.RUnlock()
if dl.frozen == nil {
return nil
}
return dl.frozen.waitFlush()
}
// terminate releases the frozen buffer if it's not nil and terminates the
// background state generator.
func (dl *diskLayer) terminate() error {
dl.lock.Lock()
defer dl.lock.Unlock()
if dl.frozen != nil {
if err := dl.frozen.waitFlush(); err != nil {
return err
}
dl.frozen = nil
}
if dl.generator != nil {
dl.generator.stop()
}
return nil
}

View file

@ -186,7 +186,7 @@ func generateSnapshot(triedb *Database, root common.Hash, noBuild bool) *diskLay
stats = &generatorStats{start: time.Now()}
genMarker = []byte{} // Initialized but empty!
)
dl := newDiskLayer(root, 0, triedb, nil, nil, newBuffer(triedb.config.WriteBufferSize, nil, nil, 0))
dl := newDiskLayer(root, 0, triedb, nil, nil, newBuffer(triedb.config.WriteBufferSize, nil, nil, 0), nil)
dl.setGenerator(newGenerator(triedb.diskdb, noBuild, genMarker, stats))
if !noBuild {

View file

@ -49,6 +49,7 @@ func newGenTester() *genTester {
disk := rawdb.NewMemoryDatabase()
config := *Defaults
config.SnapshotNoBuild = true // no background generation
config.NoAsyncFlush = true // no async flush
db := New(disk, &config, false)
tr, _ := trie.New(trie.StateTrieID(types.EmptyRootHash), db)
return &genTester{

View file

@ -45,6 +45,10 @@ type binaryIterator struct {
// accounts in a slow, but easily verifiable way. Note this function is used
// for initialization, use `newBinaryAccountIterator` as the API.
func (dl *diskLayer) initBinaryAccountIterator(seek common.Hash) *binaryIterator {
// Block until the frozen buffer flushing is completed.
if err := dl.waitFlush(); err != nil {
panic(err)
}
// The state set in the disk layer is mutable, hold the lock before obtaining
// the account list to prevent concurrent map iteration and write.
dl.lock.RLock()
@ -113,6 +117,10 @@ func (dl *diffLayer) initBinaryAccountIterator(seek common.Hash) *binaryIterator
// storage slots in a slow, but easily verifiable way. Note this function is used
// for initialization, use `newBinaryStorageIterator` as the API.
func (dl *diskLayer) initBinaryStorageIterator(account common.Hash, seek common.Hash) *binaryIterator {
// Block until the frozen buffer flushing is completed.
if err := dl.waitFlush(); err != nil {
panic(err)
}
// The state set in the disk layer is mutable, hold the lock before obtaining
// the storage list to prevent concurrent map iteration and write.
dl.lock.RLock()

View file

@ -76,6 +76,11 @@ func newFastIterator(db *Database, root common.Hash, account common.Hash, seek c
if accountIterator {
switch dl := current.(type) {
case *diskLayer:
// Ensure no active background buffer flush is in progress, otherwise,
// part of the state data may become invisible.
if err := dl.waitFlush(); err != nil {
return nil, err
}
// The state set in the disk layer is mutable, hold the lock before obtaining
// the account list to prevent concurrent map iteration and write.
dl.lock.RLock()
@ -113,6 +118,11 @@ func newFastIterator(db *Database, root common.Hash, account common.Hash, seek c
} else {
switch dl := current.(type) {
case *diskLayer:
// Ensure no active background buffer flush is in progress, otherwise,
// part of the state data may become invisible.
if err := dl.waitFlush(); err != nil {
return nil, err
}
// The state set in the disk layer is mutable, hold the lock before obtaining
// the storage list to prevent concurrent map iteration and write.
dl.lock.RLock()

View file

@ -254,10 +254,9 @@ func TestFastIteratorBasics(t *testing.T) {
// TestAccountIteratorTraversal tests some simple multi-layer iteration.
func TestAccountIteratorTraversal(t *testing.T) {
config := &Config{
WriteBufferSize: 0,
NoAsyncGeneration: true,
}
db := New(rawdb.NewMemoryDatabase(), config, false)
db.waitGeneration()
// Stack three diff layers on top with various overlaps
db.Update(common.HexToHash("0x02"), types.EmptyRootHash, 0, trienode.NewMergedNodeSet(),
@ -298,10 +297,9 @@ func TestAccountIteratorTraversal(t *testing.T) {
func TestStorageIteratorTraversal(t *testing.T) {
config := &Config{
WriteBufferSize: 0,
NoAsyncGeneration: true,
}
db := New(rawdb.NewMemoryDatabase(), config, false)
db.waitGeneration()
// Stack three diff layers on top with various overlaps
db.Update(common.HexToHash("0x02"), types.EmptyRootHash, 0, trienode.NewMergedNodeSet(),
@ -311,14 +309,14 @@ func TestStorageIteratorTraversal(t *testing.T) {
NewStateSetWithOrigin(randomAccountSet("0xaa"), randomStorageSet([]string{"0xaa"}, [][]string{{"0x04", "0x05", "0x06"}}, nil), nil, nil, false))
db.Update(common.HexToHash("0x04"), common.HexToHash("0x03"), 0, trienode.NewMergedNodeSet(),
NewStateSetWithOrigin(randomAccountSet("0xaa"), randomStorageSet([]string{"0xaa"}, [][]string{{"0x01", "0x02", "0x03"}}, nil), nil, nil, false))
NewStateSetWithOrigin(randomAccountSet("0xaa"), randomStorageSet([]string{"0xaa"}, [][]string{{"0x01", "0x02"}}, nil), nil, nil, false))
// Verify the single and multi-layer iterators
head := db.tree.get(common.HexToHash("0x04"))
// singleLayer: 0x1, 0x2, 0x3
diffIter := newDiffStorageIterator(common.HexToHash("0xaa"), common.Hash{}, head.(*diffLayer).states.stateSet.storageList(common.HexToHash("0xaa")), nil)
verifyIterator(t, 3, diffIter, verifyNothing)
verifyIterator(t, 2, diffIter, verifyNothing)
// binaryIterator: 0x1, 0x2, 0x3, 0x4, 0x5, 0x6
verifyIterator(t, 6, head.(*diffLayer).newBinaryStorageIterator(common.HexToHash("0xaa"), common.Hash{}), verifyStorage)
@ -342,10 +340,9 @@ func TestStorageIteratorTraversal(t *testing.T) {
// also expect the correct values to show up.
func TestAccountIteratorTraversalValues(t *testing.T) {
config := &Config{
WriteBufferSize: 0,
NoAsyncGeneration: true,
}
db := New(rawdb.NewMemoryDatabase(), config, false)
db.waitGeneration()
// Create a batch of account sets to seed subsequent layers with
var (
@ -458,10 +455,9 @@ func TestAccountIteratorTraversalValues(t *testing.T) {
func TestStorageIteratorTraversalValues(t *testing.T) {
config := &Config{
WriteBufferSize: 0,
NoAsyncGeneration: true,
}
db := New(rawdb.NewMemoryDatabase(), config, false)
db.waitGeneration()
wrapStorage := func(storage map[common.Hash][]byte) map[common.Hash]map[common.Hash][]byte {
return map[common.Hash]map[common.Hash][]byte{
@ -591,10 +587,9 @@ func TestAccountIteratorLargeTraversal(t *testing.T) {
}
// Build up a large stack of snapshots
config := &Config{
WriteBufferSize: 0,
NoAsyncGeneration: true,
}
db := New(rawdb.NewMemoryDatabase(), config, false)
db.waitGeneration()
for i := 1; i < 128; i++ {
parent := types.EmptyRootHash
@ -631,9 +626,9 @@ func TestAccountIteratorLargeTraversal(t *testing.T) {
func TestAccountIteratorFlattening(t *testing.T) {
config := &Config{
WriteBufferSize: 10 * 1024,
NoAsyncGeneration: true,
}
db := New(rawdb.NewMemoryDatabase(), config, false)
db.waitGeneration()
// Create a stack of diffs on top
db.Update(common.HexToHash("0x02"), types.EmptyRootHash, 1, trienode.NewMergedNodeSet(),
@ -662,11 +657,24 @@ func TestAccountIteratorFlattening(t *testing.T) {
}
func TestAccountIteratorSeek(t *testing.T) {
t.Run("fast", func(t *testing.T) {
testAccountIteratorSeek(t, func(db *Database, root, seek common.Hash) AccountIterator {
it, _ := db.AccountIterator(root, seek)
return it
})
})
t.Run("binary", func(t *testing.T) {
testAccountIteratorSeek(t, func(db *Database, root, seek common.Hash) AccountIterator {
return db.tree.get(root).(*diffLayer).newBinaryAccountIterator(seek)
})
})
}
func testAccountIteratorSeek(t *testing.T, newIterator func(db *Database, root, seek common.Hash) AccountIterator) {
config := &Config{
WriteBufferSize: 0,
NoAsyncGeneration: true,
}
db := New(rawdb.NewMemoryDatabase(), config, false)
db.waitGeneration()
db.Update(common.HexToHash("0x02"), types.EmptyRootHash, 1, trienode.NewMergedNodeSet(),
NewStateSetWithOrigin(randomAccountSet("0xaa", "0xee", "0xff", "0xf0"), nil, nil, nil, false))
@ -682,39 +690,39 @@ func TestAccountIteratorSeek(t *testing.T) {
// 03: aa, bb, dd, ee, f0 (, f0), ff
// 04: aa, bb, cc, dd, ee, f0 (, f0), ff (, ff)
// Construct various iterators and ensure their traversal is correct
it, _ := db.AccountIterator(common.HexToHash("0x02"), common.HexToHash("0xdd"))
it := newIterator(db, common.HexToHash("0x02"), common.HexToHash("0xdd"))
defer it.Release()
verifyIterator(t, 3, it, verifyAccount) // expected: ee, f0, ff
it, _ = db.AccountIterator(common.HexToHash("0x02"), common.HexToHash("0xaa"))
it = newIterator(db, common.HexToHash("0x02"), common.HexToHash("0xaa"))
defer it.Release()
verifyIterator(t, 4, it, verifyAccount) // expected: aa, ee, f0, ff
it, _ = db.AccountIterator(common.HexToHash("0x02"), common.HexToHash("0xff"))
it = newIterator(db, common.HexToHash("0x02"), common.HexToHash("0xff"))
defer it.Release()
verifyIterator(t, 1, it, verifyAccount) // expected: ff
it, _ = db.AccountIterator(common.HexToHash("0x02"), common.HexToHash("0xff1"))
it = newIterator(db, common.HexToHash("0x02"), common.HexToHash("0xff1"))
defer it.Release()
verifyIterator(t, 0, it, verifyAccount) // expected: nothing
it, _ = db.AccountIterator(common.HexToHash("0x04"), common.HexToHash("0xbb"))
it = newIterator(db, common.HexToHash("0x04"), common.HexToHash("0xbb"))
defer it.Release()
verifyIterator(t, 6, it, verifyAccount) // expected: bb, cc, dd, ee, f0, ff
it, _ = db.AccountIterator(common.HexToHash("0x04"), common.HexToHash("0xef"))
it = newIterator(db, common.HexToHash("0x04"), common.HexToHash("0xef"))
defer it.Release()
verifyIterator(t, 2, it, verifyAccount) // expected: f0, ff
it, _ = db.AccountIterator(common.HexToHash("0x04"), common.HexToHash("0xf0"))
it = newIterator(db, common.HexToHash("0x04"), common.HexToHash("0xf0"))
defer it.Release()
verifyIterator(t, 2, it, verifyAccount) // expected: f0, ff
it, _ = db.AccountIterator(common.HexToHash("0x04"), common.HexToHash("0xff"))
it = newIterator(db, common.HexToHash("0x04"), common.HexToHash("0xff"))
defer it.Release()
verifyIterator(t, 1, it, verifyAccount) // expected: ff
it, _ = db.AccountIterator(common.HexToHash("0x04"), common.HexToHash("0xff1"))
it = newIterator(db, common.HexToHash("0x04"), common.HexToHash("0xff1"))
defer it.Release()
verifyIterator(t, 0, it, verifyAccount) // expected: nothing
}
@ -735,10 +743,9 @@ func TestStorageIteratorSeek(t *testing.T) {
func testStorageIteratorSeek(t *testing.T, newIterator func(db *Database, root, account, seek common.Hash) StorageIterator) {
config := &Config{
WriteBufferSize: 0,
NoAsyncGeneration: true,
}
db := New(rawdb.NewMemoryDatabase(), config, false)
db.waitGeneration()
// Stack three diff layers on top with various overlaps
db.Update(common.HexToHash("0x02"), types.EmptyRootHash, 1, trienode.NewMergedNodeSet(),
@ -807,10 +814,9 @@ func TestAccountIteratorDeletions(t *testing.T) {
func testAccountIteratorDeletions(t *testing.T, newIterator func(db *Database, root, seek common.Hash) AccountIterator) {
config := &Config{
WriteBufferSize: 0,
NoAsyncGeneration: true,
}
db := New(rawdb.NewMemoryDatabase(), config, false)
db.waitGeneration()
// Stack three diff layers on top with various overlaps
db.Update(common.HexToHash("0x02"), types.EmptyRootHash, 1, trienode.NewMergedNodeSet(),
@ -847,10 +853,9 @@ func testAccountIteratorDeletions(t *testing.T, newIterator func(db *Database, r
func TestStorageIteratorDeletions(t *testing.T) {
config := &Config{
WriteBufferSize: 0,
NoAsyncGeneration: true,
}
db := New(rawdb.NewMemoryDatabase(), config, false)
db.waitGeneration()
// Stack three diff layers on top with various overlaps
db.Update(common.HexToHash("0x02"), types.EmptyRootHash, 1, trienode.NewMergedNodeSet(),
@ -916,9 +921,9 @@ func TestStaleIterator(t *testing.T) {
func testStaleIterator(t *testing.T, newIter func(db *Database, hash common.Hash) Iterator) {
config := &Config{
WriteBufferSize: 16 * 1024 * 1024,
NoAsyncGeneration: true,
}
db := New(rawdb.NewMemoryDatabase(), config, false)
db.waitGeneration()
// [02 (disk), 03]
db.Update(common.HexToHash("0x02"), types.EmptyRootHash, 1, trienode.NewMergedNodeSet(),
@ -970,10 +975,9 @@ func BenchmarkAccountIteratorTraversal(b *testing.B) {
return accounts
}
config := &Config{
WriteBufferSize: 0,
NoAsyncGeneration: true,
}
db := New(rawdb.NewMemoryDatabase(), config, false)
db.waitGeneration()
for i := 1; i <= 100; i++ {
parent := types.EmptyRootHash
@ -1065,10 +1069,9 @@ func BenchmarkAccountIteratorLargeBaselayer(b *testing.B) {
return accounts
}
config := &Config{
WriteBufferSize: 0,
NoAsyncGeneration: true,
}
db := New(rawdb.NewMemoryDatabase(), config, false)
db.waitGeneration()
db.Update(common.HexToHash("0x02"), types.EmptyRootHash, 1, trienode.NewMergedNodeSet(), NewStateSetWithOrigin(makeAccounts(2000), nil, nil, nil, false))
for i := 2; i <= 100; i++ {

View file

@ -160,7 +160,7 @@ func (db *Database) loadLayers() layer {
log.Info("Failed to load journal, discard it", "err", err)
}
// Return single layer with persistent state.
return newDiskLayer(root, rawdb.ReadPersistentStateID(db.diskdb), db, nil, nil, newBuffer(db.config.WriteBufferSize, nil, nil, 0))
return newDiskLayer(root, rawdb.ReadPersistentStateID(db.diskdb), db, nil, nil, newBuffer(db.config.WriteBufferSize, nil, nil, 0), nil)
}
// loadDiskLayer reads the binary blob from the layer journal, reconstructing
@ -192,7 +192,7 @@ func (db *Database) loadDiskLayer(r *rlp.Stream) (layer, error) {
if err := states.decode(r); err != nil {
return nil, err
}
return newDiskLayer(root, id, db, nil, nil, newBuffer(db.config.WriteBufferSize, &nodes, &states, id-stored)), nil
return newDiskLayer(root, id, db, nil, nil, newBuffer(db.config.WriteBufferSize, &nodes, &states, id-stored), nil), nil
}
// loadDiffLayer reads the next sections of a layer journal, reconstructing a new
@ -301,9 +301,10 @@ func (db *Database) Journal(root common.Hash) error {
} else { // disk layer only on noop runs (likely) or deep reorgs (unlikely)
log.Info("Persisting dirty state to disk", "root", root, "layers", disk.buffer.layers)
}
// Terminate the background state generation if it's active
if disk.generator != nil {
disk.generator.stop()
// Block until the background flushing is finished and terminate
// the potential active state generator.
if err := disk.terminate(); err != nil {
return err
}
start := time.Now()

View file

@ -27,7 +27,7 @@ import (
func newTestLayerTree() *layerTree {
db := New(rawdb.NewMemoryDatabase(), nil, false)
l := newDiskLayer(common.Hash{0x1}, 0, db, nil, nil, newBuffer(0, nil, nil, 0))
l := newDiskLayer(common.Hash{0x1}, 0, db, nil, nil, newBuffer(0, nil, nil, 0), nil)
t := newLayerTree(l)
return t
}