triedb/pathdb: improve pruner

This commit is contained in:
Gary Rong 2026-03-20 10:32:03 +08:00
parent f7bff33820
commit d9e6c9ca56

View file

@ -220,13 +220,6 @@ func (p *indexPruner) prunePrefix(prefix []byte, elemType elementType, tail uint
it.Release() it.Release()
break break
} }
// Periodically release the iterator so the LSM compactor
// is not blocked by the read snapshot we hold.
if time.Since(opened) >= iteratorReopenInterval {
start := common.CopyBytes(it.Key()[len(prefix):])
it.Release()
it = p.disk.NewIterator(prefix, start)
}
// Check termination or pause request // Check termination or pause request
select { select {
case <-p.closed: case <-p.closed:
@ -240,26 +233,35 @@ func (p *indexPruner) prunePrefix(prefix []byte, elemType elementType, tail uint
case ack := <-p.pauseReq: case ack := <-p.pauseReq:
// Save the current position so that after resume the // Save the current position so that after resume the
// iterator can be re-opened from where it left off. // iterator can be re-opened from where it left off.
key := it.Key() start := common.CopyBytes(it.Key()[len(prefix):])
start := common.CopyBytes(key[len(prefix):])
it.Release() it.Release()
// Flush all pending writes before acknowledging the pause. // Flush all pending writes before acknowledging the pause.
var flushErr error
if batch.ValueSize() > 0 { if batch.ValueSize() > 0 {
if err := batch.Write(); err != nil { if err := batch.Write(); err != nil {
close(ack) flushErr = err
return 0, err
} }
batch.Reset() batch.Reset()
} }
close(ack) close(ack)
// Block until resumed or closed // Block until resumed or closed. Always wait here even if
// the flush failed — returning early would cause resume()
// to deadlock since nobody would receive on resumeCh.
select { select {
case <-p.resumeCh: case <-p.resumeCh:
if flushErr != nil {
return 0, flushErr
}
// Re-open the iterator from the saved position so the
// pruner sees the current database state (including any
// writes made by indexer during the pause).
it = p.disk.NewIterator(prefix, start) it = p.disk.NewIterator(prefix, start)
opened = time.Now()
continue
case <-p.closed: case <-p.closed:
return pruned, nil return pruned, flushErr
} }
default: default:
@ -284,6 +286,16 @@ func (p *indexPruner) prunePrefix(prefix []byte, elemType elementType, tail uint
} }
batch.Reset() batch.Reset()
} }
// Periodically release the iterator so the LSM compactor
// is not blocked by the read snapshot we hold.
if time.Since(opened) >= iteratorReopenInterval {
opened = time.Now()
start := common.CopyBytes(it.Key()[len(prefix):])
it.Release()
it = p.disk.NewIterator(prefix, start)
}
} }
if batch.ValueSize() > 0 { if batch.ValueSize() > 0 {
if err := batch.Write(); err != nil { if err := batch.Write(); err != nil {