mirror of
https://github.com/ethereum/go-ethereum.git
synced 2026-05-27 10:19:28 +00:00
triedb, core/rawdb: flush + cancel-check inside storage-slot loop. remove unused progress marker
This commit is contained in:
parent
4935e0259d
commit
331edf8917
3 changed files with 19 additions and 31 deletions
|
|
@ -209,20 +209,6 @@ func WriteSnapshotSyncStatus(db ethdb.KeyValueWriter, status []byte) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// WriteGenerateTrieProgress records a partition's current progress.
|
|
||||||
func WriteGenerateTrieProgress(db ethdb.KeyValueWriter, partition byte, hash common.Hash) {
|
|
||||||
if err := db.Put(generateTrieProgressKey(partition), hash[:]); err != nil {
|
|
||||||
log.Crit("Failed to store generate-trie progress marker", "err", err)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// DeleteGenerateTrieProgress removes a partition's progress marker.
|
|
||||||
func DeleteGenerateTrieProgress(db ethdb.KeyValueWriter, partition byte) {
|
|
||||||
if err := db.Delete(generateTrieProgressKey(partition)); err != nil {
|
|
||||||
log.Crit("Failed to remove generate-trie progress marker", "err", err)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// ReadGenerateTriePartitionDone returns the raw subtree root blob for a
|
// ReadGenerateTriePartitionDone returns the raw subtree root blob for a
|
||||||
// partition that has previously completed.
|
// partition that has previously completed.
|
||||||
func ReadGenerateTriePartitionDone(db ethdb.KeyValueReader, partition byte) ([]byte, bool) {
|
func ReadGenerateTriePartitionDone(db ethdb.KeyValueReader, partition byte) ([]byte, bool) {
|
||||||
|
|
|
||||||
|
|
@ -104,10 +104,6 @@ var (
|
||||||
// snapSyncStatusFlagKey flags that status of snap sync.
|
// snapSyncStatusFlagKey flags that status of snap sync.
|
||||||
snapSyncStatusFlagKey = []byte("SnapSyncStatus")
|
snapSyncStatusFlagKey = []byte("SnapSyncStatus")
|
||||||
|
|
||||||
// generateTrieProgressPrefix tracks the last account hash processed by each
|
|
||||||
// of triedb.GenerateTrie's 16 partitions.
|
|
||||||
generateTrieProgressPrefix = []byte("gtp") // generateTrieProgressPrefix + partition byte -> last account hash
|
|
||||||
|
|
||||||
// generateTriePartitionDonePrefix stores the subtree root hash of each
|
// generateTriePartitionDonePrefix stores the subtree root hash of each
|
||||||
// triedb.GenerateTrie partition once it finishes.
|
// triedb.GenerateTrie partition once it finishes.
|
||||||
generateTriePartitionDonePrefix = []byte("gtd") // generateTriePartitionDonePrefix + partition byte -> subtree root hash
|
generateTriePartitionDonePrefix = []byte("gtd") // generateTriePartitionDonePrefix + partition byte -> subtree root hash
|
||||||
|
|
@ -474,11 +470,6 @@ func transitionStateKey(hash common.Hash) []byte {
|
||||||
return append(VerkleTransitionStatePrefix, hash.Bytes()...)
|
return append(VerkleTransitionStatePrefix, hash.Bytes()...)
|
||||||
}
|
}
|
||||||
|
|
||||||
// generateTrieProgressKey = generateTrieProgressPrefix + partition (single byte).
|
|
||||||
func generateTrieProgressKey(partition byte) []byte {
|
|
||||||
return append(generateTrieProgressPrefix, partition)
|
|
||||||
}
|
|
||||||
|
|
||||||
// generateTriePartitionDoneKey = generateTriePartitionDonePrefix + partition (single byte).
|
// generateTriePartitionDoneKey = generateTriePartitionDonePrefix + partition (single byte).
|
||||||
func generateTriePartitionDoneKey(partition byte) []byte {
|
func generateTriePartitionDoneKey(partition byte) []byte {
|
||||||
return append(generateTriePartitionDonePrefix, partition)
|
return append(generateTriePartitionDonePrefix, partition)
|
||||||
|
|
|
||||||
|
|
@ -146,6 +146,14 @@ func generatePartition(ctx context.Context, cancel <-chan struct{}, db ethdb.Dat
|
||||||
// shared storage iterator. The inner loop terminates on Hold()
|
// shared storage iterator. The inner loop terminates on Hold()
|
||||||
// (slot belongs to a later account) or exhaustion.
|
// (slot belongs to a later account) or exhaustion.
|
||||||
for iters.stor.Next() {
|
for iters.stor.Next() {
|
||||||
|
// Re-check cancel.
|
||||||
|
select {
|
||||||
|
case <-cancel:
|
||||||
|
return nil, ErrCancelled
|
||||||
|
case <-ctx.Done():
|
||||||
|
return nil, ctx.Err()
|
||||||
|
default:
|
||||||
|
}
|
||||||
sk := iters.stor.Key()
|
sk := iters.stor.Key()
|
||||||
storAcc := sk[len(rawdb.SnapshotStoragePrefix) : len(rawdb.SnapshotStoragePrefix)+common.HashLength]
|
storAcc := sk[len(rawdb.SnapshotStoragePrefix) : len(rawdb.SnapshotStoragePrefix)+common.HashLength]
|
||||||
cmp := bytes.Compare(storAcc, accountHash[:])
|
cmp := bytes.Compare(storAcc, accountHash[:])
|
||||||
|
|
@ -172,6 +180,15 @@ func generatePartition(ctx context.Context, cancel <-chan struct{}, db ethdb.Dat
|
||||||
if err := stoTrie.Update(slotHash, iters.stor.Value()); err != nil {
|
if err := stoTrie.Update(slotHash, iters.stor.Value()); err != nil {
|
||||||
return nil, fmt.Errorf("storage stack trie update for %x: %w", accountHash, err)
|
return nil, fmt.Errorf("storage stack trie update for %x: %w", accountHash, err)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Cap batch size and reopen iterators so pebble compactions don't stall.
|
||||||
|
if batch.ValueSize() > ethdb.IdealBatchSize {
|
||||||
|
if err := batch.Write(); err != nil {
|
||||||
|
return nil, fmt.Errorf("flush batch (storage): %w", err)
|
||||||
|
}
|
||||||
|
batch.Reset()
|
||||||
|
iters.reopen()
|
||||||
|
}
|
||||||
}
|
}
|
||||||
if err := iters.stor.Error(); err != nil {
|
if err := iters.stor.Error(); err != nil {
|
||||||
return nil, fmt.Errorf("storage iterator: %w", err)
|
return nil, fmt.Errorf("storage iterator: %w", err)
|
||||||
|
|
@ -194,10 +211,7 @@ func generatePartition(ctx context.Context, cancel <-chan struct{}, db ethdb.Dat
|
||||||
return nil, fmt.Errorf("account stack trie update for %x: %w", accountHash, err)
|
return nil, fmt.Errorf("account stack trie update for %x: %w", accountHash, err)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Progress marker keeps the batch growing on a predictable
|
// Cap batch size and reopen iterators so pebble compactions don't stall.
|
||||||
// rate. The size check drives flush + iterator reopen so
|
|
||||||
// pebble compactions aren't blocked by long-lived iterators.
|
|
||||||
rawdb.WriteGenerateTrieProgress(batch, partition, accountHash)
|
|
||||||
if batch.ValueSize() > ethdb.IdealBatchSize {
|
if batch.ValueSize() > ethdb.IdealBatchSize {
|
||||||
if err := batch.Write(); err != nil {
|
if err := batch.Write(); err != nil {
|
||||||
return nil, fmt.Errorf("flush batch: %w", err)
|
return nil, fmt.Errorf("flush batch: %w", err)
|
||||||
|
|
@ -216,9 +230,6 @@ func generatePartition(ctx context.Context, cancel <-chan struct{}, db ethdb.Dat
|
||||||
// rootBlob at nil.
|
// rootBlob at nil.
|
||||||
acctTrie.Hash()
|
acctTrie.Hash()
|
||||||
|
|
||||||
// Clear the progress marker since it's no longer needed once the
|
|
||||||
// partition's batch is flushed.
|
|
||||||
rawdb.DeleteGenerateTrieProgress(batch, partition)
|
|
||||||
if err := batch.Write(); err != nil {
|
if err := batch.Write(); err != nil {
|
||||||
return nil, fmt.Errorf("final partition batch write: %w", err)
|
return nil, fmt.Errorf("final partition batch write: %w", err)
|
||||||
}
|
}
|
||||||
|
|
@ -378,7 +389,7 @@ func assembleRoot(db ethdb.Database, scheme string, partitionBlobs [numPartition
|
||||||
|
|
||||||
// Remember that strip returns nil for the common case 1.
|
// Remember that strip returns nil for the common case 1.
|
||||||
if strippedBlob != nil {
|
if strippedBlob != nil {
|
||||||
// Strip constructed a new node that is alonger extension or leaf
|
// Strip constructed a new node that is a longer extension or leaf
|
||||||
// partition (case 2/3). Persist it at path=[i] so path-scheme readers
|
// partition (case 2/3). Persist it at path=[i] so path-scheme readers
|
||||||
// traversing slot i of the top branch can find it.
|
// traversing slot i of the top branch can find it.
|
||||||
rawdb.WriteTrieNode(batch, common.Hash{}, []byte{byte(i)}, stripped, strippedBlob, scheme)
|
rawdb.WriteTrieNode(batch, common.Hash{}, []byte{byte(i)}, stripped, strippedBlob, scheme)
|
||||||
|
|
|
||||||
Loading…
Reference in a new issue