triedb/pathdb: pause the pruning if indexer is active

This commit is contained in:
Gary Rong 2026-03-20 10:18:42 +08:00
parent 0fd7a61f7f
commit f7bff33820
2 changed files with 105 additions and 42 deletions

View file

@ -59,16 +59,21 @@ type indexPruner struct {
closed chan struct{}
wg sync.WaitGroup
log log.Logger
pauseReq chan chan struct{} // Pause request; caller sends ack channel, pruner closes it when paused
resumeCh chan struct{} // Resume signal sent by caller after indexSingle/unindexSingle completes
}
// newIndexPruner creates and starts a new index pruner for the given history type.
func newIndexPruner(disk ethdb.KeyValueStore, typ historyType) *indexPruner {
p := &indexPruner{
disk: disk,
typ: typ,
trigger: make(chan struct{}, 1),
closed: make(chan struct{}),
log: log.New("type", typ.String()),
disk: disk,
typ: typ,
trigger: make(chan struct{}, 1),
closed: make(chan struct{}),
log: log.New("type", typ.String()),
pauseReq: make(chan chan struct{}),
resumeCh: make(chan struct{}),
}
p.wg.Add(1)
go p.run()
@ -95,6 +100,27 @@ func (p *indexPruner) prune(newTail uint64) {
}
}
// pause requests the pruner to flush all pending writes and pause. It blocks
// until the pruner has acknowledged the pause. This must be paired with a
// subsequent call to resume.
func (p *indexPruner) pause() {
ack := make(chan struct{})
select {
case p.pauseReq <- ack:
<-ack // wait for the pruner to flush and acknowledge
case <-p.closed:
}
}
// resume unblocks a previously paused pruner, allowing it to continue
// processing.
func (p *indexPruner) resume() {
select {
case p.resumeCh <- struct{}{}:
case <-p.closed:
}
}
// close shuts down the pruner and waits for it to finish.
func (p *indexPruner) close() {
select {
@ -124,6 +150,15 @@ func (p *indexPruner) run() {
p.lastRun = tail
}
case ack := <-p.pauseReq:
// Pruner is idle, acknowledge immediately and wait for resume.
close(ack)
select {
case <-p.resumeCh:
case <-p.closed:
return
}
case <-p.closed:
return
}
@ -175,55 +210,79 @@ func (p *indexPruner) process(tail uint64) error {
func (p *indexPruner) prunePrefix(prefix []byte, elemType elementType, tail uint64) (int, error) {
var (
pruned int
start []byte // iterator seek position
opened = time.Now()
it = p.disk.NewIterator(prefix, nil)
batch = p.disk.NewBatchWithSize(ethdb.IdealBatchSize)
)
for {
var (
reopen bool
opened = time.Now()
it = p.disk.NewIterator(prefix, start)
)
for it.Next() {
// Check termination
select {
case <-p.closed:
it.Release()
if batch.ValueSize() > 0 {
return pruned, batch.Write()
}
return pruned, nil
default:
// Terminate if iterator is exhausted
if !it.Next() {
it.Release()
break
}
// Periodically release the iterator so the LSM compactor
// is not blocked by the read snapshot we hold.
if time.Since(opened) >= iteratorReopenInterval {
start := common.CopyBytes(it.Key()[len(prefix):])
it.Release()
it = p.disk.NewIterator(prefix, start)
}
// Check termination or pause request
select {
case <-p.closed:
// Terminate the process if indexer is closed
it.Release()
if batch.ValueSize() > 0 {
return pruned, batch.Write()
}
return pruned, nil
key, value := it.Key(), it.Value()
ident, bsize := p.identFromKey(key, prefix, elemType)
n, err := p.pruneEntry(batch, ident, value, bsize, tail)
if err != nil {
p.log.Warn("Failed to prune index entry", "ident", ident, "err", err)
continue
}
pruned += n
case ack := <-p.pauseReq:
// Save the current position so that after resume the
// iterator can be re-opened from where it left off.
key := it.Key()
start := common.CopyBytes(key[len(prefix):])
it.Release()
if batch.ValueSize() >= ethdb.IdealBatchSize {
// Flush all pending writes before acknowledging the pause.
if batch.ValueSize() > 0 {
if err := batch.Write(); err != nil {
it.Release()
close(ack)
return 0, err
}
batch.Reset()
}
// Periodically release the iterator so the LSM compactor
// is not blocked by the read snapshot we hold.
if time.Since(opened) >= iteratorReopenInterval {
reopen = true
start = common.CopyBytes(key[len(prefix):])
break
}
}
it.Release()
close(ack)
if !reopen {
break
// Block until resumed or closed
select {
case <-p.resumeCh:
it = p.disk.NewIterator(prefix, start)
case <-p.closed:
return pruned, nil
}
default:
// Keep processing
}
// Prune the index data block
key, value := it.Key(), it.Value()
ident, bsize := p.identFromKey(key, prefix, elemType)
n, err := p.pruneEntry(batch, ident, value, bsize, tail)
if err != nil {
p.log.Warn("Failed to prune index entry", "ident", ident, "err", err)
continue
}
pruned += n
// Flush the batch if there are too many accumulated
if batch.ValueSize() >= ethdb.IdealBatchSize {
if err := batch.Write(); err != nil {
it.Release()
return 0, err
}
batch.Reset()
}
}
if batch.ValueSize() > 0 {

View file

@ -805,6 +805,8 @@ func (i *historyIndexer) extend(historyID uint64) error {
case <-i.initer.closed:
return errors.New("indexer is closed")
case <-i.initer.done:
i.pruner.pause()
defer i.pruner.resume()
return indexSingle(historyID, i.disk, i.freezer, i.typ)
case i.initer.interrupt <- signal:
return <-signal.result
@ -822,6 +824,8 @@ func (i *historyIndexer) shorten(historyID uint64) error {
case <-i.initer.closed:
return errors.New("indexer is closed")
case <-i.initer.done:
i.pruner.pause()
defer i.pruner.resume()
return unindexSingle(historyID, i.disk, i.freezer, i.typ)
case i.initer.interrupt <- signal:
return <-signal.result