feat: enable txn queries on pruned node

This commit is contained in:
0xjvn 2026-04-13 14:39:57 +05:30
parent 26afa16777
commit 41d6664b0c
4 changed files with 32 additions and 244 deletions

View file

@ -172,18 +172,6 @@ from Era archives.
Description: `
The export-history command will export blocks and their corresponding receipts
into Era archives. Eras are typically packaged in steps of 8192 blocks.
`,
}
importEraIndexCommand = &cli.Command{
Action: importEraIndex,
Name: "import-era-index",
Usage: "Import transaction index from era archive files",
ArgsUsage: "<era-dir>",
Flags: slices.Concat(utils.DatabaseFlags, utils.NetworkFlags, []cli.Flag{utils.EraFormatFlag}),
Description: `
The import-era-index command indexes transactions from era files to enable
transaction lookups by hash for pruned block ranges. Era files must be present in the specified directory.
The command is idempotent and can be re-run to index newly added era files.
`,
}
importPreimagesCommand = &cli.Command{
@ -550,7 +538,7 @@ func importHistory(ctx *cli.Context) error {
default:
return fmt.Errorf("unknown --era.format %q (expected 'era1' or 'erae')", format)
}
if err := utils.ImportHistory(chain, dir, network, from); err != nil {
if err := utils.ImportHistory(chain, db, dir, network, from); err != nil {
return err
}
@ -608,79 +596,6 @@ func exportHistory(ctx *cli.Context) error {
return nil
}
func importEraIndex(ctx *cli.Context) error {
if ctx.Args().Len() != 1 {
utils.Fatalf("usage: %s", ctx.Command.ArgsUsage)
}
stack, _ := makeConfigNode(ctx)
defer stack.Close()
db := utils.MakeChainDatabase(ctx, stack, false)
defer db.Close()
var (
start = time.Now()
dir = ctx.Args().Get(0)
network string
)
// Determine network.
if utils.IsNetworkPreset(ctx) {
switch {
case ctx.Bool(utils.MainnetFlag.Name):
network = "mainnet"
case ctx.Bool(utils.SepoliaFlag.Name):
network = "sepolia"
case ctx.Bool(utils.HoleskyFlag.Name):
network = "holesky"
case ctx.Bool(utils.HoodiFlag.Name):
network = "hoodi"
}
} else {
// No network flag set, try to determine network based on files
// present in directory.
var networks []string
for _, n := range params.NetworkNames {
entries, err := era.ReadDir(dir, n)
if err != nil {
return fmt.Errorf("error reading %s: %w", dir, err)
}
if len(entries) > 0 {
networks = append(networks, n)
}
}
if len(networks) == 0 {
return fmt.Errorf("no era files found in %s", dir)
}
if len(networks) > 1 {
return errors.New("multiple networks found, use a network flag to specify network")
}
network = networks[0]
}
// Determine era format.
var (
format = ctx.String(utils.EraFormatFlag.Name)
from func(era.ReadAtSeekCloser) (era.Era, error)
)
switch format {
case "era1", "era":
from = onedb.From
case "erae":
from = execdb.From
default:
return fmt.Errorf("unknown --era.format %q (expected 'era1' or 'erae')", format)
}
if err := utils.ImportEraIndex(db, dir, network, from); err != nil {
return err
}
fmt.Printf("Era indexing done in %v\n", time.Since(start))
return nil
}
// importPreimages imports preimage data from the specified file.
// it is deprecated, and the export function has been removed, but
// the import function is kept around for the time being so that

View file

@ -236,7 +236,6 @@ func init() {
exportCommand,
importHistoryCommand,
exportHistoryCommand,
importEraIndexCommand,
importPreimagesCommand,
removedbCommand,
dumpCommand,

View file

@ -252,7 +252,7 @@ func readList(filename string) ([]string, error) {
// ImportHistory imports Era1 files containing historical block information,
// starting from genesis. The assumption is held that the provided chain
// segment in Era1 file should all be canonical and verified.
func ImportHistory(chain *core.BlockChain, dir string, network string, from func(f era.ReadAtSeekCloser) (era.Era, error)) error {
func ImportHistory(chain *core.BlockChain, db ethdb.Database, dir string, network string, from func(f era.ReadAtSeekCloser) (era.Era, error)) error {
if chain.CurrentSnapBlock().Number.BitLen() != 0 {
return errors.New("history import only supported when starting from genesis")
}
@ -275,6 +275,7 @@ func ImportHistory(chain *core.BlockChain, dir string, network string, from func
imported = 0
h = sha256.New()
buf = bytes.NewBuffer(nil)
idxBatch = db.NewBatch()
)
for i, file := range entries {
@ -297,6 +298,7 @@ func ImportHistory(chain *core.BlockChain, dir string, network string, from func
if got != checksums[i] {
return fmt.Errorf("%s checksum mismatch: have %s want %s", file, got, checksums[i])
}
// Import all block data from Era1.
e, err := from(f)
if err != nil {
@ -321,6 +323,24 @@ func ImportHistory(chain *core.BlockChain, dir string, network string, from func
return fmt.Errorf("error inserting blocks %d-%d: %w",
blocks[0].NumberU64(), blocks[len(blocks)-1].NumberU64(), err)
}
// Index tx lookups for the same batch we just inserted.
for _, block := range blocks {
txHashes := make([]common.Hash, len(block.Transactions()))
for j, tx := range block.Transactions() {
txHashes[j] = tx.Hash()
}
if len(txHashes) > 0 {
rawdb.WriteEraTxLookupEntries(idxBatch, block.NumberU64(), txHashes)
}
}
if idxBatch.ValueSize() >= ethdb.IdealBatchSize {
if err := idxBatch.Write(); err != nil {
return fmt.Errorf("error writing tx index batch: %w", err)
}
idxBatch.Reset()
}
imported += len(blocks)
if time.Since(reported) >= 8*time.Second {
head := blocks[len(blocks)-1].NumberU64()
@ -334,6 +354,7 @@ func ImportHistory(chain *core.BlockChain, dir string, network string, from func
return nil
}
)
for it.Next() {
block, err := it.Block()
if err != nil {
@ -357,172 +378,25 @@ func ImportHistory(chain *core.BlockChain, dir string, network string, from func
if err := it.Error(); err != nil {
return err
}
return flush()
}()
if err != nil {
return err
}
}
return nil
}
// ImportEraIndex indexes transactions from era files into the database to enable
// transaction lookups by hash for pruned block ranges.
func ImportEraIndex(db ethdb.Database, dir string, network string, from func(f era.ReadAtSeekCloser) (era.Era, error)) error {
entries, err := era.ReadDir(dir, network)
if err != nil {
return fmt.Errorf("error reading era directory: %w", err)
}
if len(entries) == 0 {
return fmt.Errorf("no era files found for network %s in %s", network, dir)
}
// Get the last indexed epoch to support resume.
tail := rawdb.ReadEraIndexTail(db)
startEpoch := uint64(0)
if tail != nil {
startEpoch = *tail + 1
log.Info("Resuming era indexing", "lastEpoch", *tail, "nextEpoch", startEpoch)
}
interrupt := make(chan os.Signal, 1)
signal.Notify(interrupt, os.Interrupt, syscall.SIGTERM)
defer signal.Stop(interrupt)
var (
start = time.Now()
reported = time.Now()
batch = db.NewBatch()
totalBlocks uint64
totalTxs uint64
interrupted = false
)
// Index each era file.
for epoch, entry := range entries {
if uint64(epoch) < startEpoch {
continue
}
select {
case <-interrupt:
log.Warn("Interrupted, flushing and shutting down gracefully...")
interrupted = true
break
default:
}
if interrupted {
break
}
err := func() error {
path := filepath.Join(dir, entry)
f, err := os.Open(path)
if err != nil {
return fmt.Errorf("error opening era file %s: %w", path, err)
}
defer f.Close()
e, err := from(f)
if err != nil {
return fmt.Errorf("error opening era: %w", err)
if err := flush(); err != nil {
return err
}
it, err := e.Iterator()
if err != nil {
return fmt.Errorf("error creating iterator: %w", err)
// Flush any remaining index writes and mark this epoch fully indexed.
rawdb.WriteEraIndexTail(idxBatch, uint64(i))
if err := idxBatch.Write(); err != nil {
return fmt.Errorf("error writing era index tail for epoch %d: %w", i, err)
}
epochBlocks := uint64(0)
epochTxs := uint64(0)
// Iterate over all blocks in this epoch.
for it.Next() {
select {
case <-interrupt:
log.Warn("Interrupted during epoch processing, flushing current progress...")
interrupted = true
return nil
default:
}
if it.Error() != nil {
return fmt.Errorf("error iterating era file: %w", it.Error())
}
block, err := it.Block()
if err != nil {
return fmt.Errorf("error reading block: %w", err)
}
// Index all transactions in this block.
txHashes := make([]common.Hash, len(block.Transactions()))
for i, tx := range block.Transactions() {
txHashes[i] = tx.Hash()
}
if len(txHashes) > 0 {
rawdb.WriteEraTxLookupEntries(batch, block.NumberU64(), txHashes)
epochTxs += uint64(len(txHashes))
}
epochBlocks++
totalBlocks++
// Write batch if it's getting large.
if batch.ValueSize() >= ethdb.IdealBatchSize {
if err := batch.Write(); err != nil {
return fmt.Errorf("error writing index batch: %w", err)
}
batch.Reset()
}
}
// Flush remaining batch for this epoch.
if batch.ValueSize() > 0 {
if err := batch.Write(); err != nil {
return fmt.Errorf("error writing index batch: %w", err)
}
batch.Reset()
}
// Only mark epoch as complete if we weren't interrupted mid-epoch
if !interrupted {
// Mark this epoch as fully indexed.
rawdb.WriteEraIndexTail(batch, uint64(epoch))
if err := batch.Write(); err != nil {
return fmt.Errorf("error writing tail marker: %w", err)
}
batch.Reset()
totalTxs += epochTxs
if time.Since(reported) >= 8*time.Second {
log.Info("Indexing era files", "epoch", epoch, "blocks", epochBlocks, "txs", epochTxs,
"totalBlocks", totalBlocks, "totalTxs", totalTxs, "elapsed", common.PrettyDuration(time.Since(start)))
reported = time.Now()
}
}
idxBatch.Reset()
return nil
}()
if err != nil {
return err
}
if interrupted {
break
}
}
if interrupted {
log.Info("Era indexing interrupted", "totalBlocks", totalBlocks, "totalTxs", totalTxs, "elapsed", common.PrettyDuration(time.Since(start)))
} else {
log.Info("Era indexing complete", "totalBlocks", totalBlocks, "totalTxs", totalTxs, "elapsed", common.PrettyDuration(time.Since(start)))
}
return nil
}
func missingBlocks(chain *core.BlockChain, blocks []*types.Block) []*types.Block {
head := chain.CurrentBlock()
for i, block := range blocks {

View file

@ -182,7 +182,7 @@ func TestHistoryImportAndExport(t *testing.T) {
if err != nil {
t.Fatalf("unable to initialize chain: %v", err)
}
if err := ImportHistory(imported, dir, "mainnet", tt.from); err != nil {
if err := ImportHistory(imported, db2, dir, "mainnet", tt.from); err != nil {
t.Fatalf("failed to import chain: %v", err)
}
if have, want := imported.CurrentHeader(), chain.CurrentHeader(); have.Hash() != want.Hash() {