triedb/pathdb: optimize history indexing efficiency (#33303)

This pull request optimizes history indexing by splitting a single large
database
 batch into multiple smaller chunks.

Originally, the indexer will resolve a batch of state histories and
commit all
corresponding index entries atomically together with the indexing
marker.

While indexing more state histories in a single batch improves
efficiency, excessively
large batches can cause significant memory issues.

To mitigate this, the pull request splits the mega-batch into several
smaller batches
and flushes them independently during indexing. However, this introduces
a potential
inconsistency that some index entries may be flushed while the indexing
marker is not,
and an unclean shutdown may leave the database in a partially updated
state.
This can corrupt index data.

To address this, head truncation is introduced. After a restart, any
excessive index
entries beyond the expected indexing marker are removed, ensuring the
index remains
consistent after an unclean shutdown.
This commit is contained in:
rjl493456442 2025-12-30 23:05:13 +08:00 committed by GitHub
parent b84097d22e
commit b3e7d9ee44
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
6 changed files with 333 additions and 82 deletions

View file

@ -163,12 +163,15 @@ type indexWriter struct {
db ethdb.KeyValueReader
}
// newIndexWriter constructs the index writer for the specified state.
func newIndexWriter(db ethdb.KeyValueReader, state stateIdent) (*indexWriter, error) {
// newIndexWriter constructs the index writer for the specified state. Additionally,
// it takes an integer as the limit and prunes all existing elements above that ID.
// It's essential as the recovery mechanism after unclean shutdown during the history
// indexing.
func newIndexWriter(db ethdb.KeyValueReader, state stateIdent, limit uint64) (*indexWriter, error) {
blob := readStateIndex(state, db)
if len(blob) == 0 {
desc := newIndexBlockDesc(0)
bw, _ := newBlockWriter(nil, desc)
bw, _ := newBlockWriter(nil, desc, 0 /* useless if the block is empty */)
return &indexWriter{
descList: []*indexBlockDesc{desc},
bw: bw,
@ -180,15 +183,27 @@ func newIndexWriter(db ethdb.KeyValueReader, state stateIdent) (*indexWriter, er
if err != nil {
return nil, err
}
// Trim trailing blocks whose elements all exceed the limit.
for i := len(descList) - 1; i > 0 && descList[i].max > limit; i-- {
// The previous block has the elements that exceed the limit,
// therefore the current block can be entirely dropped.
if descList[i-1].max >= limit {
descList = descList[:i]
}
}
// Take the last block for appending new elements
lastDesc := descList[len(descList)-1]
indexBlock := readStateIndexBlock(state, db, lastDesc.id)
bw, err := newBlockWriter(indexBlock, lastDesc)
// Construct the writer for the last block. All elements in this block
// that exceed the limit will be truncated.
bw, err := newBlockWriter(indexBlock, lastDesc, limit)
if err != nil {
return nil, err
}
return &indexWriter{
descList: descList,
lastID: lastDesc.max,
lastID: bw.last(),
bw: bw,
state: state,
db: db,
@ -221,7 +236,7 @@ func (w *indexWriter) rotate() error {
desc = newIndexBlockDesc(w.bw.desc.id + 1)
)
w.frozen = append(w.frozen, w.bw)
w.bw, err = newBlockWriter(nil, desc)
w.bw, err = newBlockWriter(nil, desc, 0 /* useless if the block is empty */)
if err != nil {
return err
}
@ -271,13 +286,13 @@ type indexDeleter struct {
}
// newIndexDeleter constructs the index deleter for the specified state.
func newIndexDeleter(db ethdb.KeyValueReader, state stateIdent) (*indexDeleter, error) {
func newIndexDeleter(db ethdb.KeyValueReader, state stateIdent, limit uint64) (*indexDeleter, error) {
blob := readStateIndex(state, db)
if len(blob) == 0 {
// TODO(rjl493456442) we can probably return an error here,
// deleter with no data is meaningless.
desc := newIndexBlockDesc(0)
bw, _ := newBlockWriter(nil, desc)
bw, _ := newBlockWriter(nil, desc, 0 /* useless if the block is empty */)
return &indexDeleter{
descList: []*indexBlockDesc{desc},
bw: bw,
@ -289,22 +304,34 @@ func newIndexDeleter(db ethdb.KeyValueReader, state stateIdent) (*indexDeleter,
if err != nil {
return nil, err
}
// Trim trailing blocks whose elements all exceed the limit.
for i := len(descList) - 1; i > 0 && descList[i].max > limit; i-- {
// The previous block has the elements that exceed the limit,
// therefore the current block can be entirely dropped.
if descList[i-1].max >= limit {
descList = descList[:i]
}
}
// Take the block for deleting element from
lastDesc := descList[len(descList)-1]
indexBlock := readStateIndexBlock(state, db, lastDesc.id)
bw, err := newBlockWriter(indexBlock, lastDesc)
// Construct the writer for the last block. All elements in this block
// that exceed the limit will be truncated.
bw, err := newBlockWriter(indexBlock, lastDesc, limit)
if err != nil {
return nil, err
}
return &indexDeleter{
descList: descList,
lastID: lastDesc.max,
lastID: bw.last(),
bw: bw,
state: state,
db: db,
}, nil
}
// empty returns an flag indicating whether the state index is empty.
// empty returns whether the state index is empty.
func (d *indexDeleter) empty() bool {
return d.bw.empty() && len(d.descList) == 1
}
@ -337,7 +364,7 @@ func (d *indexDeleter) pop(id uint64) error {
// Open the previous block writer for deleting
lastDesc := d.descList[len(d.descList)-1]
indexBlock := readStateIndexBlock(d.state, d.db, lastDesc.id)
bw, err := newBlockWriter(indexBlock, lastDesc)
bw, err := newBlockWriter(indexBlock, lastDesc, lastDesc.max)
if err != nil {
return err
}

View file

@ -21,13 +21,15 @@ import (
"errors"
"fmt"
"math"
"github.com/ethereum/go-ethereum/log"
)
const (
indexBlockDescSize = 14 // The size of index block descriptor
indexBlockEntriesCap = 4096 // The maximum number of entries can be grouped in a block
indexBlockRestartLen = 256 // The restart interval length of index block
historyIndexBatch = 512 * 1024 // The number of state history indexes for constructing or deleting as batch
indexBlockDescSize = 14 // The size of index block descriptor
indexBlockEntriesCap = 4096 // The maximum number of entries can be grouped in a block
indexBlockRestartLen = 256 // The restart interval length of index block
historyIndexBatch = 8 * 1024 * 1024 // The number of state history indexes for constructing or deleting as batch
)
// indexBlockDesc represents a descriptor for an index block, which contains a
@ -180,7 +182,11 @@ type blockWriter struct {
data []byte // Aggregated encoded data slice
}
func newBlockWriter(blob []byte, desc *indexBlockDesc) (*blockWriter, error) {
// newBlockWriter constructs a block writer. In addition to the existing data
// and block description, it takes an element ID and prunes all existing elements
// above that ID. It's essential as the recovery mechanism after unclean shutdown
// during the history indexing.
func newBlockWriter(blob []byte, desc *indexBlockDesc, limit uint64) (*blockWriter, error) {
if len(blob) == 0 {
return &blockWriter{
desc: desc,
@ -191,11 +197,22 @@ func newBlockWriter(blob []byte, desc *indexBlockDesc) (*blockWriter, error) {
if err != nil {
return nil, err
}
return &blockWriter{
writer := &blockWriter{
desc: desc,
restarts: restarts,
data: data, // safe to own the slice
}, nil
}
var trimmed int
for !writer.empty() && writer.last() > limit {
if err := writer.pop(writer.last()); err != nil {
return nil, err
}
trimmed += 1
}
if trimmed > 0 {
log.Debug("Truncated extraneous elements", "count", trimmed, "limit", limit)
}
return writer, nil
}
// append adds a new element to the block. The new element must be greater than
@ -271,6 +288,7 @@ func (b *blockWriter) sectionLast(section int) uint64 {
// sectionSearch looks up the specified value in the given section,
// the position and the preceding value will be returned if found.
// It assumes that the preceding element exists in the section.
func (b *blockWriter) sectionSearch(section int, n uint64) (found bool, prev uint64, pos int) {
b.scanSection(section, func(v uint64, p int) bool {
if n == v {
@ -295,7 +313,6 @@ func (b *blockWriter) pop(id uint64) error {
}
// If there is only one entry left, the entire block should be reset
if b.desc.entries == 1 {
//b.desc.min = 0
b.desc.max = 0
b.desc.entries = 0
b.restarts = nil
@ -331,6 +348,15 @@ func (b *blockWriter) full() bool {
return b.desc.full()
}
// last returns the last element in the block. It should only be called when
// writer is not empty, otherwise the returned data is meaningless.
func (b *blockWriter) last() uint64 {
if b.empty() {
return 0
}
return b.desc.max
}
// finish finalizes the index block encoding by appending the encoded restart points
// and the restart counter to the end of the block.
//

View file

@ -28,7 +28,7 @@ func TestBlockReaderBasic(t *testing.T) {
elements := []uint64{
1, 5, 10, 11, 20,
}
bw, _ := newBlockWriter(nil, newIndexBlockDesc(0))
bw, _ := newBlockWriter(nil, newIndexBlockDesc(0), 0)
for i := 0; i < len(elements); i++ {
bw.append(elements[i])
}
@ -66,7 +66,7 @@ func TestBlockReaderLarge(t *testing.T) {
}
slices.Sort(elements)
bw, _ := newBlockWriter(nil, newIndexBlockDesc(0))
bw, _ := newBlockWriter(nil, newIndexBlockDesc(0), 0)
for i := 0; i < len(elements); i++ {
bw.append(elements[i])
}
@ -95,7 +95,7 @@ func TestBlockReaderLarge(t *testing.T) {
}
func TestBlockWriterBasic(t *testing.T) {
bw, _ := newBlockWriter(nil, newIndexBlockDesc(0))
bw, _ := newBlockWriter(nil, newIndexBlockDesc(0), 0)
if !bw.empty() {
t.Fatal("expected empty block")
}
@ -103,11 +103,13 @@ func TestBlockWriterBasic(t *testing.T) {
if err := bw.append(1); err == nil {
t.Fatal("out-of-order insertion is not expected")
}
var maxElem uint64
for i := 0; i < 10; i++ {
bw.append(uint64(i + 3))
maxElem = uint64(i + 3)
}
bw, err := newBlockWriter(bw.finish(), newIndexBlockDesc(0))
bw, err := newBlockWriter(bw.finish(), newIndexBlockDesc(0), maxElem)
if err != nil {
t.Fatalf("Failed to construct the block writer, %v", err)
}
@ -119,8 +121,71 @@ func TestBlockWriterBasic(t *testing.T) {
bw.finish()
}
func TestBlockWriterWithLimit(t *testing.T) {
bw, _ := newBlockWriter(nil, newIndexBlockDesc(0), 0)
var maxElem uint64
for i := 0; i < indexBlockRestartLen*2; i++ {
bw.append(uint64(i + 1))
maxElem = uint64(i + 1)
}
suites := []struct {
limit uint64
expMax uint64
}{
// nothing to truncate
{
maxElem, maxElem,
},
// truncate the last element
{
maxElem - 1, maxElem - 1,
},
// truncation around the restart boundary
{
uint64(indexBlockRestartLen + 1),
uint64(indexBlockRestartLen + 1),
},
// truncation around the restart boundary
{
uint64(indexBlockRestartLen),
uint64(indexBlockRestartLen),
},
{
uint64(1), uint64(1),
},
// truncate the entire block, it's in theory invalid
{
uint64(0), uint64(0),
},
}
for i, suite := range suites {
desc := *bw.desc
block, err := newBlockWriter(bw.finish(), &desc, suite.limit)
if err != nil {
t.Fatalf("Failed to construct the block writer, %v", err)
}
if block.desc.max != suite.expMax {
t.Fatalf("Test %d, unexpected max value, got %d, want %d", i, block.desc.max, suite.expMax)
}
// Re-fill the elements
var maxElem uint64
for elem := suite.limit + 1; elem < indexBlockRestartLen*4; elem++ {
if err := block.append(elem); err != nil {
t.Fatalf("Failed to append value %d: %v", elem, err)
}
maxElem = elem
}
if block.desc.max != maxElem {
t.Fatalf("Test %d, unexpected max value, got %d, want %d", i, block.desc.max, maxElem)
}
}
}
func TestBlockWriterDelete(t *testing.T) {
bw, _ := newBlockWriter(nil, newIndexBlockDesc(0))
bw, _ := newBlockWriter(nil, newIndexBlockDesc(0), 0)
for i := 0; i < 10; i++ {
bw.append(uint64(i + 1))
}
@ -147,7 +212,7 @@ func TestBlcokWriterDeleteWithData(t *testing.T) {
elements := []uint64{
1, 5, 10, 11, 20,
}
bw, _ := newBlockWriter(nil, newIndexBlockDesc(0))
bw, _ := newBlockWriter(nil, newIndexBlockDesc(0), 0)
for i := 0; i < len(elements); i++ {
bw.append(elements[i])
}
@ -158,7 +223,7 @@ func TestBlcokWriterDeleteWithData(t *testing.T) {
max: 20,
entries: 5,
}
bw, err := newBlockWriter(bw.finish(), desc)
bw, err := newBlockWriter(bw.finish(), desc, elements[len(elements)-1])
if err != nil {
t.Fatalf("Failed to construct block writer %v", err)
}
@ -201,15 +266,18 @@ func TestBlcokWriterDeleteWithData(t *testing.T) {
}
func TestCorruptedIndexBlock(t *testing.T) {
bw, _ := newBlockWriter(nil, newIndexBlockDesc(0))
bw, _ := newBlockWriter(nil, newIndexBlockDesc(0), 0)
var maxElem uint64
for i := 0; i < 10; i++ {
bw.append(uint64(i + 1))
maxElem = uint64(i + 1)
}
buf := bw.finish()
// Mutate the buffer manually
buf[len(buf)-1]++
_, err := newBlockWriter(buf, newIndexBlockDesc(0))
_, err := newBlockWriter(buf, newIndexBlockDesc(0), maxElem)
if err == nil {
t.Fatal("Corrupted index block data is not detected")
}
@ -218,7 +286,7 @@ func TestCorruptedIndexBlock(t *testing.T) {
// BenchmarkParseIndexBlock benchmarks the performance of parseIndexBlock.
func BenchmarkParseIndexBlock(b *testing.B) {
// Generate a realistic index block blob
bw, _ := newBlockWriter(nil, newIndexBlockDesc(0))
bw, _ := newBlockWriter(nil, newIndexBlockDesc(0), 0)
for i := 0; i < 4096; i++ {
bw.append(uint64(i * 2))
}
@ -238,13 +306,15 @@ func BenchmarkBlockWriterAppend(b *testing.B) {
b.ReportAllocs()
b.ResetTimer()
desc := newIndexBlockDesc(0)
writer, _ := newBlockWriter(nil, desc)
var blockID uint32
desc := newIndexBlockDesc(blockID)
writer, _ := newBlockWriter(nil, desc, 0)
for i := 0; i < b.N; i++ {
if writer.full() {
desc = newIndexBlockDesc(0)
writer, _ = newBlockWriter(nil, desc)
blockID += 1
desc = newIndexBlockDesc(blockID)
writer, _ = newBlockWriter(nil, desc, 0)
}
if err := writer.append(writer.desc.max + 1); err != nil {
b.Error(err)

View file

@ -33,7 +33,7 @@ func makeTestIndexBlock(count int) ([]byte, []uint64) {
marks = make(map[uint64]bool)
elements []uint64
)
bw, _ := newBlockWriter(nil, newIndexBlockDesc(0))
bw, _ := newBlockWriter(nil, newIndexBlockDesc(0), 0)
for i := 0; i < count; i++ {
n := uint64(rand.Uint32())
if marks[n] {
@ -67,7 +67,7 @@ func makeTestIndexBlocks(db ethdb.KeyValueStore, stateIdent stateIdent, count in
}
sort.Slice(elements, func(i, j int) bool { return elements[i] < elements[j] })
iw, _ := newIndexWriter(db, stateIdent)
iw, _ := newIndexWriter(db, stateIdent, 0)
for i := 0; i < len(elements); i++ {
iw.append(elements[i])
}

View file

@ -33,7 +33,7 @@ func TestIndexReaderBasic(t *testing.T) {
1, 5, 10, 11, 20,
}
db := rawdb.NewMemoryDatabase()
bw, _ := newIndexWriter(db, newAccountIdent(common.Hash{0xa}))
bw, _ := newIndexWriter(db, newAccountIdent(common.Hash{0xa}), 0)
for i := 0; i < len(elements); i++ {
bw.append(elements[i])
}
@ -75,7 +75,7 @@ func TestIndexReaderLarge(t *testing.T) {
slices.Sort(elements)
db := rawdb.NewMemoryDatabase()
bw, _ := newIndexWriter(db, newAccountIdent(common.Hash{0xa}))
bw, _ := newIndexWriter(db, newAccountIdent(common.Hash{0xa}), 0)
for i := 0; i < len(elements); i++ {
bw.append(elements[i])
}
@ -122,19 +122,21 @@ func TestEmptyIndexReader(t *testing.T) {
func TestIndexWriterBasic(t *testing.T) {
db := rawdb.NewMemoryDatabase()
iw, _ := newIndexWriter(db, newAccountIdent(common.Hash{0xa}))
iw, _ := newIndexWriter(db, newAccountIdent(common.Hash{0xa}), 0)
iw.append(2)
if err := iw.append(1); err == nil {
t.Fatal("out-of-order insertion is not expected")
}
var maxElem uint64
for i := 0; i < 10; i++ {
iw.append(uint64(i + 3))
maxElem = uint64(i + 3)
}
batch := db.NewBatch()
iw.finish(batch)
batch.Write()
iw, err := newIndexWriter(db, newAccountIdent(common.Hash{0xa}))
iw, err := newIndexWriter(db, newAccountIdent(common.Hash{0xa}), maxElem)
if err != nil {
t.Fatalf("Failed to construct the block writer, %v", err)
}
@ -146,18 +148,87 @@ func TestIndexWriterBasic(t *testing.T) {
iw.finish(db.NewBatch())
}
func TestIndexWriterDelete(t *testing.T) {
func TestIndexWriterWithLimit(t *testing.T) {
db := rawdb.NewMemoryDatabase()
iw, _ := newIndexWriter(db, newAccountIdent(common.Hash{0xa}))
iw, _ := newIndexWriter(db, newAccountIdent(common.Hash{0xa}), 0)
var maxElem uint64
for i := 0; i < indexBlockEntriesCap*2; i++ {
iw.append(uint64(i + 1))
maxElem = uint64(i + 1)
}
batch := db.NewBatch()
iw.finish(batch)
batch.Write()
suites := []struct {
limit uint64
expMax uint64
}{
// nothing to truncate
{
maxElem, maxElem,
},
// truncate the last element
{
maxElem - 1, maxElem - 1,
},
// truncation around the block boundary
{
uint64(indexBlockEntriesCap + 1),
uint64(indexBlockEntriesCap + 1),
},
// truncation around the block boundary
{
uint64(indexBlockEntriesCap),
uint64(indexBlockEntriesCap),
},
{
uint64(1), uint64(1),
},
// truncate the entire index, it's in theory invalid
{
uint64(0), uint64(0),
},
}
for i, suite := range suites {
iw, err := newIndexWriter(db, newAccountIdent(common.Hash{0xa}), suite.limit)
if err != nil {
t.Fatalf("Failed to construct the index writer, %v", err)
}
if iw.lastID != suite.expMax {
t.Fatalf("Test %d, unexpected max value, got %d, want %d", i, iw.lastID, suite.expMax)
}
// Re-fill the elements
var maxElem uint64
for elem := suite.limit + 1; elem < indexBlockEntriesCap*4; elem++ {
if err := iw.append(elem); err != nil {
t.Fatalf("Failed to append value %d: %v", elem, err)
}
maxElem = elem
}
if iw.lastID != maxElem {
t.Fatalf("Test %d, unexpected max value, got %d, want %d", i, iw.lastID, maxElem)
}
}
}
func TestIndexDeleterBasic(t *testing.T) {
db := rawdb.NewMemoryDatabase()
iw, _ := newIndexWriter(db, newAccountIdent(common.Hash{0xa}), 0)
var maxElem uint64
for i := 0; i < indexBlockEntriesCap*4; i++ {
iw.append(uint64(i + 1))
maxElem = uint64(i + 1)
}
batch := db.NewBatch()
iw.finish(batch)
batch.Write()
// Delete unknown id, the request should be rejected
id, _ := newIndexDeleter(db, newAccountIdent(common.Hash{0xa}))
id, _ := newIndexDeleter(db, newAccountIdent(common.Hash{0xa}), maxElem)
if err := id.pop(indexBlockEntriesCap * 5); err == nil {
t.Fatal("Expect error to occur for unknown id")
}
@ -168,10 +239,66 @@ func TestIndexWriterDelete(t *testing.T) {
if id.lastID != uint64(i-1) {
t.Fatalf("Unexpected lastID, want: %d, got: %d", uint64(i-1), iw.lastID)
}
if rand.Intn(10) == 0 {
batch := db.NewBatch()
id.finish(batch)
batch.Write()
}
}
func TestIndexDeleterWithLimit(t *testing.T) {
db := rawdb.NewMemoryDatabase()
iw, _ := newIndexWriter(db, newAccountIdent(common.Hash{0xa}), 0)
var maxElem uint64
for i := 0; i < indexBlockEntriesCap*2; i++ {
iw.append(uint64(i + 1))
maxElem = uint64(i + 1)
}
batch := db.NewBatch()
iw.finish(batch)
batch.Write()
suites := []struct {
limit uint64
expMax uint64
}{
// nothing to truncate
{
maxElem, maxElem,
},
// truncate the last element
{
maxElem - 1, maxElem - 1,
},
// truncation around the block boundary
{
uint64(indexBlockEntriesCap + 1),
uint64(indexBlockEntriesCap + 1),
},
// truncation around the block boundary
{
uint64(indexBlockEntriesCap),
uint64(indexBlockEntriesCap),
},
{
uint64(1), uint64(1),
},
// truncate the entire index, it's in theory invalid
{
uint64(0), uint64(0),
},
}
for i, suite := range suites {
id, err := newIndexDeleter(db, newAccountIdent(common.Hash{0xa}), suite.limit)
if err != nil {
t.Fatalf("Failed to construct the index writer, %v", err)
}
if id.lastID != suite.expMax {
t.Fatalf("Test %d, unexpected max value, got %d, want %d", i, id.lastID, suite.expMax)
}
// Keep removing elements
for elem := id.lastID; elem > 0; elem-- {
if err := id.pop(elem); err != nil {
t.Fatalf("Failed to pop value %d: %v", elem, err)
}
}
}
}

View file

@ -40,11 +40,6 @@ const (
stateHistoryIndexVersion = stateHistoryIndexV0 // the current state index version
trienodeHistoryIndexV0 = uint8(0) // initial version of trienode index structure
trienodeHistoryIndexVersion = trienodeHistoryIndexV0 // the current trienode index version
// estimations for calculating the batch size for atomic database commit
estimatedStateHistoryIndexSize = 3 // The average size of each state history index entry is approximately 23 bytes
estimatedTrienodeHistoryIndexSize = 3 // The average size of each trienode history index entry is approximately 2-3 bytes
estimatedIndexBatchSizeFactor = 32 // The factor counts for the write amplification for each entry
)
// indexVersion returns the latest index version for the given history type.
@ -155,22 +150,6 @@ func (b *batchIndexer) process(h history, id uint64) error {
return b.finish(false)
}
// makeBatch constructs a database batch based on the number of pending entries.
// The batch size is roughly estimated to minimize repeated resizing rounds,
// as accurately predicting the exact size is technically challenging.
func (b *batchIndexer) makeBatch() ethdb.Batch {
var size int
switch b.typ {
case typeStateHistory:
size = estimatedStateHistoryIndexSize
case typeTrienodeHistory:
size = estimatedTrienodeHistoryIndexSize
default:
panic(fmt.Sprintf("unknown history type %d", b.typ))
}
return b.db.NewBatchWithSize(size * estimatedIndexBatchSizeFactor * b.pending)
}
// finish writes the accumulated state indexes into the disk if either the
// memory limitation is reached or it's requested forcibly.
func (b *batchIndexer) finish(force bool) error {
@ -181,17 +160,38 @@ func (b *batchIndexer) finish(force bool) error {
return nil
}
var (
batch = b.makeBatch()
batchMu sync.RWMutex
start = time.Now()
eg errgroup.Group
start = time.Now()
eg errgroup.Group
batch = b.db.NewBatchWithSize(ethdb.IdealBatchSize)
batchSize int
batchMu sync.RWMutex
writeBatch = func(fn func(batch ethdb.Batch)) error {
batchMu.Lock()
defer batchMu.Unlock()
fn(batch)
if batch.ValueSize() >= ethdb.IdealBatchSize {
batchSize += batch.ValueSize()
if err := batch.Write(); err != nil {
return err
}
batch.Reset()
}
return nil
}
)
eg.SetLimit(runtime.NumCPU())
var indexed uint64
if metadata := loadIndexMetadata(b.db, b.typ); metadata != nil {
indexed = metadata.Last
}
for ident, list := range b.index {
eg.Go(func() error {
if !b.delete {
iw, err := newIndexWriter(b.db, ident)
iw, err := newIndexWriter(b.db, ident, indexed)
if err != nil {
return err
}
@ -200,11 +200,11 @@ func (b *batchIndexer) finish(force bool) error {
return err
}
}
batchMu.Lock()
iw.finish(batch)
batchMu.Unlock()
return writeBatch(func(batch ethdb.Batch) {
iw.finish(batch)
})
} else {
id, err := newIndexDeleter(b.db, ident)
id, err := newIndexDeleter(b.db, ident, indexed)
if err != nil {
return err
}
@ -213,11 +213,10 @@ func (b *batchIndexer) finish(force bool) error {
return err
}
}
batchMu.Lock()
id.finish(batch)
batchMu.Unlock()
return writeBatch(func(batch ethdb.Batch) {
id.finish(batch)
})
}
return nil
})
}
if err := eg.Wait(); err != nil {
@ -233,10 +232,12 @@ func (b *batchIndexer) finish(force bool) error {
storeIndexMetadata(batch, b.typ, b.lastID-1)
}
}
batchSize += batch.ValueSize()
if err := batch.Write(); err != nil {
return err
}
log.Debug("Committed batch indexer", "type", b.typ, "entries", len(b.index), "records", b.pending, "elapsed", common.PrettyDuration(time.Since(start)))
log.Debug("Committed batch indexer", "type", b.typ, "entries", len(b.index), "records", b.pending, "size", common.StorageSize(batchSize), "elapsed", common.PrettyDuration(time.Since(start)))
b.pending = 0
b.index = make(map[stateIdent][]uint64)
return nil