From 48b1127c6661a1afe2849eff5d5c699c4ac7a0cf Mon Sep 17 00:00:00 2001 From: Sina Mahmoodi Date: Thu, 19 Mar 2026 14:28:49 +0000 Subject: [PATCH 1/4] core/rawdb: parallel pruning of txindex --- core/rawdb/chain_iterator.go | 106 +++++++++++++++++++++++------- core/rawdb/chain_iterator_test.go | 70 ++++++++------------ 2 files changed, 111 insertions(+), 65 deletions(-) diff --git a/core/rawdb/chain_iterator.go b/core/rawdb/chain_iterator.go index afa1aa7a4c..3232c7b0b4 100644 --- a/core/rawdb/chain_iterator.go +++ b/core/rawdb/chain_iterator.go @@ -19,6 +19,7 @@ package rawdb import ( "encoding/binary" "runtime" + "sync" "sync/atomic" "time" @@ -383,31 +384,90 @@ func PruneTransactionIndex(db ethdb.Database, pruneBlock uint64) { if tail == nil || *tail > pruneBlock { return // no index, or index ends above pruneBlock } - // There are blocks below pruneBlock in the index. Iterate the entire index to remove - // their entries. Note if this fails, the index is messed up, but tail still points to - // the old tail. - var count, removed int - DeleteAllTxLookupEntries(db, func(txhash common.Hash, v []byte) bool { - count++ - if count%10000000 == 0 { - log.Info("Pruning tx index", "count", count, "removed", removed) + + workers := runtime.NumCPU() + if workers > 16 { + workers = 16 + } + start := time.Now() + var ( + wg sync.WaitGroup + removed atomic.Int64 + scanned atomic.Int64 + ) + // Periodically log progress from the main goroutine. + done := make(chan struct{}) + go func() { + ticker := time.NewTicker(8 * time.Second) + defer ticker.Stop() + for { + select { + case <-ticker.C: + log.Info("Pruning tx index", "scanned", scanned.Load(), "removed", removed.Load(), "elapsed", common.PrettyDuration(time.Since(start))) + case <-done: + return + } } - if len(v) > 8 { - log.Error("Skipping legacy tx index entry", "hash", txhash) - return false - } - bn := decodeNumber(v) - if bn < pruneBlock { - removed++ - return true - } - return false - }) + }() + for i := range workers { + wg.Add(1) + + // Split the keyspace by the first byte of the tx hash. + // Tx hashes are uniformly distributed so each worker gets + // roughly equal work. + rangeStart := byte(i * 256 / workers) + rangeEnd := byte((i + 1) * 256 / workers) + isLast := i == workers-1 + + go func() { + defer wg.Done() + + var it ethdb.Iterator + if rangeStart == 0 { + it = NewKeyLengthIterator(db.NewIterator(txLookupPrefix, nil), common.HashLength+len(txLookupPrefix)) + } else { + it = NewKeyLengthIterator(db.NewIterator(txLookupPrefix, []byte{rangeStart}), common.HashLength+len(txLookupPrefix)) + } + defer it.Release() + + batch := db.NewBatch() + for it.Next() { + // Stop if we've passed this worker's range. + if !isLast && it.Key()[len(txLookupPrefix)] >= rangeEnd { + break + } + scanned.Add(1) + v := it.Value() + if len(v) > 8 { + continue // skip legacy format entries + } + if decodeBlockNumber(v) < pruneBlock { + batch.Delete(it.Key()) + removed.Add(1) + } + if batch.ValueSize() >= ethdb.IdealBatchSize { + if err := batch.Write(); err != nil { + log.Crit("Failed to delete tx index entries", "err", err) + } + batch.Reset() + } + } + if batch.ValueSize() > 0 { + if err := batch.Write(); err != nil { + log.Crit("Failed to delete tx index entries", "err", err) + } + batch.Reset() + } + }() + } + wg.Wait() + close(done) WriteTxIndexTail(db, pruneBlock) + log.Info("Pruned transaction index", "removed", removed.Load(), "scanned", scanned.Load(), "elapsed", common.PrettyDuration(time.Since(start))) } -func decodeNumber(b []byte) uint64 { - var numBuffer [8]byte - copy(numBuffer[8-len(b):], b) - return binary.BigEndian.Uint64(numBuffer[:]) +func decodeBlockNumber(b []byte) uint64 { + var buf [8]byte + copy(buf[8-len(b):], b) + return binary.BigEndian.Uint64(buf[:]) } diff --git a/core/rawdb/chain_iterator_test.go b/core/rawdb/chain_iterator_test.go index 089ebfe828..0af2bd549f 100644 --- a/core/rawdb/chain_iterator_test.go +++ b/core/rawdb/chain_iterator_test.go @@ -103,69 +103,52 @@ func TestChainIterator(t *testing.T) { } } -func initDatabaseWithTransactions(db ethdb.Database) ([]*types.Block, []*types.Transaction) { - var blocks []*types.Block - var txs []*types.Transaction +func initDatabaseWithTransactions(db ethdb.Database, numBlocks, txsPerBlock int) []*types.Block { to := common.BytesToAddress([]byte{0x11}) - // Write empty genesis block - block := types.NewBlock(&types.Header{Number: big.NewInt(int64(0))}, nil, nil, newTestHasher()) + var blocks []*types.Block + block := types.NewBlock(&types.Header{Number: big.NewInt(0)}, nil, nil, newTestHasher()) WriteBlock(db, block) WriteCanonicalHash(db, block.Hash(), block.NumberU64()) blocks = append(blocks, block) - // Create transactions. - for i := uint64(1); i <= 10; i++ { - var tx *types.Transaction - if i%2 == 0 { - tx = types.NewTx(&types.LegacyTx{ - Nonce: i, + for i := 1; i <= numBlocks; i++ { + txs := make([]*types.Transaction, txsPerBlock) + for j := range txs { + txs[j] = types.NewTx(&types.LegacyTx{ + Nonce: uint64(i*txsPerBlock + j), GasPrice: big.NewInt(11111), Gas: 1111, To: &to, Value: big.NewInt(111), - Data: []byte{0x11, 0x11, 0x11}, - }) - } else { - tx = types.NewTx(&types.AccessListTx{ - ChainID: big.NewInt(1337), - Nonce: i, - GasPrice: big.NewInt(11111), - Gas: 1111, - To: &to, - Value: big.NewInt(111), - Data: []byte{0x11, 0x11, 0x11}, }) } - txs = append(txs, tx) - block := types.NewBlock(&types.Header{Number: big.NewInt(int64(i))}, &types.Body{Transactions: types.Transactions{tx}}, nil, newTestHasher()) + block := types.NewBlock(&types.Header{Number: big.NewInt(int64(i))}, &types.Body{Transactions: types.Transactions(txs)}, nil, newTestHasher()) WriteBlock(db, block) WriteCanonicalHash(db, block.Hash(), block.NumberU64()) blocks = append(blocks, block) } - - return blocks, txs + return blocks } func TestIndexTransactions(t *testing.T) { // Construct test chain db chainDB := NewMemoryDatabase() - _, txs := initDatabaseWithTransactions(chainDB) + blocks := initDatabaseWithTransactions(chainDB, 10, 1) - // verify checks whether the tx indices in the range [from, to) - // is expected. + // verify checks whether the tx indices in the block range [from, to) + // are present or absent. verify := func(from, to int, exist bool, tail uint64) { - for i := from; i < to; i++ { - if i == 0 { - continue - } - number := ReadTxLookupEntry(chainDB, txs[i-1].Hash()) - if exist && number == nil { - t.Fatalf("Transaction index %d missing", i) - } - if !exist && number != nil { - t.Fatalf("Transaction index %d is not deleted", i) + for _, block := range blocks[from:to] { + for _, tx := range block.Transactions() { + number := ReadTxLookupEntry(chainDB, tx.Hash()) + if exist && number == nil { + t.Fatalf("Transaction index missing for block %d", block.NumberU64()) + } + if !exist && number != nil { + t.Fatalf("Transaction index not deleted for block %d", block.NumberU64()) + } } } number := ReadTxIndexTail(chainDB) @@ -221,7 +204,7 @@ func TestIndexTransactions(t *testing.T) { func TestUnindexTransactionsMissingBody(t *testing.T) { // Construct test chain db chainDB := NewMemoryDatabase() - blocks, _ := initDatabaseWithTransactions(chainDB) + blocks := initDatabaseWithTransactions(chainDB, 10, 1) // Index the entire chain. lastBlock := blocks[len(blocks)-1].NumberU64() @@ -250,9 +233,12 @@ func TestUnindexTransactionsMissingBody(t *testing.T) { func TestPruneTransactionIndex(t *testing.T) { chainDB := NewMemoryDatabase() - blocks, _ := initDatabaseWithTransactions(chainDB) + + // Create 100 blocks with 8 txs each (800 total tx index entries) so that + // parallel workers each get a meaningful share of the keyspace. + blocks := initDatabaseWithTransactions(chainDB, 100, 8) lastBlock := blocks[len(blocks)-1].NumberU64() - pruneBlock := lastBlock - 3 + pruneBlock := lastBlock / 2 // prune the first half IndexTransactions(chainDB, 0, lastBlock+1, nil, false) From bfeb8284607ab7b260e9142ff9db4afdf22548a5 Mon Sep 17 00:00:00 2001 From: Sina Mahmoodi Date: Tue, 24 Mar 2026 21:08:18 +0000 Subject: [PATCH 2/4] rm special iterator case --- core/rawdb/chain_iterator.go | 7 +------ 1 file changed, 1 insertion(+), 6 deletions(-) diff --git a/core/rawdb/chain_iterator.go b/core/rawdb/chain_iterator.go index 3232c7b0b4..89b0f3f684 100644 --- a/core/rawdb/chain_iterator.go +++ b/core/rawdb/chain_iterator.go @@ -422,12 +422,7 @@ func PruneTransactionIndex(db ethdb.Database, pruneBlock uint64) { go func() { defer wg.Done() - var it ethdb.Iterator - if rangeStart == 0 { - it = NewKeyLengthIterator(db.NewIterator(txLookupPrefix, nil), common.HashLength+len(txLookupPrefix)) - } else { - it = NewKeyLengthIterator(db.NewIterator(txLookupPrefix, []byte{rangeStart}), common.HashLength+len(txLookupPrefix)) - } + it := NewKeyLengthIterator(db.NewIterator(txLookupPrefix, []byte{rangeStart}), common.HashLength+len(txLookupPrefix)) defer it.Release() batch := db.NewBatch() From 749fe860c66c1e423b9215225734e44f4a79497c Mon Sep 17 00:00:00 2001 From: Sina Mahmoodi Date: Tue, 24 Mar 2026 21:29:02 +0000 Subject: [PATCH 3/4] improve readability of keyspace splitting --- core/rawdb/chain_iterator.go | 14 ++++++++------ 1 file changed, 8 insertions(+), 6 deletions(-) diff --git a/core/rawdb/chain_iterator.go b/core/rawdb/chain_iterator.go index 89b0f3f684..378e4c9de5 100644 --- a/core/rawdb/chain_iterator.go +++ b/core/rawdb/chain_iterator.go @@ -415,20 +415,22 @@ func PruneTransactionIndex(db ethdb.Database, pruneBlock uint64) { // Split the keyspace by the first byte of the tx hash. // Tx hashes are uniformly distributed so each worker gets // roughly equal work. - rangeStart := byte(i * 256 / workers) - rangeEnd := byte((i + 1) * 256 / workers) - isLast := i == workers-1 + rangeStart := []byte{byte(i * 256 / workers)} + // nil rangeEnd lets the last worker run to the end of the iterator. + var rangeEnd []byte + if i < workers-1 { + rangeEnd = []byte{byte((i + 1) * 256 / workers)} + } go func() { defer wg.Done() - it := NewKeyLengthIterator(db.NewIterator(txLookupPrefix, []byte{rangeStart}), common.HashLength+len(txLookupPrefix)) + it := NewKeyLengthIterator(db.NewIterator(txLookupPrefix, rangeStart), common.HashLength+len(txLookupPrefix)) defer it.Release() batch := db.NewBatch() for it.Next() { - // Stop if we've passed this worker's range. - if !isLast && it.Key()[len(txLookupPrefix)] >= rangeEnd { + if rangeEnd != nil && it.Key()[len(txLookupPrefix)] >= rangeEnd[0] { break } scanned.Add(1) From 3934c452619229ebbfbb9b06e32cddbdde7c2de3 Mon Sep 17 00:00:00 2001 From: Sina Mahmoodi Date: Tue, 24 Mar 2026 21:51:55 +0000 Subject: [PATCH 4/4] use errgroup approach --- core/rawdb/chain_iterator.go | 41 +++++++++++++----------------------- 1 file changed, 15 insertions(+), 26 deletions(-) diff --git a/core/rawdb/chain_iterator.go b/core/rawdb/chain_iterator.go index 378e4c9de5..0803cf4184 100644 --- a/core/rawdb/chain_iterator.go +++ b/core/rawdb/chain_iterator.go @@ -19,7 +19,6 @@ package rawdb import ( "encoding/binary" "runtime" - "sync" "sync/atomic" "time" @@ -29,6 +28,7 @@ import ( "github.com/ethereum/go-ethereum/ethdb" "github.com/ethereum/go-ethereum/log" "github.com/ethereum/go-ethereum/rlp" + "golang.org/x/sync/errgroup" ) // InitDatabaseFromFreezer reinitializes an empty database from a previous batch @@ -385,16 +385,14 @@ func PruneTransactionIndex(db ethdb.Database, pruneBlock uint64) { return // no index, or index ends above pruneBlock } - workers := runtime.NumCPU() - if workers > 16 { - workers = 16 - } start := time.Now() var ( - wg sync.WaitGroup + eg errgroup.Group removed atomic.Int64 scanned atomic.Int64 ) + eg.SetLimit(runtime.NumCPU()) + // Periodically log progress from the main goroutine. done := make(chan struct{}) go func() { @@ -409,28 +407,18 @@ func PruneTransactionIndex(db ethdb.Database, pruneBlock uint64) { } } }() - for i := range workers { - wg.Add(1) - - // Split the keyspace by the first byte of the tx hash. - // Tx hashes are uniformly distributed so each worker gets - // roughly equal work. - rangeStart := []byte{byte(i * 256 / workers)} - // nil rangeEnd lets the last worker run to the end of the iterator. - var rangeEnd []byte - if i < workers-1 { - rangeEnd = []byte{byte((i + 1) * 256 / workers)} - } - - go func() { - defer wg.Done() - - it := NewKeyLengthIterator(db.NewIterator(txLookupPrefix, rangeStart), common.HashLength+len(txLookupPrefix)) + // Split the keyspace into 256 ranges by the first byte of the tx hash. + // Tx hashes are uniformly distributed so each range gets roughly equal + // work. The errgroup limits concurrency to NumCPU. + for i := 0; i < 256; i++ { + prefix := byte(i) + eg.Go(func() error { + it := NewKeyLengthIterator(db.NewIterator(txLookupPrefix, []byte{prefix}), common.HashLength+len(txLookupPrefix)) defer it.Release() batch := db.NewBatch() for it.Next() { - if rangeEnd != nil && it.Key()[len(txLookupPrefix)] >= rangeEnd[0] { + if it.Key()[len(txLookupPrefix)] != prefix { break } scanned.Add(1) @@ -455,9 +443,10 @@ func PruneTransactionIndex(db ethdb.Database, pruneBlock uint64) { } batch.Reset() } - }() + return nil + }) } - wg.Wait() + eg.Wait() close(done) WriteTxIndexTail(db, pruneBlock) log.Info("Pruned transaction index", "removed", removed.Load(), "scanned", scanned.Load(), "elapsed", common.PrettyDuration(time.Since(start)))