triedb/pathdb: implement history index pruner

This commit is contained in:
Gary Rong 2026-03-12 15:38:55 +08:00
parent 92b4cb2663
commit bcce50b037
5 changed files with 561 additions and 4 deletions

View file

@ -408,6 +408,11 @@ func (dl *diskLayer) writeHistory(typ historyType, diff *diffLayer) (bool, error
if err != nil {
return false, err
}
// Notify the index pruner about the new tail so that stale index
// blocks referencing the pruned histories can be cleaned up.
if indexer != nil && pruned > 0 {
indexer.prune(newFirst)
}
log.Debug("Pruned history", "type", typ, "items", pruned, "tailid", newFirst)
return false, nil
}

View file

@ -0,0 +1,296 @@
// Copyright 2025 The go-ethereum Authors
// This file is part of the go-ethereum library.
//
// The go-ethereum library is free software: you can redistribute it and/or modify
// it under the terms of the GNU Lesser General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// The go-ethereum library is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Lesser General Public License for more details.
//
// You should have received a copy of the GNU Lesser General Public License
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
package pathdb
import (
"encoding/binary"
"sync"
"sync/atomic"
"time"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/rawdb"
"github.com/ethereum/go-ethereum/ethdb"
"github.com/ethereum/go-ethereum/log"
)
const (
// indexPruningThreshold defines the number of pruned histories that must
// accumulate before triggering index pruning. This helps avoid scheduling
// index pruning too frequently.
indexPruningThreshold = 90000
)
// indexPruner is responsible for pruning stale index data from the tail side
// when old history objects are removed. It runs as a background goroutine and
// processes pruning signals whenever the history tail advances.
//
// The pruning operates at the block level: for each state element's index
// metadata, leading index blocks whose maximum history ID falls below the
// new tail are removed entirely. This avoids the need to decode individual
// block contents and is efficient because index blocks store monotonically
// increasing history IDs.
type indexPruner struct {
disk ethdb.KeyValueStore
typ historyType
tail atomic.Uint64 // Tail below which index entries can be pruned
lastRun uint64 // The tail in the last pruning run
trigger chan struct{} // Non-blocking signal that tail has advanced
closed chan struct{}
wg sync.WaitGroup
log log.Logger
}
// newIndexPruner creates and starts a new index pruner for the given history type.
func newIndexPruner(disk ethdb.KeyValueStore, typ historyType) *indexPruner {
p := &indexPruner{
disk: disk,
typ: typ,
trigger: make(chan struct{}, 1),
closed: make(chan struct{}),
log: log.New("type", typ.String()),
}
p.wg.Add(1)
go p.run()
return p
}
// prune signals the pruner that the history tail has advanced to the given ID.
// All index entries referencing history IDs below newTail can be removed.
func (p *indexPruner) prune(newTail uint64) {
// Only update if the tail is actually advancing
for {
old := p.tail.Load()
if newTail <= old {
return
}
if p.tail.CompareAndSwap(old, newTail) {
break
}
}
// Non-blocking signal
select {
case p.trigger <- struct{}{}:
default:
}
}
// close shuts down the pruner and waits for it to finish.
func (p *indexPruner) close() {
select {
case <-p.closed:
return
default:
close(p.closed)
p.wg.Wait()
}
}
// run is the main loop of the pruner. It waits for trigger signals and
// processes a small batch of entries on each trigger, advancing the cursor.
func (p *indexPruner) run() {
defer p.wg.Done()
for {
select {
case <-p.trigger:
tail := p.tail.Load()
if tail < p.lastRun || tail-p.lastRun < indexPruningThreshold {
continue
}
if err := p.process(tail); err != nil {
p.log.Error("Failed to prune index", "tail", tail, "err", err)
} else {
p.lastRun = tail
}
case <-p.closed:
return
}
}
}
// process iterates all index metadata entries for the history type and prunes
// leading blocks whose max history ID is below the given tail.
func (p *indexPruner) process(tail uint64) error {
var (
err error
pruned int
scanned int
start = time.Now()
)
switch p.typ {
case typeStateHistory:
pn, sn, err := p.prunePrefix(rawdb.StateHistoryAccountMetadataPrefix, typeAccount, tail)
if err != nil {
return err
}
pruned += pn
scanned += sn
pn, sn, err = p.prunePrefix(rawdb.StateHistoryStorageMetadataPrefix, typeStorage, tail)
if err != nil {
return err
}
pruned += pn
scanned += sn
case typeTrienodeHistory:
pruned, scanned, err = p.prunePrefix(rawdb.TrienodeHistoryMetadataPrefix, typeTrienode, tail)
if err != nil {
return err
}
default:
panic("unknown history type")
}
if pruned > 0 {
p.log.Debug("Pruned stale index blocks", "pruned", pruned, "scanned", scanned, "tail", tail, "elapsed", common.PrettyDuration(time.Since(start)))
}
if p.typ == typeStateHistory {
statePruneHistoryIndexTimer.UpdateSince(start)
} else {
trienodePruneHistoryIndexTimer.UpdateSince(start)
}
return nil
}
// prunePrefix scans up to indexPruneBatchSize metadata entries starting from
// the cursor position and prunes leading index blocks below the tail. The
// cursor advances after each cycle; when the prefix is fully scanned, the
// cursor resets so the next cycle starts from the beginning.
// Returns (prunedBlocks, scannedEntries, error).
func (p *indexPruner) prunePrefix(prefix []byte, elemType elementType, tail uint64) (int, int, error) {
var (
pruned int
scanned int
batch = p.disk.NewBatchWithSize(ethdb.IdealBatchSize)
)
it := p.disk.NewIterator(prefix, nil)
defer it.Release()
for it.Next() {
// Check for shutdown
select {
case <-p.closed:
return pruned, scanned, nil
default:
}
scanned++
key, value := it.Key(), it.Value()
ident, bsize := p.identFromKey(key, prefix, elemType)
n, err := p.pruneEntry(batch, ident, value, bsize, tail)
if err != nil {
p.log.Warn("Failed to prune index entry", "ident", ident, "err", err)
continue
}
pruned += n
if batch.ValueSize() >= ethdb.IdealBatchSize {
if err := batch.Write(); err != nil {
return 0, 0, err
}
batch.Reset()
}
}
if batch.ValueSize() > 0 {
if err := batch.Write(); err != nil {
return 0, 0, err
}
}
return pruned, scanned, nil
}
// identFromKey reconstructs the stateIdent and bitmapSize from a metadata key.
func (p *indexPruner) identFromKey(key []byte, prefix []byte, elemType elementType) (stateIdent, int) {
rest := key[len(prefix):]
switch elemType {
case typeAccount:
// key = prefix + addressHash(32)
var addrHash common.Hash
copy(addrHash[:], rest[:32])
return newAccountIdent(addrHash), 0
case typeStorage:
// key = prefix + addressHash(32) + storageHash(32)
var addrHash, storHash common.Hash
copy(addrHash[:], rest[:32])
copy(storHash[:], rest[32:64])
return newStorageIdent(addrHash, storHash), 0
case typeTrienode:
// key = prefix + addressHash(32) + path(variable)
var addrHash common.Hash
copy(addrHash[:], rest[:32])
path := string(rest[32:])
ident := newTrienodeIdent(addrHash, path)
return ident, ident.bloomSize()
default:
panic("unknown element type")
}
}
// pruneEntry checks a single metadata entry and removes leading index blocks
// whose max < tail. Returns the number of blocks pruned.
func (p *indexPruner) pruneEntry(batch ethdb.Batch, ident stateIdent, blob []byte, bsize int, tail uint64) (int, error) {
// Fast path: the first 8 bytes of the metadata encode the max history ID
// of the first index block (big-endian uint64). If it is >= tail, no
// blocks can be pruned and we skip the full parse entirely.
if len(blob) >= 8 && binary.BigEndian.Uint64(blob[:8]) >= tail {
return 0, nil
}
descList, err := parseIndex(blob, bsize)
if err != nil {
return 0, err
}
// Find the number of leading blocks that can be entirely pruned.
// A block can be pruned if its max history ID is strictly below
// the tail.
var count int
for _, desc := range descList {
if desc.max < tail {
count++
} else {
break // blocks are ordered, no more to prune
}
}
if count == 0 {
return 0, nil
}
// Delete the pruned index blocks
for i := 0; i < count; i++ {
deleteStateIndexBlock(ident, batch, descList[i].id)
}
// Update or delete the metadata
remaining := descList[count:]
if len(remaining) == 0 {
// All blocks pruned, remove the metadata entry entirely
deleteStateIndex(ident, batch)
} else {
// Rewrite the metadata with the remaining blocks
size := indexBlockDescSize + bsize
buf := make([]byte, 0, size*len(remaining))
for _, desc := range remaining {
buf = append(buf, desc.encode()...)
}
writeStateIndex(ident, batch, buf)
}
return count, nil
}

