From eb6c814332f434307a7a0ae58d411a1960098537 Mon Sep 17 00:00:00 2001 From: Gary Rong Date: Fri, 13 Mar 2026 14:51:03 +0800 Subject: [PATCH] triedb/pathdb: periodically reopen the iterator to unblock compactor --- triedb/pathdb/history_index_pruner.go | 84 ++++++++++++++++++--------- 1 file changed, 55 insertions(+), 29 deletions(-) diff --git a/triedb/pathdb/history_index_pruner.go b/triedb/pathdb/history_index_pruner.go index 826d83237c..e6dabaf89d 100644 --- a/triedb/pathdb/history_index_pruner.go +++ b/triedb/pathdb/history_index_pruner.go @@ -33,6 +33,12 @@ const ( // accumulate before triggering index pruning. This helps avoid scheduling // index pruning too frequently. indexPruningThreshold = 90000 + + // indexPruneReopenInterval is how long the iterator is kept open before + // being released and re-opened. Long-lived iterators hold a read snapshot + // that blocks LSM compaction; periodically re-opening avoids stalling the + // compactor during a large scan. + indexPruneReopenInterval = 3 * time.Minute ) // indexPruner is responsible for pruning stale index data from the tail side @@ -163,41 +169,61 @@ func (p *indexPruner) process(tail uint64) error { return nil } -// prunePrefix scans up to indexPruneBatchSize metadata entries starting from -// the cursor position and prunes leading index blocks below the tail. The -// cursor advances after each cycle; when the prefix is fully scanned, the -// cursor resets so the next cycle starts from the beginning. -// Returns (prunedBlocks, scannedEntries, error). +// prunePrefix scans all metadata entries under the given prefix and prunes +// leading index blocks below the tail. The iterator is periodically released +// and re-opened to avoid holding a read snapshot that blocks LSM compaction. func (p *indexPruner) prunePrefix(prefix []byte, elemType elementType, tail uint64) (int, error) { var ( pruned int + start []byte // iterator seek position batch = p.disk.NewBatchWithSize(ethdb.IdealBatchSize) ) - it := p.disk.NewIterator(prefix, nil) - defer it.Release() - - for it.Next() { - // Check for shutdown - select { - case <-p.closed: - return pruned, nil - default: - } - key, value := it.Key(), it.Value() - - ident, bsize := p.identFromKey(key, prefix, elemType) - n, err := p.pruneEntry(batch, ident, value, bsize, tail) - if err != nil { - p.log.Warn("Failed to prune index entry", "ident", ident, "err", err) - continue - } - pruned += n - - if batch.ValueSize() >= ethdb.IdealBatchSize { - if err := batch.Write(); err != nil { - return 0, err + for { + var ( + reopen bool + opened = time.Now() + it = p.disk.NewIterator(prefix, start) + ) + for it.Next() { + // Check termination + select { + case <-p.closed: + it.Release() + if batch.ValueSize() > 0 { + return pruned, batch.Write() + } + return pruned, nil + default: } - batch.Reset() + + key, value := it.Key(), it.Value() + ident, bsize := p.identFromKey(key, prefix, elemType) + n, err := p.pruneEntry(batch, ident, value, bsize, tail) + if err != nil { + p.log.Warn("Failed to prune index entry", "ident", ident, "err", err) + continue + } + pruned += n + + if batch.ValueSize() >= ethdb.IdealBatchSize { + if err := batch.Write(); err != nil { + it.Release() + return 0, err + } + batch.Reset() + } + // Periodically release the iterator so the LSM compactor + // is not blocked by the read snapshot we hold. + if time.Since(opened) >= indexPruneReopenInterval { + reopen = true + start = common.CopyBytes(key[len(prefix):]) + break + } + } + it.Release() + + if !reopen { + break } } if batch.ValueSize() > 0 {