mirror of
https://github.com/ethereum/go-ethereum.git
synced 2026-05-13 11:36:37 +00:00
triedb/pathdb: refactor state history write (#32497)
This pull request refactors the internal implementation in path database a bit, specifically: - purge the state index data in batch - simplify the logic of state history construction and index, make it more readable
This commit is contained in:
parent
f877183cbb
commit
95ab643bb8
3 changed files with 89 additions and 62 deletions
|
|
@ -318,13 +318,14 @@ func (db *Database) repairHistory() error {
|
||||||
log.Crit("Failed to retrieve head of state history", "err", err)
|
log.Crit("Failed to retrieve head of state history", "err", err)
|
||||||
}
|
}
|
||||||
if frozen != 0 {
|
if frozen != 0 {
|
||||||
// TODO(rjl493456442) would be better to group them into a batch.
|
|
||||||
//
|
|
||||||
// Purge all state history indexing data first
|
// Purge all state history indexing data first
|
||||||
rawdb.DeleteStateHistoryIndexMetadata(db.diskdb)
|
batch := db.diskdb.NewBatch()
|
||||||
rawdb.DeleteStateHistoryIndex(db.diskdb)
|
rawdb.DeleteStateHistoryIndexMetadata(batch)
|
||||||
err := db.stateFreezer.Reset()
|
rawdb.DeleteStateHistoryIndex(batch)
|
||||||
if err != nil {
|
if err := batch.Write(); err != nil {
|
||||||
|
log.Crit("Failed to purge state history index", "err", err)
|
||||||
|
}
|
||||||
|
if err := db.stateFreezer.Reset(); err != nil {
|
||||||
log.Crit("Failed to reset state histories", "err", err)
|
log.Crit("Failed to reset state histories", "err", err)
|
||||||
}
|
}
|
||||||
log.Info("Truncated extraneous state history")
|
log.Info("Truncated extraneous state history")
|
||||||
|
|
@ -510,11 +511,13 @@ func (db *Database) Enable(root common.Hash) error {
|
||||||
// mappings can be huge and might take a while to clear
|
// mappings can be huge and might take a while to clear
|
||||||
// them, just leave them in disk and wait for overwriting.
|
// them, just leave them in disk and wait for overwriting.
|
||||||
if db.stateFreezer != nil {
|
if db.stateFreezer != nil {
|
||||||
// TODO(rjl493456442) would be better to group them into a batch.
|
|
||||||
//
|
|
||||||
// Purge all state history indexing data first
|
// Purge all state history indexing data first
|
||||||
rawdb.DeleteStateHistoryIndexMetadata(db.diskdb)
|
batch.Reset()
|
||||||
rawdb.DeleteStateHistoryIndex(db.diskdb)
|
rawdb.DeleteStateHistoryIndexMetadata(batch)
|
||||||
|
rawdb.DeleteStateHistoryIndex(batch)
|
||||||
|
if err := batch.Write(); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
if err := db.stateFreezer.Reset(); err != nil {
|
if err := db.stateFreezer.Reset(); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -323,6 +323,69 @@ func (dl *diskLayer) update(root common.Hash, id uint64, block uint64, nodes *no
|
||||||
return newDiffLayer(dl, root, id, block, nodes, states)
|
return newDiffLayer(dl, root, id, block, nodes, states)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// writeStateHistory stores the state history and indexes if indexing is
|
||||||
|
// permitted.
|
||||||
|
//
|
||||||
|
// What's more, this function also returns a flag indicating whether the
|
||||||
|
// buffer flushing is required, ensuring the persistent state ID is always
|
||||||
|
// greater than or equal to the first history ID.
|
||||||
|
func (dl *diskLayer) writeStateHistory(diff *diffLayer) (bool, error) {
|
||||||
|
// Short circuit if state history is not permitted
|
||||||
|
if dl.db.stateFreezer == nil {
|
||||||
|
return false, nil
|
||||||
|
}
|
||||||
|
// Bail out with an error if writing the state history fails.
|
||||||
|
// This can happen, for example, if the device is full.
|
||||||
|
err := writeStateHistory(dl.db.stateFreezer, diff)
|
||||||
|
if err != nil {
|
||||||
|
return false, err
|
||||||
|
}
|
||||||
|
// Notify the state history indexer for newly created history
|
||||||
|
if dl.db.stateIndexer != nil {
|
||||||
|
if err := dl.db.stateIndexer.extend(diff.stateID()); err != nil {
|
||||||
|
return false, err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
// Determine if the persisted history object has exceeded the
|
||||||
|
// configured limitation.
|
||||||
|
limit := dl.db.config.StateHistory
|
||||||
|
if limit == 0 {
|
||||||
|
return false, nil
|
||||||
|
}
|
||||||
|
tail, err := dl.db.stateFreezer.Tail()
|
||||||
|
if err != nil {
|
||||||
|
return false, err
|
||||||
|
} // firstID = tail+1
|
||||||
|
|
||||||
|
// length = diff.stateID()-firstID+1 = diff.stateID()-tail
|
||||||
|
if diff.stateID()-tail <= limit {
|
||||||
|
return false, nil
|
||||||
|
}
|
||||||
|
newFirst := diff.stateID() - limit + 1 // the id of first history **after truncation**
|
||||||
|
|
||||||
|
// In a rare case where the ID of the first history object (after tail
|
||||||
|
// truncation) exceeds the persisted state ID, we must take corrective
|
||||||
|
// steps:
|
||||||
|
//
|
||||||
|
// - Skip tail truncation temporarily, avoid the scenario that associated
|
||||||
|
// history of persistent state is removed
|
||||||
|
//
|
||||||
|
// - Force a commit of the cached dirty states into persistent state
|
||||||
|
//
|
||||||
|
// These measures ensure the persisted state ID always remains greater
|
||||||
|
// than or equal to the first history ID.
|
||||||
|
if persistentID := rawdb.ReadPersistentStateID(dl.db.diskdb); persistentID < newFirst {
|
||||||
|
log.Debug("Skip tail truncation", "persistentID", persistentID, "tailID", tail+1, "headID", diff.stateID(), "limit", limit)
|
||||||
|
return true, nil
|
||||||
|
}
|
||||||
|
pruned, err := truncateFromTail(dl.db.diskdb, dl.db.stateFreezer, newFirst-1)
|
||||||
|
if err != nil {
|
||||||
|
return false, err
|
||||||
|
}
|
||||||
|
log.Debug("Pruned state history", "items", pruned, "tailid", newFirst)
|
||||||
|
return false, nil
|
||||||
|
}
|
||||||
|
|
||||||
// commit merges the given bottom-most diff layer into the node buffer
|
// commit merges the given bottom-most diff layer into the node buffer
|
||||||
// and returns a newly constructed disk layer. Note the current disk
|
// and returns a newly constructed disk layer. Note the current disk
|
||||||
// layer must be tagged as stale first to prevent re-access.
|
// layer must be tagged as stale first to prevent re-access.
|
||||||
|
|
@ -333,34 +396,9 @@ func (dl *diskLayer) commit(bottom *diffLayer, force bool) (*diskLayer, error) {
|
||||||
// Construct and store the state history first. If crash happens after storing
|
// Construct and store the state history first. If crash happens after storing
|
||||||
// the state history but without flushing the corresponding states(journal),
|
// the state history but without flushing the corresponding states(journal),
|
||||||
// the stored state history will be truncated from head in the next restart.
|
// the stored state history will be truncated from head in the next restart.
|
||||||
var (
|
flush, err := dl.writeStateHistory(bottom)
|
||||||
overflow bool
|
if err != nil {
|
||||||
oldest uint64
|
return nil, err
|
||||||
)
|
|
||||||
if dl.db.stateFreezer != nil {
|
|
||||||
// Bail out with an error if writing the state history fails.
|
|
||||||
// This can happen, for example, if the device is full.
|
|
||||||
err := writeStateHistory(dl.db.stateFreezer, bottom)
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
// Determine if the persisted history object has exceeded the configured
|
|
||||||
// limitation, set the overflow as true if so.
|
|
||||||
tail, err := dl.db.stateFreezer.Tail()
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
limit := dl.db.config.StateHistory
|
|
||||||
if limit != 0 && bottom.stateID()-tail > limit {
|
|
||||||
overflow = true
|
|
||||||
oldest = bottom.stateID() - limit + 1 // track the id of history **after truncation**
|
|
||||||
}
|
|
||||||
// Notify the state history indexer for newly created history
|
|
||||||
if dl.db.stateIndexer != nil {
|
|
||||||
if err := dl.db.stateIndexer.extend(bottom.stateID()); err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
// Mark the diskLayer as stale before applying any mutations on top.
|
// Mark the diskLayer as stale before applying any mutations on top.
|
||||||
dl.stale = true
|
dl.stale = true
|
||||||
|
|
@ -373,21 +411,13 @@ func (dl *diskLayer) commit(bottom *diffLayer, force bool) (*diskLayer, error) {
|
||||||
}
|
}
|
||||||
rawdb.WriteStateID(dl.db.diskdb, bottom.rootHash(), bottom.stateID())
|
rawdb.WriteStateID(dl.db.diskdb, bottom.rootHash(), bottom.stateID())
|
||||||
|
|
||||||
// In a unique scenario where the ID of the oldest history object (after tail
|
|
||||||
// truncation) surpasses the persisted state ID, we take the necessary action
|
|
||||||
// of forcibly committing the cached dirty states to ensure that the persisted
|
|
||||||
// state ID remains higher.
|
|
||||||
persistedID := rawdb.ReadPersistentStateID(dl.db.diskdb)
|
|
||||||
if !force && persistedID < oldest {
|
|
||||||
force = true
|
|
||||||
}
|
|
||||||
// Merge the trie nodes and flat states of the bottom-most diff layer into the
|
// Merge the trie nodes and flat states of the bottom-most diff layer into the
|
||||||
// buffer as the combined layer.
|
// buffer as the combined layer.
|
||||||
combined := dl.buffer.commit(bottom.nodes, bottom.states.stateSet)
|
combined := dl.buffer.commit(bottom.nodes, bottom.states.stateSet)
|
||||||
|
|
||||||
// Terminate the background state snapshot generation before mutating the
|
// Terminate the background state snapshot generation before mutating the
|
||||||
// persistent state.
|
// persistent state.
|
||||||
if combined.full() || force {
|
if combined.full() || force || flush {
|
||||||
// Wait until the previous frozen buffer is fully flushed
|
// Wait until the previous frozen buffer is fully flushed
|
||||||
if dl.frozen != nil {
|
if dl.frozen != nil {
|
||||||
if err := dl.frozen.waitFlush(); err != nil {
|
if err := dl.frozen.waitFlush(); err != nil {
|
||||||
|
|
@ -431,8 +461,8 @@ func (dl *diskLayer) commit(bottom *diffLayer, force bool) (*diskLayer, error) {
|
||||||
}
|
}
|
||||||
})
|
})
|
||||||
// Block until the frozen buffer is fully flushed out if the async flushing
|
// Block until the frozen buffer is fully flushed out if the async flushing
|
||||||
// is not allowed, or if the oldest history surpasses the persisted state ID.
|
// is not allowed.
|
||||||
if dl.db.config.NoAsyncFlush || persistedID < oldest {
|
if dl.db.config.NoAsyncFlush {
|
||||||
if err := dl.frozen.waitFlush(); err != nil {
|
if err := dl.frozen.waitFlush(); err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
@ -445,15 +475,6 @@ func (dl *diskLayer) commit(bottom *diffLayer, force bool) (*diskLayer, error) {
|
||||||
if dl.generator != nil {
|
if dl.generator != nil {
|
||||||
ndl.setGenerator(dl.generator)
|
ndl.setGenerator(dl.generator)
|
||||||
}
|
}
|
||||||
// To remove outdated history objects from the end, we set the 'tail' parameter
|
|
||||||
// to 'oldest-1' due to the offset between the freezer index and the history ID.
|
|
||||||
if overflow {
|
|
||||||
pruned, err := truncateFromTail(ndl.db.diskdb, ndl.db.stateFreezer, oldest-1)
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
log.Debug("Pruned state history", "items", pruned, "tailid", oldest)
|
|
||||||
}
|
|
||||||
return ndl, nil
|
return ndl, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -598,14 +598,17 @@ func checkVersion(disk ethdb.KeyValueStore) {
|
||||||
if err == nil && m.Version == stateIndexVersion {
|
if err == nil && m.Version == stateIndexVersion {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
// TODO(rjl493456442) would be better to group them into a batch.
|
|
||||||
rawdb.DeleteStateHistoryIndexMetadata(disk)
|
|
||||||
rawdb.DeleteStateHistoryIndex(disk)
|
|
||||||
|
|
||||||
version := "unknown"
|
version := "unknown"
|
||||||
if err == nil {
|
if err == nil {
|
||||||
version = fmt.Sprintf("%d", m.Version)
|
version = fmt.Sprintf("%d", m.Version)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
batch := disk.NewBatch()
|
||||||
|
rawdb.DeleteStateHistoryIndexMetadata(batch)
|
||||||
|
rawdb.DeleteStateHistoryIndex(batch)
|
||||||
|
if err := batch.Write(); err != nil {
|
||||||
|
log.Crit("Failed to purge state history index", "err", err)
|
||||||
|
}
|
||||||
log.Info("Cleaned up obsolete state history index", "version", version, "want", stateIndexVersion)
|
log.Info("Cleaned up obsolete state history index", "version", version, "want", stateIndexVersion)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
||||||
Loading…
Reference in a new issue