From efe58eac003940e736717ab2207f6db22fdbd2e6 Mon Sep 17 00:00:00 2001 From: Jonny Rhea <5555162+jrhea@users.noreply.github.com> Date: Wed, 20 May 2026 13:25:56 -0500 Subject: [PATCH] beacon/engine, rpc: optimize JSON encoding for large blob payloads (#33969) Adds a fast path for ExecutionPayloadEnvelope and BlobAndProofListV* that bypasses encoding/json's reflection and re-validation, which are expensive for large payloads with many blobs. Also hand-rolls the jsonrpcMessage wire encoding in the RPC codec to avoid a second re-validation pass when writing responses to the connection. Resolves #33814 --------- Co-authored-by: Marius van der Wijden Co-authored-by: Felix Lange --- beacon/engine/bapl_encode.go | 69 ++ beacon/engine/{gen_ed.go => ed_codec.go} | 0 beacon/engine/{gen_epe.go => epe_decode.go} | 25 - beacon/engine/epe_encode.go | 98 +++ beacon/engine/epe_test.go | 128 +++ .../{gen_blockparams.go => pa_codec.go} | 0 beacon/engine/types.go | 24 +- eth/catalyst/api.go | 12 +- eth/catalyst/api_benchmark_test.go | 739 ++++++++++++++++++ eth/catalyst/api_test.go | 6 +- go.mod | 3 +- go.sum | 6 +- rpc/client.go | 41 +- rpc/handler.go | 19 +- rpc/http.go | 88 ++- rpc/json.go | 154 +++- rpc/server_test.go | 42 + rpc/subscription.go | 14 +- rpc/subscription_test.go | 23 +- rpc/types.go | 7 +- rpc/websocket.go | 23 +- 21 files changed, 1366 insertions(+), 155 deletions(-) create mode 100644 beacon/engine/bapl_encode.go rename beacon/engine/{gen_ed.go => ed_codec.go} (100%) rename beacon/engine/{gen_epe.go => epe_decode.go} (62%) create mode 100644 beacon/engine/epe_encode.go create mode 100644 beacon/engine/epe_test.go rename beacon/engine/{gen_blockparams.go => pa_codec.go} (100%) create mode 100644 eth/catalyst/api_benchmark_test.go diff --git a/beacon/engine/bapl_encode.go b/beacon/engine/bapl_encode.go new file mode 100644 index 0000000000..b9f46ebf26 --- /dev/null +++ b/beacon/engine/bapl_encode.go @@ -0,0 +1,69 @@ +// Copyright 2026 The go-ethereum Authors +// This file is part of the go-ethereum library. +// +// The go-ethereum library is free software: you can redistribute it and/or modify +// it under the terms of the GNU Lesser General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// The go-ethereum library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the go-ethereum library. If not, see . + +package engine + +import ( + "github.com/fjl/jsonw" +) + +// MarshalJSON implements json.Marshaler. +func (list BlobAndProofListV1) MarshalJSON() ([]byte, error) { + var b jsonw.Buffer + b.Array(func() { + for _, item := range list { + marshalBlobAndProofV1(&b, item) + } + }) + return b.Output(), nil +} + +func marshalBlobAndProofV1(b *jsonw.Buffer, item *BlobAndProofV1) { + if item == nil { + b.Null() + } else { + b.Object(func() { + b.Key("blob") + b.HexBytes(item.Blob) + b.Key("proof") + b.HexBytes(item.Proof) + }) + } +} + +// MarshalJSON implements json.Marshaler. +func (list BlobAndProofListV2) MarshalJSON() ([]byte, error) { + var b jsonw.Buffer + b.Array(func() { + for _, item := range list { + marshalBlobAndProofV2(&b, item) + } + }) + return b.Output(), nil +} + +func marshalBlobAndProofV2(b *jsonw.Buffer, item *BlobAndProofV2) { + if item == nil { + b.Null() + } else { + b.Object(func() { + b.Key("blob") + b.HexBytes(item.Blob) + b.Key("proofs") + appendHexBytesArray(b, item.CellProofs) + }) + } +} diff --git a/beacon/engine/gen_ed.go b/beacon/engine/ed_codec.go similarity index 100% rename from beacon/engine/gen_ed.go rename to beacon/engine/ed_codec.go diff --git a/beacon/engine/gen_epe.go b/beacon/engine/epe_decode.go similarity index 62% rename from beacon/engine/gen_epe.go rename to beacon/engine/epe_decode.go index cf7bd9ee3f..a125daa030 100644 --- a/beacon/engine/gen_epe.go +++ b/beacon/engine/epe_decode.go @@ -12,31 +12,6 @@ import ( var _ = (*executionPayloadEnvelopeMarshaling)(nil) -// MarshalJSON marshals as JSON. -func (e ExecutionPayloadEnvelope) MarshalJSON() ([]byte, error) { - type ExecutionPayloadEnvelope struct { - ExecutionPayload *ExecutableData `json:"executionPayload" gencodec:"required"` - BlockValue *hexutil.Big `json:"blockValue" gencodec:"required"` - BlobsBundle *BlobsBundle `json:"blobsBundle"` - Requests []hexutil.Bytes `json:"executionRequests"` - Override bool `json:"shouldOverrideBuilder"` - Witness *hexutil.Bytes `json:"witness,omitempty"` - } - var enc ExecutionPayloadEnvelope - enc.ExecutionPayload = e.ExecutionPayload - enc.BlockValue = (*hexutil.Big)(e.BlockValue) - enc.BlobsBundle = e.BlobsBundle - if e.Requests != nil { - enc.Requests = make([]hexutil.Bytes, len(e.Requests)) - for k, v := range e.Requests { - enc.Requests[k] = v - } - } - enc.Override = e.Override - enc.Witness = e.Witness - return json.Marshal(&enc) -} - // UnmarshalJSON unmarshals from JSON. func (e *ExecutionPayloadEnvelope) UnmarshalJSON(input []byte) error { type ExecutionPayloadEnvelope struct { diff --git a/beacon/engine/epe_encode.go b/beacon/engine/epe_encode.go new file mode 100644 index 0000000000..73deb8f0c6 --- /dev/null +++ b/beacon/engine/epe_encode.go @@ -0,0 +1,98 @@ +// Copyright 2026 The go-ethereum Authors +// This file is part of the go-ethereum library. +// +// The go-ethereum library is free software: you can redistribute it and/or modify +// it under the terms of the GNU Lesser General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// The go-ethereum library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the go-ethereum library. If not, see . + +package engine + +import ( + "encoding/json" + "errors" + + "github.com/fjl/jsonw" +) + +// marshalBlobsBundle writes BlobsBundle as JSON and appends it to buf. +func marshalBlobsBundle(b *jsonw.Buffer, bundle *BlobsBundle) { + if bundle == nil { + b.Null() + return + } + b.Object(func() { + b.Key("commitments") + appendHexBytesArray(b, bundle.Commitments) + b.Key("proofs") + appendHexBytesArray(b, bundle.Proofs) + b.Key("blobs") + appendHexBytesArray(b, bundle.Blobs) + }) +} + +// MarshalJSON implements json.Marshaler. +func (e ExecutionPayloadEnvelope) MarshalJSON() ([]byte, error) { + if e.ExecutionPayload == nil { + return nil, errors.New("missing required field 'executionPayload' for ExecutionPayloadEnvelope") + } + + // Pre-marshal the execution payload using its gencodec MarshalJSON. + payload, err := e.ExecutionPayload.MarshalJSON() + if err != nil { + return nil, err + } + // Pre-marshal the witness. + var witness []byte + if e.Witness != nil { + witness, err = json.Marshal(e.Witness) + if err != nil { + return nil, err + } + } + + // Write the execution payload to the buffer + var b jsonw.Buffer + b.Object(func() { + b.Key("executionPayload") + b.RawValue(payload) + + b.Key("blockValue") + b.HexBigInt(e.BlockValue) + + b.Key("blobsBundle") + marshalBlobsBundle(&b, e.BlobsBundle) + + b.Key("executionRequests") + if e.Requests == nil { + b.Null() + } else { + appendHexBytesArray(&b, e.Requests) + } + + b.Key("shouldOverrideBuilder") + b.Bool(e.Override) + + if e.Witness != nil { + b.Key("witness") + b.RawValue(witness) + } + }) + return b.Output(), nil +} + +func appendHexBytesArray[T ~[]byte](b *jsonw.Buffer, slice []T) { + b.Array(func() { + for _, elem := range slice { + b.HexBytes(elem) + } + }) +} diff --git a/beacon/engine/epe_test.go b/beacon/engine/epe_test.go new file mode 100644 index 0000000000..e4ed5b6578 --- /dev/null +++ b/beacon/engine/epe_test.go @@ -0,0 +1,128 @@ +// Copyright 2026 The go-ethereum Authors +// This file is part of the go-ethereum library. +// +// The go-ethereum library is free software: you can redistribute it and/or modify +// it under the terms of the GNU Lesser General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// The go-ethereum library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the go-ethereum library. If not, see . + +package engine + +import ( + "bytes" + "encoding/json" + "math/big" + "reflect" + "testing" + + "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/common/hexutil" +) + +func makeTestPayload() *ExecutableData { + return &ExecutableData{ + ParentHash: common.HexToHash("0x01"), + FeeRecipient: common.HexToAddress("0x02"), + StateRoot: common.HexToHash("0x03"), + ReceiptsRoot: common.HexToHash("0x04"), + LogsBloom: make([]byte, 256), + Random: common.HexToHash("0x05"), + Number: 100, + GasLimit: 1000000, + GasUsed: 500000, + Timestamp: 1234567890, + ExtraData: []byte("extra"), + BaseFeePerGas: big.NewInt(7), + BlockHash: common.HexToHash("0x08"), + Transactions: [][]byte{{0xaa, 0xbb}}, + } +} + +func TestMarshalJSONRoundtrip(t *testing.T) { + witness := hexutil.Bytes{0xde, 0xad} + original := ExecutionPayloadEnvelope{ + ExecutionPayload: makeTestPayload(), + BlockValue: big.NewInt(12345), + BlobsBundle: &BlobsBundle{ + Commitments: []hexutil.Bytes{{0x01, 0x02}}, + Proofs: []hexutil.Bytes{{0x03, 0x04}}, + Blobs: []hexutil.Bytes{{0x05, 0x06}}, + }, + Requests: [][]byte{{0xaa}, {0xbb, 0xcc}}, + Override: true, + Witness: &witness, + } + + data, err := original.MarshalJSON() + if err != nil { + t.Fatalf("MarshalJSON error: %v", err) + } + + var decoded ExecutionPayloadEnvelope + if err := json.Unmarshal(data, &decoded); err != nil { + t.Fatalf("UnmarshalJSON error: %v", err) + } + + if decoded.ExecutionPayload.Number != original.ExecutionPayload.Number { + t.Error("ExecutionPayload.Number mismatch") + } + if decoded.BlockValue.Cmp(original.BlockValue) != 0 { + t.Errorf("BlockValue mismatch: got %v, want %v", decoded.BlockValue, original.BlockValue) + } + if len(decoded.BlobsBundle.Blobs) != len(original.BlobsBundle.Blobs) { + t.Error("BlobsBundle.Blobs length mismatch") + } + if len(decoded.Requests) != len(original.Requests) { + t.Error("Requests length mismatch") + } + if decoded.Override != original.Override { + t.Error("Override mismatch") + } + if !bytes.Equal(*decoded.Witness, *original.Witness) { + t.Error("Witness mismatch") + } +} + +func TestMarshalJSONNilPayload(t *testing.T) { + env := ExecutionPayloadEnvelope{ + ExecutionPayload: nil, + BlockValue: big.NewInt(1), + } + _, err := env.MarshalJSON() + if err == nil { + t.Fatal("expected error for nil ExecutionPayload") + } +} + +// TestExecutionPayloadEnvelopeFieldCoverage guards against structural drift. +// If a field is added to or removed from ExecutionPayloadEnvelope, this test +// fails, reminding the developer to update MarshalJSON in marshal_epe.go. +func TestExecutionPayloadEnvelopeFieldCoverage(t *testing.T) { + expected := []string{ + "ExecutionPayload", + "BlockValue", + "BlobsBundle", + "Requests", + "Override", + "Witness", + } + typ := reflect.TypeOf(ExecutionPayloadEnvelope{}) + if typ.NumField() != len(expected) { + t.Fatalf("ExecutionPayloadEnvelope has %d fields, expected %d — update MarshalJSON in marshal_epe.go", + typ.NumField(), len(expected)) + } + for i, name := range expected { + if typ.Field(i).Name != name { + t.Errorf("field %d: got %q, want %q — update MarshalJSON in marshal_epe.go", + i, typ.Field(i).Name, name) + } + } +} diff --git a/beacon/engine/gen_blockparams.go b/beacon/engine/pa_codec.go similarity index 100% rename from beacon/engine/gen_blockparams.go rename to beacon/engine/pa_codec.go diff --git a/beacon/engine/types.go b/beacon/engine/types.go index 5c31ee4e98..60d564b877 100644 --- a/beacon/engine/types.go +++ b/beacon/engine/types.go @@ -60,7 +60,7 @@ var ( PayloadV4 PayloadVersion = 0x4 ) -//go:generate go run github.com/fjl/gencodec -type PayloadAttributes -field-override payloadAttributesMarshaling -out gen_blockparams.go +//go:generate go run github.com/fjl/gencodec -type PayloadAttributes -field-override payloadAttributesMarshaling -out pa_codec.go // PayloadAttributes describes the environment context in which a block should // be built. @@ -79,7 +79,7 @@ type payloadAttributesMarshaling struct { SlotNumber *hexutil.Uint64 } -//go:generate go run github.com/fjl/gencodec -type ExecutableData -field-override executableDataMarshaling -out gen_ed.go +//go:generate go run github.com/fjl/gencodec -type ExecutableData -field-override executableDataMarshaling -out ed_codec.go // ExecutableData is the data necessary to execute an EL payload. type ExecutableData struct { @@ -127,7 +127,7 @@ type StatelessPayloadStatusV1 struct { ValidationError *string `json:"validationError"` } -//go:generate go run github.com/fjl/gencodec -type ExecutionPayloadEnvelope -field-override executionPayloadEnvelopeMarshaling -out gen_epe.go +//go:generate go run github.com/fjl/gencodec -enc=false -type ExecutionPayloadEnvelope -field-override executionPayloadEnvelopeMarshaling -out epe_decode.go type ExecutionPayloadEnvelope struct { ExecutionPayload *ExecutableData `json:"executionPayload" gencodec:"required"` @@ -138,6 +138,12 @@ type ExecutionPayloadEnvelope struct { Witness *hexutil.Bytes `json:"witness,omitempty"` } +// JSON type overrides for ExecutionPayloadEnvelope. +type executionPayloadEnvelopeMarshaling struct { + BlockValue *hexutil.Big + Requests []hexutil.Bytes +} + // BlobsBundle includes the marshalled sidecar data. Note this structure is // shared by BlobsBundleV1 and BlobsBundleV2 for the sake of simplicity. // @@ -154,16 +160,18 @@ type BlobAndProofV1 struct { Proof hexutil.Bytes `json:"proof"` } +// BlobAndProofListV1 is a list of BlobAndProofV1 with a hand-rolled JSON marshaler +// that avoids the overhead of encoding/json for large blob payloads. +type BlobAndProofListV1 []*BlobAndProofV1 + type BlobAndProofV2 struct { Blob hexutil.Bytes `json:"blob"` CellProofs []hexutil.Bytes `json:"proofs"` // proofs MUST contain exactly CELLS_PER_EXT_BLOB cell proofs. } -// JSON type overrides for ExecutionPayloadEnvelope. -type executionPayloadEnvelopeMarshaling struct { - BlockValue *hexutil.Big - Requests []hexutil.Bytes -} +// BlobAndProofListV2 is a list of BlobAndProofV2 with a hand-rolled JSON marshaler +// that avoids the overhead of encoding/json for large blob payloads. +type BlobAndProofListV2 []*BlobAndProofV2 type PayloadStatusV1 struct { Status string `json:"status"` diff --git a/eth/catalyst/api.go b/eth/catalyst/api.go index 1def169ae0..b31185a40f 100644 --- a/eth/catalyst/api.go +++ b/eth/catalyst/api.go @@ -552,7 +552,7 @@ func (api *ConsensusAPI) getPayload(payloadID engine.PayloadID, full bool, versi // // Client software MAY return an array of all null entries if syncing or otherwise // unable to serve blob pool data. -func (api *ConsensusAPI) GetBlobsV1(hashes []common.Hash) ([]*engine.BlobAndProofV1, error) { +func (api *ConsensusAPI) GetBlobsV1(hashes []common.Hash) (engine.BlobAndProofListV1, error) { // Reject the request if Osaka has been activated. // follow https://github.com/ethereum/execution-apis/blob/main/src/engine/osaka.md#cancun-api head := api.eth.BlockChain().CurrentHeader() @@ -566,7 +566,7 @@ func (api *ConsensusAPI) GetBlobsV1(hashes []common.Hash) ([]*engine.BlobAndProo if err != nil { return nil, engine.InvalidParams.With(err) } - res := make([]*engine.BlobAndProofV1, len(hashes)) + res := make(engine.BlobAndProofListV1, len(hashes)) for i := 0; i < len(blobs); i++ { // Skip the non-existing blob if blobs[i] == nil { @@ -605,7 +605,7 @@ func (api *ConsensusAPI) GetBlobsV1(hashes []common.Hash) ([]*engine.BlobAndProo // // Client software MUST return null if syncing or otherwise unable to serve // blob pool data. -func (api *ConsensusAPI) GetBlobsV2(hashes []common.Hash) ([]*engine.BlobAndProofV2, error) { +func (api *ConsensusAPI) GetBlobsV2(hashes []common.Hash) (engine.BlobAndProofListV2, error) { head := api.eth.BlockChain().CurrentHeader() if api.config().LatestFork(head.Time) < forks.Osaka { return nil, nil @@ -616,7 +616,7 @@ func (api *ConsensusAPI) GetBlobsV2(hashes []common.Hash) ([]*engine.BlobAndProo // GetBlobsV3 returns a set of blobs from the transaction pool. Same as // GetBlobsV2, except will return partial responses in case there is a missing // blob. -func (api *ConsensusAPI) GetBlobsV3(hashes []common.Hash) ([]*engine.BlobAndProofV2, error) { +func (api *ConsensusAPI) GetBlobsV3(hashes []common.Hash) (engine.BlobAndProofListV2, error) { head := api.eth.BlockChain().CurrentHeader() if api.config().LatestFork(head.Time) < forks.Osaka { return nil, nil @@ -626,7 +626,7 @@ func (api *ConsensusAPI) GetBlobsV3(hashes []common.Hash) ([]*engine.BlobAndProo // getBlobs returns all available blobs. In v2, partial responses are not allowed, // while v3 supports partial responses. -func (api *ConsensusAPI) getBlobs(hashes []common.Hash, v2 bool) ([]*engine.BlobAndProofV2, error) { +func (api *ConsensusAPI) getBlobs(hashes []common.Hash, v2 bool) (engine.BlobAndProofListV2, error) { if len(hashes) > 128 { return nil, engine.TooLargeRequest.With(fmt.Errorf("requested blob count too large: %v", len(hashes))) } @@ -647,7 +647,7 @@ func (api *ConsensusAPI) getBlobs(hashes []common.Hash, v2 bool) ([]*engine.Blob } // Validate the blobs from the pool and assemble the response filled := 0 - res := make([]*engine.BlobAndProofV2, len(hashes)) + res := make(engine.BlobAndProofListV2, len(hashes)) for i := range blobs { // The blob has been evicted since the last AvailableBlobs call. // Return null if partial response is not allowed. diff --git a/eth/catalyst/api_benchmark_test.go b/eth/catalyst/api_benchmark_test.go new file mode 100644 index 0000000000..377e5caa43 --- /dev/null +++ b/eth/catalyst/api_benchmark_test.go @@ -0,0 +1,739 @@ +// Copyright 2026 The go-ethereum Authors +// This file is part of the go-ethereum library. +// +// The go-ethereum library is free software: you can redistribute it and/or modify +// it under the terms of the GNU Lesser General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// The go-ethereum library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the go-ethereum library. If not, see . +package catalyst + +import ( + "context" + "crypto/ecdsa" + "crypto/sha256" + "encoding/json" + "fmt" + "io" + "math/big" + "strings" + "testing" + "time" + + "github.com/ethereum/go-ethereum/beacon/engine" + "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/core" + "github.com/ethereum/go-ethereum/core/types" + "github.com/ethereum/go-ethereum/crypto" + "github.com/ethereum/go-ethereum/crypto/kzg4844" + "github.com/ethereum/go-ethereum/eth" + "github.com/ethereum/go-ethereum/node" + "github.com/ethereum/go-ethereum/params" + "github.com/ethereum/go-ethereum/rlp" + "github.com/ethereum/go-ethereum/rpc" + "github.com/holiman/uint256" +) + +// encodingType specifies which encoding to use in benchmarks +type encodingType int + +const ( + encNone encodingType = iota + encJSON + encJSONCustom + encRLP +) + +func (e encodingType) String() string { + switch e { + case encNone: + return "none" + case encJSON: + return "json" + case encJSONCustom: + return "json_custom" + case encRLP: + return "rlp" + default: + return "unknown" + } +} + +var encodingTypes = []encodingType{encNone, encJSON, encJSONCustom, encRLP} + +// benchEncode encodes the value using the specified encoding type. +// It fails the benchmark if encoding fails. +func benchEncode(b *testing.B, enc encodingType, v any) { + var err error + switch enc { + case encJSON: + _, err = json.Marshal(v) + if err != nil { + b.Fatalf("JSON marshal failed: %v", err) + } + case encJSONCustom: + if m, ok := v.(json.Marshaler); ok { + _, err = m.MarshalJSON() + } else { + _, err = json.Marshal(v) + } + if err != nil { + b.Fatalf("JSON MarshalJSON failed: %v", err) + } + case encRLP: + _, err = rlp.EncodeToBytes(v) + if err != nil { + b.Fatalf("RLP encode failed: %v", err) + } + } +} + +// benchmarkBlobCounts defines the blob counts for benchmarks +var benchmarkBlobCounts = []int{21, 72} + +// maxBenchmarkBlobs is the maximum number of blobs we need for benchmarks +var maxBenchmarkBlobs = benchmarkBlobCounts[len(benchmarkBlobCounts)-1] + +var ( + // Pre-computed blobs for benchmarks + benchBlobs []*kzg4844.Blob + benchBlobCommits []kzg4844.Commitment + benchBlobProofs []kzg4844.Proof + benchBlobCellProofs [][]kzg4844.Proof + benchBlobVHashes []common.Hash +) + +func init() { + // Pre-compute blobs for benchmarks + for i := 0; i < maxBenchmarkBlobs; i++ { + blob := &kzg4844.Blob{byte(i), byte(i >> 8)} + benchBlobs = append(benchBlobs, blob) + + commit, _ := kzg4844.BlobToCommitment(blob) + benchBlobCommits = append(benchBlobCommits, commit) + + proof, _ := kzg4844.ComputeBlobProof(blob, commit) + benchBlobProofs = append(benchBlobProofs, proof) + + cellProofs, _ := kzg4844.ComputeCellProofs(blob) + benchBlobCellProofs = append(benchBlobCellProofs, cellProofs) + + vhash := kzg4844.CalcBlobHashV1(sha256.New(), &commit) + benchBlobVHashes = append(benchBlobVHashes, vhash) + } +} + +// benchFork specifies which fork to use in benchmark environments +type benchFork int + +const ( + forkCancun benchFork = iota + forkPrague + forkOsaka +) + +// benchmarkBlobEnv holds the environment for blob benchmarks +type benchmarkBlobEnv struct { + node *node.Node + eth *eth.Ethereum + api *ConsensusAPI + config *params.ChainConfig + keys []*ecdsa.PrivateKey + vhashes []common.Hash + version byte + blobCount int + nonces []uint64 // current nonce for each key +} + +// makeBenchBlobTx creates a blob transaction with the specified number of blobs. +// blobOffset indicates which pre-computed blobs to use. +func makeBenchBlobTx(chainConfig *params.ChainConfig, nonce uint64, blobCount int, blobOffset int, key *ecdsa.PrivateKey, version byte) *types.Transaction { + var ( + blobs []kzg4844.Blob + blobHashes []common.Hash + commitments []kzg4844.Commitment + proofs []kzg4844.Proof + ) + for i := 0; i < blobCount; i++ { + idx := blobOffset + i + blobs = append(blobs, *benchBlobs[idx]) + commitments = append(commitments, benchBlobCommits[idx]) + if version == types.BlobSidecarVersion0 { + proofs = append(proofs, benchBlobProofs[idx]) + } else { + proofs = append(proofs, benchBlobCellProofs[idx]...) + } + blobHashes = append(blobHashes, benchBlobVHashes[idx]) + } + blobtx := &types.BlobTx{ + ChainID: uint256.MustFromBig(chainConfig.ChainID), + Nonce: nonce, + GasTipCap: uint256.NewInt(params.GWei), + GasFeeCap: uint256.NewInt(10 * params.GWei), + Gas: 21000, + BlobFeeCap: uint256.NewInt(params.GWei), + BlobHashes: blobHashes, + Value: uint256.NewInt(100), + Sidecar: types.NewBlobTxSidecar(version, blobs, commitments, proofs), + } + return types.MustSignNewTx(key, types.LatestSigner(chainConfig), blobtx) +} + +// newBenchmarkBlobEnv creates an environment for blob benchmarks. +// It creates multiple keys and fills the pool with blob transactions totaling the specified blob count. +// version: 0 = BlobSidecarVersion0 (pre-Osaka), 1 = BlobSidecarVersion1 (Osaka+) +// fork: which fork to enable +func newBenchmarkBlobEnv(b *testing.B, blobCount int, version byte, fork benchFork) *benchmarkBlobEnv { + // Create a configuration that allows enough blobs + config := *params.MergedTestChainConfig + // Set blob schedule to allow for large blob counts (up to 128 blobs per block) + config.BlobScheduleConfig = ¶ms.BlobScheduleConfig{ + Cancun: ¶ms.BlobConfig{Target: 6, Max: 128, UpdateFraction: 3338477}, + Prague: ¶ms.BlobConfig{Target: 6, Max: 128, UpdateFraction: 5007716}, + Osaka: ¶ms.BlobConfig{Target: 6, Max: 128, UpdateFraction: 5007716}, + } + // Configure fork times based on requested fork + switch fork { + case forkCancun: + config.PragueTime = nil + config.OsakaTime = nil + case forkPrague: + config.OsakaTime = nil + case forkOsaka: + // All forks enabled (default) + } + + // Generate enough keys for all the blob transactions + // Each tx can have up to 6 blobs, so we need ceil(blobCount/6) keys + numTxs := (blobCount + 5) / 6 + keys := make([]*ecdsa.PrivateKey, numTxs) + addrs := make([]common.Address, numTxs) + alloc := make(types.GenesisAlloc) + alloc[testAddr] = types.Account{Balance: testBalance} + + for i := 0; i < numTxs; i++ { + key, _ := crypto.GenerateKey() + keys[i] = key + addrs[i] = crypto.PubkeyToAddress(key.PublicKey) + // Give each account enough balance for many transactions + alloc[addrs[i]] = types.Account{Balance: new(big.Int).Mul(big.NewInt(1e18), big.NewInt(10000))} + } + + gspec := &core.Genesis{ + Config: &config, + Alloc: alloc, + Difficulty: common.Big0, + } + n, ethServ := startEthService(b, gspec, nil) + + // Collect versioned hashes for the blobs we'll use + var vhashes []common.Hash + for i := 0; i < blobCount; i++ { + vhashes = append(vhashes, benchBlobVHashes[i]) + } + + // Fill initial blob txs into the pool + env := &benchmarkBlobEnv{ + node: n, + eth: ethServ, + api: newConsensusAPIWithoutHeartbeat(ethServ), + config: &config, + keys: keys, + vhashes: vhashes, + version: version, + blobCount: blobCount, + nonces: make([]uint64, numTxs), + } + env.addBlobTxs(b) + return env +} + +// addBlobTxs adds blob transactions to the pool using the stored blobCount and per-key nonces. +// It increments each key's nonce after adding transactions. +func (env *benchmarkBlobEnv) addBlobTxs(b *testing.B) { + numTxs := (env.blobCount + 5) / 6 + var txs []*types.Transaction + blobsRemaining := env.blobCount + blobOffset := 0 + + for i := 0; i < numTxs && blobsRemaining > 0; i++ { + // Each tx gets up to 6 blobs + txBlobCount := 6 + if blobsRemaining < 6 { + txBlobCount = blobsRemaining + } + tx := makeBenchBlobTx(env.config, env.nonces[i], txBlobCount, blobOffset, env.keys[i], env.version) + txs = append(txs, tx) + blobOffset += txBlobCount + blobsRemaining -= txBlobCount + } + errs := env.eth.TxPool().Add(txs, true) + for i, err := range errs { + if err != nil { + b.Fatalf("Failed to add blob tx %d to pool: %v", i, err) + } + } + // Increment nonce for each key used + for i := 0; i < numTxs; i++ { + env.nonces[i]++ + } +} + +// Close closes the environment +func (env *benchmarkBlobEnv) Close() { + env.node.Close() +} + +// BenchmarkGetBlobsV1 benchmarks the GetBlobsV1 method with various blob counts. +// GetBlobsV1 is available at Cancun/Prague (pre-Osaka). +func BenchmarkGetBlobsV1(b *testing.B) { + for _, blobCount := range benchmarkBlobCounts { + for _, enc := range encodingTypes { + b.Run(fmt.Sprintf("blobs=%d/enc=%s", blobCount, enc), func(b *testing.B) { + env := newBenchmarkBlobEnv(b, blobCount, 0, forkPrague) + defer env.Close() + + b.ResetTimer() + for b.Loop() { + result, err := env.api.GetBlobsV1(env.vhashes) + if err != nil { + b.Fatalf("GetBlobsV1 failed: %v", err) + } + // Verify we got the expected number of blobs + if len(result) != blobCount { + b.Fatalf("expected %d blobs, got %d", blobCount, len(result)) + } + benchEncode(b, enc, result) + } + b.ReportMetric(float64(b.Elapsed().Milliseconds())/float64(b.N), "ms/op") + }) + } + } +} + +// BenchmarkGetBlobsV2Extended benchmarks the GetBlobsV2 method with various blob counts. +// GetBlobsV2 is available at Osaka+. +func BenchmarkGetBlobsV2Extended(b *testing.B) { + for _, blobCount := range benchmarkBlobCounts { + for _, enc := range encodingTypes { + b.Run(fmt.Sprintf("blobs=%d/enc=%s", blobCount, enc), func(b *testing.B) { + env := newBenchmarkBlobEnv(b, blobCount, 1, forkOsaka) + defer env.Close() + + b.ResetTimer() + for b.Loop() { + result, err := env.api.GetBlobsV2(env.vhashes) + if err != nil { + b.Fatalf("GetBlobsV2 failed: %v", err) + } + // Verify we got the expected number of blobs + if len(result) != blobCount { + b.Fatalf("expected %d blobs, got %d", blobCount, len(result)) + } + benchEncode(b, enc, result) + } + b.ReportMetric(float64(b.Elapsed().Milliseconds())/float64(b.N), "ms/op") + }) + } + } +} + +// BenchmarkGetBlobsV3 benchmarks the GetBlobsV3 method with various blob counts. +// GetBlobsV3 is available at Osaka+. +func BenchmarkGetBlobsV3(b *testing.B) { + for _, blobCount := range benchmarkBlobCounts { + for _, enc := range encodingTypes { + b.Run(fmt.Sprintf("blobs=%d/enc=%s", blobCount, enc), func(b *testing.B) { + env := newBenchmarkBlobEnv(b, blobCount, 1, forkOsaka) + defer env.Close() + + b.ResetTimer() + for b.Loop() { + result, err := env.api.GetBlobsV3(env.vhashes) + if err != nil { + b.Fatalf("GetBlobsV3 failed: %v", err) + } + // Verify we got the expected number of blobs + if len(result) != blobCount { + b.Fatalf("expected %d blobs, got %d", blobCount, len(result)) + } + benchEncode(b, enc, result) + } + b.ReportMetric(float64(b.Elapsed().Milliseconds())/float64(b.N), "ms/op") + }) + } + } +} + +// BenchmarkGetPayloadV5WithBlobs benchmarks GetPayloadV5 (Osaka fork) with blobs. +// Note: Measures single iteration performance due to NewPayload complexity at Osaka. +func BenchmarkGetPayloadV5WithBlobs(b *testing.B) { + for _, blobCount := range benchmarkBlobCounts { + for _, enc := range encodingTypes { + b.Run(fmt.Sprintf("blobs=%d/enc=%s", blobCount, enc), func(b *testing.B) { + env := newBenchmarkBlobEnv(b, blobCount, 1, forkOsaka) + defer env.Close() + + parent := env.api.eth.BlockChain().CurrentHeader() + beaconRoot := common.Hash{0x42} + + b.ResetTimer() + for i := 0; i < b.N; i++ { + // Note: We don't call addBlobTxs here because we can't advance the chain + // (NewPayloadV5 requires execution requests). The same transactions are + // reused for each iteration, which still benchmarks the GetPayload performance. + timestamp := parent.Time + 12 + fcState := engine.ForkchoiceStateV1{ + HeadBlockHash: parent.Hash(), + SafeBlockHash: parent.Hash(), + FinalizedBlockHash: parent.Hash(), + } + payloadAttr := &engine.PayloadAttributes{ + Timestamp: timestamp, + Random: common.Hash{byte(i)}, + SuggestedFeeRecipient: testAddr, + Withdrawals: []*types.Withdrawal{}, + BeaconRoot: &beaconRoot, + } + resp, err := env.api.ForkchoiceUpdatedV3(context.Background(), fcState, payloadAttr) + if err != nil { + b.Fatalf("ForkchoiceUpdatedV3 failed: %v", err) + } + if resp.PayloadID == nil { + b.Fatalf("ForkchoiceUpdatedV3 returned nil PayloadID") + } + // Wait for the payload to be built with transactions + time.Sleep(100 * time.Millisecond) + + envelope, err := env.api.GetPayloadV5(*resp.PayloadID) + if err != nil { + b.Fatalf("GetPayloadV5 failed: %v", err) + } + if envelope.BlobsBundle == nil { + b.Fatalf("BlobsBundle is nil") + } + // Verify we got the expected number of blobs + if len(envelope.BlobsBundle.Blobs) != blobCount { + b.Fatalf("expected %d blobs, got %d", blobCount, len(envelope.BlobsBundle.Blobs)) + } + benchEncode(b, enc, envelope) + } + b.ReportMetric(float64(b.Elapsed().Milliseconds())/float64(b.N), "ms/op") + }) + } + } +} + +// BenchmarkNewPayloadV3WithBlobs benchmarks the NewPayloadV3 method with various blob counts. +// Each iteration processes a payload with the full blob count. +func BenchmarkNewPayloadV3WithBlobs(b *testing.B) { + for _, blobCount := range benchmarkBlobCounts { + for _, enc := range encodingTypes { + b.Run(fmt.Sprintf("blobs=%d/enc=%s", blobCount, enc), func(b *testing.B) { + env := newBenchmarkBlobEnv(b, blobCount, 0, forkCancun) + defer env.Close() + + parent := env.api.eth.BlockChain().CurrentHeader() + beaconRoot := common.Hash{0x42} + + // Build a payload first to get valid executable data + timestamp := parent.Time + 12 + fcState := engine.ForkchoiceStateV1{ + HeadBlockHash: parent.Hash(), + SafeBlockHash: parent.Hash(), + FinalizedBlockHash: parent.Hash(), + } + payloadAttr := &engine.PayloadAttributes{ + Timestamp: timestamp, + Random: common.Hash{0x01}, + SuggestedFeeRecipient: testAddr, + Withdrawals: []*types.Withdrawal{}, + BeaconRoot: &beaconRoot, + } + resp, err := env.api.ForkchoiceUpdatedV3(context.Background(), fcState, payloadAttr) + if err != nil { + b.Fatalf("ForkchoiceUpdatedV3 failed: %v", err) + } + if resp.PayloadID == nil { + b.Fatalf("ForkchoiceUpdatedV3 returned nil PayloadID") + } + // Wait for the payload to be built with transactions + time.Sleep(100 * time.Millisecond) + + // Get the payload + envelope, err := env.api.GetPayloadV3(*resp.PayloadID) + if err != nil { + b.Fatalf("GetPayloadV3 failed: %v", err) + } + // Verify we got the expected number of blobs + if len(envelope.BlobsBundle.Blobs) != blobCount { + b.Fatalf("expected %d blobs in setup, got %d", blobCount, len(envelope.BlobsBundle.Blobs)) + } + + execData := envelope.ExecutionPayload + // Collect versioned hashes from blobs bundle + vhashes := make([]common.Hash, len(envelope.BlobsBundle.Commitments)) + for j, commitment := range envelope.BlobsBundle.Commitments { + var commit kzg4844.Commitment + copy(commit[:], commitment) + vhashes[j] = kzg4844.CalcBlobHashV1(sha256.New(), &commit) + } + + b.ResetTimer() + for i := 0; i < b.N; i++ { + // NewPayload is idempotent, calling it multiple times with the same data + // should return the same result. The payload contains blobCount blobs. + result, err := env.api.NewPayloadV3(context.Background(), *execData, vhashes, &beaconRoot) + if err != nil { + b.Fatalf("NewPayloadV3 failed: %v", err) + } + benchEncode(b, enc, result) + } + b.ReportMetric(float64(b.Elapsed().Milliseconds())/float64(b.N), "ms/op") + }) + } + } +} + +// BenchmarkForkchoiceUpdatedWithBlobPayload benchmarks forkchoice updates that trigger +// payload building with blob transactions. +// Note: Measures ForkchoiceUpdated performance with blob transactions in the pool. +func BenchmarkForkchoiceUpdatedWithBlobPayload(b *testing.B) { + for _, blobCount := range benchmarkBlobCounts { + for _, enc := range encodingTypes { + b.Run(fmt.Sprintf("blobs=%d/enc=%s", blobCount, enc), func(b *testing.B) { + env := newBenchmarkBlobEnv(b, blobCount, 0, forkCancun) + defer env.Close() + + parent := env.api.eth.BlockChain().CurrentHeader() + beaconRoot := common.Hash{0x42} + + b.ResetTimer() + for i := 0; i < b.N; i++ { + // Note: We don't call addBlobTxs here because the blob pool has + // a per-account limit of 16 transactions. The same transactions are + // reused for each iteration, which still benchmarks the ForkchoiceUpdated + // performance with blob transactions in the pool. + timestamp := parent.Time + 12 + fcState := engine.ForkchoiceStateV1{ + HeadBlockHash: parent.Hash(), + SafeBlockHash: parent.Hash(), + FinalizedBlockHash: parent.Hash(), + } + payloadAttr := &engine.PayloadAttributes{ + Timestamp: timestamp, + Random: common.Hash{byte(i)}, + SuggestedFeeRecipient: testAddr, + Withdrawals: []*types.Withdrawal{}, + BeaconRoot: &beaconRoot, + } + resp, err := env.api.ForkchoiceUpdatedV3(context.Background(), fcState, payloadAttr) + if err != nil { + b.Fatalf("ForkchoiceUpdatedV3 failed: %v", err) + } + if resp.PayloadID == nil { + b.Fatalf("ForkchoiceUpdatedV3 returned nil PayloadID") + } + benchEncode(b, enc, resp) + } + b.ReportMetric(float64(b.Elapsed().Milliseconds())/float64(b.N), "ms/op") + }) + } + } +} + +// BenchmarkFullBlobWorkflowOsaka benchmarks the complete blob workflow at Osaka: +// ForkchoiceUpdated -> GetPayload +// Note: Measures single iteration performance due to NewPayload complexity at Osaka. +func BenchmarkFullBlobWorkflowOsaka(b *testing.B) { + for _, blobCount := range benchmarkBlobCounts { + for _, enc := range encodingTypes { + b.Run(fmt.Sprintf("blobs=%d/enc=%s", blobCount, enc), func(b *testing.B) { + env := newBenchmarkBlobEnv(b, blobCount, 1, forkOsaka) + defer env.Close() + + parent := env.api.eth.BlockChain().CurrentHeader() + beaconRoot := common.Hash{0x42} + + b.ResetTimer() + for i := 0; i < b.N; i++ { + // Note: We don't call addBlobTxs here because we can't advance the chain + // (NewPayloadV5 requires execution requests). The same transactions are + // reused for each iteration, which still benchmarks the workflow performance. + + // 1. ForkchoiceUpdated to build payload + timestamp := parent.Time + 12 + fcState := engine.ForkchoiceStateV1{ + HeadBlockHash: parent.Hash(), + SafeBlockHash: parent.Hash(), + FinalizedBlockHash: parent.Hash(), + } + payloadAttr := &engine.PayloadAttributes{ + Timestamp: timestamp, + Random: common.Hash{byte(i)}, + SuggestedFeeRecipient: testAddr, + Withdrawals: []*types.Withdrawal{}, + BeaconRoot: &beaconRoot, + } + resp, err := env.api.ForkchoiceUpdatedV3(context.Background(), fcState, payloadAttr) + if err != nil { + b.Fatalf("ForkchoiceUpdatedV3 failed: %v", err) + } + if resp.PayloadID == nil { + b.Fatalf("ForkchoiceUpdatedV3 returned nil PayloadID") + } + // Encode ForkchoiceUpdated response + benchEncode(b, enc, resp) + + // Wait for the payload to be built with transactions + time.Sleep(100 * time.Millisecond) + + // 2. GetPayload + envelope, err := env.api.GetPayloadV5(*resp.PayloadID) + if err != nil { + b.Fatalf("GetPayloadV5 failed: %v", err) + } + if envelope.BlobsBundle == nil { + b.Fatalf("BlobsBundle is nil") + } + // Verify we got the expected number of blobs + if len(envelope.BlobsBundle.Blobs) != blobCount { + b.Fatalf("expected %d blobs, got %d", blobCount, len(envelope.BlobsBundle.Blobs)) + } + // Encode GetPayload response + benchEncode(b, enc, envelope) + } + b.ReportMetric(float64(b.Elapsed().Milliseconds())/float64(b.N), "ms/op") + }) + } + } +} + +// discardConn is a net.Conn-like writer that discards all output. +// Used to measure server-side RPC cost without client-side decoding. +type discardConn struct { + io.Reader + io.Writer +} + +func (discardConn) Close() error { return nil } +func (discardConn) SetWriteDeadline(time.Time) error { return nil } + +// BenchmarkGetPayloadV5RPCServerOnly benchmarks only the EL server-side cost of +// engine_getPayloadV5: method dispatch, JSON serialization, and wire encoding. +// Client-side decoding is excluded by writing to io.Discard. +func BenchmarkGetPayloadV5RPCServerOnly(b *testing.B) { + blobCount := 72 + env := newBenchmarkBlobEnv(b, blobCount, 1, forkOsaka) + defer env.Close() + + // Register the engine API on the running node's in-process RPC server. + rpcServer, err := env.node.RPCHandler() + if err != nil { + b.Fatalf("RPCHandler failed: %v", err) + } + rpcServer.RegisterName("engine", env.api) + + parent := env.api.eth.BlockChain().CurrentHeader() + beaconRoot := common.Hash{0x42} + + // Build one payload to get a valid payloadID. + fcState := engine.ForkchoiceStateV1{ + HeadBlockHash: parent.Hash(), + SafeBlockHash: parent.Hash(), + FinalizedBlockHash: parent.Hash(), + } + payloadAttr := &engine.PayloadAttributes{ + Timestamp: parent.Time + 12, + Random: common.Hash{0x01}, + SuggestedFeeRecipient: testAddr, + Withdrawals: []*types.Withdrawal{}, + BeaconRoot: &beaconRoot, + } + resp, err := env.api.ForkchoiceUpdatedV3(context.Background(), fcState, payloadAttr) + if err != nil { + b.Fatalf("ForkchoiceUpdatedV3 failed: %v", err) + } + if resp.PayloadID == nil { + b.Fatalf("ForkchoiceUpdatedV3 returned nil PayloadID") + } + time.Sleep(100 * time.Millisecond) + + // Verify the payload has the expected blobs via the direct API first. + envelope, err := env.api.GetPayloadV5(*resp.PayloadID) + if err != nil { + b.Fatalf("GetPayloadV5 failed: %v", err) + } + if len(envelope.BlobsBundle.Blobs) != blobCount { + b.Fatalf("expected %d blobs, got %d", blobCount, len(envelope.BlobsBundle.Blobs)) + } + b.Logf("payload size: %d blobs, %d txs", len(envelope.BlobsBundle.Blobs), len(envelope.ExecutionPayload.Transactions)) + + // Build the JSON-RPC request bytes once. + reqJSON := fmt.Sprintf(`{"jsonrpc":"2.0","id":1,"method":"engine_getPayloadV5","params":["%s"]}`, resp.PayloadID.String()) + + b.ResetTimer() + for i := 0; i < b.N; i++ { + conn := discardConn{ + Reader: strings.NewReader(reqJSON), + Writer: io.Discard, + } + codec := rpc.NewCodec(conn) + rpcServer.ServeCodec(codec, 0) + } + b.StopTimer() + b.ReportMetric(float64(b.Elapsed().Milliseconds())/float64(b.N), "ms/op") +} + +// BenchmarkGetBlobsV3RPCServerOnly benchmarks only the EL server-side cost of +// engine_getBlobsV3: method dispatch, JSON serialization, and wire encoding. +// Client-side decoding is excluded by writing to io.Discard. +func BenchmarkGetBlobsV3RPCServerOnly(b *testing.B) { + blobCount := 72 + env := newBenchmarkBlobEnv(b, blobCount, 1, forkOsaka) + defer env.Close() + + // Register the engine API on the running node's in-process RPC server. + rpcServer, err := env.node.RPCHandler() + if err != nil { + b.Fatalf("RPCHandler failed: %v", err) + } + rpcServer.RegisterName("engine", env.api) + + // Verify the blobs are available via the direct API first. + result, err := env.api.GetBlobsV3(env.vhashes) + if err != nil { + b.Fatalf("GetBlobsV3 failed: %v", err) + } + if len(result) != blobCount { + b.Fatalf("expected %d blobs, got %d", blobCount, len(result)) + } + b.Logf("blob count: %d", blobCount) + + // Build the JSON-RPC request bytes once. + // Format the versioned hashes as a JSON array of hex strings. + var hashStrs []string + for _, h := range env.vhashes { + hashStrs = append(hashStrs, fmt.Sprintf(`"%s"`, h.Hex())) + } + reqJSON := fmt.Sprintf(`{"jsonrpc":"2.0","id":1,"method":"engine_getBlobsV3","params":[[%s]]}`, strings.Join(hashStrs, ",")) + + b.ResetTimer() + for i := 0; i < b.N; i++ { + conn := discardConn{ + Reader: strings.NewReader(reqJSON), + Writer: io.Discard, + } + codec := rpc.NewCodec(conn) + rpcServer.ServeCodec(codec, 0) + } + b.StopTimer() + b.ReportMetric(float64(b.Elapsed().Milliseconds())/float64(b.N), "ms/op") +} diff --git a/eth/catalyst/api_test.go b/eth/catalyst/api_test.go index 1f38c4dd8a..65d78d84ee 100644 --- a/eth/catalyst/api_test.go +++ b/eth/catalyst/api_test.go @@ -1961,7 +1961,7 @@ func TestGetBlobsV1(t *testing.T) { // Fill the request for retrieving blobs var ( vhashes []common.Hash - expect []*engine.BlobAndProofV1 + expect engine.BlobAndProofListV1 ) // fill missing blob at the beginning if suite.fillRandom { @@ -2072,13 +2072,13 @@ func BenchmarkGetBlobsV2(b *testing.B) { } } -type getBlobsFn func(hashes []common.Hash) ([]*engine.BlobAndProofV2, error) +type getBlobsFn func(hashes []common.Hash) (engine.BlobAndProofListV2, error) func runGetBlobs(t testing.TB, getBlobs getBlobsFn, start, limit int, fillRandom bool, expectPartialResponse bool, name string) { // Fill the request for retrieving blobs var ( vhashes []common.Hash - expect []*engine.BlobAndProofV2 + expect engine.BlobAndProofListV2 ) for j := start; j < limit; j++ { vhashes = append(vhashes, testBlobVHashes[j]) diff --git a/go.mod b/go.mod index 17897a62c0..56869d255d 100644 --- a/go.mod +++ b/go.mod @@ -26,6 +26,7 @@ require ( github.com/ethereum/hid v1.0.1-0.20260421154323-c2ab8d9bf68a github.com/fatih/color v1.16.0 github.com/ferranbt/fastssz v0.1.4 + github.com/fjl/jsonw v0.1.0 github.com/fsnotify/fsnotify v1.6.0 github.com/gballet/go-libpcsclite v0.0.0-20190607065134-2772fd86a8ff github.com/gofrs/flock v0.12.1 @@ -121,7 +122,7 @@ require ( github.com/deepmap/oapi-codegen v1.6.0 // indirect github.com/dlclark/regexp2 v1.7.0 // indirect github.com/emicklei/dot v1.6.2 // indirect - github.com/fjl/gencodec v0.1.0 // indirect + github.com/fjl/gencodec v0.1.2 // indirect github.com/garslo/gogen v0.0.0-20170306192744-1d203ffc1f61 // indirect github.com/getsentry/sentry-go v0.27.0 // indirect github.com/go-ole/go-ole v1.3.0 // indirect diff --git a/go.sum b/go.sum index bad8a44cfd..6335fb5698 100644 --- a/go.sum +++ b/go.sum @@ -123,8 +123,10 @@ github.com/fatih/color v1.16.0 h1:zmkK9Ngbjj+K0yRhTVONQh1p/HknKYSlNT+vZCzyokM= github.com/fatih/color v1.16.0/go.mod h1:fL2Sau1YI5c0pdGEVCbKQbLXB6edEj1ZgiY4NijnWvE= github.com/ferranbt/fastssz v0.1.4 h1:OCDB+dYDEQDvAgtAGnTSidK1Pe2tW3nFV40XyMkTeDY= github.com/ferranbt/fastssz v0.1.4/go.mod h1:Ea3+oeoRGGLGm5shYAeDgu6PGUlcvQhE2fILyD9+tGg= -github.com/fjl/gencodec v0.1.0 h1:B3K0xPfc52cw52BBgUbSPxYo+HlLfAgWMVKRWXUXBcs= -github.com/fjl/gencodec v0.1.0/go.mod h1:Um1dFHPONZGTHog1qD1NaWjXJW/SPB38wPv0O8uZ2fI= +github.com/fjl/gencodec v0.1.2 h1:nf+MMsmuii5ZQMbS6/xjZoe5LRkN0415FOJOSwmnuW8= +github.com/fjl/gencodec v0.1.2/go.mod h1:chDHL3wKXuBgauP8x3XNZkl5EIAR5SoCTmmmDTZRzmw= +github.com/fjl/jsonw v0.1.0 h1:V3MyR79fjLpn/+bMgvegdGUIhoJOzjmqWcKDgcOmY1I= +github.com/fjl/jsonw v0.1.0/go.mod h1:2KMLevM6FXEJnfhtk7naXu9vZdVfOma1GlnGdPRlumU= github.com/fsnotify/fsnotify v1.4.7/go.mod h1:jwhsz4b93w/PPRr/qN1Yymfu8t87LnFCMoQvtojpjFo= github.com/fsnotify/fsnotify v1.4.9/go.mod h1:znqG4EE+3YCdAaPaxE2ZRY/06pZUdp0tY4IgpuI1SZQ= github.com/fsnotify/fsnotify v1.6.0 h1:n+5WquG0fcWoWp6xPWfHdbskMCQaFnG6PfBrh1Ky4HY= diff --git a/rpc/client.go b/rpc/client.go index 8d81503d59..9175626241 100644 --- a/rpc/client.go +++ b/rpc/client.go @@ -364,7 +364,7 @@ func (c *Client) CallContext(ctx context.Context, result interface{}, method str resp := batchresp[0] switch { case resp.Error != nil: - return resp.Error + return resp.decodeError() case len(resp.Result) == 0: return ErrNoResult default: @@ -419,7 +419,7 @@ func (c *Client) BatchCallContext(ctx context.Context, b []BatchElem) error { if c.isHTTP { err = c.sendBatchHTTP(ctx, op, msgs) } else { - err = c.send(ctx, op, msgs) + err = c.sendBatch(ctx, op, msgs) } if err != nil { return err @@ -449,7 +449,7 @@ func (c *Client) BatchCallContext(ctx context.Context, b []BatchElem) error { elem := &b[index] switch { case resp.Error != nil: - elem.Error = resp.Error + elem.Error = resp.decodeError() case resp.Result == nil: elem.Error = ErrNoResult default: @@ -552,7 +552,7 @@ func (c *Client) newMessage(method string, paramsIn ...interface{}) (*jsonrpcMes // send registers op with the dispatch loop, then sends msg on the connection. // if sending fails, op is deregistered. -func (c *Client) send(ctx context.Context, op *requestOp, msg interface{}) error { +func (c *Client) send(ctx context.Context, op *requestOp, msg *jsonrpcMessage) error { select { case c.reqInit <- op: err := c.write(ctx, msg, false) @@ -567,7 +567,22 @@ func (c *Client) send(ctx context.Context, op *requestOp, msg interface{}) error } } -func (c *Client) write(ctx context.Context, msg interface{}, retry bool) error { +// sendBatch registers op with the dispatch loop, then sends a batch of messages +// on the connection. If sending fails, op is deregistered. +func (c *Client) sendBatch(ctx context.Context, op *requestOp, msgs []*jsonrpcMessage) error { + select { + case c.reqInit <- op: + err := c.writeBatch(ctx, msgs, false) + c.reqSent <- err + return err + case <-ctx.Done(): + return ctx.Err() + case <-c.closing: + return ErrClientQuit + } +} + +func (c *Client) write(ctx context.Context, msg *jsonrpcMessage, retry bool) error { if c.writeConn == nil { // The previous write failed. Try to establish a new connection. if err := c.reconnect(ctx); err != nil { @@ -584,6 +599,22 @@ func (c *Client) write(ctx context.Context, msg interface{}, retry bool) error { return err } +func (c *Client) writeBatch(ctx context.Context, msgs []*jsonrpcMessage, retry bool) error { + if c.writeConn == nil { + if err := c.reconnect(ctx); err != nil { + return err + } + } + err := c.writeConn.writeJSONBatch(ctx, msgs, false) + if err != nil { + c.writeConn = nil + if !retry { + return c.writeBatch(ctx, msgs, true) + } + } + return err +} + func (c *Client) reconnect(ctx context.Context) error { if c.reconnectFunc == nil { return errDead diff --git a/rpc/handler.go b/rpc/handler.go index c0af162f13..89fc78236c 100644 --- a/rpc/handler.go +++ b/rpc/handler.go @@ -169,7 +169,7 @@ func (b *batchCallBuffer) doWrite(ctx context.Context, conn jsonWriter, isErrorR } b.wrote = true // can only write once if len(b.resp) > 0 { - conn.writeJSON(ctx, b.resp, isErrorResponse) + conn.writeJSONBatch(ctx, b.resp, isErrorResponse) } } @@ -237,7 +237,7 @@ func (h *handler) handleBatch(msgs []*jsonrpcMessage) { resp := h.handleCallMsg(cp, msg) callBuffer.pushResponse(resp) if resp != nil && h.batchResponseMaxSize != 0 { - responseBytes += len(resp.Result) + responseBytes += len(resp.Result) + len(resp.Error) if responseBytes > h.batchResponseMaxSize { err := &internalServerError{errcodeResponseTooLarge, errMsgResponseTooLarge} callBuffer.respondWithError(cp.ctx, h.conn, err) @@ -268,7 +268,7 @@ func (h *handler) respondWithBatchTooLarge(cp *callProc, batch []*jsonrpcMessage break } } - h.conn.writeJSON(cp.ctx, []*jsonrpcMessage{resp}, true) + h.conn.writeJSONBatch(cp.ctx, []*jsonrpcMessage{resp}, true) } // handleMsg handles a single non-batch message. @@ -415,7 +415,7 @@ func (h *handler) handleResponses(batch []*jsonrpcMessage, handleCall func(*json // the op.resp channel. if op.sub != nil { if msg.Error != nil { - op.err = msg.Error + op.err = msg.decodeError() } else { op.err = json.Unmarshal(msg.Result, &op.sub.subid) if op.err == nil { @@ -481,9 +481,10 @@ func (h *handler) handleCallMsg(ctx *callProc, msg *jsonrpcMessage) *jsonrpcMess var logctx []any logctx = append(logctx, "reqid", idForLog{msg.ID}, "duration", time.Since(start)) if resp.Error != nil { - logctx = append(logctx, "err", resp.Error.Message) - if resp.Error.Data != nil { - logctx = append(logctx, "errdata", formatErrorData(resp.Error.Data)) + je := resp.decodeError() + logctx = append(logctx, "err", je.Message) + if je.Data != nil { + logctx = append(logctx, "errdata", formatErrorData(je.Data)) } h.log.Warn("Served "+msg.Method, logctx...) } else { @@ -550,7 +551,7 @@ func (h *handler) handleCall(cp *callProc, msg *jsonrpcMessage) *jsonrpcMessage answer := h.runMethod(rctx, msg, callb, args) var rErr error if answer.Error != nil { - rErr = errors.New(answer.Error.Message) + rErr = errors.New(answer.decodeError().Message) } rSpanEnd(&rErr) @@ -623,7 +624,7 @@ func (h *handler) runMethod(ctx context.Context, msg *jsonrpcMessage, callb *cal _, _, spanEnd := telemetry.StartSpanWithTracer(ctx, h.tracer(), "rpc.encodeJSONResponse", attributes...) response := msg.response(result) if response.Error != nil { - err = errors.New(response.Error.Message) + err = errors.New(response.decodeError().Message) } spanEnd(&err) return response diff --git a/rpc/http.go b/rpc/http.go index 55f0abfa72..49618244df 100644 --- a/rpc/http.go +++ b/rpc/http.go @@ -57,10 +57,14 @@ type httpConn struct { // and some methods don't work. The panic() stubs here exist to ensure // this special treatment is correct. -func (hc *httpConn) writeJSON(context.Context, interface{}, bool) error { +func (hc *httpConn) writeJSON(context.Context, *jsonrpcMessage, bool) error { panic("writeJSON called on httpConn") } +func (hc *httpConn) writeJSONBatch(context.Context, []*jsonrpcMessage, bool) error { + panic("writeJSONBatch called on httpConn") +} + func (hc *httpConn) peerInfo() PeerInfo { panic("peerInfo called on httpConn") } @@ -179,9 +183,9 @@ func cleanlyCloseBody(body io.ReadCloser) error { return body.Close() } -func (c *Client) sendHTTP(ctx context.Context, op *requestOp, msg interface{}) error { +func (c *Client) sendHTTP(ctx context.Context, op *requestOp, msg *jsonrpcMessage) error { hc := c.writeConn.(*httpConn) - respBody, err := hc.doRequest(ctx, msg) + respBody, err := hc.doRequest(ctx, appendMessage(nil, msg)) if err != nil { return err } @@ -198,7 +202,7 @@ func (c *Client) sendHTTP(ctx context.Context, op *requestOp, msg interface{}) e func (c *Client) sendBatchHTTP(ctx context.Context, op *requestOp, msgs []*jsonrpcMessage) error { hc := c.writeConn.(*httpConn) - respBody, err := hc.doRequest(ctx, msgs) + respBody, err := hc.doRequest(ctx, appendBatch(nil, msgs)) if err != nil { return err } @@ -212,11 +216,7 @@ func (c *Client) sendBatchHTTP(ctx context.Context, op *requestOp, msgs []*jsonr return nil } -func (hc *httpConn) doRequest(ctx context.Context, msg interface{}) (io.ReadCloser, error) { - body, err := json.Marshal(msg) - if err != nil { - return nil, err - } +func (hc *httpConn) doRequest(ctx context.Context, body []byte) (io.ReadCloser, error) { req, err := http.NewRequestWithContext(ctx, http.MethodPost, hc.url, io.NopCloser(bytes.NewReader(body))) if err != nil { return nil, err @@ -268,41 +268,51 @@ func (s *Server) newHTTPServerConn(r *http.Request, w http.ResponseWriter) Serve body := io.LimitReader(r.Body, int64(s.httpBodyLimit)) conn := &httpServerConn{Reader: body, Writer: w, r: r} - encoder := func(v any, isErrorResponse bool) error { - if !isErrorResponse { - return json.NewEncoder(conn).Encode(v) - } - - // It's an error response and requires special treatment. - // - // In case of a timeout error, the response must be written before the HTTP - // server's write timeout occurs. So we need to flush the response. The - // Content-Length header also needs to be set to ensure the client knows - // when it has the full response. - encdata, err := json.Marshal(v) - if err != nil { - return err - } - w.Header().Set("content-length", strconv.Itoa(len(encdata))) - - // If this request is wrapped in a handler that might remove Content-Length (such - // as the automatic gzip we do in package node), we need to ensure the HTTP server - // doesn't perform chunked encoding. In case WriteTimeout is reached, the chunked - // encoding might not be finished correctly, and some clients do not like it when - // the final chunk is missing. - w.Header().Set("transfer-encoding", "identity") - - _, err = w.Write(encdata) - if f, ok := w.(http.Flusher); ok { - f.Flush() - } - return err + var buf []byte + encodeMsg := func(msg *jsonrpcMessage, isError bool) error { + buf = appendMessage(buf[:0], msg) + return httpWriteResult(w, buf, isError) + } + encodeBatch := func(msgs []*jsonrpcMessage, isError bool) error { + buf = appendBatch(buf[:0], msgs) + return httpWriteResult(w, buf, isError) } dec := json.NewDecoder(conn) dec.UseNumber() - return NewFuncCodec(conn, encoder, dec.Decode) + return NewFuncCodec(conn, encodeMsg, encodeBatch, dec.Decode) +} + +// httpWriteResult writes pre-encoded response data over HTTP. +// For error responses, it sets Content-Length and flushes to ensure the response +// is fully written before any HTTP server write timeout occurs. +func httpWriteResult(w http.ResponseWriter, data []byte, isError bool) error { + if !isError { + _, err := w.Write(data) + return err + } + + // It's an error response and requires special treatment. + // + // In case of a timeout error, the response must be written before the HTTP + // server's write timeout occurs. So we need to flush the response. The + // Content-Length header also needs to be set to ensure the client knows + // when it has the full response. + w.Header().Set("content-length", strconv.Itoa(len(data))) + + // If this request is wrapped in a handler that might remove Content-Length (such + // as the automatic gzip we do in package node), we need to ensure the HTTP server + // doesn't perform chunked encoding. In case WriteTimeout is reached, the chunked + // encoding might not be finished correctly, and some clients do not like it when + // the final chunk is missing. + w.Header().Set("transfer-encoding", "identity") + + _, err := w.Write(data) + if f, ok := w.(http.Flusher); ok { + f.Flush() + } + return err } // Close does nothing and always returns nil. diff --git a/rpc/json.go b/rpc/json.go index fcd801fc95..9813acae73 100644 --- a/rpc/json.go +++ b/rpc/json.go @@ -27,6 +27,8 @@ import ( "strings" "sync" "time" + + "github.com/fjl/jsonw" ) const ( @@ -52,12 +54,6 @@ type subscriptionResultEnc struct { Result any `json:"result"` } -type jsonrpcSubscriptionNotification struct { - Version string `json:"jsonrpc"` - Method string `json:"method"` - Params subscriptionResultEnc `json:"params"` -} - // A value of this type can a JSON-RPC request, notification, successful response or // error response. Which one it is depends on the fields. type jsonrpcMessage struct { @@ -65,7 +61,7 @@ type jsonrpcMessage struct { ID json.RawMessage `json:"id,omitempty"` Method string `json:"method,omitempty"` Params json.RawMessage `json:"params,omitempty"` - Error *jsonError `json:"error,omitempty"` + Error json.RawMessage `json:"error,omitempty"` Result json.RawMessage `json:"result,omitempty"` } @@ -113,8 +109,29 @@ func (msg *jsonrpcMessage) errorResponse(err error) *jsonrpcMessage { return resp } +// decodeError decodes the Error field into a jsonError struct. +func (msg *jsonrpcMessage) decodeError() *jsonError { + if msg.Error == nil { + return nil + } + je := new(jsonError) + json.Unmarshal(msg.Error, je) + return je +} + func (msg *jsonrpcMessage) response(result interface{}) *jsonrpcMessage { - enc, err := json.Marshal(result) + var ( + enc []byte + err error + ) + // Call MarshalJSON directly for types that implement it. This avoids the + // expensive validation/compaction pass that json.Marshal performs on + // encoder output. + if m, ok := result.(json.Marshaler); ok { + enc, err = m.MarshalJSON() + } else { + enc, err = json.Marshal(result) + } if err != nil { return msg.errorResponse(&internalServerError{errcodeMarshalError, err.Error()}) } @@ -122,19 +139,18 @@ func (msg *jsonrpcMessage) response(result interface{}) *jsonrpcMessage { } func errorMessage(err error) *jsonrpcMessage { - msg := &jsonrpcMessage{Version: vsn, ID: null, Error: &jsonError{ + je := &jsonError{ Code: errcodeDefault, Message: err.Error(), - }} - ec, ok := err.(Error) - if ok { - msg.Error.Code = ec.ErrorCode() } - de, ok := err.(DataError) - if ok { - msg.Error.Data = de.ErrorData() + if ec, ok := err.(Error); ok { + je.Code = ec.ErrorCode() } - return msg + if de, ok := err.(DataError); ok { + je.Data = de.ErrorData() + } + enc, _ := json.Marshal(je) + return &jsonrpcMessage{Version: vsn, ID: null, Error: enc} } type jsonError struct { @@ -179,28 +195,32 @@ type ConnRemoteAddr interface { // jsonCodec reads and writes JSON-RPC messages to the underlying connection. It also has // support for parsing arguments and serializing (result) objects. type jsonCodec struct { - remote string - closer sync.Once // close closed channel once - closeCh chan interface{} // closed on Close - decode decodeFunc // decoder to allow multiple transports - encMu sync.Mutex // guards the encoder - encode encodeFunc // encoder to allow multiple transports - conn deadlineCloser + remote string + closer sync.Once // close closed channel once + closeCh chan interface{} // closed on Close + decode decodeFunc // decoder to allow multiple transports + encMu sync.Mutex // guards the encoder + encodeMsg encodeMsgFunc // single-message encoder + encodeBatch encodeBatchFunc // batch encoder + conn deadlineCloser } -type encodeFunc = func(v interface{}, isErrorResponse bool) error +type encodeMsgFunc = func(msg *jsonrpcMessage, isError bool) error + +type encodeBatchFunc = func(msgs []*jsonrpcMessage, isError bool) error type decodeFunc = func(v interface{}) error // NewFuncCodec creates a codec which uses the given functions to read and write. If conn // implements ConnRemoteAddr, log messages will use it to include the remote address of // the connection. -func NewFuncCodec(conn deadlineCloser, encode encodeFunc, decode decodeFunc) ServerCodec { +func NewFuncCodec(conn deadlineCloser, encodeMsg encodeMsgFunc, encodeBatch encodeBatchFunc, decode decodeFunc) ServerCodec { codec := &jsonCodec{ - closeCh: make(chan interface{}), - encode: encode, - decode: decode, - conn: conn, + closeCh: make(chan interface{}), + encodeMsg: encodeMsg, + encodeBatch: encodeBatch, + decode: decode, + conn: conn, } if ra, ok := conn.(ConnRemoteAddr); ok { codec.remote = ra.RemoteAddr() @@ -211,14 +231,62 @@ func NewFuncCodec(conn deadlineCloser, encode encodeFunc, decode decodeFunc) Ser // NewCodec creates a codec on the given connection. If conn implements ConnRemoteAddr, log // messages will use it to include the remote address of the connection. func NewCodec(conn Conn) ServerCodec { - enc := json.NewEncoder(conn) dec := json.NewDecoder(conn) dec.UseNumber() - - encode := func(v interface{}, isErrorResponse bool) error { - return enc.Encode(v) + var buf []byte + encodeMsg := func(msg *jsonrpcMessage, isError bool) error { + buf = appendMessage(buf[:0], msg) + buf = append(buf, '\n') + _, err := conn.Write(buf) + return err } - return NewFuncCodec(conn, encode, dec.Decode) + encodeBatch := func(msgs []*jsonrpcMessage, isError bool) error { + buf = appendBatch(buf[:0], msgs) + buf = append(buf, '\n') + _, err := conn.Write(buf) + return err + } + return NewFuncCodec(conn, encodeMsg, encodeBatch, dec.Decode) +} + +// appendMessage appends the JSON-RPC encoding of msg to buf. +func appendMessage(buf []byte, msg *jsonrpcMessage) []byte { + buf = append(buf, `{"jsonrpc":"2.0"`...) + if msg.ID != nil { + buf = append(buf, `,"id":`...) + buf = append(buf, msg.ID...) + } + if msg.Method != "" { + buf = append(buf, `,"method":`...) + buf = jsonw.AppendQuotedString(buf, msg.Method) + } + if msg.Params != nil { + buf = append(buf, `,"params":`...) + buf = append(buf, msg.Params...) + } + if msg.Error != nil { + buf = append(buf, `,"error":`...) + buf = append(buf, msg.Error...) + } + if msg.Result != nil { + buf = append(buf, `,"result":`...) + buf = append(buf, msg.Result...) + } + buf = append(buf, '}') + return buf +} + +// appendBatch appends the JSON-RPC encoding of a message batch to buf. +func appendBatch(buf []byte, msgs []*jsonrpcMessage) []byte { + buf = append(buf, '[') + for i, msg := range msgs { + if i > 0 { + buf = append(buf, ',') + } + buf = appendMessage(buf, msg) + } + buf = append(buf, ']') + return buf } func (c *jsonCodec) peerInfo() PeerInfo { @@ -248,7 +316,7 @@ func (c *jsonCodec) readBatch() (messages []*jsonrpcMessage, batch bool, err err return messages, batch, nil } -func (c *jsonCodec) writeJSON(ctx context.Context, v interface{}, isErrorResponse bool) error { +func (c *jsonCodec) writeJSON(ctx context.Context, msg *jsonrpcMessage, isError bool) error { c.encMu.Lock() defer c.encMu.Unlock() @@ -257,7 +325,19 @@ func (c *jsonCodec) writeJSON(ctx context.Context, v interface{}, isErrorRespons deadline = time.Now().Add(defaultWriteTimeout) } c.conn.SetWriteDeadline(deadline) - return c.encode(v, isErrorResponse) + return c.encodeMsg(msg, isError) +} + +func (c *jsonCodec) writeJSONBatch(ctx context.Context, msgs []*jsonrpcMessage, isError bool) error { + c.encMu.Lock() + defer c.encMu.Unlock() + + deadline, ok := ctx.Deadline() + if !ok { + deadline = time.Now().Add(defaultWriteTimeout) + } + c.conn.SetWriteDeadline(deadline) + return c.encodeBatch(msgs, isError) } func (c *jsonCodec) close() { diff --git a/rpc/server_test.go b/rpc/server_test.go index 8334d4e80d..a2b8af2b7f 100644 --- a/rpc/server_test.go +++ b/rpc/server_test.go @@ -208,6 +208,48 @@ func TestServerBatchResponseSizeLimit(t *testing.T) { } } +// TestServerBatchResponseSizeLimit_errorResponses verifies that error responses +// are counted toward BatchResponseMaxSize. +func TestServerBatchResponseSizeLimit_errorResponses(t *testing.T) { + t.Parallel() + + server := newTestServer() + defer server.Stop() + // Each error response for test_returnError is ~58 bytes of JSON in the Error field. + // Set limit to 100 so 1 response fits (58 bytes) but the 2nd (116 bytes) exceeds it. + server.SetBatchLimits(100, 100) + var ( + batch []BatchElem + client = DialInProc(server) + ) + for i := 0; i < 5; i++ { + batch = append(batch, BatchElem{ + Method: "test_returnError", + Result: new(int), + }) + } + if err := client.BatchCall(batch); err != nil { + t.Fatal("error sending batch:", err) + } + for i := range batch { + re, ok := batch[i].Error.(Error) + if !ok { + t.Fatalf("batch elem %d has wrong error type: %v", i, batch[i].Error) + } + if i < 2 { + // First two: elem 0 fits under limit, elem 1 pushes over but is already processed. + if re.ErrorCode() != 444 { + t.Errorf("batch elem %d wrong error code, have %d want 444", i, re.ErrorCode()) + } + } else { + // Remaining should be the response-too-large error. + if re.ErrorCode() != errcodeResponseTooLarge { + t.Errorf("batch elem %d wrong error code, have %d want %d", i, re.ErrorCode(), errcodeResponseTooLarge) + } + } + } +} + func TestServerWebsocketReadLimit(t *testing.T) { t.Parallel() diff --git a/rpc/subscription.go b/rpc/subscription.go index 9e400c8b60..6b90bd4a3b 100644 --- a/rpc/subscription.go +++ b/rpc/subscription.go @@ -171,13 +171,17 @@ func (n *Notifier) activate() error { } func (n *Notifier) send(sub *Subscription, data any) error { - msg := jsonrpcSubscriptionNotification{ + params, err := json.Marshal(subscriptionResultEnc{ + ID: string(sub.ID), + Result: data, + }) + if err != nil { + return err + } + msg := jsonrpcMessage{ Version: vsn, Method: n.namespace + notificationMethodSuffix, - Params: subscriptionResultEnc{ - ID: string(sub.ID), - Result: data, - }, + Params: params, } return n.h.conn.writeJSON(context.Background(), &msg, false) } diff --git a/rpc/subscription_test.go b/rpc/subscription_test.go index cd44d219de..623b496124 100644 --- a/rpc/subscription_test.go +++ b/rpc/subscription_test.go @@ -220,7 +220,7 @@ func readAndValidateMessage(in *json.Decoder) (*subConfirmation, *subscriptionRe case msg.isResponse(): var c subConfirmation if msg.Error != nil { - return nil, nil, msg.Error + return nil, nil, msg.decodeError() } else if err := json.Unmarshal(msg.Result, &c.subid); err != nil { return nil, nil, fmt.Errorf("invalid response: %v", err) } else { @@ -233,12 +233,21 @@ func readAndValidateMessage(in *json.Decoder) (*subConfirmation, *subscriptionRe } type mockConn struct { - enc *json.Encoder + w io.Writer } -// writeJSON writes a message to the connection. -func (c *mockConn) writeJSON(ctx context.Context, msg interface{}, isError bool) error { - return c.enc.Encode(msg) +func (c *mockConn) writeJSON(ctx context.Context, msg *jsonrpcMessage, isError bool) error { + buf := appendMessage(nil, msg) + buf = append(buf, '\n') + _, err := c.w.Write(buf) + return err +} + +func (c *mockConn) writeJSONBatch(ctx context.Context, msgs []*jsonrpcMessage, isError bool) error { + buf := appendBatch(nil, msgs) + buf = append(buf, '\n') + _, err := c.w.Write(buf) + return err } // closed returns a channel which is closed when the connection is closed. @@ -251,7 +260,7 @@ func (c *mockConn) remoteAddr() string { return "" } func BenchmarkNotify(b *testing.B) { id := ID("test") notifier := &Notifier{ - h: &handler{conn: &mockConn{json.NewEncoder(io.Discard)}}, + h: &handler{conn: &mockConn{io.Discard}}, sub: &Subscription{ID: id}, activated: true, } @@ -271,7 +280,7 @@ func TestNotify(t *testing.T) { out := new(bytes.Buffer) id := ID("test") notifier := &Notifier{ - h: &handler{conn: &mockConn{json.NewEncoder(out)}}, + h: &handler{conn: &mockConn{out}}, sub: &Subscription{ID: id}, activated: true, } diff --git a/rpc/types.go b/rpc/types.go index 85f15344e8..578d3f86dd 100644 --- a/rpc/types.go +++ b/rpc/types.go @@ -51,9 +51,10 @@ type ServerCodec interface { // jsonWriter can write JSON messages to its underlying connection. // Implementations must be safe for concurrent use. type jsonWriter interface { - // writeJSON writes a message to the connection. - writeJSON(ctx context.Context, msg interface{}, isError bool) error - + // writeJSON writes a single JSON-RPC message to the connection. + writeJSON(ctx context.Context, msg *jsonrpcMessage, isError bool) error + // writeJSONBatch writes a batch of JSON-RPC messages to the connection. + writeJSONBatch(ctx context.Context, msgs []*jsonrpcMessage, isError bool) error // Closed returns a channel which is closed when the connection is closed. closed() <-chan interface{} // RemoteAddr returns the peer address of the connection. diff --git a/rpc/websocket.go b/rpc/websocket.go index ec676b9caf..e70498873a 100644 --- a/rpc/websocket.go +++ b/rpc/websocket.go @@ -293,11 +293,17 @@ type websocketCodec struct { func newWebsocketCodec(conn *websocket.Conn, host string, req http.Header, readLimit int64) ServerCodec { conn.SetReadLimit(readLimit) - encode := func(v interface{}, isErrorResponse bool) error { - return conn.WriteJSON(v) + var buf []byte + encodeMsg := func(msg *jsonrpcMessage, isError bool) error { + buf = appendMessage(buf[:0], msg) + return conn.WriteMessage(websocket.TextMessage, buf) + } + encodeBatch := func(msgs []*jsonrpcMessage, isError bool) error { + buf = appendBatch(buf[:0], msgs) + return conn.WriteMessage(websocket.TextMessage, buf) } wc := &websocketCodec{ - jsonCodec: NewFuncCodec(conn, encode, conn.ReadJSON).(*jsonCodec), + jsonCodec: NewFuncCodec(conn, encodeMsg, encodeBatch, conn.ReadJSON).(*jsonCodec), conn: conn, pingReset: make(chan struct{}, 1), pongReceived: make(chan struct{}), @@ -342,8 +348,15 @@ func (wc *websocketCodec) peerInfo() PeerInfo { return wc.info } -func (wc *websocketCodec) writeJSON(ctx context.Context, v interface{}, isError bool) error { - err := wc.jsonCodec.writeJSON(ctx, v, isError) +func (wc *websocketCodec) writeJSON(ctx context.Context, msg *jsonrpcMessage, isError bool) error { + return wc.writeAndResetPing(wc.jsonCodec.writeJSON(ctx, msg, isError)) +} + +func (wc *websocketCodec) writeJSONBatch(ctx context.Context, msgs []*jsonrpcMessage, isError bool) error { + return wc.writeAndResetPing(wc.jsonCodec.writeJSONBatch(ctx, msgs, isError)) +} + +func (wc *websocketCodec) writeAndResetPing(err error) error { if err == nil { // Notify pingLoop to delay the next idle ping. select {