use errgroup approach

This commit is contained in:
Sina Mahmoodi 2026-03-24 21:51:55 +00:00
parent 749fe860c6
commit 3934c45261

View file

@ -19,7 +19,6 @@ package rawdb
import ( import (
"encoding/binary" "encoding/binary"
"runtime" "runtime"
"sync"
"sync/atomic" "sync/atomic"
"time" "time"
@ -29,6 +28,7 @@ import (
"github.com/ethereum/go-ethereum/ethdb" "github.com/ethereum/go-ethereum/ethdb"
"github.com/ethereum/go-ethereum/log" "github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/rlp" "github.com/ethereum/go-ethereum/rlp"
"golang.org/x/sync/errgroup"
) )
// InitDatabaseFromFreezer reinitializes an empty database from a previous batch // InitDatabaseFromFreezer reinitializes an empty database from a previous batch
@ -385,16 +385,14 @@ func PruneTransactionIndex(db ethdb.Database, pruneBlock uint64) {
return // no index, or index ends above pruneBlock return // no index, or index ends above pruneBlock
} }
workers := runtime.NumCPU()
if workers > 16 {
workers = 16
}
start := time.Now() start := time.Now()
var ( var (
wg sync.WaitGroup eg errgroup.Group
removed atomic.Int64 removed atomic.Int64
scanned atomic.Int64 scanned atomic.Int64
) )
eg.SetLimit(runtime.NumCPU())
// Periodically log progress from the main goroutine. // Periodically log progress from the main goroutine.
done := make(chan struct{}) done := make(chan struct{})
go func() { go func() {
@ -409,28 +407,18 @@ func PruneTransactionIndex(db ethdb.Database, pruneBlock uint64) {
} }
} }
}() }()
for i := range workers { // Split the keyspace into 256 ranges by the first byte of the tx hash.
wg.Add(1) // Tx hashes are uniformly distributed so each range gets roughly equal
// work. The errgroup limits concurrency to NumCPU.
// Split the keyspace by the first byte of the tx hash. for i := 0; i < 256; i++ {
// Tx hashes are uniformly distributed so each worker gets prefix := byte(i)
// roughly equal work. eg.Go(func() error {
rangeStart := []byte{byte(i * 256 / workers)} it := NewKeyLengthIterator(db.NewIterator(txLookupPrefix, []byte{prefix}), common.HashLength+len(txLookupPrefix))
// nil rangeEnd lets the last worker run to the end of the iterator.
var rangeEnd []byte
if i < workers-1 {
rangeEnd = []byte{byte((i + 1) * 256 / workers)}
}
go func() {
defer wg.Done()
it := NewKeyLengthIterator(db.NewIterator(txLookupPrefix, rangeStart), common.HashLength+len(txLookupPrefix))
defer it.Release() defer it.Release()
batch := db.NewBatch() batch := db.NewBatch()
for it.Next() { for it.Next() {
if rangeEnd != nil && it.Key()[len(txLookupPrefix)] >= rangeEnd[0] { if it.Key()[len(txLookupPrefix)] != prefix {
break break
} }
scanned.Add(1) scanned.Add(1)
@ -455,9 +443,10 @@ func PruneTransactionIndex(db ethdb.Database, pruneBlock uint64) {
} }
batch.Reset() batch.Reset()
} }
}() return nil
})
} }
wg.Wait() eg.Wait()
close(done) close(done)
WriteTxIndexTail(db, pruneBlock) WriteTxIndexTail(db, pruneBlock)
log.Info("Pruned transaction index", "removed", removed.Load(), "scanned", scanned.Load(), "elapsed", common.PrettyDuration(time.Since(start))) log.Info("Pruned transaction index", "removed", removed.Load(), "scanned", scanned.Load(), "elapsed", common.PrettyDuration(time.Since(start)))