core/rawdb: parallel pruning of txindex

This commit is contained in:
Sina Mahmoodi 2026-03-19 14:28:49 +00:00
parent fd859638bd
commit 48b1127c66
2 changed files with 111 additions and 65 deletions

View file

@ -19,6 +19,7 @@ package rawdb
import ( import (
"encoding/binary" "encoding/binary"
"runtime" "runtime"
"sync"
"sync/atomic" "sync/atomic"
"time" "time"
@ -383,31 +384,90 @@ func PruneTransactionIndex(db ethdb.Database, pruneBlock uint64) {
if tail == nil || *tail > pruneBlock { if tail == nil || *tail > pruneBlock {
return // no index, or index ends above pruneBlock return // no index, or index ends above pruneBlock
} }
// There are blocks below pruneBlock in the index. Iterate the entire index to remove
// their entries. Note if this fails, the index is messed up, but tail still points to workers := runtime.NumCPU()
// the old tail. if workers > 16 {
var count, removed int workers = 16
DeleteAllTxLookupEntries(db, func(txhash common.Hash, v []byte) bool { }
count++ start := time.Now()
if count%10000000 == 0 { var (
log.Info("Pruning tx index", "count", count, "removed", removed) wg sync.WaitGroup
removed atomic.Int64
scanned atomic.Int64
)
// Periodically log progress from the main goroutine.
done := make(chan struct{})
go func() {
ticker := time.NewTicker(8 * time.Second)
defer ticker.Stop()
for {
select {
case <-ticker.C:
log.Info("Pruning tx index", "scanned", scanned.Load(), "removed", removed.Load(), "elapsed", common.PrettyDuration(time.Since(start)))
case <-done:
return
}
} }
if len(v) > 8 { }()
log.Error("Skipping legacy tx index entry", "hash", txhash) for i := range workers {
return false wg.Add(1)
}
bn := decodeNumber(v) // Split the keyspace by the first byte of the tx hash.
if bn < pruneBlock { // Tx hashes are uniformly distributed so each worker gets
removed++ // roughly equal work.
return true rangeStart := byte(i * 256 / workers)
} rangeEnd := byte((i + 1) * 256 / workers)
return false isLast := i == workers-1
})
go func() {
defer wg.Done()
var it ethdb.Iterator
if rangeStart == 0 {
it = NewKeyLengthIterator(db.NewIterator(txLookupPrefix, nil), common.HashLength+len(txLookupPrefix))
} else {
it = NewKeyLengthIterator(db.NewIterator(txLookupPrefix, []byte{rangeStart}), common.HashLength+len(txLookupPrefix))
}
defer it.Release()
batch := db.NewBatch()
for it.Next() {
// Stop if we've passed this worker's range.
if !isLast && it.Key()[len(txLookupPrefix)] >= rangeEnd {
break
}
scanned.Add(1)
v := it.Value()
if len(v) > 8 {
continue // skip legacy format entries
}
if decodeBlockNumber(v) < pruneBlock {
batch.Delete(it.Key())
removed.Add(1)
}
if batch.ValueSize() >= ethdb.IdealBatchSize {
if err := batch.Write(); err != nil {
log.Crit("Failed to delete tx index entries", "err", err)
}
batch.Reset()
}
}
if batch.ValueSize() > 0 {
if err := batch.Write(); err != nil {
log.Crit("Failed to delete tx index entries", "err", err)
}
batch.Reset()
}
}()
}
wg.Wait()
close(done)
WriteTxIndexTail(db, pruneBlock) WriteTxIndexTail(db, pruneBlock)
log.Info("Pruned transaction index", "removed", removed.Load(), "scanned", scanned.Load(), "elapsed", common.PrettyDuration(time.Since(start)))
} }
func decodeNumber(b []byte) uint64 { func decodeBlockNumber(b []byte) uint64 {
var numBuffer [8]byte var buf [8]byte
copy(numBuffer[8-len(b):], b) copy(buf[8-len(b):], b)
return binary.BigEndian.Uint64(numBuffer[:]) return binary.BigEndian.Uint64(buf[:])
} }

View file

