mirror of
https://github.com/ethereum/go-ethereum.git
synced 2026-04-01 23:55:54 +00:00
triedb/pathdb: implement history index pruner (#33999)
This PR implements the missing functionality for archive nodes by pruning stale index data. The current mechanism is relatively simple but sufficient for now: it periodically iterates over index entries and deletes outdated data on a per-block basis. The pruning process is triggered every 90,000 new blocks (approximately every 12 days), and the iteration typically takes ~30 minutes on a mainnet node. This mechanism is only applied with `gcmode=archive` enabled, having no impact on normal full node.
This commit is contained in:
parent
14a26d9ccc
commit
db6c7d06a2
5 changed files with 771 additions and 4 deletions
|
|
@ -408,6 +408,11 @@ func (dl *diskLayer) writeHistory(typ historyType, diff *diffLayer) (bool, error
|
|||
if err != nil {
|
||||
return false, err
|
||||
}
|
||||
// Notify the index pruner about the new tail so that stale index
|
||||
// blocks referencing the pruned histories can be cleaned up.
|
||||
if indexer != nil && pruned > 0 {
|
||||
indexer.prune(newFirst)
|
||||
}
|
||||
log.Debug("Pruned history", "type", typ, "items", pruned, "tailid", newFirst)
|
||||
return false, nil
|
||||
}
|
||||
|
|
|
|||
385
triedb/pathdb/history_index_pruner.go
Normal file
385
triedb/pathdb/history_index_pruner.go
Normal file
|
|
@ -0,0 +1,385 @@
|
|||
// Copyright 2025 The go-ethereum Authors
|
||||
// This file is part of the go-ethereum library.
|
||||
//
|
||||
// The go-ethereum library is free software: you can redistribute it and/or modify
|
||||
// it under the terms of the GNU Lesser General Public License as published by
|
||||
// the Free Software Foundation, either version 3 of the License, or
|
||||
// (at your option) any later version.
|
||||
//
|
||||
// The go-ethereum library is distributed in the hope that it will be useful,
|
||||
// but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
// GNU Lesser General Public License for more details.
|
||||
//
|
||||
// You should have received a copy of the GNU Lesser General Public License
|
||||
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
|
||||
|
||||
package pathdb
|
||||
|
||||
import (
|
||||
"encoding/binary"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
"time"
|
||||
|
||||
"github.com/ethereum/go-ethereum/common"
|
||||
"github.com/ethereum/go-ethereum/core/rawdb"
|
||||
"github.com/ethereum/go-ethereum/ethdb"
|
||||
"github.com/ethereum/go-ethereum/log"
|
||||
)
|
||||
|
||||
const (
|
||||
// indexPruningThreshold defines the number of pruned histories that must
|
||||
// accumulate before triggering index pruning. This helps avoid scheduling
|
||||
// index pruning too frequently.
|
||||
indexPruningThreshold = 90000
|
||||
|
||||
// iteratorReopenInterval is how long the iterator is kept open before
|
||||
// being released and re-opened. Long-lived iterators hold a read snapshot
|
||||
// that blocks LSM compaction; periodically re-opening avoids stalling the
|
||||
// compactor during a large scan.
|
||||
iteratorReopenInterval = 30 * time.Second
|
||||
)
|
||||
|
||||
// indexPruner is responsible for pruning stale index data from the tail side
|
||||
// when old history objects are removed. It runs as a background goroutine and
|
||||
// processes pruning signals whenever the history tail advances.
|
||||
//
|
||||
// The pruning operates at the block level: for each state element's index
|
||||
// metadata, leading index blocks whose maximum history ID falls below the
|
||||
// new tail are removed entirely. This avoids the need to decode individual
|
||||
// block contents and is efficient because index blocks store monotonically
|
||||
// increasing history IDs.
|
||||
type indexPruner struct {
|
||||
disk ethdb.KeyValueStore
|
||||
typ historyType
|
||||
tail atomic.Uint64 // Tail below which index entries can be pruned
|
||||
lastRun uint64 // The tail in the last pruning run
|
||||
trigger chan struct{} // Non-blocking signal that tail has advanced
|
||||
closed chan struct{}
|
||||
wg sync.WaitGroup
|
||||
log log.Logger
|
||||
|
||||
pauseReq chan chan struct{} // Pause request; caller sends ack channel, pruner closes it when paused
|
||||
resumeCh chan struct{} // Resume signal sent by caller after indexSingle/unindexSingle completes
|
||||
}
|
||||
|
||||
// newIndexPruner creates and starts a new index pruner for the given history type.
|
||||
func newIndexPruner(disk ethdb.KeyValueStore, typ historyType) *indexPruner {
|
||||
p := &indexPruner{
|
||||
disk: disk,
|
||||
typ: typ,
|
||||
trigger: make(chan struct{}, 1),
|
||||
closed: make(chan struct{}),
|
||||
log: log.New("type", typ.String()),
|
||||
pauseReq: make(chan chan struct{}),
|
||||
resumeCh: make(chan struct{}),
|
||||
}
|
||||
p.wg.Add(1)
|
||||
go p.run()
|
||||
return p
|
||||
}
|
||||
|
||||
// prune signals the pruner that the history tail has advanced to the given ID.
|
||||
// All index entries referencing history IDs below newTail can be removed.
|
||||
func (p *indexPruner) prune(newTail uint64) {
|
||||
// Only update if the tail is actually advancing
|
||||
for {
|
||||
old := p.tail.Load()
|
||||
if newTail <= old {
|
||||
return
|
||||
}
|
||||
if p.tail.CompareAndSwap(old, newTail) {
|
||||
break
|
||||
}
|
||||
}
|
||||
// Non-blocking signal
|
||||
select {
|
||||
case p.trigger <- struct{}{}:
|
||||
default:
|
||||
}
|
||||
}
|
||||
|
||||
// pause requests the pruner to flush all pending writes and pause. It blocks
|
||||
// until the pruner has acknowledged the pause. This must be paired with a
|
||||
// subsequent call to resume.
|
||||
func (p *indexPruner) pause() {
|
||||
ack := make(chan struct{})
|
||||
select {
|
||||
case p.pauseReq <- ack:
|
||||
<-ack // wait for the pruner to flush and acknowledge
|
||||
case <-p.closed:
|
||||
}
|
||||
}
|
||||
|
||||
// resume unblocks a previously paused pruner, allowing it to continue
|
||||
// processing.
|
||||
func (p *indexPruner) resume() {
|
||||
select {
|
||||
case p.resumeCh <- struct{}{}:
|
||||
case <-p.closed:
|
||||
}
|
||||
}
|
||||
|
||||
// close shuts down the pruner and waits for it to finish.
|
||||
func (p *indexPruner) close() {
|
||||
select {
|
||||
case <-p.closed:
|
||||
return
|
||||
default:
|
||||
close(p.closed)
|
||||
p.wg.Wait()
|
||||
}
|
||||
}
|
||||
|
||||
// run is the main loop of the pruner. It waits for trigger signals and
|
||||
// processes a small batch of entries on each trigger, advancing the cursor.
|
||||
func (p *indexPruner) run() {
|
||||
defer p.wg.Done()
|
||||
|
||||
for {
|
||||
select {
|
||||
case <-p.trigger:
|
||||
tail := p.tail.Load()
|
||||
if tail < p.lastRun || tail-p.lastRun < indexPruningThreshold {
|
||||
continue
|
||||
}
|
||||
if err := p.process(tail); err != nil {
|
||||
p.log.Error("Failed to prune index", "tail", tail, "err", err)
|
||||
} else {
|
||||
p.lastRun = tail
|
||||
}
|
||||
|
||||
case ack := <-p.pauseReq:
|
||||
// Pruner is idle, acknowledge immediately and wait for resume.
|
||||
close(ack)
|
||||
select {
|
||||
case <-p.resumeCh:
|
||||
case <-p.closed:
|
||||
return
|
||||
}
|
||||
|
||||
case <-p.closed:
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// process iterates all index metadata entries for the history type and prunes
|
||||
// leading blocks whose max history ID is below the given tail.
|
||||
func (p *indexPruner) process(tail uint64) error {
|
||||
var (
|
||||
err error
|
||||
pruned int
|
||||
start = time.Now()
|
||||
)
|
||||
switch p.typ {
|
||||
case typeStateHistory:
|
||||
n, err := p.prunePrefix(rawdb.StateHistoryAccountMetadataPrefix, typeAccount, tail)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
pruned += n
|
||||
|
||||
n, err = p.prunePrefix(rawdb.StateHistoryStorageMetadataPrefix, typeStorage, tail)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
pruned += n
|
||||
statePruneHistoryIndexTimer.UpdateSince(start)
|
||||
|
||||
case typeTrienodeHistory:
|
||||
pruned, err = p.prunePrefix(rawdb.TrienodeHistoryMetadataPrefix, typeTrienode, tail)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
trienodePruneHistoryIndexTimer.UpdateSince(start)
|
||||
|
||||
default:
|
||||
panic("unknown history type")
|
||||
}
|
||||
if pruned > 0 {
|
||||
p.log.Info("Pruned stale index blocks", "pruned", pruned, "tail", tail, "elapsed", common.PrettyDuration(time.Since(start)))
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// prunePrefix scans all metadata entries under the given prefix and prunes
|
||||
// leading index blocks below the tail. The iterator is periodically released
|
||||
// and re-opened to avoid holding a read snapshot that blocks LSM compaction.
|
||||
func (p *indexPruner) prunePrefix(prefix []byte, elemType elementType, tail uint64) (int, error) {
|
||||
var (
|
||||
pruned int
|
||||
opened = time.Now()
|
||||
it = p.disk.NewIterator(prefix, nil)
|
||||
batch = p.disk.NewBatchWithSize(ethdb.IdealBatchSize)
|
||||
)
|
||||
for {
|
||||
// Terminate if iterator is exhausted
|
||||
if !it.Next() {
|
||||
it.Release()
|
||||
break
|
||||
}
|
||||
// Check termination or pause request
|
||||
select {
|
||||
case <-p.closed:
|
||||
// Terminate the process if indexer is closed
|
||||
it.Release()
|
||||
if batch.ValueSize() > 0 {
|
||||
return pruned, batch.Write()
|
||||
}
|
||||
return pruned, nil
|
||||
|
||||
case ack := <-p.pauseReq:
|
||||
// Save the current position so that after resume the
|
||||
// iterator can be re-opened from where it left off.
|
||||
start := common.CopyBytes(it.Key()[len(prefix):])
|
||||
it.Release()
|
||||
|
||||
// Flush all pending writes before acknowledging the pause.
|
||||
var flushErr error
|
||||
if batch.ValueSize() > 0 {
|
||||
if err := batch.Write(); err != nil {
|
||||
flushErr = err
|
||||
}
|
||||
batch.Reset()
|
||||
}
|
||||
close(ack)
|
||||
|
||||
// Block until resumed or closed. Always wait here even if
|
||||
// the flush failed — returning early would cause resume()
|
||||
// to deadlock since nobody would receive on resumeCh.
|
||||
select {
|
||||
case <-p.resumeCh:
|
||||
if flushErr != nil {
|
||||
return 0, flushErr
|
||||
}
|
||||
// Re-open the iterator from the saved position so the
|
||||
// pruner sees the current database state (including any
|
||||
// writes made by indexer during the pause).
|
||||
it = p.disk.NewIterator(prefix, start)
|
||||
opened = time.Now()
|
||||
continue
|
||||
case <-p.closed:
|
||||
return pruned, flushErr
|
||||
}
|
||||
|
||||
default:
|
||||
// Keep processing
|
||||
}
|
||||
|
||||
// Prune the index data block
|
||||
key, value := it.Key(), it.Value()
|
||||
ident, bsize := p.identFromKey(key, prefix, elemType)
|
||||
n, err := p.pruneEntry(batch, ident, value, bsize, tail)
|
||||
if err != nil {
|
||||
p.log.Warn("Failed to prune index entry", "ident", ident, "err", err)
|
||||
continue
|
||||
}
|
||||
pruned += n
|
||||
|
||||
// Flush the batch if there are too many accumulated
|
||||
if batch.ValueSize() >= ethdb.IdealBatchSize {
|
||||
if err := batch.Write(); err != nil {
|
||||
it.Release()
|
||||
return 0, err
|
||||
}
|
||||
batch.Reset()
|
||||
}
|
||||
|
||||
// Periodically release the iterator so the LSM compactor
|
||||
// is not blocked by the read snapshot we hold.
|
||||
if time.Since(opened) >= iteratorReopenInterval {
|
||||
opened = time.Now()
|
||||
|
||||
start := common.CopyBytes(it.Key()[len(prefix):])
|
||||
it.Release()
|
||||
it = p.disk.NewIterator(prefix, start)
|
||||
}
|
||||
}
|
||||
if batch.ValueSize() > 0 {
|
||||
if err := batch.Write(); err != nil {
|
||||
return 0, err
|
||||
}
|
||||
}
|
||||
return pruned, nil
|
||||
}
|
||||
|
||||
// identFromKey reconstructs the stateIdent and bitmapSize from a metadata key.
|
||||
func (p *indexPruner) identFromKey(key []byte, prefix []byte, elemType elementType) (stateIdent, int) {
|
||||
rest := key[len(prefix):]
|
||||
|
||||
switch elemType {
|
||||
case typeAccount:
|
||||
// key = prefix + addressHash(32)
|
||||
var addrHash common.Hash
|
||||
copy(addrHash[:], rest[:32])
|
||||
return newAccountIdent(addrHash), 0
|
||||
|
||||
case typeStorage:
|
||||
// key = prefix + addressHash(32) + storageHash(32)
|
||||
var addrHash, storHash common.Hash
|
||||
copy(addrHash[:], rest[:32])
|
||||
copy(storHash[:], rest[32:64])
|
||||
return newStorageIdent(addrHash, storHash), 0
|
||||
|
||||
case typeTrienode:
|
||||
// key = prefix + addressHash(32) + path(variable)
|
||||
var addrHash common.Hash
|
||||
copy(addrHash[:], rest[:32])
|
||||
path := string(rest[32:])
|
||||
ident := newTrienodeIdent(addrHash, path)
|
||||
return ident, ident.bloomSize()
|
||||
|
||||
default:
|
||||
panic("unknown element type")
|
||||
}
|
||||
}
|
||||
|
||||
// pruneEntry checks a single metadata entry and removes leading index blocks
|
||||
// whose max < tail. Returns the number of blocks pruned.
|
||||
func (p *indexPruner) pruneEntry(batch ethdb.Batch, ident stateIdent, blob []byte, bsize int, tail uint64) (int, error) {
|
||||
// Fast path: the first 8 bytes of the metadata encode the max history ID
|
||||
// of the first index block (big-endian uint64). If it is >= tail, no
|
||||
// blocks can be pruned and we skip the full parse entirely.
|
||||
if len(blob) >= 8 && binary.BigEndian.Uint64(blob[:8]) >= tail {
|
||||
return 0, nil
|
||||
}
|
||||
descList, err := parseIndex(blob, bsize)
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
// Find the number of leading blocks that can be entirely pruned.
|
||||
// A block can be pruned if its max history ID is strictly below
|
||||
// the tail.
|
||||
var count int
|
||||
for _, desc := range descList {
|
||||
if desc.max < tail {
|
||||
count++
|
||||
} else {
|
||||
break // blocks are ordered, no more to prune
|
||||
}
|
||||
}
|
||||
if count == 0 {
|
||||
return 0, nil
|
||||
}
|
||||
// Delete the pruned index blocks
|
||||
for i := 0; i < count; i++ {
|
||||
deleteStateIndexBlock(ident, batch, descList[i].id)
|
||||
}
|
||||
// Update or delete the metadata
|
||||
remaining := descList[count:]
|
||||
if len(remaining) == 0 {
|
||||
// All blocks pruned, remove the metadata entry entirely
|
||||
deleteStateIndex(ident, batch)
|
||||
} else {
|
||||
// Rewrite the metadata with the remaining blocks
|
||||
size := indexBlockDescSize + bsize
|
||||
buf := make([]byte, 0, size*len(remaining))
|
||||
for _, desc := range remaining {
|
||||
buf = append(buf, desc.encode()...)
|
||||
}
|
||||
writeStateIndex(ident, batch, buf)
|
||||
}
|
||||
return count, nil
|
||||
}
|
||||
355
triedb/pathdb/history_index_pruner_test.go
Normal file
355
triedb/pathdb/history_index_pruner_test.go
Normal file
|
|
@ -0,0 +1,355 @@
|
|||
// Copyright 2025 The go-ethereum Authors
|
||||
// This file is part of the go-ethereum library.
|
||||
//
|
||||
// The go-ethereum library is free software: you can redistribute it and/or modify
|
||||
// it under the terms of the GNU Lesser General Public License as published by
|
||||
// the Free Software Foundation, either version 3 of the License, or
|
||||
// (at your option) any later version.
|
||||
//
|
||||
// The go-ethereum library is distributed in the hope that it will be useful,
|
||||
// but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
// GNU Lesser General Public License for more details.
|
||||
//
|
||||
// You should have received a copy of the GNU Lesser General Public License
|
||||
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
|
||||
|
||||
package pathdb
|
||||
|
||||
import (
|
||||
"math"
|
||||
"testing"
|
||||
|
||||
"github.com/ethereum/go-ethereum/common"
|
||||
"github.com/ethereum/go-ethereum/core/rawdb"
|
||||
"github.com/ethereum/go-ethereum/ethdb"
|
||||
"github.com/ethereum/go-ethereum/log"
|
||||
)
|
||||
|
||||
func writeMultiBlockIndex(t *testing.T, db ethdb.Database, ident stateIdent, bitmapSize int, startID uint64) []*indexBlockDesc {
|
||||
t.Helper()
|
||||
|
||||
if startID == 0 {
|
||||
startID = 1
|
||||
}
|
||||
iw, _ := newIndexWriter(db, ident, 0, bitmapSize)
|
||||
|
||||
for i := 0; i < 10000; i++ {
|
||||
if err := iw.append(startID+uint64(i), randomExt(bitmapSize, 5)); err != nil {
|
||||
t.Fatalf("Failed to append element %d: %v", i, err)
|
||||
}
|
||||
}
|
||||
batch := db.NewBatch()
|
||||
iw.finish(batch)
|
||||
if err := batch.Write(); err != nil {
|
||||
t.Fatalf("Failed to write batch: %v", err)
|
||||
}
|
||||
|
||||
blob := readStateIndex(ident, db)
|
||||
descList, err := parseIndex(blob, bitmapSize)
|
||||
if err != nil {
|
||||
t.Fatalf("Failed to parse index: %v", err)
|
||||
}
|
||||
return descList
|
||||
}
|
||||
|
||||
// TestPruneEntryBasic verifies that pruneEntry correctly removes leading index
|
||||
// blocks whose max is below the given tail.
|
||||
func TestPruneEntryBasic(t *testing.T) {
|
||||
db := rawdb.NewMemoryDatabase()
|
||||
ident := newAccountIdent(common.Hash{0xa})
|
||||
descList := writeMultiBlockIndex(t, db, ident, 0, 1)
|
||||
|
||||
// Prune with a tail that is above the first block's max but below the second
|
||||
firstBlockMax := descList[0].max
|
||||
|
||||
pruner := newIndexPruner(db, typeStateHistory)
|
||||
defer pruner.close()
|
||||
|
||||
if err := pruner.process(firstBlockMax + 1); err != nil {
|
||||
t.Fatalf("Failed to process pruning: %v", err)
|
||||
}
|
||||
|
||||
// Verify the first block was removed
|
||||
blob := readStateIndex(ident, db)
|
||||
if len(blob) == 0 {
|
||||
t.Fatal("Index metadata should not be empty after partial prune")
|
||||
}
|
||||
remaining, err := parseIndex(blob, 0)
|
||||
if err != nil {
|
||||
t.Fatalf("Failed to parse index after prune: %v", err)
|
||||
}
|
||||
if len(remaining) != len(descList)-1 {
|
||||
t.Fatalf("Expected %d blocks remaining, got %d", len(descList)-1, len(remaining))
|
||||
}
|
||||
// The first remaining block should be what was previously the second block
|
||||
if remaining[0].id != descList[1].id {
|
||||
t.Fatalf("Expected first remaining block id %d, got %d", descList[1].id, remaining[0].id)
|
||||
}
|
||||
|
||||
// Verify the pruned block data is actually deleted
|
||||
blockData := readStateIndexBlock(ident, db, descList[0].id)
|
||||
if len(blockData) != 0 {
|
||||
t.Fatal("Pruned block data should have been deleted")
|
||||
}
|
||||
|
||||
// Remaining blocks should still have their data
|
||||
for _, desc := range remaining {
|
||||
blockData = readStateIndexBlock(ident, db, desc.id)
|
||||
if len(blockData) == 0 {
|
||||
t.Fatalf("Block %d data should still exist", desc.id)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// TestPruneEntryBasicTrienode is the same as TestPruneEntryBasic but for
|
||||
// trienode index entries with a non-zero bitmapSize.
|
||||
func TestPruneEntryBasicTrienode(t *testing.T) {
|
||||
db := rawdb.NewMemoryDatabase()
|
||||
addrHash := common.Hash{0xa}
|
||||
path := string([]byte{0x0, 0x0, 0x0})
|
||||
ident := newTrienodeIdent(addrHash, path)
|
||||
|
||||
descList := writeMultiBlockIndex(t, db, ident, ident.bloomSize(), 1)
|
||||
firstBlockMax := descList[0].max
|
||||
|
||||
pruner := newIndexPruner(db, typeTrienodeHistory)
|
||||
defer pruner.close()
|
||||
|
||||
if err := pruner.process(firstBlockMax + 1); err != nil {
|
||||
t.Fatalf("Failed to process pruning: %v", err)
|
||||
}
|
||||
|
||||
blob := readStateIndex(ident, db)
|
||||
remaining, err := parseIndex(blob, ident.bloomSize())
|
||||
if err != nil {
|
||||
t.Fatalf("Failed to parse index after prune: %v", err)
|
||||
}
|
||||
if len(remaining) != len(descList)-1 {
|
||||
t.Fatalf("Expected %d blocks remaining, got %d", len(descList)-1, len(remaining))
|
||||
}
|
||||
if remaining[0].id != descList[1].id {
|
||||
t.Fatalf("Expected first remaining block id %d, got %d", descList[1].id, remaining[0].id)
|
||||
}
|
||||
blockData := readStateIndexBlock(ident, db, descList[0].id)
|
||||
if len(blockData) != 0 {
|
||||
t.Fatal("Pruned block data should have been deleted")
|
||||
}
|
||||
}
|
||||
|
||||
// TestPruneEntryComplete verifies that when all blocks are pruned, the metadata
|
||||
// entry is also deleted.
|
||||
func TestPruneEntryComplete(t *testing.T) {
|
||||
db := rawdb.NewMemoryDatabase()
|
||||
ident := newAccountIdent(common.Hash{0xb})
|
||||
iw, _ := newIndexWriter(db, ident, 0, 0)
|
||||
|
||||
for i := 1; i <= 10; i++ {
|
||||
if err := iw.append(uint64(i), nil); err != nil {
|
||||
t.Fatalf("Failed to append: %v", err)
|
||||
}
|
||||
}
|
||||
batch := db.NewBatch()
|
||||
iw.finish(batch)
|
||||
if err := batch.Write(); err != nil {
|
||||
t.Fatalf("Failed to write: %v", err)
|
||||
}
|
||||
|
||||
pruner := newIndexPruner(db, typeStateHistory)
|
||||
defer pruner.close()
|
||||
|
||||
// Prune with tail above all elements
|
||||
if err := pruner.process(11); err != nil {
|
||||
t.Fatalf("Failed to process: %v", err)
|
||||
}
|
||||
|
||||
// Metadata entry should be deleted
|
||||
blob := readStateIndex(ident, db)
|
||||
if len(blob) != 0 {
|
||||
t.Fatal("Index metadata should be empty after full prune")
|
||||
}
|
||||
}
|
||||
|
||||
// TestPruneNoop verifies that pruning does nothing when the tail is below all
|
||||
// block maximums.
|
||||
func TestPruneNoop(t *testing.T) {
|
||||
db := rawdb.NewMemoryDatabase()
|
||||
ident := newAccountIdent(common.Hash{0xc})
|
||||
iw, _ := newIndexWriter(db, ident, 0, 0)
|
||||
|
||||
for i := 100; i <= 200; i++ {
|
||||
if err := iw.append(uint64(i), nil); err != nil {
|
||||
t.Fatalf("Failed to append: %v", err)
|
||||
}
|
||||
}
|
||||
batch := db.NewBatch()
|
||||
iw.finish(batch)
|
||||
if err := batch.Write(); err != nil {
|
||||
t.Fatalf("Failed to write: %v", err)
|
||||
}
|
||||
|
||||
blob := readStateIndex(ident, db)
|
||||
origLen := len(blob)
|
||||
|
||||
pruner := newIndexPruner(db, typeStateHistory)
|
||||
defer pruner.close()
|
||||
|
||||
if err := pruner.process(50); err != nil {
|
||||
t.Fatalf("Failed to process: %v", err)
|
||||
}
|
||||
|
||||
// Nothing should have changed
|
||||
blob = readStateIndex(ident, db)
|
||||
if len(blob) != origLen {
|
||||
t.Fatalf("Expected no change, original len %d, got %d", origLen, len(blob))
|
||||
}
|
||||
}
|
||||
|
||||
// TestPrunePreservesReadability verifies that after pruning, the remaining
|
||||
// index data is still readable and returns correct results.
|
||||
func TestPrunePreservesReadability(t *testing.T) {
|
||||
db := rawdb.NewMemoryDatabase()
|
||||
ident := newAccountIdent(common.Hash{0xe})
|
||||
descList := writeMultiBlockIndex(t, db, ident, 0, 1)
|
||||
firstBlockMax := descList[0].max
|
||||
|
||||
pruner := newIndexPruner(db, typeStateHistory)
|
||||
defer pruner.close()
|
||||
|
||||
if err := pruner.process(firstBlockMax + 1); err != nil {
|
||||
t.Fatalf("Failed to process: %v", err)
|
||||
}
|
||||
|
||||
// Read the remaining index and verify lookups still work
|
||||
ir, err := newIndexReader(db, ident, 0)
|
||||
if err != nil {
|
||||
t.Fatalf("Failed to create reader: %v", err)
|
||||
}
|
||||
|
||||
// Looking for something greater than firstBlockMax should still work
|
||||
result, err := ir.readGreaterThan(firstBlockMax)
|
||||
if err != nil {
|
||||
t.Fatalf("Failed to read: %v", err)
|
||||
}
|
||||
if result != firstBlockMax+1 {
|
||||
t.Fatalf("Expected %d, got %d", firstBlockMax+1, result)
|
||||
}
|
||||
|
||||
// Looking for the last element should return MaxUint64
|
||||
result, err = ir.readGreaterThan(20000)
|
||||
if err != nil {
|
||||
t.Fatalf("Failed to read: %v", err)
|
||||
}
|
||||
if result != math.MaxUint64 {
|
||||
t.Fatalf("Expected MaxUint64, got %d", result)
|
||||
}
|
||||
}
|
||||
|
||||
// TestPrunePauseResume verifies the pause/resume mechanism:
|
||||
// - The pruner pauses mid-iteration and flushes its batch
|
||||
// - Data written while the pruner is paused (simulating indexSingle) is
|
||||
// visible after resume via a fresh iterator
|
||||
// - Pruning still completes correctly after resume
|
||||
func TestPrunePauseResume(t *testing.T) {
|
||||
db := rawdb.NewMemoryDatabase()
|
||||
|
||||
// Create many accounts with multi-block indexes so the pruner is still
|
||||
// iterating when the pause request arrives.
|
||||
var firstBlockMax uint64
|
||||
for i := 0; i < 200; i++ {
|
||||
hash := common.Hash{byte(i)}
|
||||
ident := newAccountIdent(hash)
|
||||
descList := writeMultiBlockIndex(t, db, ident, 0, 1)
|
||||
if i == 0 {
|
||||
firstBlockMax = descList[0].max
|
||||
}
|
||||
}
|
||||
// Target account at the end of the key space — the pruner should not
|
||||
// have visited it yet when the pause is acknowledged.
|
||||
targetIdent := newAccountIdent(common.Hash{0xff})
|
||||
targetDescList := writeMultiBlockIndex(t, db, targetIdent, 0, 1)
|
||||
|
||||
tail := firstBlockMax + 1
|
||||
|
||||
// Construct the pruner without starting run(). We call process()
|
||||
// directly to exercise the mid-iteration pause path deterministically.
|
||||
pruner := &indexPruner{
|
||||
disk: db,
|
||||
typ: typeStateHistory,
|
||||
log: log.New("type", "account"),
|
||||
closed: make(chan struct{}),
|
||||
pauseReq: make(chan chan struct{}, 1), // buffered so we can pre-deposit
|
||||
resumeCh: make(chan struct{}),
|
||||
}
|
||||
|
||||
// Pre-deposit a pause request before process() starts. Because
|
||||
// pauseReq is buffered, this succeeds immediately. When prunePrefix's
|
||||
// select checks the channel on an early iteration, it will find the
|
||||
// pending request and pause — no scheduling race is possible.
|
||||
ack := make(chan struct{})
|
||||
pruner.pauseReq <- ack
|
||||
|
||||
// Run process() in the background.
|
||||
errCh := make(chan error, 1)
|
||||
go func() {
|
||||
errCh <- pruner.process(tail)
|
||||
}()
|
||||
|
||||
// Block until the pruner has flushed pending writes and acknowledged.
|
||||
<-ack
|
||||
|
||||
// While paused, append a new element to the target account's index,
|
||||
// simulating what indexSingle would do during the pause window.
|
||||
lastMax := targetDescList[len(targetDescList)-1].max
|
||||
newID := lastMax + 10000
|
||||
iw, err := newIndexWriter(db, targetIdent, lastMax, 0)
|
||||
if err != nil {
|
||||
t.Fatalf("Failed to create index writer: %v", err)
|
||||
}
|
||||
if err := iw.append(newID, nil); err != nil {
|
||||
t.Fatalf("Failed to append: %v", err)
|
||||
}
|
||||
batch := db.NewBatch()
|
||||
iw.finish(batch)
|
||||
if err := batch.Write(); err != nil {
|
||||
t.Fatalf("Failed to write batch: %v", err)
|
||||
}
|
||||
|
||||
// Resume the pruner.
|
||||
pruner.resume()
|
||||
|
||||
// Wait for process() to complete.
|
||||
if err := <-errCh; err != nil {
|
||||
t.Fatalf("process() failed: %v", err)
|
||||
}
|
||||
|
||||
// Verify: the entry written during the pause must still be accessible.
|
||||
// If the pruner used a stale iterator snapshot, it would overwrite the
|
||||
// target's metadata and lose the new entry.
|
||||
ir, err := newIndexReader(db, targetIdent, 0)
|
||||
if err != nil {
|
||||
t.Fatalf("Failed to create index reader: %v", err)
|
||||
}
|
||||
result, err := ir.readGreaterThan(newID - 1)
|
||||
if err != nil {
|
||||
t.Fatalf("Failed to read: %v", err)
|
||||
}
|
||||
if result != newID {
|
||||
t.Fatalf("Entry written during pause was lost: want %d, got %d", newID, result)
|
||||
}
|
||||
|
||||
// Verify: pruning actually occurred on an early account.
|
||||
earlyIdent := newAccountIdent(common.Hash{0x00})
|
||||
earlyBlob := readStateIndex(earlyIdent, db)
|
||||
if len(earlyBlob) == 0 {
|
||||
t.Fatal("Early account index should not be completely empty")
|
||||
}
|
||||
earlyRemaining, err := parseIndex(earlyBlob, 0)
|
||||
if err != nil {
|
||||
t.Fatalf("Failed to parse early account index: %v", err)
|
||||
}
|
||||
// The first block (id=0) should have been pruned.
|
||||
if earlyRemaining[0].id == 0 {
|
||||
t.Fatal("First block of early account should have been pruned")
|
||||
}
|
||||
}
|
||||
|
|
@ -719,6 +719,7 @@ func (i *indexIniter) recover() bool {
|
|||
// state history.
|
||||
type historyIndexer struct {
|
||||
initer *indexIniter
|
||||
pruner *indexPruner
|
||||
typ historyType
|
||||
disk ethdb.KeyValueStore
|
||||
freezer ethdb.AncientStore
|
||||
|
|
@ -774,6 +775,7 @@ func newHistoryIndexer(disk ethdb.Database, freezer ethdb.AncientStore, lastHist
|
|||
checkVersion(disk, typ)
|
||||
return &historyIndexer{
|
||||
initer: newIndexIniter(disk, freezer, typ, lastHistoryID, noWait),
|
||||
pruner: newIndexPruner(disk, typ),
|
||||
typ: typ,
|
||||
disk: disk,
|
||||
freezer: freezer,
|
||||
|
|
@ -782,6 +784,7 @@ func newHistoryIndexer(disk ethdb.Database, freezer ethdb.AncientStore, lastHist
|
|||
|
||||
func (i *historyIndexer) close() {
|
||||
i.initer.close()
|
||||
i.pruner.close()
|
||||
}
|
||||
|
||||
// inited returns a flag indicating whether the existing state histories
|
||||
|
|
@ -802,6 +805,8 @@ func (i *historyIndexer) extend(historyID uint64) error {
|
|||
case <-i.initer.closed:
|
||||
return errors.New("indexer is closed")
|
||||
case <-i.initer.done:
|
||||
i.pruner.pause()
|
||||
defer i.pruner.resume()
|
||||
return indexSingle(historyID, i.disk, i.freezer, i.typ)
|
||||
case i.initer.interrupt <- signal:
|
||||
return <-signal.result
|
||||
|
|
@ -819,12 +824,27 @@ func (i *historyIndexer) shorten(historyID uint64) error {
|
|||
case <-i.initer.closed:
|
||||
return errors.New("indexer is closed")
|
||||
case <-i.initer.done:
|
||||
i.pruner.pause()
|
||||
defer i.pruner.resume()
|
||||
return unindexSingle(historyID, i.disk, i.freezer, i.typ)
|
||||
case i.initer.interrupt <- signal:
|
||||
return <-signal.result
|
||||
}
|
||||
}
|
||||
|
||||
// prune signals the pruner that the history tail has advanced to the given ID,
|
||||
// so that stale index blocks referencing pruned histories can be removed.
|
||||
func (i *historyIndexer) prune(newTail uint64) {
|
||||
select {
|
||||
case <-i.initer.closed:
|
||||
log.Debug("Ignored the pruning signal", "reason", "closed")
|
||||
case <-i.initer.done:
|
||||
i.pruner.prune(newTail)
|
||||
default:
|
||||
log.Debug("Ignored the pruning signal", "reason", "busy")
|
||||
}
|
||||
}
|
||||
|
||||
// progress returns the indexing progress made so far. It provides the number
|
||||
// of states that remain unindexed.
|
||||
func (i *historyIndexer) progress() (uint64, error) {
|
||||
|
|
|
|||
|
|
@ -77,10 +77,12 @@ var (
|
|||
trienodeHistoryDataBytesMeter = metrics.NewRegisteredMeter("pathdb/history/trienode/bytes/data", nil)
|
||||
trienodeHistoryIndexBytesMeter = metrics.NewRegisteredMeter("pathdb/history/trienode/bytes/index", nil)
|
||||
|
||||
stateIndexHistoryTimer = metrics.NewRegisteredResettingTimer("pathdb/history/state/index/time", nil)
|
||||
stateUnindexHistoryTimer = metrics.NewRegisteredResettingTimer("pathdb/history/state/unindex/time", nil)
|
||||
trienodeIndexHistoryTimer = metrics.NewRegisteredResettingTimer("pathdb/history/trienode/index/time", nil)
|
||||
trienodeUnindexHistoryTimer = metrics.NewRegisteredResettingTimer("pathdb/history/trienode/unindex/time", nil)
|
||||
stateIndexHistoryTimer = metrics.NewRegisteredResettingTimer("pathdb/history/state/index/time", nil)
|
||||
stateUnindexHistoryTimer = metrics.NewRegisteredResettingTimer("pathdb/history/state/unindex/time", nil)
|
||||
statePruneHistoryIndexTimer = metrics.NewRegisteredResettingTimer("pathdb/history/state/prune/time", nil)
|
||||
trienodeIndexHistoryTimer = metrics.NewRegisteredResettingTimer("pathdb/history/trienode/index/time", nil)
|
||||
trienodeUnindexHistoryTimer = metrics.NewRegisteredResettingTimer("pathdb/history/trienode/unindex/time", nil)
|
||||
trienodePruneHistoryIndexTimer = metrics.NewRegisteredResettingTimer("pathdb/history/trienode/prune/time", nil)
|
||||
|
||||
lookupAddLayerTimer = metrics.NewRegisteredResettingTimer("pathdb/lookup/add/time", nil)
|
||||
lookupRemoveLayerTimer = metrics.NewRegisteredResettingTimer("pathdb/lookup/remove/time", nil)
|
||||
|
|
|
|||
Loading…
Reference in a new issue