View file

@ -0,0 +1,245 @@
// Copyright 2025 The go-ethereum Authors
// This file is part of the go-ethereum library.
//
// The go-ethereum library is free software: you can redistribute it and/or modify
// it under the terms of the GNU Lesser General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// The go-ethereum library is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Lesser General Public License for more details.
//
// You should have received a copy of the GNU Lesser General Public License
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
package pathdb
import (
"math"
"testing"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/rawdb"
"github.com/ethereum/go-ethereum/ethdb"
)
func writeMultiBlockIndex(t *testing.T, db ethdb.Database, ident stateIdent, bitmapSize int, startID uint64) []*indexBlockDesc {
t.Helper()
if startID == 0 {
startID = 1
}
iw, _ := newIndexWriter(db, ident, 0, bitmapSize)
for i := 0; i < 10000; i++ {
if err := iw.append(startID+uint64(i), randomExt(bitmapSize, 5)); err != nil {
t.Fatalf("Failed to append element %d: %v", i, err)
}
}
batch := db.NewBatch()
iw.finish(batch)
if err := batch.Write(); err != nil {
t.Fatalf("Failed to write batch: %v", err)
}
blob := readStateIndex(ident, db)
descList, err := parseIndex(blob, bitmapSize)
if err != nil {
t.Fatalf("Failed to parse index: %v", err)
}
return descList
}
// TestPruneEntryBasic verifies that pruneEntry correctly removes leading index
// blocks whose max is below the given tail.
func TestPruneEntryBasic(t *testing.T) {
db := rawdb.NewMemoryDatabase()
ident := newAccountIdent(common.Hash{0xa})
descList := writeMultiBlockIndex(t, db, ident, 0, 1)
// Prune with a tail that is above the first block's max but below the second
firstBlockMax := descList[0].max
pruner := newIndexPruner(db, typeStateHistory)
defer pruner.close()
if err := pruner.process(firstBlockMax + 1); err != nil {
t.Fatalf("Failed to process pruning: %v", err)
}
// Verify the first block was removed
blob := readStateIndex(ident, db)
if len(blob) == 0 {
t.Fatal("Index metadata should not be empty after partial prune")
}
remaining, err := parseIndex(blob, 0)
if err != nil {
t.Fatalf("Failed to parse index after prune: %v", err)
}
if len(remaining) != len(descList)-1 {
t.Fatalf("Expected %d blocks remaining, got %d", len(descList)-1, len(remaining))
}
// The first remaining block should be what was previously the second block
if remaining[0].id != descList[1].id {
t.Fatalf("Expected first remaining block id %d, got %d", descList[1].id, remaining[0].id)
}
// Verify the pruned block data is actually deleted
blockData := readStateIndexBlock(ident, db, descList[0].id)
if len(blockData) != 0 {
t.Fatal("Pruned block data should have been deleted")
}
// Remaining blocks should still have their data
for _, desc := range remaining {
blockData = readStateIndexBlock(ident, db, desc.id)
if len(blockData) == 0 {
t.Fatalf("Block %d data should still exist", desc.id)
}
}
}
// TestPruneEntryBasicTrienode is the same as TestPruneEntryBasic but for
// trienode index entries with a non-zero bitmapSize.
func TestPruneEntryBasicTrienode(t *testing.T) {
db := rawdb.NewMemoryDatabase()
addrHash := common.Hash{0xa}
path := string([]byte{0x0, 0x0, 0x0})
ident := newTrienodeIdent(addrHash, path)
descList := writeMultiBlockIndex(t, db, ident, ident.bloomSize(), 1)
firstBlockMax := descList[0].max
pruner := newIndexPruner(db, typeTrienodeHistory)
defer pruner.close()
if err := pruner.process(firstBlockMax + 1); err != nil {
t.Fatalf("Failed to process pruning: %v", err)
}
blob := readStateIndex(ident, db)
remaining, err := parseIndex(blob, ident.bloomSize())
if err != nil {
t.Fatalf("Failed to parse index after prune: %v", err)
}
if len(remaining) != len(descList)-1 {
t.Fatalf("Expected %d blocks remaining, got %d", len(descList)-1, len(remaining))
}
if remaining[0].id != descList[1].id {
t.Fatalf("Expected first remaining block id %d, got %d", descList[1].id, remaining[0].id)
}
blockData := readStateIndexBlock(ident, db, descList[0].id)
if len(blockData) != 0 {
t.Fatal("Pruned block data should have been deleted")
}
}
// TestPruneEntryComplete verifies that when all blocks are pruned, the metadata
// entry is also deleted.
func TestPruneEntryComplete(t *testing.T) {
db := rawdb.NewMemoryDatabase()
ident := newAccountIdent(common.Hash{0xb})
iw, _ := newIndexWriter(db, ident, 0, 0)
for i := 1; i <= 10; i++ {
if err := iw.append(uint64(i), nil); err != nil {
t.Fatalf("Failed to append: %v", err)
}
}
batch := db.NewBatch()
iw.finish(batch)
if err := batch.Write(); err != nil {
t.Fatalf("Failed to write: %v", err)
}
pruner := newIndexPruner(db, typeStateHistory)
defer pruner.close()
// Prune with tail above all elements
if err := pruner.process(11); err != nil {
t.Fatalf("Failed to process: %v", err)
}
// Metadata entry should be deleted
blob := readStateIndex(ident, db)
if len(blob) != 0 {
t.Fatal("Index metadata should be empty after full prune")
}
}
// TestPruneNoop verifies that pruning does nothing when the tail is below all
// block maximums.
func TestPruneNoop(t *testing.T) {
db := rawdb.NewMemoryDatabase()
ident := newAccountIdent(common.Hash{0xc})
iw, _ := newIndexWriter(db, ident, 0, 0)
for i := 100; i <= 200; i++ {
if err := iw.append(uint64(i), nil); err != nil {
t.Fatalf("Failed to append: %v", err)
}
}
batch := db.NewBatch()
iw.finish(batch)
if err := batch.Write(); err != nil {
t.Fatalf("Failed to write: %v", err)
}
blob := readStateIndex(ident, db)
origLen := len(blob)
pruner := newIndexPruner(db, typeStateHistory)
defer pruner.close()
if err := pruner.process(50); err != nil {
t.Fatalf("Failed to process: %v", err)
}
// Nothing should have changed
blob = readStateIndex(ident, db)
if len(blob) != origLen {
t.Fatalf("Expected no change, original len %d, got %d", origLen, len(blob))
}
}
// TestPrunePreservesReadability verifies that after pruning, the remaining
// index data is still readable and returns correct results.
func TestPrunePreservesReadability(t *testing.T) {
db := rawdb.NewMemoryDatabase()
ident := newAccountIdent(common.Hash{0xe})
descList := writeMultiBlockIndex(t, db, ident, 0, 1)
firstBlockMax := descList[0].max
pruner := newIndexPruner(db, typeStateHistory)
defer pruner.close()
if err := pruner.process(firstBlockMax + 1); err != nil {
t.Fatalf("Failed to process: %v", err)
}
// Read the remaining index and verify lookups still work
ir, err := newIndexReader(db, ident, 0)
if err != nil {
t.Fatalf("Failed to create reader: %v", err)
}
// Looking for something greater than firstBlockMax should still work
result, err := ir.readGreaterThan(firstBlockMax)
if err != nil {
t.Fatalf("Failed to read: %v", err)
}
if result != firstBlockMax+1 {
t.Fatalf("Expected %d, got %d", firstBlockMax+1, result)
}
// Looking for the last element should return MaxUint64
result, err = ir.readGreaterThan(20000)
if err != nil {
t.Fatalf("Failed to read: %v", err)
}
if result != math.MaxUint64 {
t.Fatalf("Expected MaxUint64, got %d", result)
}
}

