diff --git a/triedb/pathdb/history_index_pruner.go b/triedb/pathdb/history_index_pruner.go index c966cefae1..045ab0ce7f 100644 --- a/triedb/pathdb/history_index_pruner.go +++ b/triedb/pathdb/history_index_pruner.go @@ -59,16 +59,21 @@ type indexPruner struct { closed chan struct{} wg sync.WaitGroup log log.Logger + + pauseReq chan chan struct{} // Pause request; caller sends ack channel, pruner closes it when paused + resumeCh chan struct{} // Resume signal sent by caller after indexSingle/unindexSingle completes } // newIndexPruner creates and starts a new index pruner for the given history type. func newIndexPruner(disk ethdb.KeyValueStore, typ historyType) *indexPruner { p := &indexPruner{ - disk: disk, - typ: typ, - trigger: make(chan struct{}, 1), - closed: make(chan struct{}), - log: log.New("type", typ.String()), + disk: disk, + typ: typ, + trigger: make(chan struct{}, 1), + closed: make(chan struct{}), + log: log.New("type", typ.String()), + pauseReq: make(chan chan struct{}), + resumeCh: make(chan struct{}), } p.wg.Add(1) go p.run() @@ -95,6 +100,27 @@ func (p *indexPruner) prune(newTail uint64) { } } +// pause requests the pruner to flush all pending writes and pause. It blocks +// until the pruner has acknowledged the pause. This must be paired with a +// subsequent call to resume. +func (p *indexPruner) pause() { + ack := make(chan struct{}) + select { + case p.pauseReq <- ack: + <-ack // wait for the pruner to flush and acknowledge + case <-p.closed: + } +} + +// resume unblocks a previously paused pruner, allowing it to continue +// processing. +func (p *indexPruner) resume() { + select { + case p.resumeCh <- struct{}{}: + case <-p.closed: + } +} + // close shuts down the pruner and waits for it to finish. func (p *indexPruner) close() { select { @@ -124,6 +150,15 @@ func (p *indexPruner) run() { p.lastRun = tail } + case ack := <-p.pauseReq: + // Pruner is idle, acknowledge immediately and wait for resume. + close(ack) + select { + case <-p.resumeCh: + case <-p.closed: + return + } + case <-p.closed: return } @@ -175,55 +210,79 @@ func (p *indexPruner) process(tail uint64) error { func (p *indexPruner) prunePrefix(prefix []byte, elemType elementType, tail uint64) (int, error) { var ( pruned int - start []byte // iterator seek position + opened = time.Now() + it = p.disk.NewIterator(prefix, nil) batch = p.disk.NewBatchWithSize(ethdb.IdealBatchSize) ) for { - var ( - reopen bool - opened = time.Now() - it = p.disk.NewIterator(prefix, start) - ) - for it.Next() { - // Check termination - select { - case <-p.closed: - it.Release() - if batch.ValueSize() > 0 { - return pruned, batch.Write() - } - return pruned, nil - default: + // Terminate if iterator is exhausted + if !it.Next() { + it.Release() + break + } + // Periodically release the iterator so the LSM compactor + // is not blocked by the read snapshot we hold. + if time.Since(opened) >= iteratorReopenInterval { + start := common.CopyBytes(it.Key()[len(prefix):]) + it.Release() + it = p.disk.NewIterator(prefix, start) + } + // Check termination or pause request + select { + case <-p.closed: + // Terminate the process if indexer is closed + it.Release() + if batch.ValueSize() > 0 { + return pruned, batch.Write() } + return pruned, nil - key, value := it.Key(), it.Value() - ident, bsize := p.identFromKey(key, prefix, elemType) - n, err := p.pruneEntry(batch, ident, value, bsize, tail) - if err != nil { - p.log.Warn("Failed to prune index entry", "ident", ident, "err", err) - continue - } - pruned += n + case ack := <-p.pauseReq: + // Save the current position so that after resume the + // iterator can be re-opened from where it left off. + key := it.Key() + start := common.CopyBytes(key[len(prefix):]) + it.Release() - if batch.ValueSize() >= ethdb.IdealBatchSize { + // Flush all pending writes before acknowledging the pause. + if batch.ValueSize() > 0 { if err := batch.Write(); err != nil { - it.Release() + close(ack) return 0, err } batch.Reset() } - // Periodically release the iterator so the LSM compactor - // is not blocked by the read snapshot we hold. - if time.Since(opened) >= iteratorReopenInterval { - reopen = true - start = common.CopyBytes(key[len(prefix):]) - break - } - } - it.Release() + close(ack) - if !reopen { - break + // Block until resumed or closed + select { + case <-p.resumeCh: + it = p.disk.NewIterator(prefix, start) + case <-p.closed: + return pruned, nil + } + + default: + // Keep processing + } + + // Prune the index data block + key, value := it.Key(), it.Value() + ident, bsize := p.identFromKey(key, prefix, elemType) + n, err := p.pruneEntry(batch, ident, value, bsize, tail) + if err != nil { + p.log.Warn("Failed to prune index entry", "ident", ident, "err", err) + continue + } + pruned += n + + // Flush the batch if there are too many accumulated + if batch.ValueSize() >= ethdb.IdealBatchSize { + if err := batch.Write(); err != nil { + it.Release() + return 0, err + } + batch.Reset() } } if batch.ValueSize() > 0 { diff --git a/triedb/pathdb/history_indexer.go b/triedb/pathdb/history_indexer.go index ea03f82edb..9b215b917f 100644 --- a/triedb/pathdb/history_indexer.go +++ b/triedb/pathdb/history_indexer.go @@ -805,6 +805,8 @@ func (i *historyIndexer) extend(historyID uint64) error { case <-i.initer.closed: return errors.New("indexer is closed") case <-i.initer.done: + i.pruner.pause() + defer i.pruner.resume() return indexSingle(historyID, i.disk, i.freezer, i.typ) case i.initer.interrupt <- signal: return <-signal.result @@ -822,6 +824,8 @@ func (i *historyIndexer) shorten(historyID uint64) error { case <-i.initer.closed: return errors.New("indexer is closed") case <-i.initer.done: + i.pruner.pause() + defer i.pruner.resume() return unindexSingle(historyID, i.disk, i.freezer, i.typ) case i.initer.interrupt <- signal: return <-signal.result