rpc: replace interface{} codec writes with typed methods

This commit is contained in:
jonny rhea 2026-03-10 10:23:05 -05:00
parent ad11672233
commit 9d052bf2e1
8 changed files with 155 additions and 82 deletions

View file

@ -419,7 +419,7 @@ func (c *Client) BatchCallContext(ctx context.Context, b []BatchElem) error {
if c.isHTTP {
err = c.sendBatchHTTP(ctx, op, msgs)
} else {
err = c.send(ctx, op, msgs)
err = c.sendBatch(ctx, op, msgs)
}
if err != nil {
return err
@ -552,7 +552,7 @@ func (c *Client) newMessage(method string, paramsIn ...interface{}) (*jsonrpcMes
// send registers op with the dispatch loop, then sends msg on the connection.
// if sending fails, op is deregistered.
func (c *Client) send(ctx context.Context, op *requestOp, msg interface{}) error {
func (c *Client) send(ctx context.Context, op *requestOp, msg *jsonrpcMessage) error {
select {
case c.reqInit <- op:
err := c.write(ctx, msg, false)
@ -567,7 +567,22 @@ func (c *Client) send(ctx context.Context, op *requestOp, msg interface{}) error
}
}
func (c *Client) write(ctx context.Context, msg interface{}, retry bool) error {
// sendBatch registers op with the dispatch loop, then sends a batch of messages
// on the connection. If sending fails, op is deregistered.
func (c *Client) sendBatch(ctx context.Context, op *requestOp, msgs []*jsonrpcMessage) error {
select {
case c.reqInit <- op:
err := c.writeBatch(ctx, msgs, false)
c.reqSent <- err
return err
case <-ctx.Done():
return ctx.Err()
case <-c.closing:
return ErrClientQuit
}
}
func (c *Client) write(ctx context.Context, msg *jsonrpcMessage, retry bool) error {
if c.writeConn == nil {
// The previous write failed. Try to establish a new connection.
if err := c.reconnect(ctx); err != nil {
@ -584,6 +599,22 @@ func (c *Client) write(ctx context.Context, msg interface{}, retry bool) error {
return err
}
func (c *Client) writeBatch(ctx context.Context, msgs []*jsonrpcMessage, retry bool) error {
if c.writeConn == nil {
if err := c.reconnect(ctx); err != nil {
return err
}
}
err := c.writeConn.writeJSONBatch(ctx, msgs, false)
if err != nil {
c.writeConn = nil
if !retry {
return c.writeBatch(ctx, msgs, true)
}
}
return err
}
func (c *Client) reconnect(ctx context.Context) error {
if c.reconnectFunc == nil {
return errDead

View file

@ -169,7 +169,7 @@ func (b *batchCallBuffer) doWrite(ctx context.Context, conn jsonWriter, isErrorR
}
b.wrote = true // can only write once
if len(b.resp) > 0 {
conn.writeJSON(ctx, b.resp, isErrorResponse)
conn.writeJSONBatch(ctx, b.resp, isErrorResponse)
}
}
@ -268,7 +268,7 @@ func (h *handler) respondWithBatchTooLarge(cp *callProc, batch []*jsonrpcMessage
break
}
}
h.conn.writeJSON(cp.ctx, []*jsonrpcMessage{resp}, true)
h.conn.writeJSONBatch(cp.ctx, []*jsonrpcMessage{resp}, true)
}
// handleMsg handles a single non-batch message.

View file

@ -57,10 +57,14 @@ type httpConn struct {
// and some methods don't work. The panic() stubs here exist to ensure
// this special treatment is correct.
func (hc *httpConn) writeJSON(context.Context, interface{}, bool) error {
func (hc *httpConn) writeJSON(context.Context, *jsonrpcMessage, bool) error {
panic("writeJSON called on httpConn")
}
func (hc *httpConn) writeJSONBatch(context.Context, []*jsonrpcMessage, bool) error {
panic("writeJSONBatch called on httpConn")
}
func (hc *httpConn) peerInfo() PeerInfo {
panic("peerInfo called on httpConn")
}
@ -268,41 +272,51 @@ func (s *Server) newHTTPServerConn(r *http.Request, w http.ResponseWriter) Serve
body := io.LimitReader(r.Body, int64(s.httpBodyLimit))
conn := &httpServerConn{Reader: body, Writer: w, r: r}
encoder := func(v any, isErrorResponse bool) error {
if !isErrorResponse {
return json.NewEncoder(conn).Encode(v)
}
// It's an error response and requires special treatment.
//
// In case of a timeout error, the response must be written before the HTTP
// server's write timeout occurs. So we need to flush the response. The
// Content-Length header also needs to be set to ensure the client knows
// when it has the full response.
encdata, err := json.Marshal(v)
if err != nil {
return err
}
w.Header().Set("content-length", strconv.Itoa(len(encdata)))
// If this request is wrapped in a handler that might remove Content-Length (such
// as the automatic gzip we do in package node), we need to ensure the HTTP server
// doesn't perform chunked encoding. In case WriteTimeout is reached, the chunked
// encoding might not be finished correctly, and some clients do not like it when
// the final chunk is missing.
w.Header().Set("transfer-encoding", "identity")
_, err = w.Write(encdata)
if f, ok := w.(http.Flusher); ok {
f.Flush()
}
return err
encodeMsg := func(msg *jsonrpcMessage, isError bool) error {
return httpEncodeValue(conn, w, msg, isError)
}
encodeBatch := func(msgs []*jsonrpcMessage, isError bool) error {
return httpEncodeValue(conn, w, msgs, isError)
}
dec := json.NewDecoder(conn)
dec.UseNumber()
return NewFuncCodec(conn, encoder, dec.Decode)
return NewFuncCodec(conn, encodeMsg, encodeBatch, dec.Decode)
}
// httpEncodeValue handles the HTTP-specific JSON encoding logic for responses.
// For error responses, it sets Content-Length and flushes to ensure the response
// is fully written before any HTTP server write timeout occurs.
func httpEncodeValue(conn *httpServerConn, w http.ResponseWriter, v any, isError bool) error {
if !isError {
return json.NewEncoder(conn).Encode(v)
}
// It's an error response and requires special treatment.
//
// In case of a timeout error, the response must be written before the HTTP
// server's write timeout occurs. So we need to flush the response. The
// Content-Length header also needs to be set to ensure the client knows
// when it has the full response.
encdata, err := json.Marshal(v)
if err != nil {
return err
}
w.Header().Set("content-length", strconv.Itoa(len(encdata)))
// If this request is wrapped in a handler that might remove Content-Length (such
// as the automatic gzip we do in package node), we need to ensure the HTTP server
// doesn't perform chunked encoding. In case WriteTimeout is reached, the chunked
// encoding might not be finished correctly, and some clients do not like it when
// the final chunk is missing.
w.Header().Set("transfer-encoding", "identity")
_, err = w.Write(encdata)
if f, ok := w.(http.Flusher); ok {
f.Flush()
}
return err
}
// Close does nothing and always returns nil.

View file

@ -53,12 +53,6 @@ type subscriptionResultEnc struct {
Result any `json:"result"`
}
type jsonrpcSubscriptionNotification struct {
Version string `json:"jsonrpc"`
Method string `json:"method"`
Params subscriptionResultEnc `json:"params"`
}
// A value of this type can a JSON-RPC request, notification, successful response or
// error response. Which one it is depends on the fields.
type jsonrpcMessage struct {
@ -188,28 +182,32 @@ type ConnRemoteAddr interface {
// jsonCodec reads and writes JSON-RPC messages to the underlying connection. It also has
// support for parsing arguments and serializing (result) objects.
type jsonCodec struct {
remote string
closer sync.Once // close closed channel once
closeCh chan interface{} // closed on Close
decode decodeFunc // decoder to allow multiple transports
encMu sync.Mutex // guards the encoder
encode encodeFunc // encoder to allow multiple transports
conn deadlineCloser
remote string
closer sync.Once // close closed channel once
closeCh chan interface{} // closed on Close
decode decodeFunc // decoder to allow multiple transports
encMu sync.Mutex // guards the encoder
encodeMsg encodeMsgFunc // single-message encoder
encodeBatch encodeBatchFunc // batch encoder
conn deadlineCloser
}
type encodeFunc = func(v interface{}, isErrorResponse bool) error
type encodeMsgFunc = func(msg *jsonrpcMessage, isError bool) error
type encodeBatchFunc = func(msgs []*jsonrpcMessage, isError bool) error
type decodeFunc = func(v interface{}) error
// NewFuncCodec creates a codec which uses the given functions to read and write. If conn
// implements ConnRemoteAddr, log messages will use it to include the remote address of
// the connection.
func NewFuncCodec(conn deadlineCloser, encode encodeFunc, decode decodeFunc) ServerCodec {
func NewFuncCodec(conn deadlineCloser, encodeMsg encodeMsgFunc, encodeBatch encodeBatchFunc, decode decodeFunc) ServerCodec {
codec := &jsonCodec{
closeCh: make(chan interface{}),
encode: encode,
decode: decode,
conn: conn,
closeCh: make(chan interface{}),
encodeMsg: encodeMsg,
encodeBatch: encodeBatch,
decode: decode,
conn: conn,
}
if ra, ok := conn.(ConnRemoteAddr); ok {
codec.remote = ra.RemoteAddr()
@ -224,13 +222,13 @@ func NewCodec(conn Conn) ServerCodec {
dec := json.NewDecoder(conn)
dec.UseNumber()
encode := func(v interface{}, isErrorResponse bool) error {
if msg, ok := v.(*jsonrpcMessage); ok {
return writeMessage(conn, msg)
}
return enc.Encode(v)
encodeMsg := func(msg *jsonrpcMessage, isError bool) error {
return writeMessage(conn, msg)
}
return NewFuncCodec(conn, encode, dec.Decode)
encodeBatch := func(msgs []*jsonrpcMessage, isError bool) error {
return enc.Encode(msgs)
}
return NewFuncCodec(conn, encodeMsg, encodeBatch, dec.Decode)
}
// writeMessage writes a single jsonrpcMessage directly to the writer.
@ -343,7 +341,7 @@ func (c *jsonCodec) readBatch() (messages []*jsonrpcMessage, batch bool, err err
return messages, batch, nil
}
func (c *jsonCodec) writeJSON(ctx context.Context, v interface{}, isErrorResponse bool) error {
func (c *jsonCodec) writeJSON(ctx context.Context, msg *jsonrpcMessage, isError bool) error {
c.encMu.Lock()
defer c.encMu.Unlock()
@ -352,7 +350,19 @@ func (c *jsonCodec) writeJSON(ctx context.Context, v interface{}, isErrorRespons
deadline = time.Now().Add(defaultWriteTimeout)
}
c.conn.SetWriteDeadline(deadline)
return c.encode(v, isErrorResponse)
return c.encodeMsg(msg, isError)
}
func (c *jsonCodec) writeJSONBatch(ctx context.Context, msgs []*jsonrpcMessage, isError bool) error {
c.encMu.Lock()
defer c.encMu.Unlock()
deadline, ok := ctx.Deadline()
if !ok {
deadline = time.Now().Add(defaultWriteTimeout)
}
c.conn.SetWriteDeadline(deadline)
return c.encodeBatch(msgs, isError)
}
func (c *jsonCodec) close() {

View file

@ -171,13 +171,17 @@ func (n *Notifier) activate() error {
}
func (n *Notifier) send(sub *Subscription, data any) error {
msg := jsonrpcSubscriptionNotification{
params, err := json.Marshal(subscriptionResultEnc{
ID: string(sub.ID),
Result: data,
})
if err != nil {
return err
}
msg := jsonrpcMessage{
Version: vsn,
Method: n.namespace + notificationMethodSuffix,
Params: subscriptionResultEnc{
ID: string(sub.ID),
Result: data,
},
Params: params,
}
return n.h.conn.writeJSON(context.Background(), &msg, false)
}

View file

@ -233,12 +233,15 @@ func readAndValidateMessage(in *json.Decoder) (*subConfirmation, *subscriptionRe
}
type mockConn struct {
enc *json.Encoder
w io.Writer
}
// writeJSON writes a message to the connection.
func (c *mockConn) writeJSON(ctx context.Context, msg interface{}, isError bool) error {
return c.enc.Encode(msg)
func (c *mockConn) writeJSON(ctx context.Context, msg *jsonrpcMessage, isError bool) error {
return writeMessage(c.w, msg)
}
func (c *mockConn) writeJSONBatch(ctx context.Context, msgs []*jsonrpcMessage, isError bool) error {
return json.NewEncoder(c.w).Encode(msgs)
}
// closed returns a channel which is closed when the connection is closed.
@ -251,7 +254,7 @@ func (c *mockConn) remoteAddr() string { return "" }
func BenchmarkNotify(b *testing.B) {
id := ID("test")
notifier := &Notifier{
h: &handler{conn: &mockConn{json.NewEncoder(io.Discard)}},
h: &handler{conn: &mockConn{io.Discard}},
sub: &Subscription{ID: id},
activated: true,
}
@ -271,7 +274,7 @@ func TestNotify(t *testing.T) {
out := new(bytes.Buffer)
id := ID("test")
notifier := &Notifier{
h: &handler{conn: &mockConn{json.NewEncoder(out)}},
h: &handler{conn: &mockConn{out}},
sub: &Subscription{ID: id},
activated: true,
}

View file

@ -51,9 +51,10 @@ type ServerCodec interface {
// jsonWriter can write JSON messages to its underlying connection.
// Implementations must be safe for concurrent use.
type jsonWriter interface {
// writeJSON writes a message to the connection.
writeJSON(ctx context.Context, msg interface{}, isError bool) error
// writeJSON writes a single JSON-RPC message to the connection.
writeJSON(ctx context.Context, msg *jsonrpcMessage, isError bool) error
// writeJSONBatch writes a batch of JSON-RPC messages to the connection.
writeJSONBatch(ctx context.Context, msgs []*jsonrpcMessage, isError bool) error
// Closed returns a channel which is closed when the connection is closed.
closed() <-chan interface{}
// RemoteAddr returns the peer address of the connection.

View file

@ -293,11 +293,14 @@ type websocketCodec struct {
func newWebsocketCodec(conn *websocket.Conn, host string, req http.Header, readLimit int64) ServerCodec {
conn.SetReadLimit(readLimit)
encode := func(v interface{}, isErrorResponse bool) error {
return conn.WriteJSON(v)
encodeMsg := func(msg *jsonrpcMessage, isError bool) error {
return conn.WriteJSON(msg)
}
encodeBatch := func(msgs []*jsonrpcMessage, isError bool) error {
return conn.WriteJSON(msgs)
}
wc := &websocketCodec{
jsonCodec: NewFuncCodec(conn, encode, conn.ReadJSON).(*jsonCodec),
jsonCodec: NewFuncCodec(conn, encodeMsg, encodeBatch, conn.ReadJSON).(*jsonCodec),
conn: conn,
pingReset: make(chan struct{}, 1),
pongReceived: make(chan struct{}),
@ -342,8 +345,15 @@ func (wc *websocketCodec) peerInfo() PeerInfo {
return wc.info
}
func (wc *websocketCodec) writeJSON(ctx context.Context, v interface{}, isError bool) error {
err := wc.jsonCodec.writeJSON(ctx, v, isError)
func (wc *websocketCodec) writeJSON(ctx context.Context, msg *jsonrpcMessage, isError bool) error {
return wc.writeAndResetPing(wc.jsonCodec.writeJSON(ctx, msg, isError))
}
func (wc *websocketCodec) writeJSONBatch(ctx context.Context, msgs []*jsonrpcMessage, isError bool) error {
return wc.writeAndResetPing(wc.jsonCodec.writeJSONBatch(ctx, msgs, isError))
}
func (wc *websocketCodec) writeAndResetPing(err error) error {
if err == nil {
// Notify pingLoop to delay the next idle ping.
select {