diff --git a/triedb/pathdb/disklayer.go b/triedb/pathdb/disklayer.go index 5bad19b4f5..50c7279d0e 100644 --- a/triedb/pathdb/disklayer.go +++ b/triedb/pathdb/disklayer.go @@ -408,6 +408,11 @@ func (dl *diskLayer) writeHistory(typ historyType, diff *diffLayer) (bool, error if err != nil { return false, err } + // Notify the index pruner about the new tail so that stale index + // blocks referencing the pruned histories can be cleaned up. + if indexer != nil && pruned > 0 { + indexer.prune(newFirst) + } log.Debug("Pruned history", "type", typ, "items", pruned, "tailid", newFirst) return false, nil } diff --git a/triedb/pathdb/history_index_pruner.go b/triedb/pathdb/history_index_pruner.go new file mode 100644 index 0000000000..c9be3618e8 --- /dev/null +++ b/triedb/pathdb/history_index_pruner.go @@ -0,0 +1,385 @@ +// Copyright 2025 The go-ethereum Authors +// This file is part of the go-ethereum library. +// +// The go-ethereum library is free software: you can redistribute it and/or modify +// it under the terms of the GNU Lesser General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// The go-ethereum library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the go-ethereum library. If not, see . + +package pathdb + +import ( + "encoding/binary" + "sync" + "sync/atomic" + "time" + + "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/core/rawdb" + "github.com/ethereum/go-ethereum/ethdb" + "github.com/ethereum/go-ethereum/log" +) + +const ( + // indexPruningThreshold defines the number of pruned histories that must + // accumulate before triggering index pruning. This helps avoid scheduling + // index pruning too frequently. + indexPruningThreshold = 90000 + + // iteratorReopenInterval is how long the iterator is kept open before + // being released and re-opened. Long-lived iterators hold a read snapshot + // that blocks LSM compaction; periodically re-opening avoids stalling the + // compactor during a large scan. + iteratorReopenInterval = 30 * time.Second +) + +// indexPruner is responsible for pruning stale index data from the tail side +// when old history objects are removed. It runs as a background goroutine and +// processes pruning signals whenever the history tail advances. +// +// The pruning operates at the block level: for each state element's index +// metadata, leading index blocks whose maximum history ID falls below the +// new tail are removed entirely. This avoids the need to decode individual +// block contents and is efficient because index blocks store monotonically +// increasing history IDs. +type indexPruner struct { + disk ethdb.KeyValueStore + typ historyType + tail atomic.Uint64 // Tail below which index entries can be pruned + lastRun uint64 // The tail in the last pruning run + trigger chan struct{} // Non-blocking signal that tail has advanced + closed chan struct{} + wg sync.WaitGroup + log log.Logger + + pauseReq chan chan struct{} // Pause request; caller sends ack channel, pruner closes it when paused + resumeCh chan struct{} // Resume signal sent by caller after indexSingle/unindexSingle completes +} + +// newIndexPruner creates and starts a new index pruner for the given history type. +func newIndexPruner(disk ethdb.KeyValueStore, typ historyType) *indexPruner { + p := &indexPruner{ + disk: disk, + typ: typ, + trigger: make(chan struct{}, 1), + closed: make(chan struct{}), + log: log.New("type", typ.String()), + pauseReq: make(chan chan struct{}), + resumeCh: make(chan struct{}), + } + p.wg.Add(1) + go p.run() + return p +} + +// prune signals the pruner that the history tail has advanced to the given ID. +// All index entries referencing history IDs below newTail can be removed. +func (p *indexPruner) prune(newTail uint64) { + // Only update if the tail is actually advancing + for { + old := p.tail.Load() + if newTail <= old { + return + } + if p.tail.CompareAndSwap(old, newTail) { + break + } + } + // Non-blocking signal + select { + case p.trigger <- struct{}{}: + default: + } +} + +// pause requests the pruner to flush all pending writes and pause. It blocks +// until the pruner has acknowledged the pause. This must be paired with a +// subsequent call to resume. +func (p *indexPruner) pause() { + ack := make(chan struct{}) + select { + case p.pauseReq <- ack: + <-ack // wait for the pruner to flush and acknowledge + case <-p.closed: + } +} + +// resume unblocks a previously paused pruner, allowing it to continue +// processing. +func (p *indexPruner) resume() { + select { + case p.resumeCh <- struct{}{}: + case <-p.closed: + } +} + +// close shuts down the pruner and waits for it to finish. +func (p *indexPruner) close() { + select { + case <-p.closed: + return + default: + close(p.closed) + p.wg.Wait() + } +} + +// run is the main loop of the pruner. It waits for trigger signals and +// processes a small batch of entries on each trigger, advancing the cursor. +func (p *indexPruner) run() { + defer p.wg.Done() + + for { + select { + case <-p.trigger: + tail := p.tail.Load() + if tail < p.lastRun || tail-p.lastRun < indexPruningThreshold { + continue + } + if err := p.process(tail); err != nil { + p.log.Error("Failed to prune index", "tail", tail, "err", err) + } else { + p.lastRun = tail + } + + case ack := <-p.pauseReq: + // Pruner is idle, acknowledge immediately and wait for resume. + close(ack) + select { + case <-p.resumeCh: + case <-p.closed: + return + } + + case <-p.closed: + return + } + } +} + +// process iterates all index metadata entries for the history type and prunes +// leading blocks whose max history ID is below the given tail. +func (p *indexPruner) process(tail uint64) error { + var ( + err error + pruned int + start = time.Now() + ) + switch p.typ { + case typeStateHistory: + n, err := p.prunePrefix(rawdb.StateHistoryAccountMetadataPrefix, typeAccount, tail) + if err != nil { + return err + } + pruned += n + + n, err = p.prunePrefix(rawdb.StateHistoryStorageMetadataPrefix, typeStorage, tail) + if err != nil { + return err + } + pruned += n + statePruneHistoryIndexTimer.UpdateSince(start) + + case typeTrienodeHistory: + pruned, err = p.prunePrefix(rawdb.TrienodeHistoryMetadataPrefix, typeTrienode, tail) + if err != nil { + return err + } + trienodePruneHistoryIndexTimer.UpdateSince(start) + + default: + panic("unknown history type") + } + if pruned > 0 { + p.log.Info("Pruned stale index blocks", "pruned", pruned, "tail", tail, "elapsed", common.PrettyDuration(time.Since(start))) + } + return nil +} + +// prunePrefix scans all metadata entries under the given prefix and prunes +// leading index blocks below the tail. The iterator is periodically released +// and re-opened to avoid holding a read snapshot that blocks LSM compaction. +func (p *indexPruner) prunePrefix(prefix []byte, elemType elementType, tail uint64) (int, error) { + var ( + pruned int + opened = time.Now() + it = p.disk.NewIterator(prefix, nil) + batch = p.disk.NewBatchWithSize(ethdb.IdealBatchSize) + ) + for { + // Terminate if iterator is exhausted + if !it.Next() { + it.Release() + break + } + // Check termination or pause request + select { + case <-p.closed: + // Terminate the process if indexer is closed + it.Release() + if batch.ValueSize() > 0 { + return pruned, batch.Write() + } + return pruned, nil + + case ack := <-p.pauseReq: + // Save the current position so that after resume the + // iterator can be re-opened from where it left off. + start := common.CopyBytes(it.Key()[len(prefix):]) + it.Release() + + // Flush all pending writes before acknowledging the pause. + var flushErr error + if batch.ValueSize() > 0 { + if err := batch.Write(); err != nil { + flushErr = err + } + batch.Reset() + } + close(ack) + + // Block until resumed or closed. Always wait here even if + // the flush failed — returning early would cause resume() + // to deadlock since nobody would receive on resumeCh. + select { + case <-p.resumeCh: + if flushErr != nil { + return 0, flushErr + } + // Re-open the iterator from the saved position so the + // pruner sees the current database state (including any + // writes made by indexer during the pause). + it = p.disk.NewIterator(prefix, start) + opened = time.Now() + continue + case <-p.closed: + return pruned, flushErr + } + + default: + // Keep processing + } + + // Prune the index data block + key, value := it.Key(), it.Value() + ident, bsize := p.identFromKey(key, prefix, elemType) + n, err := p.pruneEntry(batch, ident, value, bsize, tail) + if err != nil { + p.log.Warn("Failed to prune index entry", "ident", ident, "err", err) + continue + } + pruned += n + + // Flush the batch if there are too many accumulated + if batch.ValueSize() >= ethdb.IdealBatchSize { + if err := batch.Write(); err != nil { + it.Release() + return 0, err + } + batch.Reset() + } + + // Periodically release the iterator so the LSM compactor + // is not blocked by the read snapshot we hold. + if time.Since(opened) >= iteratorReopenInterval { + opened = time.Now() + + start := common.CopyBytes(it.Key()[len(prefix):]) + it.Release() + it = p.disk.NewIterator(prefix, start) + } + } + if batch.ValueSize() > 0 { + if err := batch.Write(); err != nil { + return 0, err + } + } + return pruned, nil +} + +// identFromKey reconstructs the stateIdent and bitmapSize from a metadata key. +func (p *indexPruner) identFromKey(key []byte, prefix []byte, elemType elementType) (stateIdent, int) { + rest := key[len(prefix):] + + switch elemType { + case typeAccount: + // key = prefix + addressHash(32) + var addrHash common.Hash + copy(addrHash[:], rest[:32]) + return newAccountIdent(addrHash), 0 + + case typeStorage: + // key = prefix + addressHash(32) + storageHash(32) + var addrHash, storHash common.Hash + copy(addrHash[:], rest[:32]) + copy(storHash[:], rest[32:64]) + return newStorageIdent(addrHash, storHash), 0 + + case typeTrienode: + // key = prefix + addressHash(32) + path(variable) + var addrHash common.Hash + copy(addrHash[:], rest[:32]) + path := string(rest[32:]) + ident := newTrienodeIdent(addrHash, path) + return ident, ident.bloomSize() + + default: + panic("unknown element type") + } +} + +// pruneEntry checks a single metadata entry and removes leading index blocks +// whose max < tail. Returns the number of blocks pruned. +func (p *indexPruner) pruneEntry(batch ethdb.Batch, ident stateIdent, blob []byte, bsize int, tail uint64) (int, error) { + // Fast path: the first 8 bytes of the metadata encode the max history ID + // of the first index block (big-endian uint64). If it is >= tail, no + // blocks can be pruned and we skip the full parse entirely. + if len(blob) >= 8 && binary.BigEndian.Uint64(blob[:8]) >= tail { + return 0, nil + } + descList, err := parseIndex(blob, bsize) + if err != nil { + return 0, err + } + // Find the number of leading blocks that can be entirely pruned. + // A block can be pruned if its max history ID is strictly below + // the tail. + var count int + for _, desc := range descList { + if desc.max < tail { + count++ + } else { + break // blocks are ordered, no more to prune + } + } + if count == 0 { + return 0, nil + } + // Delete the pruned index blocks + for i := 0; i < count; i++ { + deleteStateIndexBlock(ident, batch, descList[i].id) + } + // Update or delete the metadata + remaining := descList[count:] + if len(remaining) == 0 { + // All blocks pruned, remove the metadata entry entirely + deleteStateIndex(ident, batch) + } else { + // Rewrite the metadata with the remaining blocks + size := indexBlockDescSize + bsize + buf := make([]byte, 0, size*len(remaining)) + for _, desc := range remaining { + buf = append(buf, desc.encode()...) + } + writeStateIndex(ident, batch, buf) + } + return count, nil +} diff --git a/triedb/pathdb/history_index_pruner_test.go b/triedb/pathdb/history_index_pruner_test.go new file mode 100644 index 0000000000..b3094de3e6 --- /dev/null +++ b/triedb/pathdb/history_index_pruner_test.go @@ -0,0 +1,355 @@ +// Copyright 2025 The go-ethereum Authors +// This file is part of the go-ethereum library. +// +// The go-ethereum library is free software: you can redistribute it and/or modify +// it under the terms of the GNU Lesser General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// The go-ethereum library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the go-ethereum library. If not, see . + +package pathdb + +import ( + "math" + "testing" + + "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/core/rawdb" + "github.com/ethereum/go-ethereum/ethdb" + "github.com/ethereum/go-ethereum/log" +) + +func writeMultiBlockIndex(t *testing.T, db ethdb.Database, ident stateIdent, bitmapSize int, startID uint64) []*indexBlockDesc { + t.Helper() + + if startID == 0 { + startID = 1 + } + iw, _ := newIndexWriter(db, ident, 0, bitmapSize) + + for i := 0; i < 10000; i++ { + if err := iw.append(startID+uint64(i), randomExt(bitmapSize, 5)); err != nil { + t.Fatalf("Failed to append element %d: %v", i, err) + } + } + batch := db.NewBatch() + iw.finish(batch) + if err := batch.Write(); err != nil { + t.Fatalf("Failed to write batch: %v", err) + } + + blob := readStateIndex(ident, db) + descList, err := parseIndex(blob, bitmapSize) + if err != nil { + t.Fatalf("Failed to parse index: %v", err) + } + return descList +} + +// TestPruneEntryBasic verifies that pruneEntry correctly removes leading index +// blocks whose max is below the given tail. +func TestPruneEntryBasic(t *testing.T) { + db := rawdb.NewMemoryDatabase() + ident := newAccountIdent(common.Hash{0xa}) + descList := writeMultiBlockIndex(t, db, ident, 0, 1) + + // Prune with a tail that is above the first block's max but below the second + firstBlockMax := descList[0].max + + pruner := newIndexPruner(db, typeStateHistory) + defer pruner.close() + + if err := pruner.process(firstBlockMax + 1); err != nil { + t.Fatalf("Failed to process pruning: %v", err) + } + + // Verify the first block was removed + blob := readStateIndex(ident, db) + if len(blob) == 0 { + t.Fatal("Index metadata should not be empty after partial prune") + } + remaining, err := parseIndex(blob, 0) + if err != nil { + t.Fatalf("Failed to parse index after prune: %v", err) + } + if len(remaining) != len(descList)-1 { + t.Fatalf("Expected %d blocks remaining, got %d", len(descList)-1, len(remaining)) + } + // The first remaining block should be what was previously the second block + if remaining[0].id != descList[1].id { + t.Fatalf("Expected first remaining block id %d, got %d", descList[1].id, remaining[0].id) + } + + // Verify the pruned block data is actually deleted + blockData := readStateIndexBlock(ident, db, descList[0].id) + if len(blockData) != 0 { + t.Fatal("Pruned block data should have been deleted") + } + + // Remaining blocks should still have their data + for _, desc := range remaining { + blockData = readStateIndexBlock(ident, db, desc.id) + if len(blockData) == 0 { + t.Fatalf("Block %d data should still exist", desc.id) + } + } +} + +// TestPruneEntryBasicTrienode is the same as TestPruneEntryBasic but for +// trienode index entries with a non-zero bitmapSize. +func TestPruneEntryBasicTrienode(t *testing.T) { + db := rawdb.NewMemoryDatabase() + addrHash := common.Hash{0xa} + path := string([]byte{0x0, 0x0, 0x0}) + ident := newTrienodeIdent(addrHash, path) + + descList := writeMultiBlockIndex(t, db, ident, ident.bloomSize(), 1) + firstBlockMax := descList[0].max + + pruner := newIndexPruner(db, typeTrienodeHistory) + defer pruner.close() + + if err := pruner.process(firstBlockMax + 1); err != nil { + t.Fatalf("Failed to process pruning: %v", err) + } + + blob := readStateIndex(ident, db) + remaining, err := parseIndex(blob, ident.bloomSize()) + if err != nil { + t.Fatalf("Failed to parse index after prune: %v", err) + } + if len(remaining) != len(descList)-1 { + t.Fatalf("Expected %d blocks remaining, got %d", len(descList)-1, len(remaining)) + } + if remaining[0].id != descList[1].id { + t.Fatalf("Expected first remaining block id %d, got %d", descList[1].id, remaining[0].id) + } + blockData := readStateIndexBlock(ident, db, descList[0].id) + if len(blockData) != 0 { + t.Fatal("Pruned block data should have been deleted") + } +} + +// TestPruneEntryComplete verifies that when all blocks are pruned, the metadata +// entry is also deleted. +func TestPruneEntryComplete(t *testing.T) { + db := rawdb.NewMemoryDatabase() + ident := newAccountIdent(common.Hash{0xb}) + iw, _ := newIndexWriter(db, ident, 0, 0) + + for i := 1; i <= 10; i++ { + if err := iw.append(uint64(i), nil); err != nil { + t.Fatalf("Failed to append: %v", err) + } + } + batch := db.NewBatch() + iw.finish(batch) + if err := batch.Write(); err != nil { + t.Fatalf("Failed to write: %v", err) + } + + pruner := newIndexPruner(db, typeStateHistory) + defer pruner.close() + + // Prune with tail above all elements + if err := pruner.process(11); err != nil { + t.Fatalf("Failed to process: %v", err) + } + + // Metadata entry should be deleted + blob := readStateIndex(ident, db) + if len(blob) != 0 { + t.Fatal("Index metadata should be empty after full prune") + } +} + +// TestPruneNoop verifies that pruning does nothing when the tail is below all +// block maximums. +func TestPruneNoop(t *testing.T) { + db := rawdb.NewMemoryDatabase() + ident := newAccountIdent(common.Hash{0xc}) + iw, _ := newIndexWriter(db, ident, 0, 0) + + for i := 100; i <= 200; i++ { + if err := iw.append(uint64(i), nil); err != nil { + t.Fatalf("Failed to append: %v", err) + } + } + batch := db.NewBatch() + iw.finish(batch) + if err := batch.Write(); err != nil { + t.Fatalf("Failed to write: %v", err) + } + + blob := readStateIndex(ident, db) + origLen := len(blob) + + pruner := newIndexPruner(db, typeStateHistory) + defer pruner.close() + + if err := pruner.process(50); err != nil { + t.Fatalf("Failed to process: %v", err) + } + + // Nothing should have changed + blob = readStateIndex(ident, db) + if len(blob) != origLen { + t.Fatalf("Expected no change, original len %d, got %d", origLen, len(blob)) + } +} + +// TestPrunePreservesReadability verifies that after pruning, the remaining +// index data is still readable and returns correct results. +func TestPrunePreservesReadability(t *testing.T) { + db := rawdb.NewMemoryDatabase() + ident := newAccountIdent(common.Hash{0xe}) + descList := writeMultiBlockIndex(t, db, ident, 0, 1) + firstBlockMax := descList[0].max + + pruner := newIndexPruner(db, typeStateHistory) + defer pruner.close() + + if err := pruner.process(firstBlockMax + 1); err != nil { + t.Fatalf("Failed to process: %v", err) + } + + // Read the remaining index and verify lookups still work + ir, err := newIndexReader(db, ident, 0) + if err != nil { + t.Fatalf("Failed to create reader: %v", err) + } + + // Looking for something greater than firstBlockMax should still work + result, err := ir.readGreaterThan(firstBlockMax) + if err != nil { + t.Fatalf("Failed to read: %v", err) + } + if result != firstBlockMax+1 { + t.Fatalf("Expected %d, got %d", firstBlockMax+1, result) + } + + // Looking for the last element should return MaxUint64 + result, err = ir.readGreaterThan(20000) + if err != nil { + t.Fatalf("Failed to read: %v", err) + } + if result != math.MaxUint64 { + t.Fatalf("Expected MaxUint64, got %d", result) + } +} + +// TestPrunePauseResume verifies the pause/resume mechanism: +// - The pruner pauses mid-iteration and flushes its batch +// - Data written while the pruner is paused (simulating indexSingle) is +// visible after resume via a fresh iterator +// - Pruning still completes correctly after resume +func TestPrunePauseResume(t *testing.T) { + db := rawdb.NewMemoryDatabase() + + // Create many accounts with multi-block indexes so the pruner is still + // iterating when the pause request arrives. + var firstBlockMax uint64 + for i := 0; i < 200; i++ { + hash := common.Hash{byte(i)} + ident := newAccountIdent(hash) + descList := writeMultiBlockIndex(t, db, ident, 0, 1) + if i == 0 { + firstBlockMax = descList[0].max + } + } + // Target account at the end of the key space — the pruner should not + // have visited it yet when the pause is acknowledged. + targetIdent := newAccountIdent(common.Hash{0xff}) + targetDescList := writeMultiBlockIndex(t, db, targetIdent, 0, 1) + + tail := firstBlockMax + 1 + + // Construct the pruner without starting run(). We call process() + // directly to exercise the mid-iteration pause path deterministically. + pruner := &indexPruner{ + disk: db, + typ: typeStateHistory, + log: log.New("type", "account"), + closed: make(chan struct{}), + pauseReq: make(chan chan struct{}, 1), // buffered so we can pre-deposit + resumeCh: make(chan struct{}), + } + + // Pre-deposit a pause request before process() starts. Because + // pauseReq is buffered, this succeeds immediately. When prunePrefix's + // select checks the channel on an early iteration, it will find the + // pending request and pause — no scheduling race is possible. + ack := make(chan struct{}) + pruner.pauseReq <- ack + + // Run process() in the background. + errCh := make(chan error, 1) + go func() { + errCh <- pruner.process(tail) + }() + + // Block until the pruner has flushed pending writes and acknowledged. + <-ack + + // While paused, append a new element to the target account's index, + // simulating what indexSingle would do during the pause window. + lastMax := targetDescList[len(targetDescList)-1].max + newID := lastMax + 10000 + iw, err := newIndexWriter(db, targetIdent, lastMax, 0) + if err != nil { + t.Fatalf("Failed to create index writer: %v", err) + } + if err := iw.append(newID, nil); err != nil { + t.Fatalf("Failed to append: %v", err) + } + batch := db.NewBatch() + iw.finish(batch) + if err := batch.Write(); err != nil { + t.Fatalf("Failed to write batch: %v", err) + } + + // Resume the pruner. + pruner.resume() + + // Wait for process() to complete. + if err := <-errCh; err != nil { + t.Fatalf("process() failed: %v", err) + } + + // Verify: the entry written during the pause must still be accessible. + // If the pruner used a stale iterator snapshot, it would overwrite the + // target's metadata and lose the new entry. + ir, err := newIndexReader(db, targetIdent, 0) + if err != nil { + t.Fatalf("Failed to create index reader: %v", err) + } + result, err := ir.readGreaterThan(newID - 1) + if err != nil { + t.Fatalf("Failed to read: %v", err) + } + if result != newID { + t.Fatalf("Entry written during pause was lost: want %d, got %d", newID, result) + } + + // Verify: pruning actually occurred on an early account. + earlyIdent := newAccountIdent(common.Hash{0x00}) + earlyBlob := readStateIndex(earlyIdent, db) + if len(earlyBlob) == 0 { + t.Fatal("Early account index should not be completely empty") + } + earlyRemaining, err := parseIndex(earlyBlob, 0) + if err != nil { + t.Fatalf("Failed to parse early account index: %v", err) + } + // The first block (id=0) should have been pruned. + if earlyRemaining[0].id == 0 { + t.Fatal("First block of early account should have been pruned") + } +} diff --git a/triedb/pathdb/history_indexer.go b/triedb/pathdb/history_indexer.go index c9bf3e87f1..9b215b917f 100644 --- a/triedb/pathdb/history_indexer.go +++ b/triedb/pathdb/history_indexer.go @@ -719,6 +719,7 @@ func (i *indexIniter) recover() bool { // state history. type historyIndexer struct { initer *indexIniter + pruner *indexPruner typ historyType disk ethdb.KeyValueStore freezer ethdb.AncientStore @@ -774,6 +775,7 @@ func newHistoryIndexer(disk ethdb.Database, freezer ethdb.AncientStore, lastHist checkVersion(disk, typ) return &historyIndexer{ initer: newIndexIniter(disk, freezer, typ, lastHistoryID, noWait), + pruner: newIndexPruner(disk, typ), typ: typ, disk: disk, freezer: freezer, @@ -782,6 +784,7 @@ func newHistoryIndexer(disk ethdb.Database, freezer ethdb.AncientStore, lastHist func (i *historyIndexer) close() { i.initer.close() + i.pruner.close() } // inited returns a flag indicating whether the existing state histories @@ -802,6 +805,8 @@ func (i *historyIndexer) extend(historyID uint64) error { case <-i.initer.closed: return errors.New("indexer is closed") case <-i.initer.done: + i.pruner.pause() + defer i.pruner.resume() return indexSingle(historyID, i.disk, i.freezer, i.typ) case i.initer.interrupt <- signal: return <-signal.result @@ -819,12 +824,27 @@ func (i *historyIndexer) shorten(historyID uint64) error { case <-i.initer.closed: return errors.New("indexer is closed") case <-i.initer.done: + i.pruner.pause() + defer i.pruner.resume() return unindexSingle(historyID, i.disk, i.freezer, i.typ) case i.initer.interrupt <- signal: return <-signal.result } } +// prune signals the pruner that the history tail has advanced to the given ID, +// so that stale index blocks referencing pruned histories can be removed. +func (i *historyIndexer) prune(newTail uint64) { + select { + case <-i.initer.closed: + log.Debug("Ignored the pruning signal", "reason", "closed") + case <-i.initer.done: + i.pruner.prune(newTail) + default: + log.Debug("Ignored the pruning signal", "reason", "busy") + } +} + // progress returns the indexing progress made so far. It provides the number // of states that remain unindexed. func (i *historyIndexer) progress() (uint64, error) { diff --git a/triedb/pathdb/metrics.go b/triedb/pathdb/metrics.go index a0a626f9b5..e01dfdfb86 100644 --- a/triedb/pathdb/metrics.go +++ b/triedb/pathdb/metrics.go @@ -77,10 +77,12 @@ var ( trienodeHistoryDataBytesMeter = metrics.NewRegisteredMeter("pathdb/history/trienode/bytes/data", nil) trienodeHistoryIndexBytesMeter = metrics.NewRegisteredMeter("pathdb/history/trienode/bytes/index", nil) - stateIndexHistoryTimer = metrics.NewRegisteredResettingTimer("pathdb/history/state/index/time", nil) - stateUnindexHistoryTimer = metrics.NewRegisteredResettingTimer("pathdb/history/state/unindex/time", nil) - trienodeIndexHistoryTimer = metrics.NewRegisteredResettingTimer("pathdb/history/trienode/index/time", nil) - trienodeUnindexHistoryTimer = metrics.NewRegisteredResettingTimer("pathdb/history/trienode/unindex/time", nil) + stateIndexHistoryTimer = metrics.NewRegisteredResettingTimer("pathdb/history/state/index/time", nil) + stateUnindexHistoryTimer = metrics.NewRegisteredResettingTimer("pathdb/history/state/unindex/time", nil) + statePruneHistoryIndexTimer = metrics.NewRegisteredResettingTimer("pathdb/history/state/prune/time", nil) + trienodeIndexHistoryTimer = metrics.NewRegisteredResettingTimer("pathdb/history/trienode/index/time", nil) + trienodeUnindexHistoryTimer = metrics.NewRegisteredResettingTimer("pathdb/history/trienode/unindex/time", nil) + trienodePruneHistoryIndexTimer = metrics.NewRegisteredResettingTimer("pathdb/history/trienode/prune/time", nil) lookupAddLayerTimer = metrics.NewRegisteredResettingTimer("pathdb/lookup/add/time", nil) lookupRemoveLayerTimer = metrics.NewRegisteredResettingTimer("pathdb/lookup/remove/time", nil)