mirror of
https://github.com/ethereum/go-ethereum.git
synced 2026-05-15 12:36:48 +00:00
core/rawdb: skip missing block bodies during tx unindexing (#33573)
This PR fixes an issue where the tx indexer would repeatedly try to “unindex” a block with a missing body, causing a spike in CPU usage. This change skips these blocks and advances the index tail. The fix was verified both manually on a local development chain and with a new test. resolves #33371
This commit is contained in:
parent
127d1f42bb
commit
c890637af9
2 changed files with 53 additions and 9 deletions
|
|
@ -87,6 +87,7 @@ func InitDatabaseFromFreezer(db ethdb.Database) {
|
||||||
type blockTxHashes struct {
|
type blockTxHashes struct {
|
||||||
number uint64
|
number uint64
|
||||||
hashes []common.Hash
|
hashes []common.Hash
|
||||||
|
err error
|
||||||
}
|
}
|
||||||
|
|
||||||
// iterateTransactions iterates over all transactions in the (canon) block
|
// iterateTransactions iterates over all transactions in the (canon) block
|
||||||
|
|
@ -144,17 +145,22 @@ func iterateTransactions(db ethdb.Database, from uint64, to uint64, reverse bool
|
||||||
}()
|
}()
|
||||||
for data := range rlpCh {
|
for data := range rlpCh {
|
||||||
var body types.Body
|
var body types.Body
|
||||||
|
var result *blockTxHashes
|
||||||
if err := rlp.DecodeBytes(data.rlp, &body); err != nil {
|
if err := rlp.DecodeBytes(data.rlp, &body); err != nil {
|
||||||
log.Warn("Failed to decode block body", "block", data.number, "error", err)
|
log.Warn("Failed to decode block body", "block", data.number, "error", err)
|
||||||
return
|
result = &blockTxHashes{
|
||||||
}
|
number: data.number,
|
||||||
var hashes []common.Hash
|
err: err,
|
||||||
for _, tx := range body.Transactions {
|
}
|
||||||
hashes = append(hashes, tx.Hash())
|
} else {
|
||||||
}
|
var hashes []common.Hash
|
||||||
result := &blockTxHashes{
|
for _, tx := range body.Transactions {
|
||||||
hashes: hashes,
|
hashes = append(hashes, tx.Hash())
|
||||||
number: data.number,
|
}
|
||||||
|
result = &blockTxHashes{
|
||||||
|
hashes: hashes,
|
||||||
|
number: data.number,
|
||||||
|
}
|
||||||
}
|
}
|
||||||
// Feed the block to the aggregator, or abort on interrupt
|
// Feed the block to the aggregator, or abort on interrupt
|
||||||
select {
|
select {
|
||||||
|
|
@ -214,6 +220,10 @@ func indexTransactions(db ethdb.Database, from uint64, to uint64, interrupt chan
|
||||||
// Next block available, pop it off and index it
|
// Next block available, pop it off and index it
|
||||||
delivery := queue.PopItem()
|
delivery := queue.PopItem()
|
||||||
lastNum = delivery.number
|
lastNum = delivery.number
|
||||||
|
if delivery.err != nil {
|
||||||
|
log.Warn("Skipping tx indexing for block with missing/corrupt body", "block", delivery.number, "error", delivery.err)
|
||||||
|
continue
|
||||||
|
}
|
||||||
WriteTxLookupEntries(batch, delivery.number, delivery.hashes)
|
WriteTxLookupEntries(batch, delivery.number, delivery.hashes)
|
||||||
blocks++
|
blocks++
|
||||||
txs += len(delivery.hashes)
|
txs += len(delivery.hashes)
|
||||||
|
|
@ -307,6 +317,10 @@ func unindexTransactions(db ethdb.Database, from uint64, to uint64, interrupt ch
|
||||||
}
|
}
|
||||||
delivery := queue.PopItem()
|
delivery := queue.PopItem()
|
||||||
nextNum = delivery.number + 1
|
nextNum = delivery.number + 1
|
||||||
|
if delivery.err != nil {
|
||||||
|
log.Warn("Skipping tx unindexing for block with missing/corrupt body", "block", delivery.number, "error", delivery.err)
|
||||||
|
continue
|
||||||
|
}
|
||||||
DeleteTxLookupEntries(batch, delivery.hashes)
|
DeleteTxLookupEntries(batch, delivery.hashes)
|
||||||
txs += len(delivery.hashes)
|
txs += len(delivery.hashes)
|
||||||
blocks++
|
blocks++
|
||||||
|
|
|
||||||
|
|
@ -218,6 +218,36 @@ func TestIndexTransactions(t *testing.T) {
|
||||||
verify(0, 8, false, 8)
|
verify(0, 8, false, 8)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestUnindexTransactionsMissingBody(t *testing.T) {
|
||||||
|
// Construct test chain db
|
||||||
|
chainDB := NewMemoryDatabase()
|
||||||
|
blocks, _ := initDatabaseWithTransactions(chainDB)
|
||||||
|
|
||||||
|
// Index the entire chain.
|
||||||
|
lastBlock := blocks[len(blocks)-1].NumberU64()
|
||||||
|
IndexTransactions(chainDB, 0, lastBlock+1, nil, false)
|
||||||
|
|
||||||
|
// Prove that block 2 body exists in the database.
|
||||||
|
if raw := ReadCanonicalBodyRLP(chainDB, 2, nil); len(raw) == 0 {
|
||||||
|
t.Fatalf("Block 2 body does not exist in the database.")
|
||||||
|
}
|
||||||
|
|
||||||
|
// Delete body for block 2. This simulates a corrupted database.
|
||||||
|
key := blockBodyKey(2, blocks[2].Hash())
|
||||||
|
if err := chainDB.Delete(key); err != nil {
|
||||||
|
t.Fatalf("Failed to delete block body %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Unindex blocks [0, 3)
|
||||||
|
UnindexTransactions(chainDB, 0, 3, nil, false)
|
||||||
|
|
||||||
|
// Verify that tx index tail is updated to 3.
|
||||||
|
tail := ReadTxIndexTail(chainDB)
|
||||||
|
if tail == nil || *tail != 3 {
|
||||||
|
t.Fatalf("The tx index tail is wrong: got %v want %d", *tail, 3)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
func TestPruneTransactionIndex(t *testing.T) {
|
func TestPruneTransactionIndex(t *testing.T) {
|
||||||
chainDB := NewMemoryDatabase()
|
chainDB := NewMemoryDatabase()
|
||||||
blocks, _ := initDatabaseWithTransactions(chainDB)
|
blocks, _ := initDatabaseWithTransactions(chainDB)
|
||||||
|
|
|
||||||
Loading…
Reference in a new issue