diff --git a/cmd/geth/chaincmd.go b/cmd/geth/chaincmd.go index 2e3c772fbc..9dcd2caee3 100644 --- a/cmd/geth/chaincmd.go +++ b/cmd/geth/chaincmd.go @@ -172,18 +172,6 @@ from Era archives. Description: ` The export-history command will export blocks and their corresponding receipts into Era archives. Eras are typically packaged in steps of 8192 blocks. -`, - } - importEraIndexCommand = &cli.Command{ - Action: importEraIndex, - Name: "import-era-index", - Usage: "Import transaction index from era archive files", - ArgsUsage: "", - Flags: slices.Concat(utils.DatabaseFlags, utils.NetworkFlags, []cli.Flag{utils.EraFormatFlag}), - Description: ` -The import-era-index command indexes transactions from era files to enable -transaction lookups by hash for pruned block ranges. Era files must be present in the specified directory. -The command is idempotent and can be re-run to index newly added era files. `, } importPreimagesCommand = &cli.Command{ @@ -550,7 +538,7 @@ func importHistory(ctx *cli.Context) error { default: return fmt.Errorf("unknown --era.format %q (expected 'era1' or 'erae')", format) } - if err := utils.ImportHistory(chain, dir, network, from); err != nil { + if err := utils.ImportHistory(chain, db, dir, network, from); err != nil { return err } @@ -608,79 +596,6 @@ func exportHistory(ctx *cli.Context) error { return nil } -func importEraIndex(ctx *cli.Context) error { - if ctx.Args().Len() != 1 { - utils.Fatalf("usage: %s", ctx.Command.ArgsUsage) - } - - stack, _ := makeConfigNode(ctx) - defer stack.Close() - - db := utils.MakeChainDatabase(ctx, stack, false) - defer db.Close() - - var ( - start = time.Now() - dir = ctx.Args().Get(0) - network string - ) - - // Determine network. - if utils.IsNetworkPreset(ctx) { - switch { - case ctx.Bool(utils.MainnetFlag.Name): - network = "mainnet" - case ctx.Bool(utils.SepoliaFlag.Name): - network = "sepolia" - case ctx.Bool(utils.HoleskyFlag.Name): - network = "holesky" - case ctx.Bool(utils.HoodiFlag.Name): - network = "hoodi" - } - } else { - // No network flag set, try to determine network based on files - // present in directory. - var networks []string - for _, n := range params.NetworkNames { - entries, err := era.ReadDir(dir, n) - if err != nil { - return fmt.Errorf("error reading %s: %w", dir, err) - } - if len(entries) > 0 { - networks = append(networks, n) - } - } - if len(networks) == 0 { - return fmt.Errorf("no era files found in %s", dir) - } - if len(networks) > 1 { - return errors.New("multiple networks found, use a network flag to specify network") - } - network = networks[0] - } - - // Determine era format. - var ( - format = ctx.String(utils.EraFormatFlag.Name) - from func(era.ReadAtSeekCloser) (era.Era, error) - ) - switch format { - case "era1", "era": - from = onedb.From - case "erae": - from = execdb.From - default: - return fmt.Errorf("unknown --era.format %q (expected 'era1' or 'erae')", format) - } - - if err := utils.ImportEraIndex(db, dir, network, from); err != nil { - return err - } - - fmt.Printf("Era indexing done in %v\n", time.Since(start)) - return nil -} - // importPreimages imports preimage data from the specified file. // it is deprecated, and the export function has been removed, but // the import function is kept around for the time being so that diff --git a/cmd/geth/main.go b/cmd/geth/main.go index af85aedaf6..e196ac8688 100644 --- a/cmd/geth/main.go +++ b/cmd/geth/main.go @@ -236,7 +236,6 @@ func init() { exportCommand, importHistoryCommand, exportHistoryCommand, - importEraIndexCommand, importPreimagesCommand, removedbCommand, dumpCommand, diff --git a/cmd/utils/cmd.go b/cmd/utils/cmd.go index dd41efcb9a..a6d4b64ef5 100644 --- a/cmd/utils/cmd.go +++ b/cmd/utils/cmd.go @@ -252,7 +252,7 @@ func readList(filename string) ([]string, error) { // ImportHistory imports Era1 files containing historical block information, // starting from genesis. The assumption is held that the provided chain // segment in Era1 file should all be canonical and verified. -func ImportHistory(chain *core.BlockChain, dir string, network string, from func(f era.ReadAtSeekCloser) (era.Era, error)) error { +func ImportHistory(chain *core.BlockChain, db ethdb.Database, dir string, network string, from func(f era.ReadAtSeekCloser) (era.Era, error)) error { if chain.CurrentSnapBlock().Number.BitLen() != 0 { return errors.New("history import only supported when starting from genesis") } @@ -275,6 +275,7 @@ func ImportHistory(chain *core.BlockChain, dir string, network string, from func imported = 0 h = sha256.New() buf = bytes.NewBuffer(nil) + idxBatch = db.NewBatch() ) for i, file := range entries { @@ -297,6 +298,7 @@ func ImportHistory(chain *core.BlockChain, dir string, network string, from func if got != checksums[i] { return fmt.Errorf("%s checksum mismatch: have %s want %s", file, got, checksums[i]) } + // Import all block data from Era1. e, err := from(f) if err != nil { @@ -321,6 +323,24 @@ func ImportHistory(chain *core.BlockChain, dir string, network string, from func return fmt.Errorf("error inserting blocks %d-%d: %w", blocks[0].NumberU64(), blocks[len(blocks)-1].NumberU64(), err) } + + // Index tx lookups for the same batch we just inserted. + for _, block := range blocks { + txHashes := make([]common.Hash, len(block.Transactions())) + for j, tx := range block.Transactions() { + txHashes[j] = tx.Hash() + } + if len(txHashes) > 0 { + rawdb.WriteEraTxLookupEntries(idxBatch, block.NumberU64(), txHashes) + } + } + if idxBatch.ValueSize() >= ethdb.IdealBatchSize { + if err := idxBatch.Write(); err != nil { + return fmt.Errorf("error writing tx index batch: %w", err) + } + idxBatch.Reset() + } + imported += len(blocks) if time.Since(reported) >= 8*time.Second { head := blocks[len(blocks)-1].NumberU64() @@ -334,6 +354,7 @@ func ImportHistory(chain *core.BlockChain, dir string, network string, from func return nil } ) + for it.Next() { block, err := it.Block() if err != nil { @@ -357,172 +378,25 @@ func ImportHistory(chain *core.BlockChain, dir string, network string, from func if err := it.Error(); err != nil { return err } - return flush() - }() - if err != nil { - return err - } - } - return nil -} - -// ImportEraIndex indexes transactions from era files into the database to enable -// transaction lookups by hash for pruned block ranges. -func ImportEraIndex(db ethdb.Database, dir string, network string, from func(f era.ReadAtSeekCloser) (era.Era, error)) error { - entries, err := era.ReadDir(dir, network) - if err != nil { - return fmt.Errorf("error reading era directory: %w", err) - } - if len(entries) == 0 { - return fmt.Errorf("no era files found for network %s in %s", network, dir) - } - - // Get the last indexed epoch to support resume. - tail := rawdb.ReadEraIndexTail(db) - startEpoch := uint64(0) - if tail != nil { - startEpoch = *tail + 1 - log.Info("Resuming era indexing", "lastEpoch", *tail, "nextEpoch", startEpoch) - } - - interrupt := make(chan os.Signal, 1) - signal.Notify(interrupt, os.Interrupt, syscall.SIGTERM) - defer signal.Stop(interrupt) - - var ( - start = time.Now() - reported = time.Now() - batch = db.NewBatch() - totalBlocks uint64 - totalTxs uint64 - interrupted = false - ) - - // Index each era file. - for epoch, entry := range entries { - if uint64(epoch) < startEpoch { - continue - } - - select { - case <-interrupt: - log.Warn("Interrupted, flushing and shutting down gracefully...") - interrupted = true - break - default: - } - - if interrupted { - break - } - - err := func() error { - path := filepath.Join(dir, entry) - f, err := os.Open(path) - if err != nil { - return fmt.Errorf("error opening era file %s: %w", path, err) - } - defer f.Close() - - e, err := from(f) - if err != nil { - return fmt.Errorf("error opening era: %w", err) + if err := flush(); err != nil { + return err } - it, err := e.Iterator() - if err != nil { - return fmt.Errorf("error creating iterator: %w", err) + // Flush any remaining index writes and mark this epoch fully indexed. + rawdb.WriteEraIndexTail(idxBatch, uint64(i)) + if err := idxBatch.Write(); err != nil { + return fmt.Errorf("error writing era index tail for epoch %d: %w", i, err) } - - epochBlocks := uint64(0) - epochTxs := uint64(0) - - // Iterate over all blocks in this epoch. - for it.Next() { - select { - case <-interrupt: - log.Warn("Interrupted during epoch processing, flushing current progress...") - interrupted = true - return nil - default: - } - - if it.Error() != nil { - return fmt.Errorf("error iterating era file: %w", it.Error()) - } - - block, err := it.Block() - if err != nil { - return fmt.Errorf("error reading block: %w", err) - } - - // Index all transactions in this block. - txHashes := make([]common.Hash, len(block.Transactions())) - for i, tx := range block.Transactions() { - txHashes[i] = tx.Hash() - } - - if len(txHashes) > 0 { - rawdb.WriteEraTxLookupEntries(batch, block.NumberU64(), txHashes) - epochTxs += uint64(len(txHashes)) - } - - epochBlocks++ - totalBlocks++ - - // Write batch if it's getting large. - if batch.ValueSize() >= ethdb.IdealBatchSize { - if err := batch.Write(); err != nil { - return fmt.Errorf("error writing index batch: %w", err) - } - batch.Reset() - } - } - - // Flush remaining batch for this epoch. - if batch.ValueSize() > 0 { - if err := batch.Write(); err != nil { - return fmt.Errorf("error writing index batch: %w", err) - } - batch.Reset() - } - - // Only mark epoch as complete if we weren't interrupted mid-epoch - if !interrupted { - // Mark this epoch as fully indexed. - rawdb.WriteEraIndexTail(batch, uint64(epoch)) - if err := batch.Write(); err != nil { - return fmt.Errorf("error writing tail marker: %w", err) - } - batch.Reset() - - totalTxs += epochTxs - - if time.Since(reported) >= 8*time.Second { - log.Info("Indexing era files", "epoch", epoch, "blocks", epochBlocks, "txs", epochTxs, - "totalBlocks", totalBlocks, "totalTxs", totalTxs, "elapsed", common.PrettyDuration(time.Since(start))) - reported = time.Now() - } - } - + idxBatch.Reset() return nil }() if err != nil { return err } - - if interrupted { - break - } - } - - if interrupted { - log.Info("Era indexing interrupted", "totalBlocks", totalBlocks, "totalTxs", totalTxs, "elapsed", common.PrettyDuration(time.Since(start))) - } else { - log.Info("Era indexing complete", "totalBlocks", totalBlocks, "totalTxs", totalTxs, "elapsed", common.PrettyDuration(time.Since(start))) } return nil } + func missingBlocks(chain *core.BlockChain, blocks []*types.Block) []*types.Block { head := chain.CurrentBlock() for i, block := range blocks { diff --git a/cmd/utils/history_test.go b/cmd/utils/history_test.go index 6631946129..4b94117587 100644 --- a/cmd/utils/history_test.go +++ b/cmd/utils/history_test.go @@ -182,7 +182,7 @@ func TestHistoryImportAndExport(t *testing.T) { if err != nil { t.Fatalf("unable to initialize chain: %v", err) } - if err := ImportHistory(imported, dir, "mainnet", tt.from); err != nil { + if err := ImportHistory(imported, db2, dir, "mainnet", tt.from); err != nil { t.Fatalf("failed to import chain: %v", err) } if have, want := imported.CurrentHeader(), chain.CurrentHeader(); have.Hash() != want.Hash() {