diff --git a/rpc/client.go b/rpc/client.go index 314612c1bb..9175626241 100644 --- a/rpc/client.go +++ b/rpc/client.go @@ -364,7 +364,7 @@ func (c *Client) CallContext(ctx context.Context, result interface{}, method str resp := batchresp[0] switch { case resp.Error != nil: - return resp.jsonErr() + return resp.decodeError() case len(resp.Result) == 0: return ErrNoResult default: @@ -449,7 +449,7 @@ func (c *Client) BatchCallContext(ctx context.Context, b []BatchElem) error { elem := &b[index] switch { case resp.Error != nil: - elem.Error = resp.jsonErr() + elem.Error = resp.decodeError() case resp.Result == nil: elem.Error = ErrNoResult default: diff --git a/rpc/handler.go b/rpc/handler.go index 1a1a31f920..89fc78236c 100644 --- a/rpc/handler.go +++ b/rpc/handler.go @@ -415,7 +415,7 @@ func (h *handler) handleResponses(batch []*jsonrpcMessage, handleCall func(*json // the op.resp channel. if op.sub != nil { if msg.Error != nil { - op.err = msg.jsonErr() + op.err = msg.decodeError() } else { op.err = json.Unmarshal(msg.Result, &op.sub.subid) if op.err == nil { @@ -481,7 +481,7 @@ func (h *handler) handleCallMsg(ctx *callProc, msg *jsonrpcMessage) *jsonrpcMess var logctx []any logctx = append(logctx, "reqid", idForLog{msg.ID}, "duration", time.Since(start)) if resp.Error != nil { - je := resp.jsonErr() + je := resp.decodeError() logctx = append(logctx, "err", je.Message) if je.Data != nil { logctx = append(logctx, "errdata", formatErrorData(je.Data)) @@ -551,7 +551,7 @@ func (h *handler) handleCall(cp *callProc, msg *jsonrpcMessage) *jsonrpcMessage answer := h.runMethod(rctx, msg, callb, args) var rErr error if answer.Error != nil { - rErr = errors.New(answer.jsonErr().Message) + rErr = errors.New(answer.decodeError().Message) } rSpanEnd(&rErr) @@ -624,7 +624,7 @@ func (h *handler) runMethod(ctx context.Context, msg *jsonrpcMessage, callb *cal _, _, spanEnd := telemetry.StartSpanWithTracer(ctx, h.tracer(), "rpc.encodeJSONResponse", attributes...) response := msg.response(result) if response.Error != nil { - err = errors.New(response.jsonErr().Message) + err = errors.New(response.decodeError().Message) } spanEnd(&err) return response diff --git a/rpc/http.go b/rpc/http.go index f61398889e..49618244df 100644 --- a/rpc/http.go +++ b/rpc/http.go @@ -183,9 +183,9 @@ func cleanlyCloseBody(body io.ReadCloser) error { return body.Close() } -func (c *Client) sendHTTP(ctx context.Context, op *requestOp, msg interface{}) error { +func (c *Client) sendHTTP(ctx context.Context, op *requestOp, msg *jsonrpcMessage) error { hc := c.writeConn.(*httpConn) - respBody, err := hc.doRequest(ctx, msg) + respBody, err := hc.doRequest(ctx, appendMessage(nil, msg)) if err != nil { return err } @@ -202,7 +202,7 @@ func (c *Client) sendHTTP(ctx context.Context, op *requestOp, msg interface{}) e func (c *Client) sendBatchHTTP(ctx context.Context, op *requestOp, msgs []*jsonrpcMessage) error { hc := c.writeConn.(*httpConn) - respBody, err := hc.doRequest(ctx, msgs) + respBody, err := hc.doRequest(ctx, appendBatch(nil, msgs)) if err != nil { return err } @@ -216,11 +216,7 @@ func (c *Client) sendBatchHTTP(ctx context.Context, op *requestOp, msgs []*jsonr return nil } -func (hc *httpConn) doRequest(ctx context.Context, msg interface{}) (io.ReadCloser, error) { - body, err := json.Marshal(msg) - if err != nil { - return nil, err - } +func (hc *httpConn) doRequest(ctx context.Context, body []byte) (io.ReadCloser, error) { req, err := http.NewRequestWithContext(ctx, http.MethodPost, hc.url, io.NopCloser(bytes.NewReader(body))) if err != nil { return nil, err @@ -272,11 +268,14 @@ func (s *Server) newHTTPServerConn(r *http.Request, w http.ResponseWriter) Serve body := io.LimitReader(r.Body, int64(s.httpBodyLimit)) conn := &httpServerConn{Reader: body, Writer: w, r: r} + var buf []byte encodeMsg := func(msg *jsonrpcMessage, isError bool) error { - return httpEncodeValue(conn, w, msg, isError) + buf = appendMessage(buf[:0], msg) + return httpWriteResult(w, buf, isError) } encodeBatch := func(msgs []*jsonrpcMessage, isError bool) error { - return httpEncodeValue(conn, w, msgs, isError) + buf = appendBatch(buf[:0], msgs) + return httpWriteResult(w, buf, isError) } dec := json.NewDecoder(conn) @@ -285,12 +284,13 @@ func (s *Server) newHTTPServerConn(r *http.Request, w http.ResponseWriter) Serve return NewFuncCodec(conn, encodeMsg, encodeBatch, dec.Decode) } -// httpEncodeValue handles the HTTP-specific JSON encoding logic for responses. +// httpWriteResult writes pre-encoded response data over HTTP. // For error responses, it sets Content-Length and flushes to ensure the response // is fully written before any HTTP server write timeout occurs. -func httpEncodeValue(conn *httpServerConn, w http.ResponseWriter, v any, isError bool) error { +func httpWriteResult(w http.ResponseWriter, data []byte, isError bool) error { if !isError { - return json.NewEncoder(conn).Encode(v) + _, err := w.Write(data) + return err } // It's an error response and requires special treatment. @@ -299,11 +299,7 @@ func httpEncodeValue(conn *httpServerConn, w http.ResponseWriter, v any, isError // server's write timeout occurs. So we need to flush the response. The // Content-Length header also needs to be set to ensure the client knows // when it has the full response. - encdata, err := json.Marshal(v) - if err != nil { - return err - } - w.Header().Set("content-length", strconv.Itoa(len(encdata))) + w.Header().Set("content-length", strconv.Itoa(len(data))) // If this request is wrapped in a handler that might remove Content-Length (such // as the automatic gzip we do in package node), we need to ensure the HTTP server @@ -312,7 +308,7 @@ func httpEncodeValue(conn *httpServerConn, w http.ResponseWriter, v any, isError // the final chunk is missing. w.Header().Set("transfer-encoding", "identity") - _, err = w.Write(encdata) + _, err := w.Write(data) if f, ok := w.(http.Flusher); ok { f.Flush() } diff --git a/rpc/json.go b/rpc/json.go index e74aadc695..120e6fa7c2 100644 --- a/rpc/json.go +++ b/rpc/json.go @@ -108,8 +108,8 @@ func (msg *jsonrpcMessage) errorResponse(err error) *jsonrpcMessage { return resp } -// jsonErr decodes the Error field into a jsonError struct. -func (msg *jsonrpcMessage) jsonErr() *jsonError { +// decodeError decodes the Error field into a jsonError struct. +func (msg *jsonrpcMessage) decodeError() *jsonError { if msg.Error == nil { return nil } @@ -123,6 +123,9 @@ func (msg *jsonrpcMessage) response(result interface{}) *jsonrpcMessage { enc []byte err error ) + // Call MarshalJSON directly for types that implement it. This avoids the + // expensive validation/compaction pass that json.Marshal performs on + // encoder output. if m, ok := result.(json.Marshaler); ok { enc, err = m.MarshalJSON() } else { @@ -227,22 +230,26 @@ func NewFuncCodec(conn deadlineCloser, encodeMsg encodeMsgFunc, encodeBatch enco // NewCodec creates a codec on the given connection. If conn implements ConnRemoteAddr, log // messages will use it to include the remote address of the connection. func NewCodec(conn Conn) ServerCodec { - enc := json.NewEncoder(conn) dec := json.NewDecoder(conn) dec.UseNumber() - + var buf []byte encodeMsg := func(msg *jsonrpcMessage, isError bool) error { - return writeMessage(conn, msg) + buf = appendMessage(buf[:0], msg) + buf = append(buf, '\n') + _, err := conn.Write(buf) + return err } encodeBatch := func(msgs []*jsonrpcMessage, isError bool) error { - return enc.Encode(msgs) + buf = appendBatch(buf[:0], msgs) + buf = append(buf, '\n') + _, err := conn.Write(buf) + return err } return NewFuncCodec(conn, encodeMsg, encodeBatch, dec.Decode) } -// writeMessage writes a single jsonrpcMessage directly to the writer. -func writeMessage(w io.Writer, msg *jsonrpcMessage) error { - var buf []byte +// appendMessage appends the JSON-RPC encoding of msg to buf. +func appendMessage(buf []byte, msg *jsonrpcMessage) []byte { buf = append(buf, `{"jsonrpc":"2.0"`...) if msg.ID != nil { buf = append(buf, `,"id":`...) @@ -264,9 +271,21 @@ func writeMessage(w io.Writer, msg *jsonrpcMessage) error { buf = append(buf, `,"result":`...) buf = append(buf, msg.Result...) } - buf = append(buf, '}', '\n') - _, err := w.Write(buf) - return err + buf = append(buf, '}') + return buf +} + +// appendBatch appends the JSON-RPC encoding of a message batch to buf. +func appendBatch(buf []byte, msgs []*jsonrpcMessage) []byte { + buf = append(buf, '[') + for i, msg := range msgs { + if i > 0 { + buf = append(buf, ',') + } + buf = appendMessage(buf, msg) + } + buf = append(buf, ']') + return buf } const hexDigits = "0123456789abcdef" diff --git a/rpc/subscription_test.go b/rpc/subscription_test.go index d1888c8a9a..623b496124 100644 --- a/rpc/subscription_test.go +++ b/rpc/subscription_test.go @@ -220,7 +220,7 @@ func readAndValidateMessage(in *json.Decoder) (*subConfirmation, *subscriptionRe case msg.isResponse(): var c subConfirmation if msg.Error != nil { - return nil, nil, msg.jsonErr() + return nil, nil, msg.decodeError() } else if err := json.Unmarshal(msg.Result, &c.subid); err != nil { return nil, nil, fmt.Errorf("invalid response: %v", err) } else { @@ -237,11 +237,17 @@ type mockConn struct { } func (c *mockConn) writeJSON(ctx context.Context, msg *jsonrpcMessage, isError bool) error { - return writeMessage(c.w, msg) + buf := appendMessage(nil, msg) + buf = append(buf, '\n') + _, err := c.w.Write(buf) + return err } func (c *mockConn) writeJSONBatch(ctx context.Context, msgs []*jsonrpcMessage, isError bool) error { - return json.NewEncoder(c.w).Encode(msgs) + buf := appendBatch(nil, msgs) + buf = append(buf, '\n') + _, err := c.w.Write(buf) + return err } // closed returns a channel which is closed when the connection is closed. diff --git a/rpc/websocket.go b/rpc/websocket.go index 9970c8d4b2..e70498873a 100644 --- a/rpc/websocket.go +++ b/rpc/websocket.go @@ -293,11 +293,14 @@ type websocketCodec struct { func newWebsocketCodec(conn *websocket.Conn, host string, req http.Header, readLimit int64) ServerCodec { conn.SetReadLimit(readLimit) + var buf []byte encodeMsg := func(msg *jsonrpcMessage, isError bool) error { - return conn.WriteJSON(msg) + buf = appendMessage(buf[:0], msg) + return conn.WriteMessage(websocket.TextMessage, buf) } encodeBatch := func(msgs []*jsonrpcMessage, isError bool) error { - return conn.WriteJSON(msgs) + buf = appendBatch(buf[:0], msgs) + return conn.WriteMessage(websocket.TextMessage, buf) } wc := &websocketCodec{ jsonCodec: NewFuncCodec(conn, encodeMsg, encodeBatch, conn.ReadJSON).(*jsonCodec),