rpc: separate encoding from I/O in JSON-RPC wire format and address other feedback nits

This commit is contained in:
jonny rhea 2026-03-10 16:15:02 -05:00
parent d5ba7b0957
commit 3688824db3
6 changed files with 66 additions and 42 deletions

View file

@ -364,7 +364,7 @@ func (c *Client) CallContext(ctx context.Context, result interface{}, method str
resp := batchresp[0]
switch {
case resp.Error != nil:
return resp.jsonErr()
return resp.decodeError()
case len(resp.Result) == 0:
return ErrNoResult
default:
@ -449,7 +449,7 @@ func (c *Client) BatchCallContext(ctx context.Context, b []BatchElem) error {
elem := &b[index]
switch {
case resp.Error != nil:
elem.Error = resp.jsonErr()
elem.Error = resp.decodeError()
case resp.Result == nil:
elem.Error = ErrNoResult
default:

View file

@ -415,7 +415,7 @@ func (h *handler) handleResponses(batch []*jsonrpcMessage, handleCall func(*json
// the op.resp channel.
if op.sub != nil {
if msg.Error != nil {
op.err = msg.jsonErr()
op.err = msg.decodeError()
} else {
op.err = json.Unmarshal(msg.Result, &op.sub.subid)
if op.err == nil {
@ -481,7 +481,7 @@ func (h *handler) handleCallMsg(ctx *callProc, msg *jsonrpcMessage) *jsonrpcMess
var logctx []any
logctx = append(logctx, "reqid", idForLog{msg.ID}, "duration", time.Since(start))
if resp.Error != nil {
je := resp.jsonErr()
je := resp.decodeError()
logctx = append(logctx, "err", je.Message)
if je.Data != nil {
logctx = append(logctx, "errdata", formatErrorData(je.Data))
@ -551,7 +551,7 @@ func (h *handler) handleCall(cp *callProc, msg *jsonrpcMessage) *jsonrpcMessage
answer := h.runMethod(rctx, msg, callb, args)
var rErr error
if answer.Error != nil {
rErr = errors.New(answer.jsonErr().Message)
rErr = errors.New(answer.decodeError().Message)
}
rSpanEnd(&rErr)
@ -624,7 +624,7 @@ func (h *handler) runMethod(ctx context.Context, msg *jsonrpcMessage, callb *cal
_, _, spanEnd := telemetry.StartSpanWithTracer(ctx, h.tracer(), "rpc.encodeJSONResponse", attributes...)
response := msg.response(result)
if response.Error != nil {
err = errors.New(response.jsonErr().Message)
err = errors.New(response.decodeError().Message)
}
spanEnd(&err)
return response

View file

@ -183,9 +183,9 @@ func cleanlyCloseBody(body io.ReadCloser) error {
return body.Close()
}
func (c *Client) sendHTTP(ctx context.Context, op *requestOp, msg interface{}) error {
func (c *Client) sendHTTP(ctx context.Context, op *requestOp, msg *jsonrpcMessage) error {
hc := c.writeConn.(*httpConn)
respBody, err := hc.doRequest(ctx, msg)
respBody, err := hc.doRequest(ctx, appendMessage(nil, msg))
if err != nil {
return err
}
@ -202,7 +202,7 @@ func (c *Client) sendHTTP(ctx context.Context, op *requestOp, msg interface{}) e
func (c *Client) sendBatchHTTP(ctx context.Context, op *requestOp, msgs []*jsonrpcMessage) error {
hc := c.writeConn.(*httpConn)
respBody, err := hc.doRequest(ctx, msgs)
respBody, err := hc.doRequest(ctx, appendBatch(nil, msgs))
if err != nil {
return err
}
@ -216,11 +216,7 @@ func (c *Client) sendBatchHTTP(ctx context.Context, op *requestOp, msgs []*jsonr
return nil
}
func (hc *httpConn) doRequest(ctx context.Context, msg interface{}) (io.ReadCloser, error) {
body, err := json.Marshal(msg)
if err != nil {
return nil, err
}
func (hc *httpConn) doRequest(ctx context.Context, body []byte) (io.ReadCloser, error) {
req, err := http.NewRequestWithContext(ctx, http.MethodPost, hc.url, io.NopCloser(bytes.NewReader(body)))
if err != nil {
return nil, err
@ -272,11 +268,14 @@ func (s *Server) newHTTPServerConn(r *http.Request, w http.ResponseWriter) Serve
body := io.LimitReader(r.Body, int64(s.httpBodyLimit))
conn := &httpServerConn{Reader: body, Writer: w, r: r}
var buf []byte
encodeMsg := func(msg *jsonrpcMessage, isError bool) error {
return httpEncodeValue(conn, w, msg, isError)
buf = appendMessage(buf[:0], msg)
return httpWriteResult(w, buf, isError)
}
encodeBatch := func(msgs []*jsonrpcMessage, isError bool) error {
return httpEncodeValue(conn, w, msgs, isError)
buf = appendBatch(buf[:0], msgs)
return httpWriteResult(w, buf, isError)
}
dec := json.NewDecoder(conn)
@ -285,12 +284,13 @@ func (s *Server) newHTTPServerConn(r *http.Request, w http.ResponseWriter) Serve
return NewFuncCodec(conn, encodeMsg, encodeBatch, dec.Decode)
}
// httpEncodeValue handles the HTTP-specific JSON encoding logic for responses.
// httpWriteResult writes pre-encoded response data over HTTP.
// For error responses, it sets Content-Length and flushes to ensure the response
// is fully written before any HTTP server write timeout occurs.
func httpEncodeValue(conn *httpServerConn, w http.ResponseWriter, v any, isError bool) error {
func httpWriteResult(w http.ResponseWriter, data []byte, isError bool) error {
if !isError {
return json.NewEncoder(conn).Encode(v)
_, err := w.Write(data)
return err
}
// It's an error response and requires special treatment.
@ -299,11 +299,7 @@ func httpEncodeValue(conn *httpServerConn, w http.ResponseWriter, v any, isError
// server's write timeout occurs. So we need to flush the response. The
// Content-Length header also needs to be set to ensure the client knows
// when it has the full response.
encdata, err := json.Marshal(v)
if err != nil {
return err
}
w.Header().Set("content-length", strconv.Itoa(len(encdata)))
w.Header().Set("content-length", strconv.Itoa(len(data)))
// If this request is wrapped in a handler that might remove Content-Length (such
// as the automatic gzip we do in package node), we need to ensure the HTTP server
@ -312,7 +308,7 @@ func httpEncodeValue(conn *httpServerConn, w http.ResponseWriter, v any, isError
// the final chunk is missing.
w.Header().Set("transfer-encoding", "identity")
_, err = w.Write(encdata)
_, err := w.Write(data)
if f, ok := w.(http.Flusher); ok {
f.Flush()
}

View file

@ -108,8 +108,8 @@ func (msg *jsonrpcMessage) errorResponse(err error) *jsonrpcMessage {
return resp
}
// jsonErr decodes the Error field into a jsonError struct.
func (msg *jsonrpcMessage) jsonErr() *jsonError {
// decodeError decodes the Error field into a jsonError struct.
func (msg *jsonrpcMessage) decodeError() *jsonError {
if msg.Error == nil {
return nil
}
@ -123,6 +123,9 @@ func (msg *jsonrpcMessage) response(result interface{}) *jsonrpcMessage {
enc []byte
err error
)
// Call MarshalJSON directly for types that implement it. This avoids the
// expensive validation/compaction pass that json.Marshal performs on
// encoder output.
if m, ok := result.(json.Marshaler); ok {
enc, err = m.MarshalJSON()
} else {
@ -227,22 +230,26 @@ func NewFuncCodec(conn deadlineCloser, encodeMsg encodeMsgFunc, encodeBatch enco
// NewCodec creates a codec on the given connection. If conn implements ConnRemoteAddr, log
// messages will use it to include the remote address of the connection.
func NewCodec(conn Conn) ServerCodec {
enc := json.NewEncoder(conn)
dec := json.NewDecoder(conn)
dec.UseNumber()
var buf []byte
encodeMsg := func(msg *jsonrpcMessage, isError bool) error {
return writeMessage(conn, msg)
buf = appendMessage(buf[:0], msg)
buf = append(buf, '\n')
_, err := conn.Write(buf)
return err
}
encodeBatch := func(msgs []*jsonrpcMessage, isError bool) error {
return enc.Encode(msgs)
buf = appendBatch(buf[:0], msgs)
buf = append(buf, '\n')
_, err := conn.Write(buf)
return err
}
return NewFuncCodec(conn, encodeMsg, encodeBatch, dec.Decode)
}
// writeMessage writes a single jsonrpcMessage directly to the writer.
func writeMessage(w io.Writer, msg *jsonrpcMessage) error {
var buf []byte
// appendMessage appends the JSON-RPC encoding of msg to buf.
func appendMessage(buf []byte, msg *jsonrpcMessage) []byte {
buf = append(buf, `{"jsonrpc":"2.0"`...)
if msg.ID != nil {
buf = append(buf, `,"id":`...)
@ -264,9 +271,21 @@ func writeMessage(w io.Writer, msg *jsonrpcMessage) error {
buf = append(buf, `,"result":`...)
buf = append(buf, msg.Result...)
}
buf = append(buf, '}', '\n')
_, err := w.Write(buf)
return err
buf = append(buf, '}')
return buf
}
// appendBatch appends the JSON-RPC encoding of a message batch to buf.
func appendBatch(buf []byte, msgs []*jsonrpcMessage) []byte {
buf = append(buf, '[')
for i, msg := range msgs {
if i > 0 {
buf = append(buf, ',')
}
buf = appendMessage(buf, msg)
}
buf = append(buf, ']')
return buf
}
const hexDigits = "0123456789abcdef"

View file

@ -220,7 +220,7 @@ func readAndValidateMessage(in *json.Decoder) (*subConfirmation, *subscriptionRe
case msg.isResponse():
var c subConfirmation
if msg.Error != nil {
return nil, nil, msg.jsonErr()
return nil, nil, msg.decodeError()
} else if err := json.Unmarshal(msg.Result, &c.subid); err != nil {
return nil, nil, fmt.Errorf("invalid response: %v", err)
} else {
@ -237,11 +237,17 @@ type mockConn struct {
}
func (c *mockConn) writeJSON(ctx context.Context, msg *jsonrpcMessage, isError bool) error {
return writeMessage(c.w, msg)
buf := appendMessage(nil, msg)
buf = append(buf, '\n')
_, err := c.w.Write(buf)
return err
}
func (c *mockConn) writeJSONBatch(ctx context.Context, msgs []*jsonrpcMessage, isError bool) error {
return json.NewEncoder(c.w).Encode(msgs)
buf := appendBatch(nil, msgs)
buf = append(buf, '\n')
_, err := c.w.Write(buf)
return err
}
// closed returns a channel which is closed when the connection is closed.

View file

@ -293,11 +293,14 @@ type websocketCodec struct {
func newWebsocketCodec(conn *websocket.Conn, host string, req http.Header, readLimit int64) ServerCodec {
conn.SetReadLimit(readLimit)
var buf []byte
encodeMsg := func(msg *jsonrpcMessage, isError bool) error {
return conn.WriteJSON(msg)
buf = appendMessage(buf[:0], msg)
return conn.WriteMessage(websocket.TextMessage, buf)
}
encodeBatch := func(msgs []*jsonrpcMessage, isError bool) error {
return conn.WriteJSON(msgs)
buf = appendBatch(buf[:0], msgs)
return conn.WriteMessage(websocket.TextMessage, buf)
}
wc := &websocketCodec{
jsonCodec: NewFuncCodec(conn, encodeMsg, encodeBatch, conn.ReadJSON).(*jsonCodec),