diff --git a/triedb/pathdb/history_index_pruner.go b/triedb/pathdb/history_index_pruner.go index 045ab0ce7f..25407b9805 100644 --- a/triedb/pathdb/history_index_pruner.go +++ b/triedb/pathdb/history_index_pruner.go @@ -220,13 +220,6 @@ func (p *indexPruner) prunePrefix(prefix []byte, elemType elementType, tail uint it.Release() break } - // Periodically release the iterator so the LSM compactor - // is not blocked by the read snapshot we hold. - if time.Since(opened) >= iteratorReopenInterval { - start := common.CopyBytes(it.Key()[len(prefix):]) - it.Release() - it = p.disk.NewIterator(prefix, start) - } // Check termination or pause request select { case <-p.closed: @@ -240,26 +233,35 @@ func (p *indexPruner) prunePrefix(prefix []byte, elemType elementType, tail uint case ack := <-p.pauseReq: // Save the current position so that after resume the // iterator can be re-opened from where it left off. - key := it.Key() - start := common.CopyBytes(key[len(prefix):]) + start := common.CopyBytes(it.Key()[len(prefix):]) it.Release() // Flush all pending writes before acknowledging the pause. + var flushErr error if batch.ValueSize() > 0 { if err := batch.Write(); err != nil { - close(ack) - return 0, err + flushErr = err } batch.Reset() } close(ack) - // Block until resumed or closed + // Block until resumed or closed. Always wait here even if + // the flush failed — returning early would cause resume() + // to deadlock since nobody would receive on resumeCh. select { case <-p.resumeCh: + if flushErr != nil { + return 0, flushErr + } + // Re-open the iterator from the saved position so the + // pruner sees the current database state (including any + // writes made by indexer during the pause). it = p.disk.NewIterator(prefix, start) + opened = time.Now() + continue case <-p.closed: - return pruned, nil + return pruned, flushErr } default: @@ -284,6 +286,16 @@ func (p *indexPruner) prunePrefix(prefix []byte, elemType elementType, tail uint } batch.Reset() } + + // Periodically release the iterator so the LSM compactor + // is not blocked by the read snapshot we hold. + if time.Since(opened) >= iteratorReopenInterval { + opened = time.Now() + + start := common.CopyBytes(it.Key()[len(prefix):]) + it.Release() + it = p.disk.NewIterator(prefix, start) + } } if batch.ValueSize() > 0 { if err := batch.Write(); err != nil {