@ -103,69 +103,52 @@ func TestChainIterator(t *testing.T) {
} }
} }
func initDatabaseWithTransactions(db ethdb.Database) ([]*types.Block, []*types.Transaction) { func initDatabaseWithTransactions(db ethdb.Database, numBlocks, txsPerBlock int) []*types.Block {
var blocks []*types.Block
var txs []*types.Transaction
to := common.BytesToAddress([]byte{0x11}) to := common.BytesToAddress([]byte{0x11})
// Write empty genesis block var blocks []*types.Block
block := types.NewBlock(&types.Header{Number: big.NewInt(int64(0))}, nil, nil, newTestHasher()) block := types.NewBlock(&types.Header{Number: big.NewInt(0)}, nil, nil, newTestHasher())
WriteBlock(db, block) WriteBlock(db, block)
WriteCanonicalHash(db, block.Hash(), block.NumberU64()) WriteCanonicalHash(db, block.Hash(), block.NumberU64())
blocks = append(blocks, block) blocks = append(blocks, block)
// Create transactions. for i := 1; i <= numBlocks; i++ {
for i := uint64(1); i <= 10; i++ { txs := make([]*types.Transaction, txsPerBlock)
var tx *types.Transaction for j := range txs {
if i%2 == 0 { txs[j] = types.NewTx(&types.LegacyTx{
tx = types.NewTx(&types.LegacyTx{ Nonce: uint64(i*txsPerBlock + j),
Nonce: i,
GasPrice: big.NewInt(11111), GasPrice: big.NewInt(11111),
Gas: 1111, Gas: 1111,
To: &to, To: &to,
Value: big.NewInt(111), Value: big.NewInt(111),
Data: []byte{0x11, 0x11, 0x11},
})
} else {
tx = types.NewTx(&types.AccessListTx{
ChainID: big.NewInt(1337),
Nonce: i,
GasPrice: big.NewInt(11111),
Gas: 1111,
To: &to,
Value: big.NewInt(111),
Data: []byte{0x11, 0x11, 0x11},
}) })
} }
txs = append(txs, tx) block := types.NewBlock(&types.Header{Number: big.NewInt(int64(i))}, &types.Body{Transactions: types.Transactions(txs)}, nil, newTestHasher())
block := types.NewBlock(&types.Header{Number: big.NewInt(int64(i))}, &types.Body{Transactions: types.Transactions{tx}}, nil, newTestHasher())
WriteBlock(db, block) WriteBlock(db, block)
WriteCanonicalHash(db, block.Hash(), block.NumberU64()) WriteCanonicalHash(db, block.Hash(), block.NumberU64())
blocks = append(blocks, block) blocks = append(blocks, block)
} }
return blocks
return blocks, txs
} }
func TestIndexTransactions(t *testing.T) { func TestIndexTransactions(t *testing.T) {
// Construct test chain db // Construct test chain db
chainDB := NewMemoryDatabase() chainDB := NewMemoryDatabase()
_, txs := initDatabaseWithTransactions(chainDB) blocks := initDatabaseWithTransactions(chainDB, 10, 1)
// verify checks whether the tx indices in the range [from, to) // verify checks whether the tx indices in the block range [from, to)
// is expected. // are present or absent.
verify := func(from, to int, exist bool, tail uint64) { verify := func(from, to int, exist bool, tail uint64) {
for i := from; i < to; i++ { for _, block := range blocks[from:to] {
if i == 0 { for _, tx := range block.Transactions() {
continue number := ReadTxLookupEntry(chainDB, tx.Hash())
} if exist && number == nil {
number := ReadTxLookupEntry(chainDB, txs[i-1].Hash()) t.Fatalf("Transaction index missing for block %d", block.NumberU64())
if exist && number == nil { }
t.Fatalf("Transaction index %d missing", i) if !exist && number != nil {
} t.Fatalf("Transaction index not deleted for block %d", block.NumberU64())
if !exist && number != nil { }
t.Fatalf("Transaction index %d is not deleted", i)
} }
} }
number := ReadTxIndexTail(chainDB) number := ReadTxIndexTail(chainDB)
@ -221,7 +204,7 @@ func TestIndexTransactions(t *testing.T) {
func TestUnindexTransactionsMissingBody(t *testing.T) { func TestUnindexTransactionsMissingBody(t *testing.T) {
// Construct test chain db // Construct test chain db
chainDB := NewMemoryDatabase() chainDB := NewMemoryDatabase()
blocks, _ := initDatabaseWithTransactions(chainDB) blocks := initDatabaseWithTransactions(chainDB, 10, 1)
// Index the entire chain. // Index the entire chain.
lastBlock := blocks[len(blocks)-1].NumberU64() lastBlock := blocks[len(blocks)-1].NumberU64()
@ -250,9 +233,12 @@ func TestUnindexTransactionsMissingBody(t *testing.T) {
func TestPruneTransactionIndex(t *testing.T) { func TestPruneTransactionIndex(t *testing.T) {
chainDB := NewMemoryDatabase() chainDB := NewMemoryDatabase()
blocks, _ := initDatabaseWithTransactions(chainDB)
// Create 100 blocks with 8 txs each (800 total tx index entries) so that
// parallel workers each get a meaningful share of the keyspace.
blocks := initDatabaseWithTransactions(chainDB, 100, 8)
lastBlock := blocks[len(blocks)-1].NumberU64() lastBlock := blocks[len(blocks)-1].NumberU64()
pruneBlock := lastBlock - 3 pruneBlock := lastBlock / 2 // prune the first half
IndexTransactions(chainDB, 0, lastBlock+1, nil, false) IndexTransactions(chainDB, 0, lastBlock+1, nil, false)