diff --git a/rpc/client.go b/rpc/client.go index 8d81503d59..b644a04c48 100644 --- a/rpc/client.go +++ b/rpc/client.go @@ -419,7 +419,7 @@ func (c *Client) BatchCallContext(ctx context.Context, b []BatchElem) error { if c.isHTTP { err = c.sendBatchHTTP(ctx, op, msgs) } else { - err = c.send(ctx, op, msgs) + err = c.sendBatch(ctx, op, msgs) } if err != nil { return err @@ -552,7 +552,7 @@ func (c *Client) newMessage(method string, paramsIn ...interface{}) (*jsonrpcMes // send registers op with the dispatch loop, then sends msg on the connection. // if sending fails, op is deregistered. -func (c *Client) send(ctx context.Context, op *requestOp, msg interface{}) error { +func (c *Client) send(ctx context.Context, op *requestOp, msg *jsonrpcMessage) error { select { case c.reqInit <- op: err := c.write(ctx, msg, false) @@ -567,7 +567,22 @@ func (c *Client) send(ctx context.Context, op *requestOp, msg interface{}) error } } -func (c *Client) write(ctx context.Context, msg interface{}, retry bool) error { +// sendBatch registers op with the dispatch loop, then sends a batch of messages +// on the connection. If sending fails, op is deregistered. +func (c *Client) sendBatch(ctx context.Context, op *requestOp, msgs []*jsonrpcMessage) error { + select { + case c.reqInit <- op: + err := c.writeBatch(ctx, msgs, false) + c.reqSent <- err + return err + case <-ctx.Done(): + return ctx.Err() + case <-c.closing: + return ErrClientQuit + } +} + +func (c *Client) write(ctx context.Context, msg *jsonrpcMessage, retry bool) error { if c.writeConn == nil { // The previous write failed. Try to establish a new connection. if err := c.reconnect(ctx); err != nil { @@ -584,6 +599,22 @@ func (c *Client) write(ctx context.Context, msg interface{}, retry bool) error { return err } +func (c *Client) writeBatch(ctx context.Context, msgs []*jsonrpcMessage, retry bool) error { + if c.writeConn == nil { + if err := c.reconnect(ctx); err != nil { + return err + } + } + err := c.writeConn.writeJSONBatch(ctx, msgs, false) + if err != nil { + c.writeConn = nil + if !retry { + return c.writeBatch(ctx, msgs, true) + } + } + return err +} + func (c *Client) reconnect(ctx context.Context) error { if c.reconnectFunc == nil { return errDead diff --git a/rpc/handler.go b/rpc/handler.go index c0af162f13..12828b3350 100644 --- a/rpc/handler.go +++ b/rpc/handler.go @@ -169,7 +169,7 @@ func (b *batchCallBuffer) doWrite(ctx context.Context, conn jsonWriter, isErrorR } b.wrote = true // can only write once if len(b.resp) > 0 { - conn.writeJSON(ctx, b.resp, isErrorResponse) + conn.writeJSONBatch(ctx, b.resp, isErrorResponse) } } @@ -268,7 +268,7 @@ func (h *handler) respondWithBatchTooLarge(cp *callProc, batch []*jsonrpcMessage break } } - h.conn.writeJSON(cp.ctx, []*jsonrpcMessage{resp}, true) + h.conn.writeJSONBatch(cp.ctx, []*jsonrpcMessage{resp}, true) } // handleMsg handles a single non-batch message. diff --git a/rpc/http.go b/rpc/http.go index 55f0abfa72..f61398889e 100644 --- a/rpc/http.go +++ b/rpc/http.go @@ -57,10 +57,14 @@ type httpConn struct { // and some methods don't work. The panic() stubs here exist to ensure // this special treatment is correct. -func (hc *httpConn) writeJSON(context.Context, interface{}, bool) error { +func (hc *httpConn) writeJSON(context.Context, *jsonrpcMessage, bool) error { panic("writeJSON called on httpConn") } +func (hc *httpConn) writeJSONBatch(context.Context, []*jsonrpcMessage, bool) error { + panic("writeJSONBatch called on httpConn") +} + func (hc *httpConn) peerInfo() PeerInfo { panic("peerInfo called on httpConn") } @@ -268,41 +272,51 @@ func (s *Server) newHTTPServerConn(r *http.Request, w http.ResponseWriter) Serve body := io.LimitReader(r.Body, int64(s.httpBodyLimit)) conn := &httpServerConn{Reader: body, Writer: w, r: r} - encoder := func(v any, isErrorResponse bool) error { - if !isErrorResponse { - return json.NewEncoder(conn).Encode(v) - } - - // It's an error response and requires special treatment. - // - // In case of a timeout error, the response must be written before the HTTP - // server's write timeout occurs. So we need to flush the response. The - // Content-Length header also needs to be set to ensure the client knows - // when it has the full response. - encdata, err := json.Marshal(v) - if err != nil { - return err - } - w.Header().Set("content-length", strconv.Itoa(len(encdata))) - - // If this request is wrapped in a handler that might remove Content-Length (such - // as the automatic gzip we do in package node), we need to ensure the HTTP server - // doesn't perform chunked encoding. In case WriteTimeout is reached, the chunked - // encoding might not be finished correctly, and some clients do not like it when - // the final chunk is missing. - w.Header().Set("transfer-encoding", "identity") - - _, err = w.Write(encdata) - if f, ok := w.(http.Flusher); ok { - f.Flush() - } - return err + encodeMsg := func(msg *jsonrpcMessage, isError bool) error { + return httpEncodeValue(conn, w, msg, isError) + } + encodeBatch := func(msgs []*jsonrpcMessage, isError bool) error { + return httpEncodeValue(conn, w, msgs, isError) } dec := json.NewDecoder(conn) dec.UseNumber() - return NewFuncCodec(conn, encoder, dec.Decode) + return NewFuncCodec(conn, encodeMsg, encodeBatch, dec.Decode) +} + +// httpEncodeValue handles the HTTP-specific JSON encoding logic for responses. +// For error responses, it sets Content-Length and flushes to ensure the response +// is fully written before any HTTP server write timeout occurs. +func httpEncodeValue(conn *httpServerConn, w http.ResponseWriter, v any, isError bool) error { + if !isError { + return json.NewEncoder(conn).Encode(v) + } + + // It's an error response and requires special treatment. + // + // In case of a timeout error, the response must be written before the HTTP + // server's write timeout occurs. So we need to flush the response. The + // Content-Length header also needs to be set to ensure the client knows + // when it has the full response. + encdata, err := json.Marshal(v) + if err != nil { + return err + } + w.Header().Set("content-length", strconv.Itoa(len(encdata))) + + // If this request is wrapped in a handler that might remove Content-Length (such + // as the automatic gzip we do in package node), we need to ensure the HTTP server + // doesn't perform chunked encoding. In case WriteTimeout is reached, the chunked + // encoding might not be finished correctly, and some clients do not like it when + // the final chunk is missing. + w.Header().Set("transfer-encoding", "identity") + + _, err = w.Write(encdata) + if f, ok := w.(http.Flusher); ok { + f.Flush() + } + return err } // Close does nothing and always returns nil. diff --git a/rpc/json.go b/rpc/json.go index daeaffcc35..863998c56a 100644 --- a/rpc/json.go +++ b/rpc/json.go @@ -53,12 +53,6 @@ type subscriptionResultEnc struct { Result any `json:"result"` } -type jsonrpcSubscriptionNotification struct { - Version string `json:"jsonrpc"` - Method string `json:"method"` - Params subscriptionResultEnc `json:"params"` -} - // A value of this type can a JSON-RPC request, notification, successful response or // error response. Which one it is depends on the fields. type jsonrpcMessage struct { @@ -188,28 +182,32 @@ type ConnRemoteAddr interface { // jsonCodec reads and writes JSON-RPC messages to the underlying connection. It also has // support for parsing arguments and serializing (result) objects. type jsonCodec struct { - remote string - closer sync.Once // close closed channel once - closeCh chan interface{} // closed on Close - decode decodeFunc // decoder to allow multiple transports - encMu sync.Mutex // guards the encoder - encode encodeFunc // encoder to allow multiple transports - conn deadlineCloser + remote string + closer sync.Once // close closed channel once + closeCh chan interface{} // closed on Close + decode decodeFunc // decoder to allow multiple transports + encMu sync.Mutex // guards the encoder + encodeMsg encodeMsgFunc // single-message encoder + encodeBatch encodeBatchFunc // batch encoder + conn deadlineCloser } -type encodeFunc = func(v interface{}, isErrorResponse bool) error +type encodeMsgFunc = func(msg *jsonrpcMessage, isError bool) error + +type encodeBatchFunc = func(msgs []*jsonrpcMessage, isError bool) error type decodeFunc = func(v interface{}) error // NewFuncCodec creates a codec which uses the given functions to read and write. If conn // implements ConnRemoteAddr, log messages will use it to include the remote address of // the connection. -func NewFuncCodec(conn deadlineCloser, encode encodeFunc, decode decodeFunc) ServerCodec { +func NewFuncCodec(conn deadlineCloser, encodeMsg encodeMsgFunc, encodeBatch encodeBatchFunc, decode decodeFunc) ServerCodec { codec := &jsonCodec{ - closeCh: make(chan interface{}), - encode: encode, - decode: decode, - conn: conn, + closeCh: make(chan interface{}), + encodeMsg: encodeMsg, + encodeBatch: encodeBatch, + decode: decode, + conn: conn, } if ra, ok := conn.(ConnRemoteAddr); ok { codec.remote = ra.RemoteAddr() @@ -224,13 +222,13 @@ func NewCodec(conn Conn) ServerCodec { dec := json.NewDecoder(conn) dec.UseNumber() - encode := func(v interface{}, isErrorResponse bool) error { - if msg, ok := v.(*jsonrpcMessage); ok { - return writeMessage(conn, msg) - } - return enc.Encode(v) + encodeMsg := func(msg *jsonrpcMessage, isError bool) error { + return writeMessage(conn, msg) } - return NewFuncCodec(conn, encode, dec.Decode) + encodeBatch := func(msgs []*jsonrpcMessage, isError bool) error { + return enc.Encode(msgs) + } + return NewFuncCodec(conn, encodeMsg, encodeBatch, dec.Decode) } // writeMessage writes a single jsonrpcMessage directly to the writer. @@ -343,7 +341,7 @@ func (c *jsonCodec) readBatch() (messages []*jsonrpcMessage, batch bool, err err return messages, batch, nil } -func (c *jsonCodec) writeJSON(ctx context.Context, v interface{}, isErrorResponse bool) error { +func (c *jsonCodec) writeJSON(ctx context.Context, msg *jsonrpcMessage, isError bool) error { c.encMu.Lock() defer c.encMu.Unlock() @@ -352,7 +350,19 @@ func (c *jsonCodec) writeJSON(ctx context.Context, v interface{}, isErrorRespons deadline = time.Now().Add(defaultWriteTimeout) } c.conn.SetWriteDeadline(deadline) - return c.encode(v, isErrorResponse) + return c.encodeMsg(msg, isError) +} + +func (c *jsonCodec) writeJSONBatch(ctx context.Context, msgs []*jsonrpcMessage, isError bool) error { + c.encMu.Lock() + defer c.encMu.Unlock() + + deadline, ok := ctx.Deadline() + if !ok { + deadline = time.Now().Add(defaultWriteTimeout) + } + c.conn.SetWriteDeadline(deadline) + return c.encodeBatch(msgs, isError) } func (c *jsonCodec) close() { diff --git a/rpc/subscription.go b/rpc/subscription.go index 9e400c8b60..6b90bd4a3b 100644 --- a/rpc/subscription.go +++ b/rpc/subscription.go @@ -171,13 +171,17 @@ func (n *Notifier) activate() error { } func (n *Notifier) send(sub *Subscription, data any) error { - msg := jsonrpcSubscriptionNotification{ + params, err := json.Marshal(subscriptionResultEnc{ + ID: string(sub.ID), + Result: data, + }) + if err != nil { + return err + } + msg := jsonrpcMessage{ Version: vsn, Method: n.namespace + notificationMethodSuffix, - Params: subscriptionResultEnc{ - ID: string(sub.ID), - Result: data, - }, + Params: params, } return n.h.conn.writeJSON(context.Background(), &msg, false) } diff --git a/rpc/subscription_test.go b/rpc/subscription_test.go index cd44d219de..89b629f7c9 100644 --- a/rpc/subscription_test.go +++ b/rpc/subscription_test.go @@ -233,12 +233,15 @@ func readAndValidateMessage(in *json.Decoder) (*subConfirmation, *subscriptionRe } type mockConn struct { - enc *json.Encoder + w io.Writer } -// writeJSON writes a message to the connection. -func (c *mockConn) writeJSON(ctx context.Context, msg interface{}, isError bool) error { - return c.enc.Encode(msg) +func (c *mockConn) writeJSON(ctx context.Context, msg *jsonrpcMessage, isError bool) error { + return writeMessage(c.w, msg) +} + +func (c *mockConn) writeJSONBatch(ctx context.Context, msgs []*jsonrpcMessage, isError bool) error { + return json.NewEncoder(c.w).Encode(msgs) } // closed returns a channel which is closed when the connection is closed. @@ -251,7 +254,7 @@ func (c *mockConn) remoteAddr() string { return "" } func BenchmarkNotify(b *testing.B) { id := ID("test") notifier := &Notifier{ - h: &handler{conn: &mockConn{json.NewEncoder(io.Discard)}}, + h: &handler{conn: &mockConn{io.Discard}}, sub: &Subscription{ID: id}, activated: true, } @@ -271,7 +274,7 @@ func TestNotify(t *testing.T) { out := new(bytes.Buffer) id := ID("test") notifier := &Notifier{ - h: &handler{conn: &mockConn{json.NewEncoder(out)}}, + h: &handler{conn: &mockConn{out}}, sub: &Subscription{ID: id}, activated: true, } diff --git a/rpc/types.go b/rpc/types.go index 85f15344e8..578d3f86dd 100644 --- a/rpc/types.go +++ b/rpc/types.go @@ -51,9 +51,10 @@ type ServerCodec interface { // jsonWriter can write JSON messages to its underlying connection. // Implementations must be safe for concurrent use. type jsonWriter interface { - // writeJSON writes a message to the connection. - writeJSON(ctx context.Context, msg interface{}, isError bool) error - + // writeJSON writes a single JSON-RPC message to the connection. + writeJSON(ctx context.Context, msg *jsonrpcMessage, isError bool) error + // writeJSONBatch writes a batch of JSON-RPC messages to the connection. + writeJSONBatch(ctx context.Context, msgs []*jsonrpcMessage, isError bool) error // Closed returns a channel which is closed when the connection is closed. closed() <-chan interface{} // RemoteAddr returns the peer address of the connection. diff --git a/rpc/websocket.go b/rpc/websocket.go index ec676b9caf..9970c8d4b2 100644 --- a/rpc/websocket.go +++ b/rpc/websocket.go @@ -293,11 +293,14 @@ type websocketCodec struct { func newWebsocketCodec(conn *websocket.Conn, host string, req http.Header, readLimit int64) ServerCodec { conn.SetReadLimit(readLimit) - encode := func(v interface{}, isErrorResponse bool) error { - return conn.WriteJSON(v) + encodeMsg := func(msg *jsonrpcMessage, isError bool) error { + return conn.WriteJSON(msg) + } + encodeBatch := func(msgs []*jsonrpcMessage, isError bool) error { + return conn.WriteJSON(msgs) } wc := &websocketCodec{ - jsonCodec: NewFuncCodec(conn, encode, conn.ReadJSON).(*jsonCodec), + jsonCodec: NewFuncCodec(conn, encodeMsg, encodeBatch, conn.ReadJSON).(*jsonCodec), conn: conn, pingReset: make(chan struct{}, 1), pongReceived: make(chan struct{}), @@ -342,8 +345,15 @@ func (wc *websocketCodec) peerInfo() PeerInfo { return wc.info } -func (wc *websocketCodec) writeJSON(ctx context.Context, v interface{}, isError bool) error { - err := wc.jsonCodec.writeJSON(ctx, v, isError) +func (wc *websocketCodec) writeJSON(ctx context.Context, msg *jsonrpcMessage, isError bool) error { + return wc.writeAndResetPing(wc.jsonCodec.writeJSON(ctx, msg, isError)) +} + +func (wc *websocketCodec) writeJSONBatch(ctx context.Context, msgs []*jsonrpcMessage, isError bool) error { + return wc.writeAndResetPing(wc.jsonCodec.writeJSONBatch(ctx, msgs, isError)) +} + +func (wc *websocketCodec) writeAndResetPing(err error) error { if err == nil { // Notify pingLoop to delay the next idle ping. select {