feat: add interrupts

This commit is contained in:
jeevan-sid 2026-02-18 16:07:13 +05:30
parent e8a656cc95
commit f13740a146

View file

@ -364,12 +364,17 @@ func ImportEraIndex(db ethdb.Database, dir string, network string, from func(f e
log.Info("Resuming era indexing", "lastEpoch", *tail, "nextEpoch", startEpoch) log.Info("Resuming era indexing", "lastEpoch", *tail, "nextEpoch", startEpoch)
} }
interrupt := make(chan os.Signal, 1)
signal.Notify(interrupt, os.Interrupt, syscall.SIGTERM)
defer signal.Stop(interrupt)
var ( var (
start = time.Now() start = time.Now()
reported = time.Now() reported = time.Now()
batch = db.NewBatch() batch = db.NewBatch()
totalBlocks uint64 totalBlocks uint64
totalTxs uint64 totalTxs uint64
interrupted = false
) )
// Index each era file. // Index each era file.
@ -378,6 +383,18 @@ func ImportEraIndex(db ethdb.Database, dir string, network string, from func(f e
continue continue
} }
select {
case <-interrupt:
log.Warn("Interrupted, flushing and shutting down gracefully...")
interrupted = true
break
default:
}
if interrupted {
break
}
err := func() error { err := func() error {
path := filepath.Join(dir, entry) path := filepath.Join(dir, entry)
f, err := os.Open(path) f, err := os.Open(path)
@ -401,6 +418,14 @@ func ImportEraIndex(db ethdb.Database, dir string, network string, from func(f e
// Iterate over all blocks in this epoch. // Iterate over all blocks in this epoch.
for it.Next() { for it.Next() {
select {
case <-interrupt:
log.Warn("Interrupted during epoch processing, flushing current progress...")
interrupted = true
return nil
default:
}
if it.Error() != nil { if it.Error() != nil {
return fmt.Errorf("error iterating era file: %w", it.Error()) return fmt.Errorf("error iterating era file: %w", it.Error())
} }
@ -441,15 +466,22 @@ func ImportEraIndex(db ethdb.Database, dir string, network string, from func(f e
batch.Reset() batch.Reset()
} }
// Mark this epoch as fully indexed. // Only mark epoch as complete if we weren't interrupted mid-epoch
rawdb.WriteEraIndexTail(db, uint64(epoch)) if !interrupted {
// Mark this epoch as fully indexed.
rawdb.WriteEraIndexTail(batch, uint64(epoch))
if err := batch.Write(); err != nil {
return fmt.Errorf("error writing tail marker: %w", err)
}
batch.Reset()
totalTxs += epochTxs totalTxs += epochTxs
if time.Since(reported) >= 8*time.Second { if time.Since(reported) >= 8*time.Second {
log.Info("Indexing era files", "epoch", epoch, "blocks", epochBlocks, "txs", epochTxs, log.Info("Indexing era files", "epoch", epoch, "blocks", epochBlocks, "txs", epochTxs,
"totalBlocks", totalBlocks, "totalTxs", totalTxs, "elapsed", common.PrettyDuration(time.Since(start))) "totalBlocks", totalBlocks, "totalTxs", totalTxs, "elapsed", common.PrettyDuration(time.Since(start)))
reported = time.Now() reported = time.Now()
}
} }
return nil return nil
@ -457,12 +489,19 @@ func ImportEraIndex(db ethdb.Database, dir string, network string, from func(f e
if err != nil { if err != nil {
return err return err
} }
if interrupted {
break
}
} }
log.Info("Era indexing complete", "totalBlocks", totalBlocks, "totalTxs", totalTxs, "elapsed", common.PrettyDuration(time.Since(start))) if interrupted {
log.Info("Era indexing interrupted", "totalBlocks", totalBlocks, "totalTxs", totalTxs, "elapsed", common.PrettyDuration(time.Since(start)))
} else {
log.Info("Era indexing complete", "totalBlocks", totalBlocks, "totalTxs", totalTxs, "elapsed", common.PrettyDuration(time.Since(start)))
}
return nil return nil
} }
func missingBlocks(chain *core.BlockChain, blocks []*types.Block) []*types.Block { func missingBlocks(chain *core.BlockChain, blocks []*types.Block) []*types.Block {
head := chain.CurrentBlock() head := chain.CurrentBlock()
for i, block := range blocks { for i, block := range blocks {