diff --git a/beacon/engine/bapl_encode.go b/beacon/engine/bapl_encode.go
new file mode 100644
index 0000000000..b9f46ebf26
--- /dev/null
+++ b/beacon/engine/bapl_encode.go
@@ -0,0 +1,69 @@
+// Copyright 2026 The go-ethereum Authors
+// This file is part of the go-ethereum library.
+//
+// The go-ethereum library is free software: you can redistribute it and/or modify
+// it under the terms of the GNU Lesser General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+//
+// The go-ethereum library is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU Lesser General Public License for more details.
+//
+// You should have received a copy of the GNU Lesser General Public License
+// along with the go-ethereum library. If not, see .
+
+package engine
+
+import (
+ "github.com/fjl/jsonw"
+)
+
+// MarshalJSON implements json.Marshaler.
+func (list BlobAndProofListV1) MarshalJSON() ([]byte, error) {
+ var b jsonw.Buffer
+ b.Array(func() {
+ for _, item := range list {
+ marshalBlobAndProofV1(&b, item)
+ }
+ })
+ return b.Output(), nil
+}
+
+func marshalBlobAndProofV1(b *jsonw.Buffer, item *BlobAndProofV1) {
+ if item == nil {
+ b.Null()
+ } else {
+ b.Object(func() {
+ b.Key("blob")
+ b.HexBytes(item.Blob)
+ b.Key("proof")
+ b.HexBytes(item.Proof)
+ })
+ }
+}
+
+// MarshalJSON implements json.Marshaler.
+func (list BlobAndProofListV2) MarshalJSON() ([]byte, error) {
+ var b jsonw.Buffer
+ b.Array(func() {
+ for _, item := range list {
+ marshalBlobAndProofV2(&b, item)
+ }
+ })
+ return b.Output(), nil
+}
+
+func marshalBlobAndProofV2(b *jsonw.Buffer, item *BlobAndProofV2) {
+ if item == nil {
+ b.Null()
+ } else {
+ b.Object(func() {
+ b.Key("blob")
+ b.HexBytes(item.Blob)
+ b.Key("proofs")
+ appendHexBytesArray(b, item.CellProofs)
+ })
+ }
+}
diff --git a/beacon/engine/gen_ed.go b/beacon/engine/ed_codec.go
similarity index 100%
rename from beacon/engine/gen_ed.go
rename to beacon/engine/ed_codec.go
diff --git a/beacon/engine/gen_epe.go b/beacon/engine/epe_decode.go
similarity index 62%
rename from beacon/engine/gen_epe.go
rename to beacon/engine/epe_decode.go
index cf7bd9ee3f..a125daa030 100644
--- a/beacon/engine/gen_epe.go
+++ b/beacon/engine/epe_decode.go
@@ -12,31 +12,6 @@ import (
var _ = (*executionPayloadEnvelopeMarshaling)(nil)
-// MarshalJSON marshals as JSON.
-func (e ExecutionPayloadEnvelope) MarshalJSON() ([]byte, error) {
- type ExecutionPayloadEnvelope struct {
- ExecutionPayload *ExecutableData `json:"executionPayload" gencodec:"required"`
- BlockValue *hexutil.Big `json:"blockValue" gencodec:"required"`
- BlobsBundle *BlobsBundle `json:"blobsBundle"`
- Requests []hexutil.Bytes `json:"executionRequests"`
- Override bool `json:"shouldOverrideBuilder"`
- Witness *hexutil.Bytes `json:"witness,omitempty"`
- }
- var enc ExecutionPayloadEnvelope
- enc.ExecutionPayload = e.ExecutionPayload
- enc.BlockValue = (*hexutil.Big)(e.BlockValue)
- enc.BlobsBundle = e.BlobsBundle
- if e.Requests != nil {
- enc.Requests = make([]hexutil.Bytes, len(e.Requests))
- for k, v := range e.Requests {
- enc.Requests[k] = v
- }
- }
- enc.Override = e.Override
- enc.Witness = e.Witness
- return json.Marshal(&enc)
-}
-
// UnmarshalJSON unmarshals from JSON.
func (e *ExecutionPayloadEnvelope) UnmarshalJSON(input []byte) error {
type ExecutionPayloadEnvelope struct {
diff --git a/beacon/engine/epe_encode.go b/beacon/engine/epe_encode.go
new file mode 100644
index 0000000000..73deb8f0c6
--- /dev/null
+++ b/beacon/engine/epe_encode.go
@@ -0,0 +1,98 @@
+// Copyright 2026 The go-ethereum Authors
+// This file is part of the go-ethereum library.
+//
+// The go-ethereum library is free software: you can redistribute it and/or modify
+// it under the terms of the GNU Lesser General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+//
+// The go-ethereum library is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU Lesser General Public License for more details.
+//
+// You should have received a copy of the GNU Lesser General Public License
+// along with the go-ethereum library. If not, see .
+
+package engine
+
+import (
+ "encoding/json"
+ "errors"
+
+ "github.com/fjl/jsonw"
+)
+
+// marshalBlobsBundle writes BlobsBundle as JSON and appends it to buf.
+func marshalBlobsBundle(b *jsonw.Buffer, bundle *BlobsBundle) {
+ if bundle == nil {
+ b.Null()
+ return
+ }
+ b.Object(func() {
+ b.Key("commitments")
+ appendHexBytesArray(b, bundle.Commitments)
+ b.Key("proofs")
+ appendHexBytesArray(b, bundle.Proofs)
+ b.Key("blobs")
+ appendHexBytesArray(b, bundle.Blobs)
+ })
+}
+
+// MarshalJSON implements json.Marshaler.
+func (e ExecutionPayloadEnvelope) MarshalJSON() ([]byte, error) {
+ if e.ExecutionPayload == nil {
+ return nil, errors.New("missing required field 'executionPayload' for ExecutionPayloadEnvelope")
+ }
+
+ // Pre-marshal the execution payload using its gencodec MarshalJSON.
+ payload, err := e.ExecutionPayload.MarshalJSON()
+ if err != nil {
+ return nil, err
+ }
+ // Pre-marshal the witness.
+ var witness []byte
+ if e.Witness != nil {
+ witness, err = json.Marshal(e.Witness)
+ if err != nil {
+ return nil, err
+ }
+ }
+
+ // Write the execution payload to the buffer
+ var b jsonw.Buffer
+ b.Object(func() {
+ b.Key("executionPayload")
+ b.RawValue(payload)
+
+ b.Key("blockValue")
+ b.HexBigInt(e.BlockValue)
+
+ b.Key("blobsBundle")
+ marshalBlobsBundle(&b, e.BlobsBundle)
+
+ b.Key("executionRequests")
+ if e.Requests == nil {
+ b.Null()
+ } else {
+ appendHexBytesArray(&b, e.Requests)
+ }
+
+ b.Key("shouldOverrideBuilder")
+ b.Bool(e.Override)
+
+ if e.Witness != nil {
+ b.Key("witness")
+ b.RawValue(witness)
+ }
+ })
+ return b.Output(), nil
+}
+
+func appendHexBytesArray[T ~[]byte](b *jsonw.Buffer, slice []T) {
+ b.Array(func() {
+ for _, elem := range slice {
+ b.HexBytes(elem)
+ }
+ })
+}
diff --git a/beacon/engine/epe_test.go b/beacon/engine/epe_test.go
new file mode 100644
index 0000000000..e4ed5b6578
--- /dev/null
+++ b/beacon/engine/epe_test.go
@@ -0,0 +1,128 @@
+// Copyright 2026 The go-ethereum Authors
+// This file is part of the go-ethereum library.
+//
+// The go-ethereum library is free software: you can redistribute it and/or modify
+// it under the terms of the GNU Lesser General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+//
+// The go-ethereum library is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU Lesser General Public License for more details.
+//
+// You should have received a copy of the GNU Lesser General Public License
+// along with the go-ethereum library. If not, see .
+
+package engine
+
+import (
+ "bytes"
+ "encoding/json"
+ "math/big"
+ "reflect"
+ "testing"
+
+ "github.com/ethereum/go-ethereum/common"
+ "github.com/ethereum/go-ethereum/common/hexutil"
+)
+
+func makeTestPayload() *ExecutableData {
+ return &ExecutableData{
+ ParentHash: common.HexToHash("0x01"),
+ FeeRecipient: common.HexToAddress("0x02"),
+ StateRoot: common.HexToHash("0x03"),
+ ReceiptsRoot: common.HexToHash("0x04"),
+ LogsBloom: make([]byte, 256),
+ Random: common.HexToHash("0x05"),
+ Number: 100,
+ GasLimit: 1000000,
+ GasUsed: 500000,
+ Timestamp: 1234567890,
+ ExtraData: []byte("extra"),
+ BaseFeePerGas: big.NewInt(7),
+ BlockHash: common.HexToHash("0x08"),
+ Transactions: [][]byte{{0xaa, 0xbb}},
+ }
+}
+
+func TestMarshalJSONRoundtrip(t *testing.T) {
+ witness := hexutil.Bytes{0xde, 0xad}
+ original := ExecutionPayloadEnvelope{
+ ExecutionPayload: makeTestPayload(),
+ BlockValue: big.NewInt(12345),
+ BlobsBundle: &BlobsBundle{
+ Commitments: []hexutil.Bytes{{0x01, 0x02}},
+ Proofs: []hexutil.Bytes{{0x03, 0x04}},
+ Blobs: []hexutil.Bytes{{0x05, 0x06}},
+ },
+ Requests: [][]byte{{0xaa}, {0xbb, 0xcc}},
+ Override: true,
+ Witness: &witness,
+ }
+
+ data, err := original.MarshalJSON()
+ if err != nil {
+ t.Fatalf("MarshalJSON error: %v", err)
+ }
+
+ var decoded ExecutionPayloadEnvelope
+ if err := json.Unmarshal(data, &decoded); err != nil {
+ t.Fatalf("UnmarshalJSON error: %v", err)
+ }
+
+ if decoded.ExecutionPayload.Number != original.ExecutionPayload.Number {
+ t.Error("ExecutionPayload.Number mismatch")
+ }
+ if decoded.BlockValue.Cmp(original.BlockValue) != 0 {
+ t.Errorf("BlockValue mismatch: got %v, want %v", decoded.BlockValue, original.BlockValue)
+ }
+ if len(decoded.BlobsBundle.Blobs) != len(original.BlobsBundle.Blobs) {
+ t.Error("BlobsBundle.Blobs length mismatch")
+ }
+ if len(decoded.Requests) != len(original.Requests) {
+ t.Error("Requests length mismatch")
+ }
+ if decoded.Override != original.Override {
+ t.Error("Override mismatch")
+ }
+ if !bytes.Equal(*decoded.Witness, *original.Witness) {
+ t.Error("Witness mismatch")
+ }
+}
+
+func TestMarshalJSONNilPayload(t *testing.T) {
+ env := ExecutionPayloadEnvelope{
+ ExecutionPayload: nil,
+ BlockValue: big.NewInt(1),
+ }
+ _, err := env.MarshalJSON()
+ if err == nil {
+ t.Fatal("expected error for nil ExecutionPayload")
+ }
+}
+
+// TestExecutionPayloadEnvelopeFieldCoverage guards against structural drift.
+// If a field is added to or removed from ExecutionPayloadEnvelope, this test
+// fails, reminding the developer to update MarshalJSON in marshal_epe.go.
+func TestExecutionPayloadEnvelopeFieldCoverage(t *testing.T) {
+ expected := []string{
+ "ExecutionPayload",
+ "BlockValue",
+ "BlobsBundle",
+ "Requests",
+ "Override",
+ "Witness",
+ }
+ typ := reflect.TypeOf(ExecutionPayloadEnvelope{})
+ if typ.NumField() != len(expected) {
+ t.Fatalf("ExecutionPayloadEnvelope has %d fields, expected %d — update MarshalJSON in marshal_epe.go",
+ typ.NumField(), len(expected))
+ }
+ for i, name := range expected {
+ if typ.Field(i).Name != name {
+ t.Errorf("field %d: got %q, want %q — update MarshalJSON in marshal_epe.go",
+ i, typ.Field(i).Name, name)
+ }
+ }
+}
diff --git a/beacon/engine/gen_blockparams.go b/beacon/engine/pa_codec.go
similarity index 100%
rename from beacon/engine/gen_blockparams.go
rename to beacon/engine/pa_codec.go
diff --git a/beacon/engine/types.go b/beacon/engine/types.go
index 5c31ee4e98..60d564b877 100644
--- a/beacon/engine/types.go
+++ b/beacon/engine/types.go
@@ -60,7 +60,7 @@ var (
PayloadV4 PayloadVersion = 0x4
)
-//go:generate go run github.com/fjl/gencodec -type PayloadAttributes -field-override payloadAttributesMarshaling -out gen_blockparams.go
+//go:generate go run github.com/fjl/gencodec -type PayloadAttributes -field-override payloadAttributesMarshaling -out pa_codec.go
// PayloadAttributes describes the environment context in which a block should
// be built.
@@ -79,7 +79,7 @@ type payloadAttributesMarshaling struct {
SlotNumber *hexutil.Uint64
}
-//go:generate go run github.com/fjl/gencodec -type ExecutableData -field-override executableDataMarshaling -out gen_ed.go
+//go:generate go run github.com/fjl/gencodec -type ExecutableData -field-override executableDataMarshaling -out ed_codec.go
// ExecutableData is the data necessary to execute an EL payload.
type ExecutableData struct {
@@ -127,7 +127,7 @@ type StatelessPayloadStatusV1 struct {
ValidationError *string `json:"validationError"`
}
-//go:generate go run github.com/fjl/gencodec -type ExecutionPayloadEnvelope -field-override executionPayloadEnvelopeMarshaling -out gen_epe.go
+//go:generate go run github.com/fjl/gencodec -enc=false -type ExecutionPayloadEnvelope -field-override executionPayloadEnvelopeMarshaling -out epe_decode.go
type ExecutionPayloadEnvelope struct {
ExecutionPayload *ExecutableData `json:"executionPayload" gencodec:"required"`
@@ -138,6 +138,12 @@ type ExecutionPayloadEnvelope struct {
Witness *hexutil.Bytes `json:"witness,omitempty"`
}
+// JSON type overrides for ExecutionPayloadEnvelope.
+type executionPayloadEnvelopeMarshaling struct {
+ BlockValue *hexutil.Big
+ Requests []hexutil.Bytes
+}
+
// BlobsBundle includes the marshalled sidecar data. Note this structure is
// shared by BlobsBundleV1 and BlobsBundleV2 for the sake of simplicity.
//
@@ -154,16 +160,18 @@ type BlobAndProofV1 struct {
Proof hexutil.Bytes `json:"proof"`
}
+// BlobAndProofListV1 is a list of BlobAndProofV1 with a hand-rolled JSON marshaler
+// that avoids the overhead of encoding/json for large blob payloads.
+type BlobAndProofListV1 []*BlobAndProofV1
+
type BlobAndProofV2 struct {
Blob hexutil.Bytes `json:"blob"`
CellProofs []hexutil.Bytes `json:"proofs"` // proofs MUST contain exactly CELLS_PER_EXT_BLOB cell proofs.
}
-// JSON type overrides for ExecutionPayloadEnvelope.
-type executionPayloadEnvelopeMarshaling struct {
- BlockValue *hexutil.Big
- Requests []hexutil.Bytes
-}
+// BlobAndProofListV2 is a list of BlobAndProofV2 with a hand-rolled JSON marshaler
+// that avoids the overhead of encoding/json for large blob payloads.
+type BlobAndProofListV2 []*BlobAndProofV2
type PayloadStatusV1 struct {
Status string `json:"status"`
diff --git a/eth/catalyst/api.go b/eth/catalyst/api.go
index 1def169ae0..b31185a40f 100644
--- a/eth/catalyst/api.go
+++ b/eth/catalyst/api.go
@@ -552,7 +552,7 @@ func (api *ConsensusAPI) getPayload(payloadID engine.PayloadID, full bool, versi
//
// Client software MAY return an array of all null entries if syncing or otherwise
// unable to serve blob pool data.
-func (api *ConsensusAPI) GetBlobsV1(hashes []common.Hash) ([]*engine.BlobAndProofV1, error) {
+func (api *ConsensusAPI) GetBlobsV1(hashes []common.Hash) (engine.BlobAndProofListV1, error) {
// Reject the request if Osaka has been activated.
// follow https://github.com/ethereum/execution-apis/blob/main/src/engine/osaka.md#cancun-api
head := api.eth.BlockChain().CurrentHeader()
@@ -566,7 +566,7 @@ func (api *ConsensusAPI) GetBlobsV1(hashes []common.Hash) ([]*engine.BlobAndProo
if err != nil {
return nil, engine.InvalidParams.With(err)
}
- res := make([]*engine.BlobAndProofV1, len(hashes))
+ res := make(engine.BlobAndProofListV1, len(hashes))
for i := 0; i < len(blobs); i++ {
// Skip the non-existing blob
if blobs[i] == nil {
@@ -605,7 +605,7 @@ func (api *ConsensusAPI) GetBlobsV1(hashes []common.Hash) ([]*engine.BlobAndProo
//
// Client software MUST return null if syncing or otherwise unable to serve
// blob pool data.
-func (api *ConsensusAPI) GetBlobsV2(hashes []common.Hash) ([]*engine.BlobAndProofV2, error) {
+func (api *ConsensusAPI) GetBlobsV2(hashes []common.Hash) (engine.BlobAndProofListV2, error) {
head := api.eth.BlockChain().CurrentHeader()
if api.config().LatestFork(head.Time) < forks.Osaka {
return nil, nil
@@ -616,7 +616,7 @@ func (api *ConsensusAPI) GetBlobsV2(hashes []common.Hash) ([]*engine.BlobAndProo
// GetBlobsV3 returns a set of blobs from the transaction pool. Same as
// GetBlobsV2, except will return partial responses in case there is a missing
// blob.
-func (api *ConsensusAPI) GetBlobsV3(hashes []common.Hash) ([]*engine.BlobAndProofV2, error) {
+func (api *ConsensusAPI) GetBlobsV3(hashes []common.Hash) (engine.BlobAndProofListV2, error) {
head := api.eth.BlockChain().CurrentHeader()
if api.config().LatestFork(head.Time) < forks.Osaka {
return nil, nil
@@ -626,7 +626,7 @@ func (api *ConsensusAPI) GetBlobsV3(hashes []common.Hash) ([]*engine.BlobAndProo
// getBlobs returns all available blobs. In v2, partial responses are not allowed,
// while v3 supports partial responses.
-func (api *ConsensusAPI) getBlobs(hashes []common.Hash, v2 bool) ([]*engine.BlobAndProofV2, error) {
+func (api *ConsensusAPI) getBlobs(hashes []common.Hash, v2 bool) (engine.BlobAndProofListV2, error) {
if len(hashes) > 128 {
return nil, engine.TooLargeRequest.With(fmt.Errorf("requested blob count too large: %v", len(hashes)))
}
@@ -647,7 +647,7 @@ func (api *ConsensusAPI) getBlobs(hashes []common.Hash, v2 bool) ([]*engine.Blob
}
// Validate the blobs from the pool and assemble the response
filled := 0
- res := make([]*engine.BlobAndProofV2, len(hashes))
+ res := make(engine.BlobAndProofListV2, len(hashes))
for i := range blobs {
// The blob has been evicted since the last AvailableBlobs call.
// Return null if partial response is not allowed.
diff --git a/eth/catalyst/api_benchmark_test.go b/eth/catalyst/api_benchmark_test.go
new file mode 100644
index 0000000000..377e5caa43
--- /dev/null
+++ b/eth/catalyst/api_benchmark_test.go
@@ -0,0 +1,739 @@
+// Copyright 2026 The go-ethereum Authors
+// This file is part of the go-ethereum library.
+//
+// The go-ethereum library is free software: you can redistribute it and/or modify
+// it under the terms of the GNU Lesser General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+//
+// The go-ethereum library is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU Lesser General Public License for more details.
+//
+// You should have received a copy of the GNU Lesser General Public License
+// along with the go-ethereum library. If not, see .
+package catalyst
+
+import (
+ "context"
+ "crypto/ecdsa"
+ "crypto/sha256"
+ "encoding/json"
+ "fmt"
+ "io"
+ "math/big"
+ "strings"
+ "testing"
+ "time"
+
+ "github.com/ethereum/go-ethereum/beacon/engine"
+ "github.com/ethereum/go-ethereum/common"
+ "github.com/ethereum/go-ethereum/core"
+ "github.com/ethereum/go-ethereum/core/types"
+ "github.com/ethereum/go-ethereum/crypto"
+ "github.com/ethereum/go-ethereum/crypto/kzg4844"
+ "github.com/ethereum/go-ethereum/eth"
+ "github.com/ethereum/go-ethereum/node"
+ "github.com/ethereum/go-ethereum/params"
+ "github.com/ethereum/go-ethereum/rlp"
+ "github.com/ethereum/go-ethereum/rpc"
+ "github.com/holiman/uint256"
+)
+
+// encodingType specifies which encoding to use in benchmarks
+type encodingType int
+
+const (
+ encNone encodingType = iota
+ encJSON
+ encJSONCustom
+ encRLP
+)
+
+func (e encodingType) String() string {
+ switch e {
+ case encNone:
+ return "none"
+ case encJSON:
+ return "json"
+ case encJSONCustom:
+ return "json_custom"
+ case encRLP:
+ return "rlp"
+ default:
+ return "unknown"
+ }
+}
+
+var encodingTypes = []encodingType{encNone, encJSON, encJSONCustom, encRLP}
+
+// benchEncode encodes the value using the specified encoding type.
+// It fails the benchmark if encoding fails.
+func benchEncode(b *testing.B, enc encodingType, v any) {
+ var err error
+ switch enc {
+ case encJSON:
+ _, err = json.Marshal(v)
+ if err != nil {
+ b.Fatalf("JSON marshal failed: %v", err)
+ }
+ case encJSONCustom:
+ if m, ok := v.(json.Marshaler); ok {
+ _, err = m.MarshalJSON()
+ } else {
+ _, err = json.Marshal(v)
+ }
+ if err != nil {
+ b.Fatalf("JSON MarshalJSON failed: %v", err)
+ }
+ case encRLP:
+ _, err = rlp.EncodeToBytes(v)
+ if err != nil {
+ b.Fatalf("RLP encode failed: %v", err)
+ }
+ }
+}
+
+// benchmarkBlobCounts defines the blob counts for benchmarks
+var benchmarkBlobCounts = []int{21, 72}
+
+// maxBenchmarkBlobs is the maximum number of blobs we need for benchmarks
+var maxBenchmarkBlobs = benchmarkBlobCounts[len(benchmarkBlobCounts)-1]
+
+var (
+ // Pre-computed blobs for benchmarks
+ benchBlobs []*kzg4844.Blob
+ benchBlobCommits []kzg4844.Commitment
+ benchBlobProofs []kzg4844.Proof
+ benchBlobCellProofs [][]kzg4844.Proof
+ benchBlobVHashes []common.Hash
+)
+
+func init() {
+ // Pre-compute blobs for benchmarks
+ for i := 0; i < maxBenchmarkBlobs; i++ {
+ blob := &kzg4844.Blob{byte(i), byte(i >> 8)}
+ benchBlobs = append(benchBlobs, blob)
+
+ commit, _ := kzg4844.BlobToCommitment(blob)
+ benchBlobCommits = append(benchBlobCommits, commit)
+
+ proof, _ := kzg4844.ComputeBlobProof(blob, commit)
+ benchBlobProofs = append(benchBlobProofs, proof)
+
+ cellProofs, _ := kzg4844.ComputeCellProofs(blob)
+ benchBlobCellProofs = append(benchBlobCellProofs, cellProofs)
+
+ vhash := kzg4844.CalcBlobHashV1(sha256.New(), &commit)
+ benchBlobVHashes = append(benchBlobVHashes, vhash)
+ }
+}
+
+// benchFork specifies which fork to use in benchmark environments
+type benchFork int
+
+const (
+ forkCancun benchFork = iota
+ forkPrague
+ forkOsaka
+)
+
+// benchmarkBlobEnv holds the environment for blob benchmarks
+type benchmarkBlobEnv struct {
+ node *node.Node
+ eth *eth.Ethereum
+ api *ConsensusAPI
+ config *params.ChainConfig
+ keys []*ecdsa.PrivateKey
+ vhashes []common.Hash
+ version byte
+ blobCount int
+ nonces []uint64 // current nonce for each key
+}
+
+// makeBenchBlobTx creates a blob transaction with the specified number of blobs.
+// blobOffset indicates which pre-computed blobs to use.
+func makeBenchBlobTx(chainConfig *params.ChainConfig, nonce uint64, blobCount int, blobOffset int, key *ecdsa.PrivateKey, version byte) *types.Transaction {
+ var (
+ blobs []kzg4844.Blob
+ blobHashes []common.Hash
+ commitments []kzg4844.Commitment
+ proofs []kzg4844.Proof
+ )
+ for i := 0; i < blobCount; i++ {
+ idx := blobOffset + i
+ blobs = append(blobs, *benchBlobs[idx])
+ commitments = append(commitments, benchBlobCommits[idx])
+ if version == types.BlobSidecarVersion0 {
+ proofs = append(proofs, benchBlobProofs[idx])
+ } else {
+ proofs = append(proofs, benchBlobCellProofs[idx]...)
+ }
+ blobHashes = append(blobHashes, benchBlobVHashes[idx])
+ }
+ blobtx := &types.BlobTx{
+ ChainID: uint256.MustFromBig(chainConfig.ChainID),
+ Nonce: nonce,
+ GasTipCap: uint256.NewInt(params.GWei),
+ GasFeeCap: uint256.NewInt(10 * params.GWei),
+ Gas: 21000,
+ BlobFeeCap: uint256.NewInt(params.GWei),
+ BlobHashes: blobHashes,
+ Value: uint256.NewInt(100),
+ Sidecar: types.NewBlobTxSidecar(version, blobs, commitments, proofs),
+ }
+ return types.MustSignNewTx(key, types.LatestSigner(chainConfig), blobtx)
+}
+
+// newBenchmarkBlobEnv creates an environment for blob benchmarks.
+// It creates multiple keys and fills the pool with blob transactions totaling the specified blob count.
+// version: 0 = BlobSidecarVersion0 (pre-Osaka), 1 = BlobSidecarVersion1 (Osaka+)
+// fork: which fork to enable
+func newBenchmarkBlobEnv(b *testing.B, blobCount int, version byte, fork benchFork) *benchmarkBlobEnv {
+ // Create a configuration that allows enough blobs
+ config := *params.MergedTestChainConfig
+ // Set blob schedule to allow for large blob counts (up to 128 blobs per block)
+ config.BlobScheduleConfig = ¶ms.BlobScheduleConfig{
+ Cancun: ¶ms.BlobConfig{Target: 6, Max: 128, UpdateFraction: 3338477},
+ Prague: ¶ms.BlobConfig{Target: 6, Max: 128, UpdateFraction: 5007716},
+ Osaka: ¶ms.BlobConfig{Target: 6, Max: 128, UpdateFraction: 5007716},
+ }
+ // Configure fork times based on requested fork
+ switch fork {
+ case forkCancun:
+ config.PragueTime = nil
+ config.OsakaTime = nil
+ case forkPrague:
+ config.OsakaTime = nil
+ case forkOsaka:
+ // All forks enabled (default)
+ }
+
+ // Generate enough keys for all the blob transactions
+ // Each tx can have up to 6 blobs, so we need ceil(blobCount/6) keys
+ numTxs := (blobCount + 5) / 6
+ keys := make([]*ecdsa.PrivateKey, numTxs)
+ addrs := make([]common.Address, numTxs)
+ alloc := make(types.GenesisAlloc)
+ alloc[testAddr] = types.Account{Balance: testBalance}
+
+ for i := 0; i < numTxs; i++ {
+ key, _ := crypto.GenerateKey()
+ keys[i] = key
+ addrs[i] = crypto.PubkeyToAddress(key.PublicKey)
+ // Give each account enough balance for many transactions
+ alloc[addrs[i]] = types.Account{Balance: new(big.Int).Mul(big.NewInt(1e18), big.NewInt(10000))}
+ }
+
+ gspec := &core.Genesis{
+ Config: &config,
+ Alloc: alloc,
+ Difficulty: common.Big0,
+ }
+ n, ethServ := startEthService(b, gspec, nil)
+
+ // Collect versioned hashes for the blobs we'll use
+ var vhashes []common.Hash
+ for i := 0; i < blobCount; i++ {
+ vhashes = append(vhashes, benchBlobVHashes[i])
+ }
+
+ // Fill initial blob txs into the pool
+ env := &benchmarkBlobEnv{
+ node: n,
+ eth: ethServ,
+ api: newConsensusAPIWithoutHeartbeat(ethServ),
+ config: &config,
+ keys: keys,
+ vhashes: vhashes,
+ version: version,
+ blobCount: blobCount,
+ nonces: make([]uint64, numTxs),
+ }
+ env.addBlobTxs(b)
+ return env
+}
+
+// addBlobTxs adds blob transactions to the pool using the stored blobCount and per-key nonces.
+// It increments each key's nonce after adding transactions.
+func (env *benchmarkBlobEnv) addBlobTxs(b *testing.B) {
+ numTxs := (env.blobCount + 5) / 6
+ var txs []*types.Transaction
+ blobsRemaining := env.blobCount
+ blobOffset := 0
+
+ for i := 0; i < numTxs && blobsRemaining > 0; i++ {
+ // Each tx gets up to 6 blobs
+ txBlobCount := 6
+ if blobsRemaining < 6 {
+ txBlobCount = blobsRemaining
+ }
+ tx := makeBenchBlobTx(env.config, env.nonces[i], txBlobCount, blobOffset, env.keys[i], env.version)
+ txs = append(txs, tx)
+ blobOffset += txBlobCount
+ blobsRemaining -= txBlobCount
+ }
+ errs := env.eth.TxPool().Add(txs, true)
+ for i, err := range errs {
+ if err != nil {
+ b.Fatalf("Failed to add blob tx %d to pool: %v", i, err)
+ }
+ }
+ // Increment nonce for each key used
+ for i := 0; i < numTxs; i++ {
+ env.nonces[i]++
+ }
+}
+
+// Close closes the environment
+func (env *benchmarkBlobEnv) Close() {
+ env.node.Close()
+}
+
+// BenchmarkGetBlobsV1 benchmarks the GetBlobsV1 method with various blob counts.
+// GetBlobsV1 is available at Cancun/Prague (pre-Osaka).
+func BenchmarkGetBlobsV1(b *testing.B) {
+ for _, blobCount := range benchmarkBlobCounts {
+ for _, enc := range encodingTypes {
+ b.Run(fmt.Sprintf("blobs=%d/enc=%s", blobCount, enc), func(b *testing.B) {
+ env := newBenchmarkBlobEnv(b, blobCount, 0, forkPrague)
+ defer env.Close()
+
+ b.ResetTimer()
+ for b.Loop() {
+ result, err := env.api.GetBlobsV1(env.vhashes)
+ if err != nil {
+ b.Fatalf("GetBlobsV1 failed: %v", err)
+ }
+ // Verify we got the expected number of blobs
+ if len(result) != blobCount {
+ b.Fatalf("expected %d blobs, got %d", blobCount, len(result))
+ }
+ benchEncode(b, enc, result)
+ }
+ b.ReportMetric(float64(b.Elapsed().Milliseconds())/float64(b.N), "ms/op")
+ })
+ }
+ }
+}
+
+// BenchmarkGetBlobsV2Extended benchmarks the GetBlobsV2 method with various blob counts.
+// GetBlobsV2 is available at Osaka+.
+func BenchmarkGetBlobsV2Extended(b *testing.B) {
+ for _, blobCount := range benchmarkBlobCounts {
+ for _, enc := range encodingTypes {
+ b.Run(fmt.Sprintf("blobs=%d/enc=%s", blobCount, enc), func(b *testing.B) {
+ env := newBenchmarkBlobEnv(b, blobCount, 1, forkOsaka)
+ defer env.Close()
+
+ b.ResetTimer()
+ for b.Loop() {
+ result, err := env.api.GetBlobsV2(env.vhashes)
+ if err != nil {
+ b.Fatalf("GetBlobsV2 failed: %v", err)
+ }
+ // Verify we got the expected number of blobs
+ if len(result) != blobCount {
+ b.Fatalf("expected %d blobs, got %d", blobCount, len(result))
+ }
+ benchEncode(b, enc, result)
+ }
+ b.ReportMetric(float64(b.Elapsed().Milliseconds())/float64(b.N), "ms/op")
+ })
+ }
+ }
+}
+
+// BenchmarkGetBlobsV3 benchmarks the GetBlobsV3 method with various blob counts.
+// GetBlobsV3 is available at Osaka+.
+func BenchmarkGetBlobsV3(b *testing.B) {
+ for _, blobCount := range benchmarkBlobCounts {
+ for _, enc := range encodingTypes {
+ b.Run(fmt.Sprintf("blobs=%d/enc=%s", blobCount, enc), func(b *testing.B) {
+ env := newBenchmarkBlobEnv(b, blobCount, 1, forkOsaka)
+ defer env.Close()
+
+ b.ResetTimer()
+ for b.Loop() {
+ result, err := env.api.GetBlobsV3(env.vhashes)
+ if err != nil {
+ b.Fatalf("GetBlobsV3 failed: %v", err)
+ }
+ // Verify we got the expected number of blobs
+ if len(result) != blobCount {
+ b.Fatalf("expected %d blobs, got %d", blobCount, len(result))
+ }
+ benchEncode(b, enc, result)
+ }
+ b.ReportMetric(float64(b.Elapsed().Milliseconds())/float64(b.N), "ms/op")
+ })
+ }
+ }
+}
+
+// BenchmarkGetPayloadV5WithBlobs benchmarks GetPayloadV5 (Osaka fork) with blobs.
+// Note: Measures single iteration performance due to NewPayload complexity at Osaka.
+func BenchmarkGetPayloadV5WithBlobs(b *testing.B) {
+ for _, blobCount := range benchmarkBlobCounts {
+ for _, enc := range encodingTypes {
+ b.Run(fmt.Sprintf("blobs=%d/enc=%s", blobCount, enc), func(b *testing.B) {
+ env := newBenchmarkBlobEnv(b, blobCount, 1, forkOsaka)
+ defer env.Close()
+
+ parent := env.api.eth.BlockChain().CurrentHeader()
+ beaconRoot := common.Hash{0x42}
+
+ b.ResetTimer()
+ for i := 0; i < b.N; i++ {
+ // Note: We don't call addBlobTxs here because we can't advance the chain
+ // (NewPayloadV5 requires execution requests). The same transactions are
+ // reused for each iteration, which still benchmarks the GetPayload performance.
+ timestamp := parent.Time + 12
+ fcState := engine.ForkchoiceStateV1{
+ HeadBlockHash: parent.Hash(),
+ SafeBlockHash: parent.Hash(),
+ FinalizedBlockHash: parent.Hash(),
+ }
+ payloadAttr := &engine.PayloadAttributes{
+ Timestamp: timestamp,
+ Random: common.Hash{byte(i)},
+ SuggestedFeeRecipient: testAddr,
+ Withdrawals: []*types.Withdrawal{},
+ BeaconRoot: &beaconRoot,
+ }
+ resp, err := env.api.ForkchoiceUpdatedV3(context.Background(), fcState, payloadAttr)
+ if err != nil {
+ b.Fatalf("ForkchoiceUpdatedV3 failed: %v", err)
+ }
+ if resp.PayloadID == nil {
+ b.Fatalf("ForkchoiceUpdatedV3 returned nil PayloadID")
+ }
+ // Wait for the payload to be built with transactions
+ time.Sleep(100 * time.Millisecond)
+
+ envelope, err := env.api.GetPayloadV5(*resp.PayloadID)
+ if err != nil {
+ b.Fatalf("GetPayloadV5 failed: %v", err)
+ }
+ if envelope.BlobsBundle == nil {
+ b.Fatalf("BlobsBundle is nil")
+ }
+ // Verify we got the expected number of blobs
+ if len(envelope.BlobsBundle.Blobs) != blobCount {
+ b.Fatalf("expected %d blobs, got %d", blobCount, len(envelope.BlobsBundle.Blobs))
+ }
+ benchEncode(b, enc, envelope)
+ }
+ b.ReportMetric(float64(b.Elapsed().Milliseconds())/float64(b.N), "ms/op")
+ })
+ }
+ }
+}
+
+// BenchmarkNewPayloadV3WithBlobs benchmarks the NewPayloadV3 method with various blob counts.
+// Each iteration processes a payload with the full blob count.
+func BenchmarkNewPayloadV3WithBlobs(b *testing.B) {
+ for _, blobCount := range benchmarkBlobCounts {
+ for _, enc := range encodingTypes {
+ b.Run(fmt.Sprintf("blobs=%d/enc=%s", blobCount, enc), func(b *testing.B) {
+ env := newBenchmarkBlobEnv(b, blobCount, 0, forkCancun)
+ defer env.Close()
+
+ parent := env.api.eth.BlockChain().CurrentHeader()
+ beaconRoot := common.Hash{0x42}
+
+ // Build a payload first to get valid executable data
+ timestamp := parent.Time + 12
+ fcState := engine.ForkchoiceStateV1{
+ HeadBlockHash: parent.Hash(),
+ SafeBlockHash: parent.Hash(),
+ FinalizedBlockHash: parent.Hash(),
+ }
+ payloadAttr := &engine.PayloadAttributes{
+ Timestamp: timestamp,
+ Random: common.Hash{0x01},
+ SuggestedFeeRecipient: testAddr,
+ Withdrawals: []*types.Withdrawal{},
+ BeaconRoot: &beaconRoot,
+ }
+ resp, err := env.api.ForkchoiceUpdatedV3(context.Background(), fcState, payloadAttr)
+ if err != nil {
+ b.Fatalf("ForkchoiceUpdatedV3 failed: %v", err)
+ }
+ if resp.PayloadID == nil {
+ b.Fatalf("ForkchoiceUpdatedV3 returned nil PayloadID")
+ }
+ // Wait for the payload to be built with transactions
+ time.Sleep(100 * time.Millisecond)
+
+ // Get the payload
+ envelope, err := env.api.GetPayloadV3(*resp.PayloadID)
+ if err != nil {
+ b.Fatalf("GetPayloadV3 failed: %v", err)
+ }
+ // Verify we got the expected number of blobs
+ if len(envelope.BlobsBundle.Blobs) != blobCount {
+ b.Fatalf("expected %d blobs in setup, got %d", blobCount, len(envelope.BlobsBundle.Blobs))
+ }
+
+ execData := envelope.ExecutionPayload
+ // Collect versioned hashes from blobs bundle
+ vhashes := make([]common.Hash, len(envelope.BlobsBundle.Commitments))
+ for j, commitment := range envelope.BlobsBundle.Commitments {
+ var commit kzg4844.Commitment
+ copy(commit[:], commitment)
+ vhashes[j] = kzg4844.CalcBlobHashV1(sha256.New(), &commit)
+ }
+
+ b.ResetTimer()
+ for i := 0; i < b.N; i++ {
+ // NewPayload is idempotent, calling it multiple times with the same data
+ // should return the same result. The payload contains blobCount blobs.
+ result, err := env.api.NewPayloadV3(context.Background(), *execData, vhashes, &beaconRoot)
+ if err != nil {
+ b.Fatalf("NewPayloadV3 failed: %v", err)
+ }
+ benchEncode(b, enc, result)
+ }
+ b.ReportMetric(float64(b.Elapsed().Milliseconds())/float64(b.N), "ms/op")
+ })
+ }
+ }
+}
+
+// BenchmarkForkchoiceUpdatedWithBlobPayload benchmarks forkchoice updates that trigger
+// payload building with blob transactions.
+// Note: Measures ForkchoiceUpdated performance with blob transactions in the pool.
+func BenchmarkForkchoiceUpdatedWithBlobPayload(b *testing.B) {
+ for _, blobCount := range benchmarkBlobCounts {
+ for _, enc := range encodingTypes {
+ b.Run(fmt.Sprintf("blobs=%d/enc=%s", blobCount, enc), func(b *testing.B) {
+ env := newBenchmarkBlobEnv(b, blobCount, 0, forkCancun)
+ defer env.Close()
+
+ parent := env.api.eth.BlockChain().CurrentHeader()
+ beaconRoot := common.Hash{0x42}
+
+ b.ResetTimer()
+ for i := 0; i < b.N; i++ {
+ // Note: We don't call addBlobTxs here because the blob pool has
+ // a per-account limit of 16 transactions. The same transactions are
+ // reused for each iteration, which still benchmarks the ForkchoiceUpdated
+ // performance with blob transactions in the pool.
+ timestamp := parent.Time + 12
+ fcState := engine.ForkchoiceStateV1{
+ HeadBlockHash: parent.Hash(),
+ SafeBlockHash: parent.Hash(),
+ FinalizedBlockHash: parent.Hash(),
+ }
+ payloadAttr := &engine.PayloadAttributes{
+ Timestamp: timestamp,
+ Random: common.Hash{byte(i)},
+ SuggestedFeeRecipient: testAddr,
+ Withdrawals: []*types.Withdrawal{},
+ BeaconRoot: &beaconRoot,
+ }
+ resp, err := env.api.ForkchoiceUpdatedV3(context.Background(), fcState, payloadAttr)
+ if err != nil {
+ b.Fatalf("ForkchoiceUpdatedV3 failed: %v", err)
+ }
+ if resp.PayloadID == nil {
+ b.Fatalf("ForkchoiceUpdatedV3 returned nil PayloadID")
+ }
+ benchEncode(b, enc, resp)
+ }
+ b.ReportMetric(float64(b.Elapsed().Milliseconds())/float64(b.N), "ms/op")
+ })
+ }
+ }
+}
+
+// BenchmarkFullBlobWorkflowOsaka benchmarks the complete blob workflow at Osaka:
+// ForkchoiceUpdated -> GetPayload
+// Note: Measures single iteration performance due to NewPayload complexity at Osaka.
+func BenchmarkFullBlobWorkflowOsaka(b *testing.B) {
+ for _, blobCount := range benchmarkBlobCounts {
+ for _, enc := range encodingTypes {
+ b.Run(fmt.Sprintf("blobs=%d/enc=%s", blobCount, enc), func(b *testing.B) {
+ env := newBenchmarkBlobEnv(b, blobCount, 1, forkOsaka)
+ defer env.Close()
+
+ parent := env.api.eth.BlockChain().CurrentHeader()
+ beaconRoot := common.Hash{0x42}
+
+ b.ResetTimer()
+ for i := 0; i < b.N; i++ {
+ // Note: We don't call addBlobTxs here because we can't advance the chain
+ // (NewPayloadV5 requires execution requests). The same transactions are
+ // reused for each iteration, which still benchmarks the workflow performance.
+
+ // 1. ForkchoiceUpdated to build payload
+ timestamp := parent.Time + 12
+ fcState := engine.ForkchoiceStateV1{
+ HeadBlockHash: parent.Hash(),
+ SafeBlockHash: parent.Hash(),
+ FinalizedBlockHash: parent.Hash(),
+ }
+ payloadAttr := &engine.PayloadAttributes{
+ Timestamp: timestamp,
+ Random: common.Hash{byte(i)},
+ SuggestedFeeRecipient: testAddr,
+ Withdrawals: []*types.Withdrawal{},
+ BeaconRoot: &beaconRoot,
+ }
+ resp, err := env.api.ForkchoiceUpdatedV3(context.Background(), fcState, payloadAttr)
+ if err != nil {
+ b.Fatalf("ForkchoiceUpdatedV3 failed: %v", err)
+ }
+ if resp.PayloadID == nil {
+ b.Fatalf("ForkchoiceUpdatedV3 returned nil PayloadID")
+ }
+ // Encode ForkchoiceUpdated response
+ benchEncode(b, enc, resp)
+
+ // Wait for the payload to be built with transactions
+ time.Sleep(100 * time.Millisecond)
+
+ // 2. GetPayload
+ envelope, err := env.api.GetPayloadV5(*resp.PayloadID)
+ if err != nil {
+ b.Fatalf("GetPayloadV5 failed: %v", err)
+ }
+ if envelope.BlobsBundle == nil {
+ b.Fatalf("BlobsBundle is nil")
+ }
+ // Verify we got the expected number of blobs
+ if len(envelope.BlobsBundle.Blobs) != blobCount {
+ b.Fatalf("expected %d blobs, got %d", blobCount, len(envelope.BlobsBundle.Blobs))
+ }
+ // Encode GetPayload response
+ benchEncode(b, enc, envelope)
+ }
+ b.ReportMetric(float64(b.Elapsed().Milliseconds())/float64(b.N), "ms/op")
+ })
+ }
+ }
+}
+
+// discardConn is a net.Conn-like writer that discards all output.
+// Used to measure server-side RPC cost without client-side decoding.
+type discardConn struct {
+ io.Reader
+ io.Writer
+}
+
+func (discardConn) Close() error { return nil }
+func (discardConn) SetWriteDeadline(time.Time) error { return nil }
+
+// BenchmarkGetPayloadV5RPCServerOnly benchmarks only the EL server-side cost of
+// engine_getPayloadV5: method dispatch, JSON serialization, and wire encoding.
+// Client-side decoding is excluded by writing to io.Discard.
+func BenchmarkGetPayloadV5RPCServerOnly(b *testing.B) {
+ blobCount := 72
+ env := newBenchmarkBlobEnv(b, blobCount, 1, forkOsaka)
+ defer env.Close()
+
+ // Register the engine API on the running node's in-process RPC server.
+ rpcServer, err := env.node.RPCHandler()
+ if err != nil {
+ b.Fatalf("RPCHandler failed: %v", err)
+ }
+ rpcServer.RegisterName("engine", env.api)
+
+ parent := env.api.eth.BlockChain().CurrentHeader()
+ beaconRoot := common.Hash{0x42}
+
+ // Build one payload to get a valid payloadID.
+ fcState := engine.ForkchoiceStateV1{
+ HeadBlockHash: parent.Hash(),
+ SafeBlockHash: parent.Hash(),
+ FinalizedBlockHash: parent.Hash(),
+ }
+ payloadAttr := &engine.PayloadAttributes{
+ Timestamp: parent.Time + 12,
+ Random: common.Hash{0x01},
+ SuggestedFeeRecipient: testAddr,
+ Withdrawals: []*types.Withdrawal{},
+ BeaconRoot: &beaconRoot,
+ }
+ resp, err := env.api.ForkchoiceUpdatedV3(context.Background(), fcState, payloadAttr)
+ if err != nil {
+ b.Fatalf("ForkchoiceUpdatedV3 failed: %v", err)
+ }
+ if resp.PayloadID == nil {
+ b.Fatalf("ForkchoiceUpdatedV3 returned nil PayloadID")
+ }
+ time.Sleep(100 * time.Millisecond)
+
+ // Verify the payload has the expected blobs via the direct API first.
+ envelope, err := env.api.GetPayloadV5(*resp.PayloadID)
+ if err != nil {
+ b.Fatalf("GetPayloadV5 failed: %v", err)
+ }
+ if len(envelope.BlobsBundle.Blobs) != blobCount {
+ b.Fatalf("expected %d blobs, got %d", blobCount, len(envelope.BlobsBundle.Blobs))
+ }
+ b.Logf("payload size: %d blobs, %d txs", len(envelope.BlobsBundle.Blobs), len(envelope.ExecutionPayload.Transactions))
+
+ // Build the JSON-RPC request bytes once.
+ reqJSON := fmt.Sprintf(`{"jsonrpc":"2.0","id":1,"method":"engine_getPayloadV5","params":["%s"]}`, resp.PayloadID.String())
+
+ b.ResetTimer()
+ for i := 0; i < b.N; i++ {
+ conn := discardConn{
+ Reader: strings.NewReader(reqJSON),
+ Writer: io.Discard,
+ }
+ codec := rpc.NewCodec(conn)
+ rpcServer.ServeCodec(codec, 0)
+ }
+ b.StopTimer()
+ b.ReportMetric(float64(b.Elapsed().Milliseconds())/float64(b.N), "ms/op")
+}
+
+// BenchmarkGetBlobsV3RPCServerOnly benchmarks only the EL server-side cost of
+// engine_getBlobsV3: method dispatch, JSON serialization, and wire encoding.
+// Client-side decoding is excluded by writing to io.Discard.
+func BenchmarkGetBlobsV3RPCServerOnly(b *testing.B) {
+ blobCount := 72
+ env := newBenchmarkBlobEnv(b, blobCount, 1, forkOsaka)
+ defer env.Close()
+
+ // Register the engine API on the running node's in-process RPC server.
+ rpcServer, err := env.node.RPCHandler()
+ if err != nil {
+ b.Fatalf("RPCHandler failed: %v", err)
+ }
+ rpcServer.RegisterName("engine", env.api)
+
+ // Verify the blobs are available via the direct API first.
+ result, err := env.api.GetBlobsV3(env.vhashes)
+ if err != nil {
+ b.Fatalf("GetBlobsV3 failed: %v", err)
+ }
+ if len(result) != blobCount {
+ b.Fatalf("expected %d blobs, got %d", blobCount, len(result))
+ }
+ b.Logf("blob count: %d", blobCount)
+
+ // Build the JSON-RPC request bytes once.
+ // Format the versioned hashes as a JSON array of hex strings.
+ var hashStrs []string
+ for _, h := range env.vhashes {
+ hashStrs = append(hashStrs, fmt.Sprintf(`"%s"`, h.Hex()))
+ }
+ reqJSON := fmt.Sprintf(`{"jsonrpc":"2.0","id":1,"method":"engine_getBlobsV3","params":[[%s]]}`, strings.Join(hashStrs, ","))
+
+ b.ResetTimer()
+ for i := 0; i < b.N; i++ {
+ conn := discardConn{
+ Reader: strings.NewReader(reqJSON),
+ Writer: io.Discard,
+ }
+ codec := rpc.NewCodec(conn)
+ rpcServer.ServeCodec(codec, 0)
+ }
+ b.StopTimer()
+ b.ReportMetric(float64(b.Elapsed().Milliseconds())/float64(b.N), "ms/op")
+}
diff --git a/eth/catalyst/api_test.go b/eth/catalyst/api_test.go
index 1f38c4dd8a..65d78d84ee 100644
--- a/eth/catalyst/api_test.go
+++ b/eth/catalyst/api_test.go
@@ -1961,7 +1961,7 @@ func TestGetBlobsV1(t *testing.T) {
// Fill the request for retrieving blobs
var (
vhashes []common.Hash
- expect []*engine.BlobAndProofV1
+ expect engine.BlobAndProofListV1
)
// fill missing blob at the beginning
if suite.fillRandom {
@@ -2072,13 +2072,13 @@ func BenchmarkGetBlobsV2(b *testing.B) {
}
}
-type getBlobsFn func(hashes []common.Hash) ([]*engine.BlobAndProofV2, error)
+type getBlobsFn func(hashes []common.Hash) (engine.BlobAndProofListV2, error)
func runGetBlobs(t testing.TB, getBlobs getBlobsFn, start, limit int, fillRandom bool, expectPartialResponse bool, name string) {
// Fill the request for retrieving blobs
var (
vhashes []common.Hash
- expect []*engine.BlobAndProofV2
+ expect engine.BlobAndProofListV2
)
for j := start; j < limit; j++ {
vhashes = append(vhashes, testBlobVHashes[j])
diff --git a/go.mod b/go.mod
index 17897a62c0..56869d255d 100644
--- a/go.mod
+++ b/go.mod
@@ -26,6 +26,7 @@ require (
github.com/ethereum/hid v1.0.1-0.20260421154323-c2ab8d9bf68a
github.com/fatih/color v1.16.0
github.com/ferranbt/fastssz v0.1.4
+ github.com/fjl/jsonw v0.1.0
github.com/fsnotify/fsnotify v1.6.0
github.com/gballet/go-libpcsclite v0.0.0-20190607065134-2772fd86a8ff
github.com/gofrs/flock v0.12.1
@@ -121,7 +122,7 @@ require (
github.com/deepmap/oapi-codegen v1.6.0 // indirect
github.com/dlclark/regexp2 v1.7.0 // indirect
github.com/emicklei/dot v1.6.2 // indirect
- github.com/fjl/gencodec v0.1.0 // indirect
+ github.com/fjl/gencodec v0.1.2 // indirect
github.com/garslo/gogen v0.0.0-20170306192744-1d203ffc1f61 // indirect
github.com/getsentry/sentry-go v0.27.0 // indirect
github.com/go-ole/go-ole v1.3.0 // indirect
diff --git a/go.sum b/go.sum
index bad8a44cfd..6335fb5698 100644
--- a/go.sum
+++ b/go.sum
@@ -123,8 +123,10 @@ github.com/fatih/color v1.16.0 h1:zmkK9Ngbjj+K0yRhTVONQh1p/HknKYSlNT+vZCzyokM=
github.com/fatih/color v1.16.0/go.mod h1:fL2Sau1YI5c0pdGEVCbKQbLXB6edEj1ZgiY4NijnWvE=
github.com/ferranbt/fastssz v0.1.4 h1:OCDB+dYDEQDvAgtAGnTSidK1Pe2tW3nFV40XyMkTeDY=
github.com/ferranbt/fastssz v0.1.4/go.mod h1:Ea3+oeoRGGLGm5shYAeDgu6PGUlcvQhE2fILyD9+tGg=
-github.com/fjl/gencodec v0.1.0 h1:B3K0xPfc52cw52BBgUbSPxYo+HlLfAgWMVKRWXUXBcs=
-github.com/fjl/gencodec v0.1.0/go.mod h1:Um1dFHPONZGTHog1qD1NaWjXJW/SPB38wPv0O8uZ2fI=
+github.com/fjl/gencodec v0.1.2 h1:nf+MMsmuii5ZQMbS6/xjZoe5LRkN0415FOJOSwmnuW8=
+github.com/fjl/gencodec v0.1.2/go.mod h1:chDHL3wKXuBgauP8x3XNZkl5EIAR5SoCTmmmDTZRzmw=
+github.com/fjl/jsonw v0.1.0 h1:V3MyR79fjLpn/+bMgvegdGUIhoJOzjmqWcKDgcOmY1I=
+github.com/fjl/jsonw v0.1.0/go.mod h1:2KMLevM6FXEJnfhtk7naXu9vZdVfOma1GlnGdPRlumU=
github.com/fsnotify/fsnotify v1.4.7/go.mod h1:jwhsz4b93w/PPRr/qN1Yymfu8t87LnFCMoQvtojpjFo=
github.com/fsnotify/fsnotify v1.4.9/go.mod h1:znqG4EE+3YCdAaPaxE2ZRY/06pZUdp0tY4IgpuI1SZQ=
github.com/fsnotify/fsnotify v1.6.0 h1:n+5WquG0fcWoWp6xPWfHdbskMCQaFnG6PfBrh1Ky4HY=
diff --git a/rpc/client.go b/rpc/client.go
index 8d81503d59..9175626241 100644
--- a/rpc/client.go
+++ b/rpc/client.go
@@ -364,7 +364,7 @@ func (c *Client) CallContext(ctx context.Context, result interface{}, method str
resp := batchresp[0]
switch {
case resp.Error != nil:
- return resp.Error
+ return resp.decodeError()
case len(resp.Result) == 0:
return ErrNoResult
default:
@@ -419,7 +419,7 @@ func (c *Client) BatchCallContext(ctx context.Context, b []BatchElem) error {
if c.isHTTP {
err = c.sendBatchHTTP(ctx, op, msgs)
} else {
- err = c.send(ctx, op, msgs)
+ err = c.sendBatch(ctx, op, msgs)
}
if err != nil {
return err
@@ -449,7 +449,7 @@ func (c *Client) BatchCallContext(ctx context.Context, b []BatchElem) error {
elem := &b[index]
switch {
case resp.Error != nil:
- elem.Error = resp.Error
+ elem.Error = resp.decodeError()
case resp.Result == nil:
elem.Error = ErrNoResult
default:
@@ -552,7 +552,7 @@ func (c *Client) newMessage(method string, paramsIn ...interface{}) (*jsonrpcMes
// send registers op with the dispatch loop, then sends msg on the connection.
// if sending fails, op is deregistered.
-func (c *Client) send(ctx context.Context, op *requestOp, msg interface{}) error {
+func (c *Client) send(ctx context.Context, op *requestOp, msg *jsonrpcMessage) error {
select {
case c.reqInit <- op:
err := c.write(ctx, msg, false)
@@ -567,7 +567,22 @@ func (c *Client) send(ctx context.Context, op *requestOp, msg interface{}) error
}
}
-func (c *Client) write(ctx context.Context, msg interface{}, retry bool) error {
+// sendBatch registers op with the dispatch loop, then sends a batch of messages
+// on the connection. If sending fails, op is deregistered.
+func (c *Client) sendBatch(ctx context.Context, op *requestOp, msgs []*jsonrpcMessage) error {
+ select {
+ case c.reqInit <- op:
+ err := c.writeBatch(ctx, msgs, false)
+ c.reqSent <- err
+ return err
+ case <-ctx.Done():
+ return ctx.Err()
+ case <-c.closing:
+ return ErrClientQuit
+ }
+}
+
+func (c *Client) write(ctx context.Context, msg *jsonrpcMessage, retry bool) error {
if c.writeConn == nil {
// The previous write failed. Try to establish a new connection.
if err := c.reconnect(ctx); err != nil {
@@ -584,6 +599,22 @@ func (c *Client) write(ctx context.Context, msg interface{}, retry bool) error {
return err
}
+func (c *Client) writeBatch(ctx context.Context, msgs []*jsonrpcMessage, retry bool) error {
+ if c.writeConn == nil {
+ if err := c.reconnect(ctx); err != nil {
+ return err
+ }
+ }
+ err := c.writeConn.writeJSONBatch(ctx, msgs, false)
+ if err != nil {
+ c.writeConn = nil
+ if !retry {
+ return c.writeBatch(ctx, msgs, true)
+ }
+ }
+ return err
+}
+
func (c *Client) reconnect(ctx context.Context) error {
if c.reconnectFunc == nil {
return errDead
diff --git a/rpc/handler.go b/rpc/handler.go
index c0af162f13..89fc78236c 100644
--- a/rpc/handler.go
+++ b/rpc/handler.go
@@ -169,7 +169,7 @@ func (b *batchCallBuffer) doWrite(ctx context.Context, conn jsonWriter, isErrorR
}
b.wrote = true // can only write once
if len(b.resp) > 0 {
- conn.writeJSON(ctx, b.resp, isErrorResponse)
+ conn.writeJSONBatch(ctx, b.resp, isErrorResponse)
}
}
@@ -237,7 +237,7 @@ func (h *handler) handleBatch(msgs []*jsonrpcMessage) {
resp := h.handleCallMsg(cp, msg)
callBuffer.pushResponse(resp)
if resp != nil && h.batchResponseMaxSize != 0 {
- responseBytes += len(resp.Result)
+ responseBytes += len(resp.Result) + len(resp.Error)
if responseBytes > h.batchResponseMaxSize {
err := &internalServerError{errcodeResponseTooLarge, errMsgResponseTooLarge}
callBuffer.respondWithError(cp.ctx, h.conn, err)
@@ -268,7 +268,7 @@ func (h *handler) respondWithBatchTooLarge(cp *callProc, batch []*jsonrpcMessage
break
}
}
- h.conn.writeJSON(cp.ctx, []*jsonrpcMessage{resp}, true)
+ h.conn.writeJSONBatch(cp.ctx, []*jsonrpcMessage{resp}, true)
}
// handleMsg handles a single non-batch message.
@@ -415,7 +415,7 @@ func (h *handler) handleResponses(batch []*jsonrpcMessage, handleCall func(*json
// the op.resp channel.
if op.sub != nil {
if msg.Error != nil {
- op.err = msg.Error
+ op.err = msg.decodeError()
} else {
op.err = json.Unmarshal(msg.Result, &op.sub.subid)
if op.err == nil {
@@ -481,9 +481,10 @@ func (h *handler) handleCallMsg(ctx *callProc, msg *jsonrpcMessage) *jsonrpcMess
var logctx []any
logctx = append(logctx, "reqid", idForLog{msg.ID}, "duration", time.Since(start))
if resp.Error != nil {
- logctx = append(logctx, "err", resp.Error.Message)
- if resp.Error.Data != nil {
- logctx = append(logctx, "errdata", formatErrorData(resp.Error.Data))
+ je := resp.decodeError()
+ logctx = append(logctx, "err", je.Message)
+ if je.Data != nil {
+ logctx = append(logctx, "errdata", formatErrorData(je.Data))
}
h.log.Warn("Served "+msg.Method, logctx...)
} else {
@@ -550,7 +551,7 @@ func (h *handler) handleCall(cp *callProc, msg *jsonrpcMessage) *jsonrpcMessage
answer := h.runMethod(rctx, msg, callb, args)
var rErr error
if answer.Error != nil {
- rErr = errors.New(answer.Error.Message)
+ rErr = errors.New(answer.decodeError().Message)
}
rSpanEnd(&rErr)
@@ -623,7 +624,7 @@ func (h *handler) runMethod(ctx context.Context, msg *jsonrpcMessage, callb *cal
_, _, spanEnd := telemetry.StartSpanWithTracer(ctx, h.tracer(), "rpc.encodeJSONResponse", attributes...)
response := msg.response(result)
if response.Error != nil {
- err = errors.New(response.Error.Message)
+ err = errors.New(response.decodeError().Message)
}
spanEnd(&err)
return response
diff --git a/rpc/http.go b/rpc/http.go
index 55f0abfa72..49618244df 100644
--- a/rpc/http.go
+++ b/rpc/http.go
@@ -57,10 +57,14 @@ type httpConn struct {
// and some methods don't work. The panic() stubs here exist to ensure
// this special treatment is correct.
-func (hc *httpConn) writeJSON(context.Context, interface{}, bool) error {
+func (hc *httpConn) writeJSON(context.Context, *jsonrpcMessage, bool) error {
panic("writeJSON called on httpConn")
}
+func (hc *httpConn) writeJSONBatch(context.Context, []*jsonrpcMessage, bool) error {
+ panic("writeJSONBatch called on httpConn")
+}
+
func (hc *httpConn) peerInfo() PeerInfo {
panic("peerInfo called on httpConn")
}
@@ -179,9 +183,9 @@ func cleanlyCloseBody(body io.ReadCloser) error {
return body.Close()
}
-func (c *Client) sendHTTP(ctx context.Context, op *requestOp, msg interface{}) error {
+func (c *Client) sendHTTP(ctx context.Context, op *requestOp, msg *jsonrpcMessage) error {
hc := c.writeConn.(*httpConn)
- respBody, err := hc.doRequest(ctx, msg)
+ respBody, err := hc.doRequest(ctx, appendMessage(nil, msg))
if err != nil {
return err
}
@@ -198,7 +202,7 @@ func (c *Client) sendHTTP(ctx context.Context, op *requestOp, msg interface{}) e
func (c *Client) sendBatchHTTP(ctx context.Context, op *requestOp, msgs []*jsonrpcMessage) error {
hc := c.writeConn.(*httpConn)
- respBody, err := hc.doRequest(ctx, msgs)
+ respBody, err := hc.doRequest(ctx, appendBatch(nil, msgs))
if err != nil {
return err
}
@@ -212,11 +216,7 @@ func (c *Client) sendBatchHTTP(ctx context.Context, op *requestOp, msgs []*jsonr
return nil
}
-func (hc *httpConn) doRequest(ctx context.Context, msg interface{}) (io.ReadCloser, error) {
- body, err := json.Marshal(msg)
- if err != nil {
- return nil, err
- }
+func (hc *httpConn) doRequest(ctx context.Context, body []byte) (io.ReadCloser, error) {
req, err := http.NewRequestWithContext(ctx, http.MethodPost, hc.url, io.NopCloser(bytes.NewReader(body)))
if err != nil {
return nil, err
@@ -268,41 +268,51 @@ func (s *Server) newHTTPServerConn(r *http.Request, w http.ResponseWriter) Serve
body := io.LimitReader(r.Body, int64(s.httpBodyLimit))
conn := &httpServerConn{Reader: body, Writer: w, r: r}
- encoder := func(v any, isErrorResponse bool) error {
- if !isErrorResponse {
- return json.NewEncoder(conn).Encode(v)
- }
-
- // It's an error response and requires special treatment.
- //
- // In case of a timeout error, the response must be written before the HTTP
- // server's write timeout occurs. So we need to flush the response. The
- // Content-Length header also needs to be set to ensure the client knows
- // when it has the full response.
- encdata, err := json.Marshal(v)
- if err != nil {
- return err
- }
- w.Header().Set("content-length", strconv.Itoa(len(encdata)))
-
- // If this request is wrapped in a handler that might remove Content-Length (such
- // as the automatic gzip we do in package node), we need to ensure the HTTP server
- // doesn't perform chunked encoding. In case WriteTimeout is reached, the chunked
- // encoding might not be finished correctly, and some clients do not like it when
- // the final chunk is missing.
- w.Header().Set("transfer-encoding", "identity")
-
- _, err = w.Write(encdata)
- if f, ok := w.(http.Flusher); ok {
- f.Flush()
- }
- return err
+ var buf []byte
+ encodeMsg := func(msg *jsonrpcMessage, isError bool) error {
+ buf = appendMessage(buf[:0], msg)
+ return httpWriteResult(w, buf, isError)
+ }
+ encodeBatch := func(msgs []*jsonrpcMessage, isError bool) error {
+ buf = appendBatch(buf[:0], msgs)
+ return httpWriteResult(w, buf, isError)
}
dec := json.NewDecoder(conn)
dec.UseNumber()
- return NewFuncCodec(conn, encoder, dec.Decode)
+ return NewFuncCodec(conn, encodeMsg, encodeBatch, dec.Decode)
+}
+
+// httpWriteResult writes pre-encoded response data over HTTP.
+// For error responses, it sets Content-Length and flushes to ensure the response
+// is fully written before any HTTP server write timeout occurs.
+func httpWriteResult(w http.ResponseWriter, data []byte, isError bool) error {
+ if !isError {
+ _, err := w.Write(data)
+ return err
+ }
+
+ // It's an error response and requires special treatment.
+ //
+ // In case of a timeout error, the response must be written before the HTTP
+ // server's write timeout occurs. So we need to flush the response. The
+ // Content-Length header also needs to be set to ensure the client knows
+ // when it has the full response.
+ w.Header().Set("content-length", strconv.Itoa(len(data)))
+
+ // If this request is wrapped in a handler that might remove Content-Length (such
+ // as the automatic gzip we do in package node), we need to ensure the HTTP server
+ // doesn't perform chunked encoding. In case WriteTimeout is reached, the chunked
+ // encoding might not be finished correctly, and some clients do not like it when
+ // the final chunk is missing.
+ w.Header().Set("transfer-encoding", "identity")
+
+ _, err := w.Write(data)
+ if f, ok := w.(http.Flusher); ok {
+ f.Flush()
+ }
+ return err
}
// Close does nothing and always returns nil.
diff --git a/rpc/json.go b/rpc/json.go
index fcd801fc95..9813acae73 100644
--- a/rpc/json.go
+++ b/rpc/json.go
@@ -27,6 +27,8 @@ import (
"strings"
"sync"
"time"
+
+ "github.com/fjl/jsonw"
)
const (
@@ -52,12 +54,6 @@ type subscriptionResultEnc struct {
Result any `json:"result"`
}
-type jsonrpcSubscriptionNotification struct {
- Version string `json:"jsonrpc"`
- Method string `json:"method"`
- Params subscriptionResultEnc `json:"params"`
-}
-
// A value of this type can a JSON-RPC request, notification, successful response or
// error response. Which one it is depends on the fields.
type jsonrpcMessage struct {
@@ -65,7 +61,7 @@ type jsonrpcMessage struct {
ID json.RawMessage `json:"id,omitempty"`
Method string `json:"method,omitempty"`
Params json.RawMessage `json:"params,omitempty"`
- Error *jsonError `json:"error,omitempty"`
+ Error json.RawMessage `json:"error,omitempty"`
Result json.RawMessage `json:"result,omitempty"`
}
@@ -113,8 +109,29 @@ func (msg *jsonrpcMessage) errorResponse(err error) *jsonrpcMessage {
return resp
}
+// decodeError decodes the Error field into a jsonError struct.
+func (msg *jsonrpcMessage) decodeError() *jsonError {
+ if msg.Error == nil {
+ return nil
+ }
+ je := new(jsonError)
+ json.Unmarshal(msg.Error, je)
+ return je
+}
+
func (msg *jsonrpcMessage) response(result interface{}) *jsonrpcMessage {
- enc, err := json.Marshal(result)
+ var (
+ enc []byte
+ err error
+ )
+ // Call MarshalJSON directly for types that implement it. This avoids the
+ // expensive validation/compaction pass that json.Marshal performs on
+ // encoder output.
+ if m, ok := result.(json.Marshaler); ok {
+ enc, err = m.MarshalJSON()
+ } else {
+ enc, err = json.Marshal(result)
+ }
if err != nil {
return msg.errorResponse(&internalServerError{errcodeMarshalError, err.Error()})
}
@@ -122,19 +139,18 @@ func (msg *jsonrpcMessage) response(result interface{}) *jsonrpcMessage {
}
func errorMessage(err error) *jsonrpcMessage {
- msg := &jsonrpcMessage{Version: vsn, ID: null, Error: &jsonError{
+ je := &jsonError{
Code: errcodeDefault,
Message: err.Error(),
- }}
- ec, ok := err.(Error)
- if ok {
- msg.Error.Code = ec.ErrorCode()
}
- de, ok := err.(DataError)
- if ok {
- msg.Error.Data = de.ErrorData()
+ if ec, ok := err.(Error); ok {
+ je.Code = ec.ErrorCode()
}
- return msg
+ if de, ok := err.(DataError); ok {
+ je.Data = de.ErrorData()
+ }
+ enc, _ := json.Marshal(je)
+ return &jsonrpcMessage{Version: vsn, ID: null, Error: enc}
}
type jsonError struct {
@@ -179,28 +195,32 @@ type ConnRemoteAddr interface {
// jsonCodec reads and writes JSON-RPC messages to the underlying connection. It also has
// support for parsing arguments and serializing (result) objects.
type jsonCodec struct {
- remote string
- closer sync.Once // close closed channel once
- closeCh chan interface{} // closed on Close
- decode decodeFunc // decoder to allow multiple transports
- encMu sync.Mutex // guards the encoder
- encode encodeFunc // encoder to allow multiple transports
- conn deadlineCloser
+ remote string
+ closer sync.Once // close closed channel once
+ closeCh chan interface{} // closed on Close
+ decode decodeFunc // decoder to allow multiple transports
+ encMu sync.Mutex // guards the encoder
+ encodeMsg encodeMsgFunc // single-message encoder
+ encodeBatch encodeBatchFunc // batch encoder
+ conn deadlineCloser
}
-type encodeFunc = func(v interface{}, isErrorResponse bool) error
+type encodeMsgFunc = func(msg *jsonrpcMessage, isError bool) error
+
+type encodeBatchFunc = func(msgs []*jsonrpcMessage, isError bool) error
type decodeFunc = func(v interface{}) error
// NewFuncCodec creates a codec which uses the given functions to read and write. If conn
// implements ConnRemoteAddr, log messages will use it to include the remote address of
// the connection.
-func NewFuncCodec(conn deadlineCloser, encode encodeFunc, decode decodeFunc) ServerCodec {
+func NewFuncCodec(conn deadlineCloser, encodeMsg encodeMsgFunc, encodeBatch encodeBatchFunc, decode decodeFunc) ServerCodec {
codec := &jsonCodec{
- closeCh: make(chan interface{}),
- encode: encode,
- decode: decode,
- conn: conn,
+ closeCh: make(chan interface{}),
+ encodeMsg: encodeMsg,
+ encodeBatch: encodeBatch,
+ decode: decode,
+ conn: conn,
}
if ra, ok := conn.(ConnRemoteAddr); ok {
codec.remote = ra.RemoteAddr()
@@ -211,14 +231,62 @@ func NewFuncCodec(conn deadlineCloser, encode encodeFunc, decode decodeFunc) Ser
// NewCodec creates a codec on the given connection. If conn implements ConnRemoteAddr, log
// messages will use it to include the remote address of the connection.
func NewCodec(conn Conn) ServerCodec {
- enc := json.NewEncoder(conn)
dec := json.NewDecoder(conn)
dec.UseNumber()
-
- encode := func(v interface{}, isErrorResponse bool) error {
- return enc.Encode(v)
+ var buf []byte
+ encodeMsg := func(msg *jsonrpcMessage, isError bool) error {
+ buf = appendMessage(buf[:0], msg)
+ buf = append(buf, '\n')
+ _, err := conn.Write(buf)
+ return err
}
- return NewFuncCodec(conn, encode, dec.Decode)
+ encodeBatch := func(msgs []*jsonrpcMessage, isError bool) error {
+ buf = appendBatch(buf[:0], msgs)
+ buf = append(buf, '\n')
+ _, err := conn.Write(buf)
+ return err
+ }
+ return NewFuncCodec(conn, encodeMsg, encodeBatch, dec.Decode)
+}
+
+// appendMessage appends the JSON-RPC encoding of msg to buf.
+func appendMessage(buf []byte, msg *jsonrpcMessage) []byte {
+ buf = append(buf, `{"jsonrpc":"2.0"`...)
+ if msg.ID != nil {
+ buf = append(buf, `,"id":`...)
+ buf = append(buf, msg.ID...)
+ }
+ if msg.Method != "" {
+ buf = append(buf, `,"method":`...)
+ buf = jsonw.AppendQuotedString(buf, msg.Method)
+ }
+ if msg.Params != nil {
+ buf = append(buf, `,"params":`...)
+ buf = append(buf, msg.Params...)
+ }
+ if msg.Error != nil {
+ buf = append(buf, `,"error":`...)
+ buf = append(buf, msg.Error...)
+ }
+ if msg.Result != nil {
+ buf = append(buf, `,"result":`...)
+ buf = append(buf, msg.Result...)
+ }
+ buf = append(buf, '}')
+ return buf
+}
+
+// appendBatch appends the JSON-RPC encoding of a message batch to buf.
+func appendBatch(buf []byte, msgs []*jsonrpcMessage) []byte {
+ buf = append(buf, '[')
+ for i, msg := range msgs {
+ if i > 0 {
+ buf = append(buf, ',')
+ }
+ buf = appendMessage(buf, msg)
+ }
+ buf = append(buf, ']')
+ return buf
}
func (c *jsonCodec) peerInfo() PeerInfo {
@@ -248,7 +316,7 @@ func (c *jsonCodec) readBatch() (messages []*jsonrpcMessage, batch bool, err err
return messages, batch, nil
}
-func (c *jsonCodec) writeJSON(ctx context.Context, v interface{}, isErrorResponse bool) error {
+func (c *jsonCodec) writeJSON(ctx context.Context, msg *jsonrpcMessage, isError bool) error {
c.encMu.Lock()
defer c.encMu.Unlock()
@@ -257,7 +325,19 @@ func (c *jsonCodec) writeJSON(ctx context.Context, v interface{}, isErrorRespons
deadline = time.Now().Add(defaultWriteTimeout)
}
c.conn.SetWriteDeadline(deadline)
- return c.encode(v, isErrorResponse)
+ return c.encodeMsg(msg, isError)
+}
+
+func (c *jsonCodec) writeJSONBatch(ctx context.Context, msgs []*jsonrpcMessage, isError bool) error {
+ c.encMu.Lock()
+ defer c.encMu.Unlock()
+
+ deadline, ok := ctx.Deadline()
+ if !ok {
+ deadline = time.Now().Add(defaultWriteTimeout)
+ }
+ c.conn.SetWriteDeadline(deadline)
+ return c.encodeBatch(msgs, isError)
}
func (c *jsonCodec) close() {
diff --git a/rpc/server_test.go b/rpc/server_test.go
index 8334d4e80d..a2b8af2b7f 100644
--- a/rpc/server_test.go
+++ b/rpc/server_test.go
@@ -208,6 +208,48 @@ func TestServerBatchResponseSizeLimit(t *testing.T) {
}
}
+// TestServerBatchResponseSizeLimit_errorResponses verifies that error responses
+// are counted toward BatchResponseMaxSize.
+func TestServerBatchResponseSizeLimit_errorResponses(t *testing.T) {
+ t.Parallel()
+
+ server := newTestServer()
+ defer server.Stop()
+ // Each error response for test_returnError is ~58 bytes of JSON in the Error field.
+ // Set limit to 100 so 1 response fits (58 bytes) but the 2nd (116 bytes) exceeds it.
+ server.SetBatchLimits(100, 100)
+ var (
+ batch []BatchElem
+ client = DialInProc(server)
+ )
+ for i := 0; i < 5; i++ {
+ batch = append(batch, BatchElem{
+ Method: "test_returnError",
+ Result: new(int),
+ })
+ }
+ if err := client.BatchCall(batch); err != nil {
+ t.Fatal("error sending batch:", err)
+ }
+ for i := range batch {
+ re, ok := batch[i].Error.(Error)
+ if !ok {
+ t.Fatalf("batch elem %d has wrong error type: %v", i, batch[i].Error)
+ }
+ if i < 2 {
+ // First two: elem 0 fits under limit, elem 1 pushes over but is already processed.
+ if re.ErrorCode() != 444 {
+ t.Errorf("batch elem %d wrong error code, have %d want 444", i, re.ErrorCode())
+ }
+ } else {
+ // Remaining should be the response-too-large error.
+ if re.ErrorCode() != errcodeResponseTooLarge {
+ t.Errorf("batch elem %d wrong error code, have %d want %d", i, re.ErrorCode(), errcodeResponseTooLarge)
+ }
+ }
+ }
+}
+
func TestServerWebsocketReadLimit(t *testing.T) {
t.Parallel()
diff --git a/rpc/subscription.go b/rpc/subscription.go
index 9e400c8b60..6b90bd4a3b 100644
--- a/rpc/subscription.go
+++ b/rpc/subscription.go
@@ -171,13 +171,17 @@ func (n *Notifier) activate() error {
}
func (n *Notifier) send(sub *Subscription, data any) error {
- msg := jsonrpcSubscriptionNotification{
+ params, err := json.Marshal(subscriptionResultEnc{
+ ID: string(sub.ID),
+ Result: data,
+ })
+ if err != nil {
+ return err
+ }
+ msg := jsonrpcMessage{
Version: vsn,
Method: n.namespace + notificationMethodSuffix,
- Params: subscriptionResultEnc{
- ID: string(sub.ID),
- Result: data,
- },
+ Params: params,
}
return n.h.conn.writeJSON(context.Background(), &msg, false)
}
diff --git a/rpc/subscription_test.go b/rpc/subscription_test.go
index cd44d219de..623b496124 100644
--- a/rpc/subscription_test.go
+++ b/rpc/subscription_test.go
@@ -220,7 +220,7 @@ func readAndValidateMessage(in *json.Decoder) (*subConfirmation, *subscriptionRe
case msg.isResponse():
var c subConfirmation
if msg.Error != nil {
- return nil, nil, msg.Error
+ return nil, nil, msg.decodeError()
} else if err := json.Unmarshal(msg.Result, &c.subid); err != nil {
return nil, nil, fmt.Errorf("invalid response: %v", err)
} else {
@@ -233,12 +233,21 @@ func readAndValidateMessage(in *json.Decoder) (*subConfirmation, *subscriptionRe
}
type mockConn struct {
- enc *json.Encoder
+ w io.Writer
}
-// writeJSON writes a message to the connection.
-func (c *mockConn) writeJSON(ctx context.Context, msg interface{}, isError bool) error {
- return c.enc.Encode(msg)
+func (c *mockConn) writeJSON(ctx context.Context, msg *jsonrpcMessage, isError bool) error {
+ buf := appendMessage(nil, msg)
+ buf = append(buf, '\n')
+ _, err := c.w.Write(buf)
+ return err
+}
+
+func (c *mockConn) writeJSONBatch(ctx context.Context, msgs []*jsonrpcMessage, isError bool) error {
+ buf := appendBatch(nil, msgs)
+ buf = append(buf, '\n')
+ _, err := c.w.Write(buf)
+ return err
}
// closed returns a channel which is closed when the connection is closed.
@@ -251,7 +260,7 @@ func (c *mockConn) remoteAddr() string { return "" }
func BenchmarkNotify(b *testing.B) {
id := ID("test")
notifier := &Notifier{
- h: &handler{conn: &mockConn{json.NewEncoder(io.Discard)}},
+ h: &handler{conn: &mockConn{io.Discard}},
sub: &Subscription{ID: id},
activated: true,
}
@@ -271,7 +280,7 @@ func TestNotify(t *testing.T) {
out := new(bytes.Buffer)
id := ID("test")
notifier := &Notifier{
- h: &handler{conn: &mockConn{json.NewEncoder(out)}},
+ h: &handler{conn: &mockConn{out}},
sub: &Subscription{ID: id},
activated: true,
}
diff --git a/rpc/types.go b/rpc/types.go
index 85f15344e8..578d3f86dd 100644
--- a/rpc/types.go
+++ b/rpc/types.go
@@ -51,9 +51,10 @@ type ServerCodec interface {
// jsonWriter can write JSON messages to its underlying connection.
// Implementations must be safe for concurrent use.
type jsonWriter interface {
- // writeJSON writes a message to the connection.
- writeJSON(ctx context.Context, msg interface{}, isError bool) error
-
+ // writeJSON writes a single JSON-RPC message to the connection.
+ writeJSON(ctx context.Context, msg *jsonrpcMessage, isError bool) error
+ // writeJSONBatch writes a batch of JSON-RPC messages to the connection.
+ writeJSONBatch(ctx context.Context, msgs []*jsonrpcMessage, isError bool) error
// Closed returns a channel which is closed when the connection is closed.
closed() <-chan interface{}
// RemoteAddr returns the peer address of the connection.
diff --git a/rpc/websocket.go b/rpc/websocket.go
index ec676b9caf..e70498873a 100644
--- a/rpc/websocket.go
+++ b/rpc/websocket.go
@@ -293,11 +293,17 @@ type websocketCodec struct {
func newWebsocketCodec(conn *websocket.Conn, host string, req http.Header, readLimit int64) ServerCodec {
conn.SetReadLimit(readLimit)
- encode := func(v interface{}, isErrorResponse bool) error {
- return conn.WriteJSON(v)
+ var buf []byte
+ encodeMsg := func(msg *jsonrpcMessage, isError bool) error {
+ buf = appendMessage(buf[:0], msg)
+ return conn.WriteMessage(websocket.TextMessage, buf)
+ }
+ encodeBatch := func(msgs []*jsonrpcMessage, isError bool) error {
+ buf = appendBatch(buf[:0], msgs)
+ return conn.WriteMessage(websocket.TextMessage, buf)
}
wc := &websocketCodec{
- jsonCodec: NewFuncCodec(conn, encode, conn.ReadJSON).(*jsonCodec),
+ jsonCodec: NewFuncCodec(conn, encodeMsg, encodeBatch, conn.ReadJSON).(*jsonCodec),
conn: conn,
pingReset: make(chan struct{}, 1),
pongReceived: make(chan struct{}),
@@ -342,8 +348,15 @@ func (wc *websocketCodec) peerInfo() PeerInfo {
return wc.info
}
-func (wc *websocketCodec) writeJSON(ctx context.Context, v interface{}, isError bool) error {
- err := wc.jsonCodec.writeJSON(ctx, v, isError)
+func (wc *websocketCodec) writeJSON(ctx context.Context, msg *jsonrpcMessage, isError bool) error {
+ return wc.writeAndResetPing(wc.jsonCodec.writeJSON(ctx, msg, isError))
+}
+
+func (wc *websocketCodec) writeJSONBatch(ctx context.Context, msgs []*jsonrpcMessage, isError bool) error {
+ return wc.writeAndResetPing(wc.jsonCodec.writeJSONBatch(ctx, msgs, isError))
+}
+
+func (wc *websocketCodec) writeAndResetPing(err error) error {
if err == nil {
// Notify pingLoop to delay the next idle ping.
select {