triedb/pathdb: periodically reopen the iterator to unblock compactor

This commit is contained in:
Gary Rong 2026-03-13 14:51:03 +08:00
parent d1b5c4101f
commit eb6c814332

View file

@ -33,6 +33,12 @@ const (
// accumulate before triggering index pruning. This helps avoid scheduling // accumulate before triggering index pruning. This helps avoid scheduling
// index pruning too frequently. // index pruning too frequently.
indexPruningThreshold = 90000 indexPruningThreshold = 90000
// indexPruneReopenInterval is how long the iterator is kept open before
// being released and re-opened. Long-lived iterators hold a read snapshot
// that blocks LSM compaction; periodically re-opening avoids stalling the
// compactor during a large scan.
indexPruneReopenInterval = 3 * time.Minute
) )
// indexPruner is responsible for pruning stale index data from the tail side // indexPruner is responsible for pruning stale index data from the tail side
@ -163,41 +169,61 @@ func (p *indexPruner) process(tail uint64) error {
return nil return nil
} }
// prunePrefix scans up to indexPruneBatchSize metadata entries starting from // prunePrefix scans all metadata entries under the given prefix and prunes
// the cursor position and prunes leading index blocks below the tail. The // leading index blocks below the tail. The iterator is periodically released
// cursor advances after each cycle; when the prefix is fully scanned, the // and re-opened to avoid holding a read snapshot that blocks LSM compaction.
// cursor resets so the next cycle starts from the beginning.
// Returns (prunedBlocks, scannedEntries, error).
func (p *indexPruner) prunePrefix(prefix []byte, elemType elementType, tail uint64) (int, error) { func (p *indexPruner) prunePrefix(prefix []byte, elemType elementType, tail uint64) (int, error) {
var ( var (
pruned int pruned int
start []byte // iterator seek position
batch = p.disk.NewBatchWithSize(ethdb.IdealBatchSize) batch = p.disk.NewBatchWithSize(ethdb.IdealBatchSize)
) )
it := p.disk.NewIterator(prefix, nil) for {
defer it.Release() var (
reopen bool
for it.Next() { opened = time.Now()
// Check for shutdown it = p.disk.NewIterator(prefix, start)
select { )
case <-p.closed: for it.Next() {
return pruned, nil // Check termination
default: select {
} case <-p.closed:
key, value := it.Key(), it.Value() it.Release()
if batch.ValueSize() > 0 {
ident, bsize := p.identFromKey(key, prefix, elemType) return pruned, batch.Write()
n, err := p.pruneEntry(batch, ident, value, bsize, tail) }
if err != nil { return pruned, nil
p.log.Warn("Failed to prune index entry", "ident", ident, "err", err) default:
continue
}
pruned += n
if batch.ValueSize() >= ethdb.IdealBatchSize {
if err := batch.Write(); err != nil {
return 0, err
} }
batch.Reset()
key, value := it.Key(), it.Value()
ident, bsize := p.identFromKey(key, prefix, elemType)
n, err := p.pruneEntry(batch, ident, value, bsize, tail)
if err != nil {
p.log.Warn("Failed to prune index entry", "ident", ident, "err", err)
continue
}
pruned += n
if batch.ValueSize() >= ethdb.IdealBatchSize {
if err := batch.Write(); err != nil {
it.Release()
return 0, err
}
batch.Reset()
}
// Periodically release the iterator so the LSM compactor
// is not blocked by the read snapshot we hold.
if time.Since(opened) >= indexPruneReopenInterval {
reopen = true
start = common.CopyBytes(key[len(prefix):])
break
}
}
it.Release()
if !reopen {
break
} }
} }
if batch.ValueSize() > 0 { if batch.ValueSize() > 0 {