cmd/utils, core/types: optimize erae history import with batched insertion and raw RLP receipt conversion

Rework ImportHistory to batch up to 2500 consecutive blocks per
InsertReceiptChain call, reducing processing overhead.

Replace full receipt deserialization with raw RLP-level conversion via
GetRawReceiptsByNumber, avoiding allocation of Receipt/Log/Bloom structs
and eliminating the decode-reencode round-trip for both era1 and erae
formats.

Centralize consensus receipt conversion into types.ConvertConsensusReceiptsToStorage
(shared by eradb runtime path and import command), and add a new slim
receipt converter for the erae format that strips the tx-type field and
validates tx/receipt count consistency.

Fix resource management by explicitly closing file handles and era objects
at each error return instead of relying on defer-in-closure patterns.
This commit is contained in:
Andrew Davis 2026-02-26 11:43:47 +10:00
parent 406a852ec8
commit d9939eb995
No known key found for this signature in database
GPG key ID: 30AB5B89A109D044
5 changed files with 463 additions and 105 deletions

View file

@ -249,9 +249,91 @@ func readList(filename string) ([]string, error) {
return strings.Split(string(b), "\n"), nil
}
// ImportHistory imports Era1 files containing historical block information,
type eraReceiptFormat uint8
const (
eraReceiptFormatConsensus eraReceiptFormat = iota // era1: full receipts
eraReceiptFormatSlim // erae: slim receipts
)
func receiptFormat(file string) (eraReceiptFormat, error) {
switch filepath.Ext(file) {
case ".era1":
return eraReceiptFormatConsensus, nil
case ".erae":
return eraReceiptFormatSlim, nil
default:
return 0, fmt.Errorf("unsupported era file: %s", file)
}
}
// convertSlimReceiptsToStorage converts slim receipt encoding
// [tx-type, post-state-or-status, gas-used, logs] into storage encoding
// [post-state-or-status, gas-used, logs].
func convertSlimReceiptsToStorage(input []byte, expectedTxs int) (rlp.RawValue, error) {
var (
out bytes.Buffer
enc = rlp.NewEncoderBuffer(&out)
)
blockListIter, err := rlp.NewListIterator(input)
if err != nil {
return nil, fmt.Errorf("invalid block receipts list: %w", err)
}
outerList := enc.List()
receipts := 0
for ; blockListIter.Next(); receipts++ {
dataIter, err := rlp.NewListIterator(blockListIter.Value())
if err != nil {
return nil, fmt.Errorf("slim receipt %d has invalid data: %w", receipts, err)
}
innerList := enc.List()
fields := 0
for dataIter.Next() {
switch fields {
case 0:
// Skip tx type.
case 1, 2, 3:
enc.Write(dataIter.Value())
default:
return nil, fmt.Errorf("slim receipt %d has too many fields", receipts)
}
fields++
}
enc.ListEnd(innerList)
if dataIter.Err() != nil {
return nil, fmt.Errorf("slim receipt %d iterator error: %w", receipts, dataIter.Err())
}
if fields != 4 {
return nil, fmt.Errorf("slim receipt %d has %d fields, want 4", receipts, fields)
}
}
enc.ListEnd(outerList)
if blockListIter.Err() != nil {
return nil, fmt.Errorf("block receipt list iterator error: %w", blockListIter.Err())
}
if expectedTxs >= 0 && receipts != expectedTxs {
return nil, fmt.Errorf("tx/receipt count mismatch: %d txs, %d receipts", expectedTxs, receipts)
}
if err := enc.Flush(); err != nil {
return nil, err
}
return out.Bytes(), nil
}
func convertReceiptsToStorage(input []byte, format eraReceiptFormat, expectedTxs int) (rlp.RawValue, error) {
switch format {
case eraReceiptFormatConsensus:
return types.ConvertConsensusReceiptsToStorage(input)
case eraReceiptFormatSlim:
return convertSlimReceiptsToStorage(input, expectedTxs)
default:
return nil, fmt.Errorf("unsupported receipt format: %d", format)
}
}
// ImportHistory imports Era files containing historical block information,
// starting from genesis. The assumption is held that the provided chain
// segment in Era1 file should all be canonical and verified.
// segment in era file should all be canonical and verified.
func ImportHistory(chain *core.BlockChain, dir string, network string, from func(f era.ReadAtSeekCloser) (era.Era, error)) error {
if chain.CurrentSnapBlock().Number.BitLen() != 0 {
return errors.New("history import only supported when starting from genesis")
@ -273,72 +355,107 @@ func ImportHistory(chain *core.BlockChain, dir string, network string, from func
start = time.Now()
reported = time.Now()
imported = 0
h = sha256.New()
scratch = bytes.NewBuffer(nil)
)
for i, file := range entries {
err := func() error {
path := filepath.Join(dir, file)
format, err := receiptFormat(file)
if err != nil {
return err
}
path := filepath.Join(dir, file)
// validate against checksum file in directory
f, err := os.Open(path)
if err != nil {
return fmt.Errorf("open %s: %w", path, err)
}
defer f.Close()
if _, err := io.Copy(h, f); err != nil {
return fmt.Errorf("checksum %s: %w", path, err)
}
got := common.BytesToHash(h.Sum(scratch.Bytes()[:])).Hex()
want := checksums[i]
h.Reset()
scratch.Reset()
// Validate against checksum file in directory.
f, err := os.Open(path)
if err != nil {
return fmt.Errorf("open %s: %w", path, err)
}
h := sha256.New()
if _, err := io.Copy(h, f); err != nil {
f.Close()
return fmt.Errorf("checksum %s: %w", path, err)
}
got := common.BytesToHash(h.Sum(nil)).Hex()
if got != checksums[i] {
f.Close()
return fmt.Errorf("%s checksum mismatch: have %s want %s", file, got, checksums[i])
}
if _, err := f.Seek(0, io.SeekStart); err != nil {
f.Close()
return fmt.Errorf("seek %s: %w", path, err)
}
if got != want {
return fmt.Errorf("%s checksum mismatch: have %s want %s", file, got, want)
}
// Import all block data from Era1.
e, err := from(f)
if err != nil {
return fmt.Errorf("error opening era: %w", err)
}
it, err := e.Iterator()
if err != nil {
return fmt.Errorf("error creating iterator: %w", err)
}
e, err := from(f)
if err != nil {
f.Close() // onedb.From does not close on metadata errors
return fmt.Errorf("error opening era: %w", err)
}
it, err := e.Iterator()
if err != nil {
e.Close()
return fmt.Errorf("error creating iterator: %w", err)
}
for it.Next() {
block, err := it.Block()
if err != nil {
return fmt.Errorf("error reading block %d: %w", it.Number(), err)
var (
blocks = make([]*types.Block, 0, importBatchSize)
receipts = make([]rlp.RawValue, 0, importBatchSize)
flush = func() error {
if len(blocks) == 0 {
return nil
}
if block.Number().BitLen() == 0 {
continue // skip genesis
if _, err := chain.InsertReceiptChain(blocks, receipts, math.MaxUint64); err != nil {
return fmt.Errorf("error inserting blocks %d-%d: %w",
blocks[0].NumberU64(), blocks[len(blocks)-1].NumberU64(), err)
}
receipts, err := it.Receipts()
if err != nil {
return fmt.Errorf("error reading receipts %d: %w", it.Number(), err)
}
enc := types.EncodeBlockReceiptLists([]types.Receipts{receipts})
if _, err := chain.InsertReceiptChain([]*types.Block{block}, enc, math.MaxUint64); err != nil {
return fmt.Errorf("error inserting body %d: %w", it.Number(), err)
}
imported++
imported += len(blocks)
if time.Since(reported) >= 8*time.Second {
log.Info("Importing Era files", "head", it.Number(), "imported", imported,
head := blocks[len(blocks)-1].NumberU64()
log.Info("Importing Era files", "head", head, "imported", imported,
"elapsed", common.PrettyDuration(time.Since(start)))
imported = 0
reported = time.Now()
}
blocks = blocks[:0]
receipts = receipts[:0]
return nil
}
if err := it.Error(); err != nil {
return err
)
for it.Next() {
block, err := it.Block()
if err != nil {
e.Close()
return fmt.Errorf("error reading block %d: %w", it.Number(), err)
}
return nil
}()
if err != nil {
if block.Number().BitLen() == 0 {
continue // skip genesis
}
raw, err := e.GetRawReceiptsByNumber(block.NumberU64())
if err != nil {
e.Close()
return fmt.Errorf("error reading receipts %d: %w", it.Number(), err)
}
enc, err := convertReceiptsToStorage(raw, format, len(block.Transactions()))
if err != nil {
e.Close()
return fmt.Errorf("error converting receipts %d: %w", it.Number(), err)
}
blocks = append(blocks, block)
receipts = append(receipts, enc)
if len(blocks) == importBatchSize {
if err := flush(); err != nil {
e.Close()
return err
}
}
}
if err := it.Error(); err != nil {
e.Close()
return err
}
if err := flush(); err != nil {
e.Close()
return err
}
if err := e.Close(); err != nil {
return err
}
}

View file

@ -0,0 +1,131 @@
// Copyright 2026 The go-ethereum Authors
// This file is part of go-ethereum.
//
// go-ethereum is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// go-ethereum is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
//
// You should have received a copy of the GNU General Public License
// along with go-ethereum. If not, see <http://www.gnu.org/licenses/>.
package utils
import (
"bytes"
"strings"
"testing"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/rlp"
)
func makeTestReceipt(typ uint8) *types.Receipt {
r := &types.Receipt{
Type: typ,
Status: types.ReceiptStatusSuccessful,
CumulativeGasUsed: 42_000,
Logs: []*types.Log{
{
Address: common.HexToAddress("0x1"),
Topics: []common.Hash{common.HexToHash("0x2"), common.HexToHash("0x3")},
Data: []byte{0xde, 0xad, 0xbe, 0xef},
},
},
}
r.Bloom = types.CreateBloom(r)
return r
}
func TestImportHistory_ConvertSlimReceiptsToStorage(t *testing.T) {
tests := []struct {
name string
receipts types.Receipts
}{
{
name: "typed-single",
receipts: types.Receipts{makeTestReceipt(types.DynamicFeeTxType)},
},
{
name: "legacy-single",
receipts: types.Receipts{makeTestReceipt(types.LegacyTxType)},
},
{
name: "mixed-multiple",
receipts: types.Receipts{
makeTestReceipt(types.LegacyTxType),
makeTestReceipt(types.DynamicFeeTxType),
},
},
{
name: "empty",
receipts: types.Receipts{},
},
}
for _, tc := range tests {
t.Run(tc.name, func(t *testing.T) {
rawSlimReceipts := make([]*types.SlimReceipt, len(tc.receipts))
for i, receipt := range tc.receipts {
rawSlimReceipts[i] = (*types.SlimReceipt)(receipt)
}
rawSlim, err := rlp.EncodeToBytes(rawSlimReceipts)
if err != nil {
t.Fatalf("failed to encode slim receipts: %v", err)
}
got, err := convertReceiptsToStorage(rawSlim, eraReceiptFormatSlim, len(tc.receipts))
if err != nil {
t.Fatalf("conversion failed: %v", err)
}
want := types.EncodeBlockReceiptLists([]types.Receipts{tc.receipts})[0]
if !bytes.Equal(got, want) {
t.Fatalf("converted storage receipts mismatch\ngot: %x\nwant: %x", got, want)
}
})
}
}
func TestImportHistory_ConvertSlimReceiptsToStorageErrors(t *testing.T) {
tests := []struct {
name string
raw []byte
want string
}{
{
name: "invalid-rlp",
raw: []byte{0xff},
want: "invalid block receipts list",
},
{
name: "too-few-fields",
raw: []byte{0xc4, 0xc3, 0x01, 0x02, 0x03}, // [[1,2,3]]
want: "want 4",
},
{
name: "too-many-fields",
raw: []byte{0xc6, 0xc5, 0x01, 0x02, 0x03, 0x04, 0x05}, // [[1,2,3,4,5]]
want: "too many fields",
},
{
name: "tx-receipt-count-mismatch",
raw: []byte{0xc5, 0xc4, 0x01, 0x01, 0x02, 0xc0}, // [[1,1,2,[]]]
want: "tx/receipt count mismatch",
},
}
for _, tc := range tests {
t.Run(tc.name, func(t *testing.T) {
_, err := convertReceiptsToStorage(tc.raw, eraReceiptFormatSlim, 2)
if err == nil {
t.Fatalf("expected error")
}
if !strings.Contains(err.Error(), tc.want) {
t.Fatalf("unexpected error: %v", err)
}
})
}
}

View file

@ -18,7 +18,6 @@
package eradb
import (
"bytes"
"errors"
"fmt"
"io/fs"
@ -26,10 +25,10 @@ import (
"sync"
"github.com/ethereum/go-ethereum/common/lru"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/internal/era"
"github.com/ethereum/go-ethereum/internal/era/onedb"
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/rlp"
)
const openFileLimit = 64
@ -138,56 +137,7 @@ func (db *Store) GetRawReceipts(number uint64) ([]byte, error) {
// convertReceipts transforms an encoded block receipts list from the format
// used by era1 into the 'storage' format used by the go-ethereum ancients database.
func convertReceipts(input []byte) ([]byte, error) {
var (
out bytes.Buffer
enc = rlp.NewEncoderBuffer(&out)
)
blockListIter, err := rlp.NewListIterator(input)
if err != nil {
return nil, fmt.Errorf("invalid block receipts list: %v", err)
}
outerList := enc.List()
for i := 0; blockListIter.Next(); i++ {
kind, content, _, err := rlp.Split(blockListIter.Value())
if err != nil {
return nil, fmt.Errorf("receipt %d invalid: %v", i, err)
}
var receiptData []byte
switch kind {
case rlp.Byte:
return nil, fmt.Errorf("receipt %d is single byte", i)
case rlp.String:
// Typed receipt - skip type.
receiptData = content[1:]
case rlp.List:
// Legacy receipt
receiptData = blockListIter.Value()
}
// Convert data list.
// Input is [status, gas-used, bloom, logs]
// Output is [status, gas-used, logs], i.e. we need to skip the bloom.
dataIter, err := rlp.NewListIterator(receiptData)
if err != nil {
return nil, fmt.Errorf("receipt %d has invalid data: %v", i, err)
}
innerList := enc.List()
for field := 0; dataIter.Next(); field++ {
if field == 2 {
continue // skip bloom
}
enc.Write(dataIter.Value())
}
enc.ListEnd(innerList)
if dataIter.Err() != nil {
return nil, fmt.Errorf("receipt %d iterator error: %v", i, dataIter.Err())
}
}
enc.ListEnd(outerList)
if blockListIter.Err() != nil {
return nil, fmt.Errorf("block receipt list iterator error: %v", blockListIter.Err())
}
enc.Flush()
return out.Bytes(), nil
return types.ConvertConsensusReceiptsToStorage(input)
}
// getEraByEpoch opens an era file or gets it from the cache.

View file

@ -425,6 +425,70 @@ func EncodeBlockReceiptLists(receipts []Receipts) []rlp.RawValue {
return result
}
// ConvertConsensusReceiptsToStorage converts canonical receipt encoding
// [post-state-or-status, gas-used, bloom, logs] into storage encoding
// [post-state-or-status, gas-used, logs].
func ConvertConsensusReceiptsToStorage(input []byte) (rlp.RawValue, error) {
var (
out bytes.Buffer
enc = rlp.NewEncoderBuffer(&out)
)
blockListIter, err := rlp.NewListIterator(input)
if err != nil {
return nil, fmt.Errorf("invalid block receipts list: %w", err)
}
outerList := enc.List()
for i := 0; blockListIter.Next(); i++ {
kind, content, _, err := rlp.Split(blockListIter.Value())
if err != nil {
return nil, fmt.Errorf("receipt %d invalid: %w", i, err)
}
var receiptData []byte
switch kind {
case rlp.Byte:
return nil, fmt.Errorf("receipt %d is single byte", i)
case rlp.String:
if len(content) == 0 {
return nil, fmt.Errorf("typed receipt %d has empty payload", i)
}
receiptData = content[1:] // strip tx type
case rlp.List:
receiptData = blockListIter.Value()
default:
return nil, fmt.Errorf("receipt %d has invalid RLP kind", i)
}
dataIter, err := rlp.NewListIterator(receiptData)
if err != nil {
return nil, fmt.Errorf("receipt %d has invalid data: %w", i, err)
}
innerList := enc.List()
fields := 0
for dataIter.Next() {
if fields == 2 {
fields++
continue // skip bloom
}
enc.Write(dataIter.Value())
fields++
}
enc.ListEnd(innerList)
if dataIter.Err() != nil {
return nil, fmt.Errorf("receipt %d iterator error: %w", i, dataIter.Err())
}
if fields != 4 {
return nil, fmt.Errorf("receipt %d has %d fields, want 4", i, fields)
}
}
enc.ListEnd(outerList)
if blockListIter.Err() != nil {
return nil, fmt.Errorf("block receipt list iterator error: %w", blockListIter.Err())
}
if err := enc.Flush(); err != nil {
return nil, err
}
return out.Bytes(), nil
}
// SlimReceipt is a wrapper around a Receipt with RLP serialization that omits
// the Bloom field and includes the tx type. Used for era files.
type SlimReceipt Receipt

View file

@ -22,6 +22,7 @@ import (
"math"
"math/big"
"reflect"
"strings"
"sync"
"testing"
@ -512,6 +513,101 @@ func TestReceiptUnmarshalBinary(t *testing.T) {
}
}
func makeStorageConversionTestReceipt(typ uint8) *Receipt {
r := &Receipt{
Type: typ,
Status: ReceiptStatusSuccessful,
CumulativeGasUsed: 42_000,
Logs: []*Log{
{
Address: common.HexToAddress("0x1"),
Topics: []common.Hash{common.HexToHash("0x2"), common.HexToHash("0x3")},
Data: []byte{0xde, 0xad, 0xbe, 0xef},
},
},
}
r.Bloom = CreateBloom(r)
return r
}
func TestConvertConsensusReceiptsToStorage(t *testing.T) {
tests := []struct {
name string
receipts Receipts
}{
{
name: "typed-single",
receipts: Receipts{makeStorageConversionTestReceipt(DynamicFeeTxType)},
},
{
name: "legacy-single",
receipts: Receipts{makeStorageConversionTestReceipt(LegacyTxType)},
},
{
name: "mixed-multiple",
receipts: Receipts{
makeStorageConversionTestReceipt(LegacyTxType),
makeStorageConversionTestReceipt(DynamicFeeTxType),
},
},
{
name: "empty",
receipts: Receipts{},
},
}
for _, tc := range tests {
t.Run(tc.name, func(t *testing.T) {
raw, err := rlp.EncodeToBytes(tc.receipts)
if err != nil {
t.Fatalf("failed to encode consensus receipts: %v", err)
}
got, err := ConvertConsensusReceiptsToStorage(raw)
if err != nil {
t.Fatalf("conversion failed: %v", err)
}
want := EncodeBlockReceiptLists([]Receipts{tc.receipts})[0]
if !bytes.Equal(got, want) {
t.Fatalf("converted storage receipts mismatch\ngot: %x\nwant: %x", got, want)
}
})
}
}
func TestConvertConsensusReceiptsToStorageErrors(t *testing.T) {
tests := []struct {
name string
raw []byte
want string
}{
{
name: "invalid-rlp",
raw: []byte{0xff},
want: "invalid block receipts list",
},
{
name: "single-byte-receipt",
raw: []byte{0xc1, 0x01}, // list with one single-byte item
want: "single byte",
},
{
name: "typed-empty-payload",
raw: []byte{0xc1, 0x80}, // list with one empty-string item
want: "empty payload",
},
}
for _, tc := range tests {
t.Run(tc.name, func(t *testing.T) {
_, err := ConvertConsensusReceiptsToStorage(tc.raw)
if err == nil {
t.Fatalf("expected error")
}
if !strings.Contains(err.Error(), tc.want) {
t.Fatalf("unexpected error: %v", err)
}
})
}
}
func TestSlimReceiptEncodingDecoding(t *testing.T) {
tests := []*Receipt{
legacyReceipt,