beacon/engine, rpc: optimize JSON encoding for large blob payloads (#33969)
Some checks are pending
/ Linux Build (push) Waiting to run
/ Linux Build (arm) (push) Waiting to run
/ Keeper Build (push) Waiting to run
/ Windows Build (push) Waiting to run
/ Docker Image (push) Waiting to run

Adds a fast path for ExecutionPayloadEnvelope and BlobAndProofListV*
that bypasses encoding/json's reflection and re-validation, which are
expensive for large payloads with many blobs. Also hand-rolls the
jsonrpcMessage wire encoding in the RPC codec to avoid a second
re-validation pass when writing responses to the connection.

Resolves #33814

---------

Co-authored-by: Marius van der Wijden <m.vanderwijden@live.de>
Co-authored-by: Felix Lange <fjl@twurst.com>
This commit is contained in:
Jonny Rhea 2026-05-20 13:25:56 -05:00 committed by GitHub
parent 918d46b942
commit efe58eac00
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
21 changed files with 1366 additions and 155 deletions

View file

@ -0,0 +1,69 @@
// Copyright 2026 The go-ethereum Authors
// This file is part of the go-ethereum library.
//
// The go-ethereum library is free software: you can redistribute it and/or modify
// it under the terms of the GNU Lesser General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// The go-ethereum library is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Lesser General Public License for more details.
//
// You should have received a copy of the GNU Lesser General Public License
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
package engine
import (
"github.com/fjl/jsonw"
)
// MarshalJSON implements json.Marshaler.
func (list BlobAndProofListV1) MarshalJSON() ([]byte, error) {
var b jsonw.Buffer
b.Array(func() {
for _, item := range list {
marshalBlobAndProofV1(&b, item)
}
})
return b.Output(), nil
}
func marshalBlobAndProofV1(b *jsonw.Buffer, item *BlobAndProofV1) {
if item == nil {
b.Null()
} else {
b.Object(func() {
b.Key("blob")
b.HexBytes(item.Blob)
b.Key("proof")
b.HexBytes(item.Proof)
})
}
}
// MarshalJSON implements json.Marshaler.
func (list BlobAndProofListV2) MarshalJSON() ([]byte, error) {
var b jsonw.Buffer
b.Array(func() {
for _, item := range list {
marshalBlobAndProofV2(&b, item)
}
})
return b.Output(), nil
}
func marshalBlobAndProofV2(b *jsonw.Buffer, item *BlobAndProofV2) {
if item == nil {
b.Null()
} else {
b.Object(func() {
b.Key("blob")
b.HexBytes(item.Blob)
b.Key("proofs")
appendHexBytesArray(b, item.CellProofs)
})
}
}

View file

@ -12,31 +12,6 @@ import (
var _ = (*executionPayloadEnvelopeMarshaling)(nil)
// MarshalJSON marshals as JSON.
func (e ExecutionPayloadEnvelope) MarshalJSON() ([]byte, error) {
type ExecutionPayloadEnvelope struct {
ExecutionPayload *ExecutableData `json:"executionPayload" gencodec:"required"`
BlockValue *hexutil.Big `json:"blockValue" gencodec:"required"`
BlobsBundle *BlobsBundle `json:"blobsBundle"`
Requests []hexutil.Bytes `json:"executionRequests"`
Override bool `json:"shouldOverrideBuilder"`
Witness *hexutil.Bytes `json:"witness,omitempty"`
}
var enc ExecutionPayloadEnvelope
enc.ExecutionPayload = e.ExecutionPayload
enc.BlockValue = (*hexutil.Big)(e.BlockValue)
enc.BlobsBundle = e.BlobsBundle
if e.Requests != nil {
enc.Requests = make([]hexutil.Bytes, len(e.Requests))
for k, v := range e.Requests {
enc.Requests[k] = v
}
}
enc.Override = e.Override
enc.Witness = e.Witness
return json.Marshal(&enc)
}
// UnmarshalJSON unmarshals from JSON.
func (e *ExecutionPayloadEnvelope) UnmarshalJSON(input []byte) error {
type ExecutionPayloadEnvelope struct {

View file

@ -0,0 +1,98 @@
// Copyright 2026 The go-ethereum Authors
// This file is part of the go-ethereum library.
//
// The go-ethereum library is free software: you can redistribute it and/or modify
// it under the terms of the GNU Lesser General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// The go-ethereum library is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Lesser General Public License for more details.
//
// You should have received a copy of the GNU Lesser General Public License
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
package engine
import (
"encoding/json"
"errors"
"github.com/fjl/jsonw"
)
// marshalBlobsBundle writes BlobsBundle as JSON and appends it to buf.
func marshalBlobsBundle(b *jsonw.Buffer, bundle *BlobsBundle) {
if bundle == nil {
b.Null()
return
}
b.Object(func() {
b.Key("commitments")
appendHexBytesArray(b, bundle.Commitments)
b.Key("proofs")
appendHexBytesArray(b, bundle.Proofs)
b.Key("blobs")
appendHexBytesArray(b, bundle.Blobs)
})
}
// MarshalJSON implements json.Marshaler.
func (e ExecutionPayloadEnvelope) MarshalJSON() ([]byte, error) {
if e.ExecutionPayload == nil {
return nil, errors.New("missing required field 'executionPayload' for ExecutionPayloadEnvelope")
}
// Pre-marshal the execution payload using its gencodec MarshalJSON.
payload, err := e.ExecutionPayload.MarshalJSON()
if err != nil {
return nil, err
}
// Pre-marshal the witness.
var witness []byte
if e.Witness != nil {
witness, err = json.Marshal(e.Witness)
if err != nil {
return nil, err
}
}
// Write the execution payload to the buffer
var b jsonw.Buffer
b.Object(func() {
b.Key("executionPayload")
b.RawValue(payload)
b.Key("blockValue")
b.HexBigInt(e.BlockValue)
b.Key("blobsBundle")
marshalBlobsBundle(&b, e.BlobsBundle)
b.Key("executionRequests")
if e.Requests == nil {
b.Null()
} else {
appendHexBytesArray(&b, e.Requests)
}
b.Key("shouldOverrideBuilder")
b.Bool(e.Override)
if e.Witness != nil {
b.Key("witness")
b.RawValue(witness)
}
})
return b.Output(), nil
}
func appendHexBytesArray[T ~[]byte](b *jsonw.Buffer, slice []T) {
b.Array(func() {
for _, elem := range slice {
b.HexBytes(elem)
}
})
}

128
beacon/engine/epe_test.go Normal file
View file

@ -0,0 +1,128 @@
// Copyright 2026 The go-ethereum Authors
// This file is part of the go-ethereum library.
//
// The go-ethereum library is free software: you can redistribute it and/or modify
// it under the terms of the GNU Lesser General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// The go-ethereum library is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Lesser General Public License for more details.
//
// You should have received a copy of the GNU Lesser General Public License
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
package engine
import (
"bytes"
"encoding/json"
"math/big"
"reflect"
"testing"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/common/hexutil"
)
func makeTestPayload() *ExecutableData {
return &ExecutableData{
ParentHash: common.HexToHash("0x01"),
FeeRecipient: common.HexToAddress("0x02"),
StateRoot: common.HexToHash("0x03"),
ReceiptsRoot: common.HexToHash("0x04"),
LogsBloom: make([]byte, 256),
Random: common.HexToHash("0x05"),
Number: 100,
GasLimit: 1000000,
GasUsed: 500000,
Timestamp: 1234567890,
ExtraData: []byte("extra"),
BaseFeePerGas: big.NewInt(7),
BlockHash: common.HexToHash("0x08"),
Transactions: [][]byte{{0xaa, 0xbb}},
}
}
func TestMarshalJSONRoundtrip(t *testing.T) {
witness := hexutil.Bytes{0xde, 0xad}
original := ExecutionPayloadEnvelope{
ExecutionPayload: makeTestPayload(),
BlockValue: big.NewInt(12345),
BlobsBundle: &BlobsBundle{
Commitments: []hexutil.Bytes{{0x01, 0x02}},
Proofs: []hexutil.Bytes{{0x03, 0x04}},
Blobs: []hexutil.Bytes{{0x05, 0x06}},
},
Requests: [][]byte{{0xaa}, {0xbb, 0xcc}},
Override: true,
Witness: &witness,
}
data, err := original.MarshalJSON()
if err != nil {
t.Fatalf("MarshalJSON error: %v", err)
}
var decoded ExecutionPayloadEnvelope
if err := json.Unmarshal(data, &decoded); err != nil {
t.Fatalf("UnmarshalJSON error: %v", err)
}
if decoded.ExecutionPayload.Number != original.ExecutionPayload.Number {
t.Error("ExecutionPayload.Number mismatch")
}
if decoded.BlockValue.Cmp(original.BlockValue) != 0 {
t.Errorf("BlockValue mismatch: got %v, want %v", decoded.BlockValue, original.BlockValue)
}
if len(decoded.BlobsBundle.Blobs) != len(original.BlobsBundle.Blobs) {
t.Error("BlobsBundle.Blobs length mismatch")
}
if len(decoded.Requests) != len(original.Requests) {
t.Error("Requests length mismatch")
}
if decoded.Override != original.Override {
t.Error("Override mismatch")
}
if !bytes.Equal(*decoded.Witness, *original.Witness) {
t.Error("Witness mismatch")
}
}
func TestMarshalJSONNilPayload(t *testing.T) {
env := ExecutionPayloadEnvelope{
ExecutionPayload: nil,
BlockValue: big.NewInt(1),
}
_, err := env.MarshalJSON()
if err == nil {
t.Fatal("expected error for nil ExecutionPayload")
}
}
// TestExecutionPayloadEnvelopeFieldCoverage guards against structural drift.
// If a field is added to or removed from ExecutionPayloadEnvelope, this test
// fails, reminding the developer to update MarshalJSON in marshal_epe.go.
func TestExecutionPayloadEnvelopeFieldCoverage(t *testing.T) {
expected := []string{
"ExecutionPayload",
"BlockValue",
"BlobsBundle",
"Requests",
"Override",
"Witness",
}
typ := reflect.TypeOf(ExecutionPayloadEnvelope{})
if typ.NumField() != len(expected) {
t.Fatalf("ExecutionPayloadEnvelope has %d fields, expected %d — update MarshalJSON in marshal_epe.go",
typ.NumField(), len(expected))
}
for i, name := range expected {
if typ.Field(i).Name != name {
t.Errorf("field %d: got %q, want %q — update MarshalJSON in marshal_epe.go",
i, typ.Field(i).Name, name)
}
}
}

View file

@ -60,7 +60,7 @@ var (
PayloadV4 PayloadVersion = 0x4
)
//go:generate go run github.com/fjl/gencodec -type PayloadAttributes -field-override payloadAttributesMarshaling -out gen_blockparams.go
//go:generate go run github.com/fjl/gencodec -type PayloadAttributes -field-override payloadAttributesMarshaling -out pa_codec.go
// PayloadAttributes describes the environment context in which a block should
// be built.
@ -79,7 +79,7 @@ type payloadAttributesMarshaling struct {
SlotNumber *hexutil.Uint64
}
//go:generate go run github.com/fjl/gencodec -type ExecutableData -field-override executableDataMarshaling -out gen_ed.go
//go:generate go run github.com/fjl/gencodec -type ExecutableData -field-override executableDataMarshaling -out ed_codec.go
// ExecutableData is the data necessary to execute an EL payload.
type ExecutableData struct {
@ -127,7 +127,7 @@ type StatelessPayloadStatusV1 struct {
ValidationError *string `json:"validationError"`
}
//go:generate go run github.com/fjl/gencodec -type ExecutionPayloadEnvelope -field-override executionPayloadEnvelopeMarshaling -out gen_epe.go
//go:generate go run github.com/fjl/gencodec -enc=false -type ExecutionPayloadEnvelope -field-override executionPayloadEnvelopeMarshaling -out epe_decode.go
type ExecutionPayloadEnvelope struct {
ExecutionPayload *ExecutableData `json:"executionPayload" gencodec:"required"`
@ -138,6 +138,12 @@ type ExecutionPayloadEnvelope struct {
Witness *hexutil.Bytes `json:"witness,omitempty"`
}
// JSON type overrides for ExecutionPayloadEnvelope.
type executionPayloadEnvelopeMarshaling struct {
BlockValue *hexutil.Big
Requests []hexutil.Bytes
}
// BlobsBundle includes the marshalled sidecar data. Note this structure is
// shared by BlobsBundleV1 and BlobsBundleV2 for the sake of simplicity.
//
@ -154,16 +160,18 @@ type BlobAndProofV1 struct {
Proof hexutil.Bytes `json:"proof"`
}
// BlobAndProofListV1 is a list of BlobAndProofV1 with a hand-rolled JSON marshaler
// that avoids the overhead of encoding/json for large blob payloads.
type BlobAndProofListV1 []*BlobAndProofV1
type BlobAndProofV2 struct {
Blob hexutil.Bytes `json:"blob"`
CellProofs []hexutil.Bytes `json:"proofs"` // proofs MUST contain exactly CELLS_PER_EXT_BLOB cell proofs.
}
// JSON type overrides for ExecutionPayloadEnvelope.
type executionPayloadEnvelopeMarshaling struct {
BlockValue *hexutil.Big
Requests []hexutil.Bytes
}
// BlobAndProofListV2 is a list of BlobAndProofV2 with a hand-rolled JSON marshaler
// that avoids the overhead of encoding/json for large blob payloads.
type BlobAndProofListV2 []*BlobAndProofV2
type PayloadStatusV1 struct {
Status string `json:"status"`

View file

@ -552,7 +552,7 @@ func (api *ConsensusAPI) getPayload(payloadID engine.PayloadID, full bool, versi
//
// Client software MAY return an array of all null entries if syncing or otherwise
// unable to serve blob pool data.
func (api *ConsensusAPI) GetBlobsV1(hashes []common.Hash) ([]*engine.BlobAndProofV1, error) {
func (api *ConsensusAPI) GetBlobsV1(hashes []common.Hash) (engine.BlobAndProofListV1, error) {
// Reject the request if Osaka has been activated.
// follow https://github.com/ethereum/execution-apis/blob/main/src/engine/osaka.md#cancun-api
head := api.eth.BlockChain().CurrentHeader()
@ -566,7 +566,7 @@ func (api *ConsensusAPI) GetBlobsV1(hashes []common.Hash) ([]*engine.BlobAndProo
if err != nil {
return nil, engine.InvalidParams.With(err)
}
res := make([]*engine.BlobAndProofV1, len(hashes))
res := make(engine.BlobAndProofListV1, len(hashes))
for i := 0; i < len(blobs); i++ {
// Skip the non-existing blob
if blobs[i] == nil {
@ -605,7 +605,7 @@ func (api *ConsensusAPI) GetBlobsV1(hashes []common.Hash) ([]*engine.BlobAndProo
//
// Client software MUST return null if syncing or otherwise unable to serve
// blob pool data.
func (api *ConsensusAPI) GetBlobsV2(hashes []common.Hash) ([]*engine.BlobAndProofV2, error) {
func (api *ConsensusAPI) GetBlobsV2(hashes []common.Hash) (engine.BlobAndProofListV2, error) {
head := api.eth.BlockChain().CurrentHeader()
if api.config().LatestFork(head.Time) < forks.Osaka {
return nil, nil
@ -616,7 +616,7 @@ func (api *ConsensusAPI) GetBlobsV2(hashes []common.Hash) ([]*engine.BlobAndProo
// GetBlobsV3 returns a set of blobs from the transaction pool. Same as
// GetBlobsV2, except will return partial responses in case there is a missing
// blob.
func (api *ConsensusAPI) GetBlobsV3(hashes []common.Hash) ([]*engine.BlobAndProofV2, error) {
func (api *ConsensusAPI) GetBlobsV3(hashes []common.Hash) (engine.BlobAndProofListV2, error) {
head := api.eth.BlockChain().CurrentHeader()
if api.config().LatestFork(head.Time) < forks.Osaka {
return nil, nil
@ -626,7 +626,7 @@ func (api *ConsensusAPI) GetBlobsV3(hashes []common.Hash) ([]*engine.BlobAndProo
// getBlobs returns all available blobs. In v2, partial responses are not allowed,
// while v3 supports partial responses.
func (api *ConsensusAPI) getBlobs(hashes []common.Hash, v2 bool) ([]*engine.BlobAndProofV2, error) {
func (api *ConsensusAPI) getBlobs(hashes []common.Hash, v2 bool) (engine.BlobAndProofListV2, error) {
if len(hashes) > 128 {
return nil, engine.TooLargeRequest.With(fmt.Errorf("requested blob count too large: %v", len(hashes)))
}
@ -647,7 +647,7 @@ func (api *ConsensusAPI) getBlobs(hashes []common.Hash, v2 bool) ([]*engine.Blob
}
// Validate the blobs from the pool and assemble the response
filled := 0
res := make([]*engine.BlobAndProofV2, len(hashes))
res := make(engine.BlobAndProofListV2, len(hashes))
for i := range blobs {
// The blob has been evicted since the last AvailableBlobs call.
// Return null if partial response is not allowed.

View file

@ -0,0 +1,739 @@
// Copyright 2026 The go-ethereum Authors
// This file is part of the go-ethereum library.
//
// The go-ethereum library is free software: you can redistribute it and/or modify
// it under the terms of the GNU Lesser General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// The go-ethereum library is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Lesser General Public License for more details.
//
// You should have received a copy of the GNU Lesser General Public License
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
package catalyst
import (
"context"
"crypto/ecdsa"
"crypto/sha256"
"encoding/json"
"fmt"
"io"
"math/big"
"strings"
"testing"
"time"
"github.com/ethereum/go-ethereum/beacon/engine"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/crypto"
"github.com/ethereum/go-ethereum/crypto/kzg4844"
"github.com/ethereum/go-ethereum/eth"
"github.com/ethereum/go-ethereum/node"
"github.com/ethereum/go-ethereum/params"
"github.com/ethereum/go-ethereum/rlp"
"github.com/ethereum/go-ethereum/rpc"
"github.com/holiman/uint256"
)
// encodingType specifies which encoding to use in benchmarks
type encodingType int
const (
encNone encodingType = iota
encJSON
encJSONCustom
encRLP
)
func (e encodingType) String() string {
switch e {
case encNone:
return "none"
case encJSON:
return "json"
case encJSONCustom:
return "json_custom"
case encRLP:
return "rlp"
default:
return "unknown"
}
}
var encodingTypes = []encodingType{encNone, encJSON, encJSONCustom, encRLP}
// benchEncode encodes the value using the specified encoding type.
// It fails the benchmark if encoding fails.
func benchEncode(b *testing.B, enc encodingType, v any) {
var err error
switch enc {
case encJSON:
_, err = json.Marshal(v)
if err != nil {
b.Fatalf("JSON marshal failed: %v", err)
}
case encJSONCustom:
if m, ok := v.(json.Marshaler); ok {
_, err = m.MarshalJSON()
} else {
_, err = json.Marshal(v)
}
if err != nil {
b.Fatalf("JSON MarshalJSON failed: %v", err)
}
case encRLP:
_, err = rlp.EncodeToBytes(v)
if err != nil {
b.Fatalf("RLP encode failed: %v", err)
}
}
}
// benchmarkBlobCounts defines the blob counts for benchmarks
var benchmarkBlobCounts = []int{21, 72}
// maxBenchmarkBlobs is the maximum number of blobs we need for benchmarks
var maxBenchmarkBlobs = benchmarkBlobCounts[len(benchmarkBlobCounts)-1]
var (
// Pre-computed blobs for benchmarks
benchBlobs []*kzg4844.Blob
benchBlobCommits []kzg4844.Commitment
benchBlobProofs []kzg4844.Proof
benchBlobCellProofs [][]kzg4844.Proof
benchBlobVHashes []common.Hash
)
func init() {
// Pre-compute blobs for benchmarks
for i := 0; i < maxBenchmarkBlobs; i++ {
blob := &kzg4844.Blob{byte(i), byte(i >> 8)}
benchBlobs = append(benchBlobs, blob)
commit, _ := kzg4844.BlobToCommitment(blob)
benchBlobCommits = append(benchBlobCommits, commit)
proof, _ := kzg4844.ComputeBlobProof(blob, commit)
benchBlobProofs = append(benchBlobProofs, proof)
cellProofs, _ := kzg4844.ComputeCellProofs(blob)
benchBlobCellProofs = append(benchBlobCellProofs, cellProofs)
vhash := kzg4844.CalcBlobHashV1(sha256.New(), &commit)
benchBlobVHashes = append(benchBlobVHashes, vhash)
}
}
// benchFork specifies which fork to use in benchmark environments
type benchFork int
const (
forkCancun benchFork = iota
forkPrague
forkOsaka
)
// benchmarkBlobEnv holds the environment for blob benchmarks
type benchmarkBlobEnv struct {
node *node.Node
eth *eth.Ethereum
api *ConsensusAPI
config *params.ChainConfig
keys []*ecdsa.PrivateKey
vhashes []common.Hash
version byte
blobCount int
nonces []uint64 // current nonce for each key
}
// makeBenchBlobTx creates a blob transaction with the specified number of blobs.
// blobOffset indicates which pre-computed blobs to use.
func makeBenchBlobTx(chainConfig *params.ChainConfig, nonce uint64, blobCount int, blobOffset int, key *ecdsa.PrivateKey, version byte) *types.Transaction {
var (
blobs []kzg4844.Blob
blobHashes []common.Hash
commitments []kzg4844.Commitment
proofs []kzg4844.Proof
)
for i := 0; i < blobCount; i++ {
idx := blobOffset + i
blobs = append(blobs, *benchBlobs[idx])
commitments = append(commitments, benchBlobCommits[idx])
if version == types.BlobSidecarVersion0 {
proofs = append(proofs, benchBlobProofs[idx])
} else {
proofs = append(proofs, benchBlobCellProofs[idx]...)
}
blobHashes = append(blobHashes, benchBlobVHashes[idx])
}
blobtx := &types.BlobTx{
ChainID: uint256.MustFromBig(chainConfig.ChainID),
Nonce: nonce,
GasTipCap: uint256.NewInt(params.GWei),
GasFeeCap: uint256.NewInt(10 * params.GWei),
Gas: 21000,
BlobFeeCap: uint256.NewInt(params.GWei),
BlobHashes: blobHashes,
Value: uint256.NewInt(100),
Sidecar: types.NewBlobTxSidecar(version, blobs, commitments, proofs),
}
return types.MustSignNewTx(key, types.LatestSigner(chainConfig), blobtx)
}
// newBenchmarkBlobEnv creates an environment for blob benchmarks.
// It creates multiple keys and fills the pool with blob transactions totaling the specified blob count.
// version: 0 = BlobSidecarVersion0 (pre-Osaka), 1 = BlobSidecarVersion1 (Osaka+)
// fork: which fork to enable
func newBenchmarkBlobEnv(b *testing.B, blobCount int, version byte, fork benchFork) *benchmarkBlobEnv {
// Create a configuration that allows enough blobs
config := *params.MergedTestChainConfig
// Set blob schedule to allow for large blob counts (up to 128 blobs per block)
config.BlobScheduleConfig = &params.BlobScheduleConfig{
Cancun: &params.BlobConfig{Target: 6, Max: 128, UpdateFraction: 3338477},
Prague: &params.BlobConfig{Target: 6, Max: 128, UpdateFraction: 5007716},
Osaka: &params.BlobConfig{Target: 6, Max: 128, UpdateFraction: 5007716},
}
// Configure fork times based on requested fork
switch fork {
case forkCancun:
config.PragueTime = nil
config.OsakaTime = nil
case forkPrague:
config.OsakaTime = nil
case forkOsaka:
// All forks enabled (default)
}
// Generate enough keys for all the blob transactions
// Each tx can have up to 6 blobs, so we need ceil(blobCount/6) keys
numTxs := (blobCount + 5) / 6
keys := make([]*ecdsa.PrivateKey, numTxs)
addrs := make([]common.Address, numTxs)
alloc := make(types.GenesisAlloc)
alloc[testAddr] = types.Account{Balance: testBalance}
for i := 0; i < numTxs; i++ {
key, _ := crypto.GenerateKey()
keys[i] = key
addrs[i] = crypto.PubkeyToAddress(key.PublicKey)
// Give each account enough balance for many transactions
alloc[addrs[i]] = types.Account{Balance: new(big.Int).Mul(big.NewInt(1e18), big.NewInt(10000))}
}
gspec := &core.Genesis{
Config: &config,
Alloc: alloc,
Difficulty: common.Big0,
}
n, ethServ := startEthService(b, gspec, nil)
// Collect versioned hashes for the blobs we'll use
var vhashes []common.Hash
for i := 0; i < blobCount; i++ {
vhashes = append(vhashes, benchBlobVHashes[i])
}
// Fill initial blob txs into the pool
env := &benchmarkBlobEnv{
node: n,
eth: ethServ,
api: newConsensusAPIWithoutHeartbeat(ethServ),
config: &config,
keys: keys,
vhashes: vhashes,
version: version,
blobCount: blobCount,
nonces: make([]uint64, numTxs),
}
env.addBlobTxs(b)
return env
}
// addBlobTxs adds blob transactions to the pool using the stored blobCount and per-key nonces.
// It increments each key's nonce after adding transactions.
func (env *benchmarkBlobEnv) addBlobTxs(b *testing.B) {
numTxs := (env.blobCount + 5) / 6
var txs []*types.Transaction
blobsRemaining := env.blobCount
blobOffset := 0
for i := 0; i < numTxs && blobsRemaining > 0; i++ {
// Each tx gets up to 6 blobs
txBlobCount := 6
if blobsRemaining < 6 {
txBlobCount = blobsRemaining
}
tx := makeBenchBlobTx(env.config, env.nonces[i], txBlobCount, blobOffset, env.keys[i], env.version)
txs = append(txs, tx)
blobOffset += txBlobCount
blobsRemaining -= txBlobCount
}
errs := env.eth.TxPool().Add(txs, true)
for i, err := range errs {
if err != nil {
b.Fatalf("Failed to add blob tx %d to pool: %v", i, err)
}
}
// Increment nonce for each key used
for i := 0; i < numTxs; i++ {
env.nonces[i]++
}
}
// Close closes the environment
func (env *benchmarkBlobEnv) Close() {
env.node.Close()
}
// BenchmarkGetBlobsV1 benchmarks the GetBlobsV1 method with various blob counts.
// GetBlobsV1 is available at Cancun/Prague (pre-Osaka).
func BenchmarkGetBlobsV1(b *testing.B) {
for _, blobCount := range benchmarkBlobCounts {
for _, enc := range encodingTypes {
b.Run(fmt.Sprintf("blobs=%d/enc=%s", blobCount, enc), func(b *testing.B) {
env := newBenchmarkBlobEnv(b, blobCount, 0, forkPrague)
defer env.Close()
b.ResetTimer()
for b.Loop() {
result, err := env.api.GetBlobsV1(env.vhashes)
if err != nil {
b.Fatalf("GetBlobsV1 failed: %v", err)
}
// Verify we got the expected number of blobs
if len(result) != blobCount {
b.Fatalf("expected %d blobs, got %d", blobCount, len(result))
}
benchEncode(b, enc, result)
}
b.ReportMetric(float64(b.Elapsed().Milliseconds())/float64(b.N), "ms/op")
})
}
}
}
// BenchmarkGetBlobsV2Extended benchmarks the GetBlobsV2 method with various blob counts.
// GetBlobsV2 is available at Osaka+.
func BenchmarkGetBlobsV2Extended(b *testing.B) {
for _, blobCount := range benchmarkBlobCounts {
for _, enc := range encodingTypes {
b.Run(fmt.Sprintf("blobs=%d/enc=%s", blobCount, enc), func(b *testing.B) {
env := newBenchmarkBlobEnv(b, blobCount, 1, forkOsaka)
defer env.Close()
b.ResetTimer()
for b.Loop() {
result, err := env.api.GetBlobsV2(env.vhashes)
if err != nil {
b.Fatalf("GetBlobsV2 failed: %v", err)
}
// Verify we got the expected number of blobs
if len(result) != blobCount {
b.Fatalf("expected %d blobs, got %d", blobCount, len(result))
}
benchEncode(b, enc, result)
}
b.ReportMetric(float64(b.Elapsed().Milliseconds())/float64(b.N), "ms/op")
})
}
}
}
// BenchmarkGetBlobsV3 benchmarks the GetBlobsV3 method with various blob counts.
// GetBlobsV3 is available at Osaka+.
func BenchmarkGetBlobsV3(b *testing.B) {
for _, blobCount := range benchmarkBlobCounts {
for _, enc := range encodingTypes {
b.Run(fmt.Sprintf("blobs=%d/enc=%s", blobCount, enc), func(b *testing.B) {
env := newBenchmarkBlobEnv(b, blobCount, 1, forkOsaka)
defer env.Close()
b.ResetTimer()
for b.Loop() {
result, err := env.api.GetBlobsV3(env.vhashes)
if err != nil {
b.Fatalf("GetBlobsV3 failed: %v", err)
}
// Verify we got the expected number of blobs
if len(result) != blobCount {
b.Fatalf("expected %d blobs, got %d", blobCount, len(result))
}
benchEncode(b, enc, result)
}
b.ReportMetric(float64(b.Elapsed().Milliseconds())/float64(b.N), "ms/op")
})
}
}
}
// BenchmarkGetPayloadV5WithBlobs benchmarks GetPayloadV5 (Osaka fork) with blobs.
// Note: Measures single iteration performance due to NewPayload complexity at Osaka.
func BenchmarkGetPayloadV5WithBlobs(b *testing.B) {
for _, blobCount := range benchmarkBlobCounts {
for _, enc := range encodingTypes {
b.Run(fmt.Sprintf("blobs=%d/enc=%s", blobCount, enc), func(b *testing.B) {
env := newBenchmarkBlobEnv(b, blobCount, 1, forkOsaka)
defer env.Close()
parent := env.api.eth.BlockChain().CurrentHeader()
beaconRoot := common.Hash{0x42}
b.ResetTimer()
for i := 0; i < b.N; i++ {
// Note: We don't call addBlobTxs here because we can't advance the chain
// (NewPayloadV5 requires execution requests). The same transactions are
// reused for each iteration, which still benchmarks the GetPayload performance.
timestamp := parent.Time + 12
fcState := engine.ForkchoiceStateV1{
HeadBlockHash: parent.Hash(),
SafeBlockHash: parent.Hash(),
FinalizedBlockHash: parent.Hash(),
}
payloadAttr := &engine.PayloadAttributes{
Timestamp: timestamp,
Random: common.Hash{byte(i)},
SuggestedFeeRecipient: testAddr,
Withdrawals: []*types.Withdrawal{},
BeaconRoot: &beaconRoot,
}
resp, err := env.api.ForkchoiceUpdatedV3(context.Background(), fcState, payloadAttr)
if err != nil {
b.Fatalf("ForkchoiceUpdatedV3 failed: %v", err)
}
if resp.PayloadID == nil {
b.Fatalf("ForkchoiceUpdatedV3 returned nil PayloadID")
}
// Wait for the payload to be built with transactions
time.Sleep(100 * time.Millisecond)
envelope, err := env.api.GetPayloadV5(*resp.PayloadID)
if err != nil {
b.Fatalf("GetPayloadV5 failed: %v", err)
}
if envelope.BlobsBundle == nil {
b.Fatalf("BlobsBundle is nil")
}
// Verify we got the expected number of blobs
if len(envelope.BlobsBundle.Blobs) != blobCount {
b.Fatalf("expected %d blobs, got %d", blobCount, len(envelope.BlobsBundle.Blobs))
}
benchEncode(b, enc, envelope)
}
b.ReportMetric(float64(b.Elapsed().Milliseconds())/float64(b.N), "ms/op")
})
}
}
}
// BenchmarkNewPayloadV3WithBlobs benchmarks the NewPayloadV3 method with various blob counts.
// Each iteration processes a payload with the full blob count.
func BenchmarkNewPayloadV3WithBlobs(b *testing.B) {
for _, blobCount := range benchmarkBlobCounts {
for _, enc := range encodingTypes {
b.Run(fmt.Sprintf("blobs=%d/enc=%s", blobCount, enc), func(b *testing.B) {
env := newBenchmarkBlobEnv(b, blobCount, 0, forkCancun)
defer env.Close()
parent := env.api.eth.BlockChain().CurrentHeader()
beaconRoot := common.Hash{0x42}
// Build a payload first to get valid executable data
timestamp := parent.Time + 12
fcState := engine.ForkchoiceStateV1{
HeadBlockHash: parent.Hash(),
SafeBlockHash: parent.Hash(),
FinalizedBlockHash: parent.Hash(),
}
payloadAttr := &engine.PayloadAttributes{
Timestamp: timestamp,
Random: common.Hash{0x01},
SuggestedFeeRecipient: testAddr,
Withdrawals: []*types.Withdrawal{},
BeaconRoot: &beaconRoot,
}
resp, err := env.api.ForkchoiceUpdatedV3(context.Background(), fcState, payloadAttr)
if err != nil {
b.Fatalf("ForkchoiceUpdatedV3 failed: %v", err)
}
if resp.PayloadID == nil {
b.Fatalf("ForkchoiceUpdatedV3 returned nil PayloadID")
}
// Wait for the payload to be built with transactions
time.Sleep(100 * time.Millisecond)
// Get the payload
envelope, err := env.api.GetPayloadV3(*resp.PayloadID)
if err != nil {
b.Fatalf("GetPayloadV3 failed: %v", err)
}
// Verify we got the expected number of blobs
if len(envelope.BlobsBundle.Blobs) != blobCount {
b.Fatalf("expected %d blobs in setup, got %d", blobCount, len(envelope.BlobsBundle.Blobs))
}
execData := envelope.ExecutionPayload
// Collect versioned hashes from blobs bundle
vhashes := make([]common.Hash, len(envelope.BlobsBundle.Commitments))
for j, commitment := range envelope.BlobsBundle.Commitments {
var commit kzg4844.Commitment
copy(commit[:], commitment)
vhashes[j] = kzg4844.CalcBlobHashV1(sha256.New(), &commit)
}
b.ResetTimer()
for i := 0; i < b.N; i++ {
// NewPayload is idempotent, calling it multiple times with the same data
// should return the same result. The payload contains blobCount blobs.
result, err := env.api.NewPayloadV3(context.Background(), *execData, vhashes, &beaconRoot)
if err != nil {
b.Fatalf("NewPayloadV3 failed: %v", err)
}
benchEncode(b, enc, result)
}
b.ReportMetric(float64(b.Elapsed().Milliseconds())/float64(b.N), "ms/op")
})
}
}
}
// BenchmarkForkchoiceUpdatedWithBlobPayload benchmarks forkchoice updates that trigger
// payload building with blob transactions.
// Note: Measures ForkchoiceUpdated performance with blob transactions in the pool.
func BenchmarkForkchoiceUpdatedWithBlobPayload(b *testing.B) {
for _, blobCount := range benchmarkBlobCounts {
for _, enc := range encodingTypes {
b.Run(fmt.Sprintf("blobs=%d/enc=%s", blobCount, enc), func(b *testing.B) {
env := newBenchmarkBlobEnv(b, blobCount, 0, forkCancun)
defer env.Close()
parent := env.api.eth.BlockChain().CurrentHeader()
beaconRoot := common.Hash{0x42}
b.ResetTimer()
for i := 0; i < b.N; i++ {
// Note: We don't call addBlobTxs here because the blob pool has
// a per-account limit of 16 transactions. The same transactions are
// reused for each iteration, which still benchmarks the ForkchoiceUpdated
// performance with blob transactions in the pool.
timestamp := parent.Time + 12
fcState := engine.ForkchoiceStateV1{
HeadBlockHash: parent.Hash(),
SafeBlockHash: parent.Hash(),
FinalizedBlockHash: parent.Hash(),
}
payloadAttr := &engine.PayloadAttributes{
Timestamp: timestamp,
Random: common.Hash{byte(i)},
SuggestedFeeRecipient: testAddr,
Withdrawals: []*types.Withdrawal{},
BeaconRoot: &beaconRoot,
}
resp, err := env.api.ForkchoiceUpdatedV3(context.Background(), fcState, payloadAttr)
if err != nil {
b.Fatalf("ForkchoiceUpdatedV3 failed: %v", err)
}
if resp.PayloadID == nil {
b.Fatalf("ForkchoiceUpdatedV3 returned nil PayloadID")
}
benchEncode(b, enc, resp)
}
b.ReportMetric(float64(b.Elapsed().Milliseconds())/float64(b.N), "ms/op")
})
}
}
}
// BenchmarkFullBlobWorkflowOsaka benchmarks the complete blob workflow at Osaka:
// ForkchoiceUpdated -> GetPayload
// Note: Measures single iteration performance due to NewPayload complexity at Osaka.
func BenchmarkFullBlobWorkflowOsaka(b *testing.B) {
for _, blobCount := range benchmarkBlobCounts {
for _, enc := range encodingTypes {
b.Run(fmt.Sprintf("blobs=%d/enc=%s", blobCount, enc), func(b *testing.B) {
env := newBenchmarkBlobEnv(b, blobCount, 1, forkOsaka)
defer env.Close()
parent := env.api.eth.BlockChain().CurrentHeader()
beaconRoot := common.Hash{0x42}
b.ResetTimer()
for i := 0; i < b.N; i++ {
// Note: We don't call addBlobTxs here because we can't advance the chain
// (NewPayloadV5 requires execution requests). The same transactions are
// reused for each iteration, which still benchmarks the workflow performance.
// 1. ForkchoiceUpdated to build payload
timestamp := parent.Time + 12
fcState := engine.ForkchoiceStateV1{
HeadBlockHash: parent.Hash(),
SafeBlockHash: parent.Hash(),
FinalizedBlockHash: parent.Hash(),
}
payloadAttr := &engine.PayloadAttributes{
Timestamp: timestamp,
Random: common.Hash{byte(i)},
SuggestedFeeRecipient: testAddr,
Withdrawals: []*types.Withdrawal{},
BeaconRoot: &beaconRoot,
}
resp, err := env.api.ForkchoiceUpdatedV3(context.Background(), fcState, payloadAttr)
if err != nil {
b.Fatalf("ForkchoiceUpdatedV3 failed: %v", err)
}
if resp.PayloadID == nil {
b.Fatalf("ForkchoiceUpdatedV3 returned nil PayloadID")
}
// Encode ForkchoiceUpdated response
benchEncode(b, enc, resp)
// Wait for the payload to be built with transactions
time.Sleep(100 * time.Millisecond)
// 2. GetPayload
envelope, err := env.api.GetPayloadV5(*resp.PayloadID)
if err != nil {
b.Fatalf("GetPayloadV5 failed: %v", err)
}
if envelope.BlobsBundle == nil {
b.Fatalf("BlobsBundle is nil")
}
// Verify we got the expected number of blobs
if len(envelope.BlobsBundle.Blobs) != blobCount {
b.Fatalf("expected %d blobs, got %d", blobCount, len(envelope.BlobsBundle.Blobs))
}
// Encode GetPayload response
benchEncode(b, enc, envelope)
}
b.ReportMetric(float64(b.Elapsed().Milliseconds())/float64(b.N), "ms/op")
})
}
}
}
// discardConn is a net.Conn-like writer that discards all output.
// Used to measure server-side RPC cost without client-side decoding.
type discardConn struct {
io.Reader
io.Writer
}
func (discardConn) Close() error { return nil }
func (discardConn) SetWriteDeadline(time.Time) error { return nil }
// BenchmarkGetPayloadV5RPCServerOnly benchmarks only the EL server-side cost of
// engine_getPayloadV5: method dispatch, JSON serialization, and wire encoding.
// Client-side decoding is excluded by writing to io.Discard.
func BenchmarkGetPayloadV5RPCServerOnly(b *testing.B) {
blobCount := 72
env := newBenchmarkBlobEnv(b, blobCount, 1, forkOsaka)
defer env.Close()
// Register the engine API on the running node's in-process RPC server.
rpcServer, err := env.node.RPCHandler()
if err != nil {
b.Fatalf("RPCHandler failed: %v", err)
}
rpcServer.RegisterName("engine", env.api)
parent := env.api.eth.BlockChain().CurrentHeader()
beaconRoot := common.Hash{0x42}
// Build one payload to get a valid payloadID.
fcState := engine.ForkchoiceStateV1{
HeadBlockHash: parent.Hash(),
SafeBlockHash: parent.Hash(),
FinalizedBlockHash: parent.Hash(),
}
payloadAttr := &engine.PayloadAttributes{
Timestamp: parent.Time + 12,
Random: common.Hash{0x01},
SuggestedFeeRecipient: testAddr,
Withdrawals: []*types.Withdrawal{},
BeaconRoot: &beaconRoot,
}
resp, err := env.api.ForkchoiceUpdatedV3(context.Background(), fcState, payloadAttr)
if err != nil {
b.Fatalf("ForkchoiceUpdatedV3 failed: %v", err)
}
if resp.PayloadID == nil {
b.Fatalf("ForkchoiceUpdatedV3 returned nil PayloadID")
}
time.Sleep(100 * time.Millisecond)
// Verify the payload has the expected blobs via the direct API first.
envelope, err := env.api.GetPayloadV5(*resp.PayloadID)
if err != nil {
b.Fatalf("GetPayloadV5 failed: %v", err)
}
if len(envelope.BlobsBundle.Blobs) != blobCount {
b.Fatalf("expected %d blobs, got %d", blobCount, len(envelope.BlobsBundle.Blobs))
}
b.Logf("payload size: %d blobs, %d txs", len(envelope.BlobsBundle.Blobs), len(envelope.ExecutionPayload.Transactions))
// Build the JSON-RPC request bytes once.
reqJSON := fmt.Sprintf(`{"jsonrpc":"2.0","id":1,"method":"engine_getPayloadV5","params":["%s"]}`, resp.PayloadID.String())
b.ResetTimer()
for i := 0; i < b.N; i++ {
conn := discardConn{
Reader: strings.NewReader(reqJSON),
Writer: io.Discard,
}
codec := rpc.NewCodec(conn)
rpcServer.ServeCodec(codec, 0)
}
b.StopTimer()
b.ReportMetric(float64(b.Elapsed().Milliseconds())/float64(b.N), "ms/op")
}
// BenchmarkGetBlobsV3RPCServerOnly benchmarks only the EL server-side cost of
// engine_getBlobsV3: method dispatch, JSON serialization, and wire encoding.
// Client-side decoding is excluded by writing to io.Discard.
func BenchmarkGetBlobsV3RPCServerOnly(b *testing.B) {
blobCount := 72
env := newBenchmarkBlobEnv(b, blobCount, 1, forkOsaka)
defer env.Close()
// Register the engine API on the running node's in-process RPC server.
rpcServer, err := env.node.RPCHandler()
if err != nil {
b.Fatalf("RPCHandler failed: %v", err)
}
rpcServer.RegisterName("engine", env.api)
// Verify the blobs are available via the direct API first.
result, err := env.api.GetBlobsV3(env.vhashes)
if err != nil {
b.Fatalf("GetBlobsV3 failed: %v", err)
}
if len(result) != blobCount {
b.Fatalf("expected %d blobs, got %d", blobCount, len(result))
}
b.Logf("blob count: %d", blobCount)
// Build the JSON-RPC request bytes once.
// Format the versioned hashes as a JSON array of hex strings.
var hashStrs []string
for _, h := range env.vhashes {
hashStrs = append(hashStrs, fmt.Sprintf(`"%s"`, h.Hex()))
}
reqJSON := fmt.Sprintf(`{"jsonrpc":"2.0","id":1,"method":"engine_getBlobsV3","params":[[%s]]}`, strings.Join(hashStrs, ","))
b.ResetTimer()
for i := 0; i < b.N; i++ {
conn := discardConn{
Reader: strings.NewReader(reqJSON),
Writer: io.Discard,
}
codec := rpc.NewCodec(conn)
rpcServer.ServeCodec(codec, 0)
}
b.StopTimer()
b.ReportMetric(float64(b.Elapsed().Milliseconds())/float64(b.N), "ms/op")
}

View file

@ -1961,7 +1961,7 @@ func TestGetBlobsV1(t *testing.T) {
// Fill the request for retrieving blobs
var (
vhashes []common.Hash
expect []*engine.BlobAndProofV1
expect engine.BlobAndProofListV1
)
// fill missing blob at the beginning
if suite.fillRandom {
@ -2072,13 +2072,13 @@ func BenchmarkGetBlobsV2(b *testing.B) {
}
}
type getBlobsFn func(hashes []common.Hash) ([]*engine.BlobAndProofV2, error)
type getBlobsFn func(hashes []common.Hash) (engine.BlobAndProofListV2, error)
func runGetBlobs(t testing.TB, getBlobs getBlobsFn, start, limit int, fillRandom bool, expectPartialResponse bool, name string) {
// Fill the request for retrieving blobs
var (
vhashes []common.Hash
expect []*engine.BlobAndProofV2
expect engine.BlobAndProofListV2
)
for j := start; j < limit; j++ {
vhashes = append(vhashes, testBlobVHashes[j])

3
go.mod
View file

@ -26,6 +26,7 @@ require (
github.com/ethereum/hid v1.0.1-0.20260421154323-c2ab8d9bf68a
github.com/fatih/color v1.16.0
github.com/ferranbt/fastssz v0.1.4
github.com/fjl/jsonw v0.1.0
github.com/fsnotify/fsnotify v1.6.0
github.com/gballet/go-libpcsclite v0.0.0-20190607065134-2772fd86a8ff
github.com/gofrs/flock v0.12.1
@ -121,7 +122,7 @@ require (
github.com/deepmap/oapi-codegen v1.6.0 // indirect
github.com/dlclark/regexp2 v1.7.0 // indirect
github.com/emicklei/dot v1.6.2 // indirect
github.com/fjl/gencodec v0.1.0 // indirect
github.com/fjl/gencodec v0.1.2 // indirect
github.com/garslo/gogen v0.0.0-20170306192744-1d203ffc1f61 // indirect
github.com/getsentry/sentry-go v0.27.0 // indirect
github.com/go-ole/go-ole v1.3.0 // indirect

6
go.sum
View file

@ -123,8 +123,10 @@ github.com/fatih/color v1.16.0 h1:zmkK9Ngbjj+K0yRhTVONQh1p/HknKYSlNT+vZCzyokM=
github.com/fatih/color v1.16.0/go.mod h1:fL2Sau1YI5c0pdGEVCbKQbLXB6edEj1ZgiY4NijnWvE=
github.com/ferranbt/fastssz v0.1.4 h1:OCDB+dYDEQDvAgtAGnTSidK1Pe2tW3nFV40XyMkTeDY=
github.com/ferranbt/fastssz v0.1.4/go.mod h1:Ea3+oeoRGGLGm5shYAeDgu6PGUlcvQhE2fILyD9+tGg=
github.com/fjl/gencodec v0.1.0 h1:B3K0xPfc52cw52BBgUbSPxYo+HlLfAgWMVKRWXUXBcs=
github.com/fjl/gencodec v0.1.0/go.mod h1:Um1dFHPONZGTHog1qD1NaWjXJW/SPB38wPv0O8uZ2fI=
github.com/fjl/gencodec v0.1.2 h1:nf+MMsmuii5ZQMbS6/xjZoe5LRkN0415FOJOSwmnuW8=
github.com/fjl/gencodec v0.1.2/go.mod h1:chDHL3wKXuBgauP8x3XNZkl5EIAR5SoCTmmmDTZRzmw=
github.com/fjl/jsonw v0.1.0 h1:V3MyR79fjLpn/+bMgvegdGUIhoJOzjmqWcKDgcOmY1I=
github.com/fjl/jsonw v0.1.0/go.mod h1:2KMLevM6FXEJnfhtk7naXu9vZdVfOma1GlnGdPRlumU=
github.com/fsnotify/fsnotify v1.4.7/go.mod h1:jwhsz4b93w/PPRr/qN1Yymfu8t87LnFCMoQvtojpjFo=
github.com/fsnotify/fsnotify v1.4.9/go.mod h1:znqG4EE+3YCdAaPaxE2ZRY/06pZUdp0tY4IgpuI1SZQ=
github.com/fsnotify/fsnotify v1.6.0 h1:n+5WquG0fcWoWp6xPWfHdbskMCQaFnG6PfBrh1Ky4HY=

View file

@ -364,7 +364,7 @@ func (c *Client) CallContext(ctx context.Context, result interface{}, method str
resp := batchresp[0]
switch {
case resp.Error != nil:
return resp.Error
return resp.decodeError()
case len(resp.Result) == 0:
return ErrNoResult
default:
@ -419,7 +419,7 @@ func (c *Client) BatchCallContext(ctx context.Context, b []BatchElem) error {
if c.isHTTP {
err = c.sendBatchHTTP(ctx, op, msgs)
} else {
err = c.send(ctx, op, msgs)
err = c.sendBatch(ctx, op, msgs)
}
if err != nil {
return err
@ -449,7 +449,7 @@ func (c *Client) BatchCallContext(ctx context.Context, b []BatchElem) error {
elem := &b[index]
switch {
case resp.Error != nil:
elem.Error = resp.Error
elem.Error = resp.decodeError()
case resp.Result == nil:
elem.Error = ErrNoResult
default:
@ -552,7 +552,7 @@ func (c *Client) newMessage(method string, paramsIn ...interface{}) (*jsonrpcMes
// send registers op with the dispatch loop, then sends msg on the connection.
// if sending fails, op is deregistered.
func (c *Client) send(ctx context.Context, op *requestOp, msg interface{}) error {
func (c *Client) send(ctx context.Context, op *requestOp, msg *jsonrpcMessage) error {
select {
case c.reqInit <- op:
err := c.write(ctx, msg, false)
@ -567,7 +567,22 @@ func (c *Client) send(ctx context.Context, op *requestOp, msg interface{}) error
}
}
func (c *Client) write(ctx context.Context, msg interface{}, retry bool) error {
// sendBatch registers op with the dispatch loop, then sends a batch of messages
// on the connection. If sending fails, op is deregistered.
func (c *Client) sendBatch(ctx context.Context, op *requestOp, msgs []*jsonrpcMessage) error {
select {
case c.reqInit <- op:
err := c.writeBatch(ctx, msgs, false)
c.reqSent <- err
return err
case <-ctx.Done():
return ctx.Err()
case <-c.closing:
return ErrClientQuit
}
}
func (c *Client) write(ctx context.Context, msg *jsonrpcMessage, retry bool) error {
if c.writeConn == nil {
// The previous write failed. Try to establish a new connection.
if err := c.reconnect(ctx); err != nil {
@ -584,6 +599,22 @@ func (c *Client) write(ctx context.Context, msg interface{}, retry bool) error {
return err
}
func (c *Client) writeBatch(ctx context.Context, msgs []*jsonrpcMessage, retry bool) error {
if c.writeConn == nil {
if err := c.reconnect(ctx); err != nil {
return err
}
}
err := c.writeConn.writeJSONBatch(ctx, msgs, false)
if err != nil {
c.writeConn = nil
if !retry {
return c.writeBatch(ctx, msgs, true)
}
}
return err
}
func (c *Client) reconnect(ctx context.Context) error {
if c.reconnectFunc == nil {
return errDead

View file

@ -169,7 +169,7 @@ func (b *batchCallBuffer) doWrite(ctx context.Context, conn jsonWriter, isErrorR
}
b.wrote = true // can only write once
if len(b.resp) > 0 {
conn.writeJSON(ctx, b.resp, isErrorResponse)
conn.writeJSONBatch(ctx, b.resp, isErrorResponse)
}
}
@ -237,7 +237,7 @@ func (h *handler) handleBatch(msgs []*jsonrpcMessage) {
resp := h.handleCallMsg(cp, msg)
callBuffer.pushResponse(resp)
if resp != nil && h.batchResponseMaxSize != 0 {
responseBytes += len(resp.Result)
responseBytes += len(resp.Result) + len(resp.Error)
if responseBytes > h.batchResponseMaxSize {
err := &internalServerError{errcodeResponseTooLarge, errMsgResponseTooLarge}
callBuffer.respondWithError(cp.ctx, h.conn, err)
@ -268,7 +268,7 @@ func (h *handler) respondWithBatchTooLarge(cp *callProc, batch []*jsonrpcMessage
break
}
}
h.conn.writeJSON(cp.ctx, []*jsonrpcMessage{resp}, true)
h.conn.writeJSONBatch(cp.ctx, []*jsonrpcMessage{resp}, true)
}
// handleMsg handles a single non-batch message.
@ -415,7 +415,7 @@ func (h *handler) handleResponses(batch []*jsonrpcMessage, handleCall func(*json
// the op.resp channel.
if op.sub != nil {
if msg.Error != nil {
op.err = msg.Error
op.err = msg.decodeError()
} else {
op.err = json.Unmarshal(msg.Result, &op.sub.subid)
if op.err == nil {
@ -481,9 +481,10 @@ func (h *handler) handleCallMsg(ctx *callProc, msg *jsonrpcMessage) *jsonrpcMess
var logctx []any
logctx = append(logctx, "reqid", idForLog{msg.ID}, "duration", time.Since(start))
if resp.Error != nil {
logctx = append(logctx, "err", resp.Error.Message)
if resp.Error.Data != nil {
logctx = append(logctx, "errdata", formatErrorData(resp.Error.Data))
je := resp.decodeError()
logctx = append(logctx, "err", je.Message)
if je.Data != nil {
logctx = append(logctx, "errdata", formatErrorData(je.Data))
}
h.log.Warn("Served "+msg.Method, logctx...)
} else {
@ -550,7 +551,7 @@ func (h *handler) handleCall(cp *callProc, msg *jsonrpcMessage) *jsonrpcMessage
answer := h.runMethod(rctx, msg, callb, args)
var rErr error
if answer.Error != nil {
rErr = errors.New(answer.Error.Message)
rErr = errors.New(answer.decodeError().Message)
}
rSpanEnd(&rErr)
@ -623,7 +624,7 @@ func (h *handler) runMethod(ctx context.Context, msg *jsonrpcMessage, callb *cal
_, _, spanEnd := telemetry.StartSpanWithTracer(ctx, h.tracer(), "rpc.encodeJSONResponse", attributes...)
response := msg.response(result)
if response.Error != nil {
err = errors.New(response.Error.Message)
err = errors.New(response.decodeError().Message)
}
spanEnd(&err)
return response

View file

@ -57,10 +57,14 @@ type httpConn struct {
// and some methods don't work. The panic() stubs here exist to ensure
// this special treatment is correct.
func (hc *httpConn) writeJSON(context.Context, interface{}, bool) error {
func (hc *httpConn) writeJSON(context.Context, *jsonrpcMessage, bool) error {
panic("writeJSON called on httpConn")
}
func (hc *httpConn) writeJSONBatch(context.Context, []*jsonrpcMessage, bool) error {
panic("writeJSONBatch called on httpConn")
}
func (hc *httpConn) peerInfo() PeerInfo {
panic("peerInfo called on httpConn")
}
@ -179,9 +183,9 @@ func cleanlyCloseBody(body io.ReadCloser) error {
return body.Close()
}
func (c *Client) sendHTTP(ctx context.Context, op *requestOp, msg interface{}) error {
func (c *Client) sendHTTP(ctx context.Context, op *requestOp, msg *jsonrpcMessage) error {
hc := c.writeConn.(*httpConn)
respBody, err := hc.doRequest(ctx, msg)
respBody, err := hc.doRequest(ctx, appendMessage(nil, msg))
if err != nil {
return err
}
@ -198,7 +202,7 @@ func (c *Client) sendHTTP(ctx context.Context, op *requestOp, msg interface{}) e
func (c *Client) sendBatchHTTP(ctx context.Context, op *requestOp, msgs []*jsonrpcMessage) error {
hc := c.writeConn.(*httpConn)
respBody, err := hc.doRequest(ctx, msgs)
respBody, err := hc.doRequest(ctx, appendBatch(nil, msgs))
if err != nil {
return err
}
@ -212,11 +216,7 @@ func (c *Client) sendBatchHTTP(ctx context.Context, op *requestOp, msgs []*jsonr
return nil
}
func (hc *httpConn) doRequest(ctx context.Context, msg interface{}) (io.ReadCloser, error) {
body, err := json.Marshal(msg)
if err != nil {
return nil, err
}
func (hc *httpConn) doRequest(ctx context.Context, body []byte) (io.ReadCloser, error) {
req, err := http.NewRequestWithContext(ctx, http.MethodPost, hc.url, io.NopCloser(bytes.NewReader(body)))
if err != nil {
return nil, err
@ -268,41 +268,51 @@ func (s *Server) newHTTPServerConn(r *http.Request, w http.ResponseWriter) Serve
body := io.LimitReader(r.Body, int64(s.httpBodyLimit))
conn := &httpServerConn{Reader: body, Writer: w, r: r}
encoder := func(v any, isErrorResponse bool) error {
if !isErrorResponse {
return json.NewEncoder(conn).Encode(v)
}
// It's an error response and requires special treatment.
//
// In case of a timeout error, the response must be written before the HTTP
// server's write timeout occurs. So we need to flush the response. The
// Content-Length header also needs to be set to ensure the client knows
// when it has the full response.
encdata, err := json.Marshal(v)
if err != nil {
return err
}
w.Header().Set("content-length", strconv.Itoa(len(encdata)))
// If this request is wrapped in a handler that might remove Content-Length (such
// as the automatic gzip we do in package node), we need to ensure the HTTP server
// doesn't perform chunked encoding. In case WriteTimeout is reached, the chunked
// encoding might not be finished correctly, and some clients do not like it when
// the final chunk is missing.
w.Header().Set("transfer-encoding", "identity")
_, err = w.Write(encdata)
if f, ok := w.(http.Flusher); ok {
f.Flush()
}
return err
var buf []byte
encodeMsg := func(msg *jsonrpcMessage, isError bool) error {
buf = appendMessage(buf[:0], msg)
return httpWriteResult(w, buf, isError)
}
encodeBatch := func(msgs []*jsonrpcMessage, isError bool) error {
buf = appendBatch(buf[:0], msgs)
return httpWriteResult(w, buf, isError)
}
dec := json.NewDecoder(conn)
dec.UseNumber()
return NewFuncCodec(conn, encoder, dec.Decode)
return NewFuncCodec(conn, encodeMsg, encodeBatch, dec.Decode)
}
// httpWriteResult writes pre-encoded response data over HTTP.
// For error responses, it sets Content-Length and flushes to ensure the response
// is fully written before any HTTP server write timeout occurs.
func httpWriteResult(w http.ResponseWriter, data []byte, isError bool) error {
if !isError {
_, err := w.Write(data)
return err
}
// It's an error response and requires special treatment.
//
// In case of a timeout error, the response must be written before the HTTP
// server's write timeout occurs. So we need to flush the response. The
// Content-Length header also needs to be set to ensure the client knows
// when it has the full response.
w.Header().Set("content-length", strconv.Itoa(len(data)))
// If this request is wrapped in a handler that might remove Content-Length (such
// as the automatic gzip we do in package node), we need to ensure the HTTP server
// doesn't perform chunked encoding. In case WriteTimeout is reached, the chunked
// encoding might not be finished correctly, and some clients do not like it when
// the final chunk is missing.
w.Header().Set("transfer-encoding", "identity")
_, err := w.Write(data)
if f, ok := w.(http.Flusher); ok {
f.Flush()
}
return err
}
// Close does nothing and always returns nil.

View file

@ -27,6 +27,8 @@ import (
"strings"
"sync"
"time"
"github.com/fjl/jsonw"
)
const (
@ -52,12 +54,6 @@ type subscriptionResultEnc struct {
Result any `json:"result"`
}
type jsonrpcSubscriptionNotification struct {
Version string `json:"jsonrpc"`
Method string `json:"method"`
Params subscriptionResultEnc `json:"params"`
}
// A value of this type can a JSON-RPC request, notification, successful response or
// error response. Which one it is depends on the fields.
type jsonrpcMessage struct {
@ -65,7 +61,7 @@ type jsonrpcMessage struct {
ID json.RawMessage `json:"id,omitempty"`
Method string `json:"method,omitempty"`
Params json.RawMessage `json:"params,omitempty"`
Error *jsonError `json:"error,omitempty"`
Error json.RawMessage `json:"error,omitempty"`
Result json.RawMessage `json:"result,omitempty"`
}
@ -113,8 +109,29 @@ func (msg *jsonrpcMessage) errorResponse(err error) *jsonrpcMessage {
return resp
}
// decodeError decodes the Error field into a jsonError struct.
func (msg *jsonrpcMessage) decodeError() *jsonError {
if msg.Error == nil {
return nil
}
je := new(jsonError)
json.Unmarshal(msg.Error, je)
return je
}
func (msg *jsonrpcMessage) response(result interface{}) *jsonrpcMessage {
enc, err := json.Marshal(result)
var (
enc []byte
err error
)
// Call MarshalJSON directly for types that implement it. This avoids the
// expensive validation/compaction pass that json.Marshal performs on
// encoder output.
if m, ok := result.(json.Marshaler); ok {
enc, err = m.MarshalJSON()
} else {
enc, err = json.Marshal(result)
}
if err != nil {
return msg.errorResponse(&internalServerError{errcodeMarshalError, err.Error()})
}
@ -122,19 +139,18 @@ func (msg *jsonrpcMessage) response(result interface{}) *jsonrpcMessage {
}
func errorMessage(err error) *jsonrpcMessage {
msg := &jsonrpcMessage{Version: vsn, ID: null, Error: &jsonError{
je := &jsonError{
Code: errcodeDefault,
Message: err.Error(),
}}
ec, ok := err.(Error)
if ok {
msg.Error.Code = ec.ErrorCode()
}
de, ok := err.(DataError)
if ok {
msg.Error.Data = de.ErrorData()
if ec, ok := err.(Error); ok {
je.Code = ec.ErrorCode()
}
return msg
if de, ok := err.(DataError); ok {
je.Data = de.ErrorData()
}
enc, _ := json.Marshal(je)
return &jsonrpcMessage{Version: vsn, ID: null, Error: enc}
}
type jsonError struct {
@ -179,28 +195,32 @@ type ConnRemoteAddr interface {
// jsonCodec reads and writes JSON-RPC messages to the underlying connection. It also has
// support for parsing arguments and serializing (result) objects.
type jsonCodec struct {
remote string
closer sync.Once // close closed channel once
closeCh chan interface{} // closed on Close
decode decodeFunc // decoder to allow multiple transports
encMu sync.Mutex // guards the encoder
encode encodeFunc // encoder to allow multiple transports
conn deadlineCloser
remote string
closer sync.Once // close closed channel once
closeCh chan interface{} // closed on Close
decode decodeFunc // decoder to allow multiple transports
encMu sync.Mutex // guards the encoder
encodeMsg encodeMsgFunc // single-message encoder
encodeBatch encodeBatchFunc // batch encoder
conn deadlineCloser
}
type encodeFunc = func(v interface{}, isErrorResponse bool) error
type encodeMsgFunc = func(msg *jsonrpcMessage, isError bool) error
type encodeBatchFunc = func(msgs []*jsonrpcMessage, isError bool) error
type decodeFunc = func(v interface{}) error
// NewFuncCodec creates a codec which uses the given functions to read and write. If conn
// implements ConnRemoteAddr, log messages will use it to include the remote address of
// the connection.
func NewFuncCodec(conn deadlineCloser, encode encodeFunc, decode decodeFunc) ServerCodec {
func NewFuncCodec(conn deadlineCloser, encodeMsg encodeMsgFunc, encodeBatch encodeBatchFunc, decode decodeFunc) ServerCodec {
codec := &jsonCodec{
closeCh: make(chan interface{}),
encode: encode,
decode: decode,
conn: conn,
closeCh: make(chan interface{}),
encodeMsg: encodeMsg,
encodeBatch: encodeBatch,
decode: decode,
conn: conn,
}
if ra, ok := conn.(ConnRemoteAddr); ok {
codec.remote = ra.RemoteAddr()
@ -211,14 +231,62 @@ func NewFuncCodec(conn deadlineCloser, encode encodeFunc, decode decodeFunc) Ser
// NewCodec creates a codec on the given connection. If conn implements ConnRemoteAddr, log
// messages will use it to include the remote address of the connection.
func NewCodec(conn Conn) ServerCodec {
enc := json.NewEncoder(conn)
dec := json.NewDecoder(conn)
dec.UseNumber()
encode := func(v interface{}, isErrorResponse bool) error {
return enc.Encode(v)
var buf []byte
encodeMsg := func(msg *jsonrpcMessage, isError bool) error {
buf = appendMessage(buf[:0], msg)
buf = append(buf, '\n')
_, err := conn.Write(buf)
return err
}
return NewFuncCodec(conn, encode, dec.Decode)
encodeBatch := func(msgs []*jsonrpcMessage, isError bool) error {
buf = appendBatch(buf[:0], msgs)
buf = append(buf, '\n')
_, err := conn.Write(buf)
return err
}
return NewFuncCodec(conn, encodeMsg, encodeBatch, dec.Decode)
}
// appendMessage appends the JSON-RPC encoding of msg to buf.
func appendMessage(buf []byte, msg *jsonrpcMessage) []byte {
buf = append(buf, `{"jsonrpc":"2.0"`...)
if msg.ID != nil {
buf = append(buf, `,"id":`...)
buf = append(buf, msg.ID...)
}
if msg.Method != "" {
buf = append(buf, `,"method":`...)
buf = jsonw.AppendQuotedString(buf, msg.Method)
}
if msg.Params != nil {
buf = append(buf, `,"params":`...)
buf = append(buf, msg.Params...)
}
if msg.Error != nil {
buf = append(buf, `,"error":`...)
buf = append(buf, msg.Error...)
}
if msg.Result != nil {
buf = append(buf, `,"result":`...)
buf = append(buf, msg.Result...)
}
buf = append(buf, '}')
return buf
}
// appendBatch appends the JSON-RPC encoding of a message batch to buf.
func appendBatch(buf []byte, msgs []*jsonrpcMessage) []byte {
buf = append(buf, '[')
for i, msg := range msgs {
if i > 0 {
buf = append(buf, ',')
}
buf = appendMessage(buf, msg)
}
buf = append(buf, ']')
return buf
}
func (c *jsonCodec) peerInfo() PeerInfo {
@ -248,7 +316,7 @@ func (c *jsonCodec) readBatch() (messages []*jsonrpcMessage, batch bool, err err
return messages, batch, nil
}
func (c *jsonCodec) writeJSON(ctx context.Context, v interface{}, isErrorResponse bool) error {
func (c *jsonCodec) writeJSON(ctx context.Context, msg *jsonrpcMessage, isError bool) error {
c.encMu.Lock()
defer c.encMu.Unlock()
@ -257,7 +325,19 @@ func (c *jsonCodec) writeJSON(ctx context.Context, v interface{}, isErrorRespons
deadline = time.Now().Add(defaultWriteTimeout)
}
c.conn.SetWriteDeadline(deadline)
return c.encode(v, isErrorResponse)
return c.encodeMsg(msg, isError)
}
func (c *jsonCodec) writeJSONBatch(ctx context.Context, msgs []*jsonrpcMessage, isError bool) error {
c.encMu.Lock()
defer c.encMu.Unlock()
deadline, ok := ctx.Deadline()
if !ok {
deadline = time.Now().Add(defaultWriteTimeout)
}
c.conn.SetWriteDeadline(deadline)
return c.encodeBatch(msgs, isError)
}
func (c *jsonCodec) close() {

View file

@ -208,6 +208,48 @@ func TestServerBatchResponseSizeLimit(t *testing.T) {
}
}
// TestServerBatchResponseSizeLimit_errorResponses verifies that error responses
// are counted toward BatchResponseMaxSize.
func TestServerBatchResponseSizeLimit_errorResponses(t *testing.T) {
t.Parallel()
server := newTestServer()
defer server.Stop()
// Each error response for test_returnError is ~58 bytes of JSON in the Error field.
// Set limit to 100 so 1 response fits (58 bytes) but the 2nd (116 bytes) exceeds it.
server.SetBatchLimits(100, 100)
var (
batch []BatchElem
client = DialInProc(server)
)
for i := 0; i < 5; i++ {
batch = append(batch, BatchElem{
Method: "test_returnError",
Result: new(int),
})
}
if err := client.BatchCall(batch); err != nil {
t.Fatal("error sending batch:", err)
}
for i := range batch {
re, ok := batch[i].Error.(Error)
if !ok {
t.Fatalf("batch elem %d has wrong error type: %v", i, batch[i].Error)
}
if i < 2 {
// First two: elem 0 fits under limit, elem 1 pushes over but is already processed.
if re.ErrorCode() != 444 {
t.Errorf("batch elem %d wrong error code, have %d want 444", i, re.ErrorCode())
}
} else {
// Remaining should be the response-too-large error.
if re.ErrorCode() != errcodeResponseTooLarge {
t.Errorf("batch elem %d wrong error code, have %d want %d", i, re.ErrorCode(), errcodeResponseTooLarge)
}
}
}
}
func TestServerWebsocketReadLimit(t *testing.T) {
t.Parallel()

View file

@ -171,13 +171,17 @@ func (n *Notifier) activate() error {
}
func (n *Notifier) send(sub *Subscription, data any) error {
msg := jsonrpcSubscriptionNotification{
params, err := json.Marshal(subscriptionResultEnc{
ID: string(sub.ID),
Result: data,
})
if err != nil {
return err
}
msg := jsonrpcMessage{
Version: vsn,
Method: n.namespace + notificationMethodSuffix,
Params: subscriptionResultEnc{
ID: string(sub.ID),
Result: data,
},
Params: params,
}
return n.h.conn.writeJSON(context.Background(), &msg, false)
}

View file

@ -220,7 +220,7 @@ func readAndValidateMessage(in *json.Decoder) (*subConfirmation, *subscriptionRe
case msg.isResponse():
var c subConfirmation
if msg.Error != nil {
return nil, nil, msg.Error
return nil, nil, msg.decodeError()
} else if err := json.Unmarshal(msg.Result, &c.subid); err != nil {
return nil, nil, fmt.Errorf("invalid response: %v", err)
} else {
@ -233,12 +233,21 @@ func readAndValidateMessage(in *json.Decoder) (*subConfirmation, *subscriptionRe
}
type mockConn struct {
enc *json.Encoder
w io.Writer
}
// writeJSON writes a message to the connection.
func (c *mockConn) writeJSON(ctx context.Context, msg interface{}, isError bool) error {
return c.enc.Encode(msg)
func (c *mockConn) writeJSON(ctx context.Context, msg *jsonrpcMessage, isError bool) error {
buf := appendMessage(nil, msg)
buf = append(buf, '\n')
_, err := c.w.Write(buf)
return err
}
func (c *mockConn) writeJSONBatch(ctx context.Context, msgs []*jsonrpcMessage, isError bool) error {
buf := appendBatch(nil, msgs)
buf = append(buf, '\n')
_, err := c.w.Write(buf)
return err
}
// closed returns a channel which is closed when the connection is closed.
@ -251,7 +260,7 @@ func (c *mockConn) remoteAddr() string { return "" }
func BenchmarkNotify(b *testing.B) {
id := ID("test")
notifier := &Notifier{
h: &handler{conn: &mockConn{json.NewEncoder(io.Discard)}},
h: &handler{conn: &mockConn{io.Discard}},
sub: &Subscription{ID: id},
activated: true,
}
@ -271,7 +280,7 @@ func TestNotify(t *testing.T) {
out := new(bytes.Buffer)
id := ID("test")
notifier := &Notifier{
h: &handler{conn: &mockConn{json.NewEncoder(out)}},
h: &handler{conn: &mockConn{out}},
sub: &Subscription{ID: id},
activated: true,
}

View file

@ -51,9 +51,10 @@ type ServerCodec interface {
// jsonWriter can write JSON messages to its underlying connection.
// Implementations must be safe for concurrent use.
type jsonWriter interface {
// writeJSON writes a message to the connection.
writeJSON(ctx context.Context, msg interface{}, isError bool) error
// writeJSON writes a single JSON-RPC message to the connection.
writeJSON(ctx context.Context, msg *jsonrpcMessage, isError bool) error
// writeJSONBatch writes a batch of JSON-RPC messages to the connection.
writeJSONBatch(ctx context.Context, msgs []*jsonrpcMessage, isError bool) error
// Closed returns a channel which is closed when the connection is closed.
closed() <-chan interface{}
// RemoteAddr returns the peer address of the connection.

View file

@ -293,11 +293,17 @@ type websocketCodec struct {
func newWebsocketCodec(conn *websocket.Conn, host string, req http.Header, readLimit int64) ServerCodec {
conn.SetReadLimit(readLimit)
encode := func(v interface{}, isErrorResponse bool) error {
return conn.WriteJSON(v)
var buf []byte
encodeMsg := func(msg *jsonrpcMessage, isError bool) error {
buf = appendMessage(buf[:0], msg)
return conn.WriteMessage(websocket.TextMessage, buf)
}
encodeBatch := func(msgs []*jsonrpcMessage, isError bool) error {
buf = appendBatch(buf[:0], msgs)
return conn.WriteMessage(websocket.TextMessage, buf)
}
wc := &websocketCodec{
jsonCodec: NewFuncCodec(conn, encode, conn.ReadJSON).(*jsonCodec),
jsonCodec: NewFuncCodec(conn, encodeMsg, encodeBatch, conn.ReadJSON).(*jsonCodec),
conn: conn,
pingReset: make(chan struct{}, 1),
pongReceived: make(chan struct{}),
@ -342,8 +348,15 @@ func (wc *websocketCodec) peerInfo() PeerInfo {
return wc.info
}
func (wc *websocketCodec) writeJSON(ctx context.Context, v interface{}, isError bool) error {
err := wc.jsonCodec.writeJSON(ctx, v, isError)
func (wc *websocketCodec) writeJSON(ctx context.Context, msg *jsonrpcMessage, isError bool) error {
return wc.writeAndResetPing(wc.jsonCodec.writeJSON(ctx, msg, isError))
}
func (wc *websocketCodec) writeJSONBatch(ctx context.Context, msgs []*jsonrpcMessage, isError bool) error {
return wc.writeAndResetPing(wc.jsonCodec.writeJSONBatch(ctx, msgs, isError))
}
func (wc *websocketCodec) writeAndResetPing(err error) error {
if err == nil {
// Notify pingLoop to delay the next idle ping.
select {