View file

@ -719,6 +719,7 @@ func (i *indexIniter) recover() bool {
// state history.
type historyIndexer struct {
initer *indexIniter
pruner *indexPruner
typ historyType
disk ethdb.KeyValueStore
freezer ethdb.AncientStore
@ -774,6 +775,7 @@ func newHistoryIndexer(disk ethdb.Database, freezer ethdb.AncientStore, lastHist
checkVersion(disk, typ)
return &historyIndexer{
initer: newIndexIniter(disk, freezer, typ, lastHistoryID, noWait),
pruner: newIndexPruner(disk, typ),
typ: typ,
disk: disk,
freezer: freezer,
@ -782,6 +784,7 @@ func newHistoryIndexer(disk ethdb.Database, freezer ethdb.AncientStore, lastHist
func (i *historyIndexer) close() {
i.initer.close()
i.pruner.close()
}
// inited returns a flag indicating whether the existing state histories
@ -825,6 +828,12 @@ func (i *historyIndexer) shorten(historyID uint64) error {
}
}
// prune signals the pruner that the history tail has advanced to the given ID,
// so that stale index blocks referencing pruned histories can be removed.
func (i *historyIndexer) prune(newTail uint64) {
i.pruner.prune(newTail)
}
// progress returns the indexing progress made so far. It provides the number
// of states that remain unindexed.
func (i *historyIndexer) progress() (uint64, error) {

View file

@ -77,10 +77,12 @@ var (
trienodeHistoryDataBytesMeter = metrics.NewRegisteredMeter("pathdb/history/trienode/bytes/data", nil)
trienodeHistoryIndexBytesMeter = metrics.NewRegisteredMeter("pathdb/history/trienode/bytes/index", nil)
stateIndexHistoryTimer = metrics.NewRegisteredResettingTimer("pathdb/history/state/index/time", nil)
stateUnindexHistoryTimer = metrics.NewRegisteredResettingTimer("pathdb/history/state/unindex/time", nil)
trienodeIndexHistoryTimer = metrics.NewRegisteredResettingTimer("pathdb/history/trienode/index/time", nil)
trienodeUnindexHistoryTimer = metrics.NewRegisteredResettingTimer("pathdb/history/trienode/unindex/time", nil)
stateIndexHistoryTimer = metrics.NewRegisteredResettingTimer("pathdb/history/state/index/time", nil)
stateUnindexHistoryTimer = metrics.NewRegisteredResettingTimer("pathdb/history/state/unindex/time", nil)
statePruneHistoryIndexTimer = metrics.NewRegisteredResettingTimer("pathdb/history/state/prune/time", nil)
trienodeIndexHistoryTimer = metrics.NewRegisteredResettingTimer("pathdb/history/trienode/index/time", nil)
trienodeUnindexHistoryTimer = metrics.NewRegisteredResettingTimer("pathdb/history/trienode/unindex/time", nil)
trienodePruneHistoryIndexTimer = metrics.NewRegisteredResettingTimer("pathdb/history/trienode/prune/time", nil)
lookupAddLayerTimer = metrics.NewRegisteredResettingTimer("pathdb/lookup/add/time", nil)
lookupRemoveLayerTimer = metrics.NewRegisteredResettingTimer("pathdb/lookup/remove/time", nil)