From 3934c452619229ebbfbb9b06e32cddbdde7c2de3 Mon Sep 17 00:00:00 2001 From: Sina Mahmoodi Date: Tue, 24 Mar 2026 21:51:55 +0000 Subject: [PATCH] use errgroup approach --- core/rawdb/chain_iterator.go | 41 +++++++++++++----------------------- 1 file changed, 15 insertions(+), 26 deletions(-) diff --git a/core/rawdb/chain_iterator.go b/core/rawdb/chain_iterator.go index 378e4c9de5..0803cf4184 100644 --- a/core/rawdb/chain_iterator.go +++ b/core/rawdb/chain_iterator.go @@ -19,7 +19,6 @@ package rawdb import ( "encoding/binary" "runtime" - "sync" "sync/atomic" "time" @@ -29,6 +28,7 @@ import ( "github.com/ethereum/go-ethereum/ethdb" "github.com/ethereum/go-ethereum/log" "github.com/ethereum/go-ethereum/rlp" + "golang.org/x/sync/errgroup" ) // InitDatabaseFromFreezer reinitializes an empty database from a previous batch @@ -385,16 +385,14 @@ func PruneTransactionIndex(db ethdb.Database, pruneBlock uint64) { return // no index, or index ends above pruneBlock } - workers := runtime.NumCPU() - if workers > 16 { - workers = 16 - } start := time.Now() var ( - wg sync.WaitGroup + eg errgroup.Group removed atomic.Int64 scanned atomic.Int64 ) + eg.SetLimit(runtime.NumCPU()) + // Periodically log progress from the main goroutine. done := make(chan struct{}) go func() { @@ -409,28 +407,18 @@ func PruneTransactionIndex(db ethdb.Database, pruneBlock uint64) { } } }() - for i := range workers { - wg.Add(1) - - // Split the keyspace by the first byte of the tx hash. - // Tx hashes are uniformly distributed so each worker gets - // roughly equal work. - rangeStart := []byte{byte(i * 256 / workers)} - // nil rangeEnd lets the last worker run to the end of the iterator. - var rangeEnd []byte - if i < workers-1 { - rangeEnd = []byte{byte((i + 1) * 256 / workers)} - } - - go func() { - defer wg.Done() - - it := NewKeyLengthIterator(db.NewIterator(txLookupPrefix, rangeStart), common.HashLength+len(txLookupPrefix)) + // Split the keyspace into 256 ranges by the first byte of the tx hash. + // Tx hashes are uniformly distributed so each range gets roughly equal + // work. The errgroup limits concurrency to NumCPU. + for i := 0; i < 256; i++ { + prefix := byte(i) + eg.Go(func() error { + it := NewKeyLengthIterator(db.NewIterator(txLookupPrefix, []byte{prefix}), common.HashLength+len(txLookupPrefix)) defer it.Release() batch := db.NewBatch() for it.Next() { - if rangeEnd != nil && it.Key()[len(txLookupPrefix)] >= rangeEnd[0] { + if it.Key()[len(txLookupPrefix)] != prefix { break } scanned.Add(1) @@ -455,9 +443,10 @@ func PruneTransactionIndex(db ethdb.Database, pruneBlock uint64) { } batch.Reset() } - }() + return nil + }) } - wg.Wait() + eg.Wait() close(done) WriteTxIndexTail(db, pruneBlock) log.Info("Pruned transaction index", "removed", removed.Load(), "scanned", scanned.Load(), "elapsed", common.PrettyDuration(time.Since(start)))