diff --git a/core/state_processor.go b/core/state_processor.go index 5092379056..5f43206eb4 100644 --- a/core/state_processor.go +++ b/core/state_processor.go @@ -104,7 +104,7 @@ func (p *StateProcessor) Process(ctx context.Context, block *types.Block, stated statedb.SetTxContext(tx.Hash(), i, uint32(i+1)) _, _, spanEnd := telemetry.StartSpan(ctx, "core.ApplyTransactionWithEVM", telemetry.StringAttribute("tx.hash", tx.Hash().Hex()), - telemetry.Int64Attribute("tx.index", int64(i)), + telemetry.IntAttribute("tx.index", i), ) receipt, bal, err := ApplyTransactionWithEVM(msg, gp, statedb, blockNumber, blockHash, context.Time, tx, evm) if err != nil { diff --git a/eth/catalyst/api.go b/eth/catalyst/api.go index 71e92e315d..1de2c80848 100644 --- a/eth/catalyst/api.go +++ b/eth/catalyst/api.go @@ -556,12 +556,12 @@ func (api *ConsensusAPI) GetBlobsV1(ctx context.Context, hashes []common.Hash) ( var ( filled int attrs = []telemetry.Attribute{ - telemetry.Int64Attribute("blobs.requested", int64(len(hashes))), + telemetry.IntAttribute("blobs.requested", len(hashes)), } ) ctx, span, spanEnd := telemetry.StartSpan(ctx, "engine.getBlobsV1", attrs...) defer func() { - span.SetAttributes(telemetry.Int64Attribute("blobs.filled", int64(filled))) + span.SetAttributes(telemetry.IntAttribute("blobs.filled", filled)) spanEnd(&err) }() @@ -643,12 +643,12 @@ func (api *ConsensusAPI) getBlobs(ctx context.Context, hashes []common.Hash, v2 var ( filled int attrs = []telemetry.Attribute{ - telemetry.Int64Attribute("blobs.requested", int64(len(hashes))), + telemetry.IntAttribute("blobs.requested", len(hashes)), } ) ctx, span, spanEnd := telemetry.StartSpan(ctx, "engine.getBlobs", attrs...) defer func() { - span.SetAttributes(telemetry.Int64Attribute("blobs.filled", int64(filled))) + span.SetAttributes(telemetry.IntAttribute("blobs.filled", filled)) spanEnd(&err) }() @@ -833,7 +833,7 @@ func (api *ConsensusAPI) newPayload(ctx context.Context, params engine.Executabl var attrs = []telemetry.Attribute{ telemetry.Int64Attribute("block.number", int64(params.Number)), telemetry.StringAttribute("block.hash", params.BlockHash.Hex()), - telemetry.Int64Attribute("tx.count", int64(len(params.Transactions))), + telemetry.IntAttribute("tx.count", len(params.Transactions)), } ctx, _, spanEnd := telemetry.StartSpan(ctx, "engine.newPayload", attrs...) defer spanEnd(&err) diff --git a/eth/catalyst/simulated_beacon.go b/eth/catalyst/simulated_beacon.go index 8a77cd8abe..7120c14501 100644 --- a/eth/catalyst/simulated_beacon.go +++ b/eth/catalyst/simulated_beacon.go @@ -215,7 +215,7 @@ func (c *SimulatedBeacon) sealBlock(withdrawals []*types.Withdrawal, timestamp u // Create a server span for forkchoiceUpdated with payload attributes, // simulating an incoming engine API request from a real consensus client. - fcCtx, fcSpanEnd := telemetry.StartServerSpan(context.Background(), tracer, telemetry.RPCInfo{ + fcCtx, fcSpanEnd := telemetry.StartCallServerSpan(context.Background(), tracer, telemetry.RPCInfo{ System: "jsonrpc", Service: "engine", Method: "forkchoiceUpdatedV" + fmt.Sprintf("%d", version), @@ -237,7 +237,7 @@ func (c *SimulatedBeacon) sealBlock(withdrawals []*types.Withdrawal, timestamp u // Create a server span for getPayload, simulating the consensus client // coming back to retrieve the built payload. - _, gpSpanEnd := telemetry.StartServerSpan(context.Background(), tracer, telemetry.RPCInfo{ + _, gpSpanEnd := telemetry.StartCallServerSpan(context.Background(), tracer, telemetry.RPCInfo{ System: "jsonrpc", Service: "engine", Method: "getPayloadV" + fmt.Sprintf("%d", version), @@ -286,7 +286,7 @@ func (c *SimulatedBeacon) sealBlock(withdrawals []*types.Withdrawal, timestamp u // Create a server span for newPayload, simulating the consensus client // sending the execution payload for validation. - npCtx, npSpanEnd := telemetry.StartServerSpan(context.Background(), tracer, telemetry.RPCInfo{ + npCtx, npSpanEnd := telemetry.StartCallServerSpan(context.Background(), tracer, telemetry.RPCInfo{ System: "jsonrpc", Service: "engine", Method: "newPayloadV" + fmt.Sprintf("%d", version), @@ -302,7 +302,7 @@ func (c *SimulatedBeacon) sealBlock(withdrawals []*types.Withdrawal, timestamp u // Create a server span for the final forkchoiceUpdated (no payload attributes), // which sets the new block as the canonical chain head. - fcuCtx, fcuSpanEnd := telemetry.StartServerSpan(context.Background(), tracer, telemetry.RPCInfo{ + fcuCtx, fcuSpanEnd := telemetry.StartCallServerSpan(context.Background(), tracer, telemetry.RPCInfo{ System: "jsonrpc", Service: "engine", Method: "forkchoiceUpdatedV" + fmt.Sprintf("%d", version), diff --git a/internal/telemetry/telemetry.go b/internal/telemetry/telemetry.go index 27fe9b0a7a..ed598064b3 100644 --- a/internal/telemetry/telemetry.go +++ b/internal/telemetry/telemetry.go @@ -40,6 +40,11 @@ func Int64Attribute(key string, val int64) Attribute { return attribute.Int64(key, val) } +// IntAttribute creates an attribute with an int value. +func IntAttribute(key string, val int) Attribute { + return attribute.Int(key, val) +} + // BoolAttribute creates an attribute with a bool value. func BoolAttribute(key string, val bool) Attribute { return attribute.Bool(key, val) @@ -60,6 +65,13 @@ func StartSpanWithTracer(ctx context.Context, tracer trace.Tracer, name string, return startSpan(ctx, tracer, trace.SpanKindInternal, name, attributes...) } +// TracerFromContext returns a Tracer from the TracerProvider associated with the +// parent span in ctx. If ctx has no parent span, the returned tracer comes from +// the no-op provider, so spans created with it will not be exported. +func TracerFromContext(ctx context.Context) trace.Tracer { + return trace.SpanFromContext(ctx).TracerProvider().Tracer("") +} + // RPCInfo contains information about the RPC request. type RPCInfo struct { System string @@ -68,11 +80,11 @@ type RPCInfo struct { RequestID string } -// StartServerSpan creates a SpanKind=SERVER span at the JSON-RPC boundary. +// StartCallServerSpan creates a SpanKind=SERVER span for a JSON-RPC call. // The span name is formatted as $rpcSystem.$rpcService/$rpcMethod // (e.g. "jsonrpc.engine/newPayloadV4") which follows the Open Telemetry // semantic convensions: https://opentelemetry.io/docs/specs/semconv/rpc/rpc-spans/#span-name. -func StartServerSpan(ctx context.Context, tracer trace.Tracer, rpc RPCInfo, others ...Attribute) (context.Context, func(*error)) { +func StartCallServerSpan(ctx context.Context, tracer trace.Tracer, rpc RPCInfo, others ...Attribute) (context.Context, func(*error)) { var ( name = fmt.Sprintf("%s.%s/%s", rpc.System, rpc.Service, rpc.Method) attributes = append([]Attribute{ @@ -88,6 +100,36 @@ func StartServerSpan(ctx context.Context, tracer trace.Tracer, rpc RPCInfo, othe return ctx, end } +// StartBatchServerSpan creates a SpanKind=SERVER span representing a batched request. +// The span name is "$system.batch" (e.g. "jsonrpc.batch") and per-call spans are nested under it. +// batchSize is exposed as rpc.batch.size. +func StartBatchServerSpan(ctx context.Context, tracer trace.Tracer, system string, batchSize int, others ...Attribute) (context.Context, func(*error)) { + attributes := append([]Attribute{ + semconv.RPCSystemKey.String(system), + IntAttribute("rpc.batch.size", batchSize), + }, others...) + ctx, _, end := startSpan(ctx, tracer, trace.SpanKindServer, system+".batch", attributes...) + return ctx, end +} + +// StartBatchCallSpan creates a SpanKind=INTERNAL span for an individual RPC call as part of a batch. +// This carries the same name and attributes as StartCallServerSpan. +func StartBatchCallSpan(ctx context.Context, tracer trace.Tracer, rpc RPCInfo, others ...Attribute) (context.Context, func(*error)) { + var ( + name = fmt.Sprintf("%s.%s/%s", rpc.System, rpc.Service, rpc.Method) + attributes = append([]Attribute{ + semconv.RPCSystemKey.String(rpc.System), + semconv.RPCServiceKey.String(rpc.Service), + semconv.RPCMethodKey.String(rpc.Method), + semconv.RPCJSONRPCRequestID(rpc.RequestID), + }, + others..., + ) + ) + ctx, _, end := startSpan(ctx, tracer, trace.SpanKindInternal, name, attributes...) + return ctx, end +} + // startSpan creates a span with the given kind. func startSpan(ctx context.Context, tracer trace.Tracer, kind trace.SpanKind, spanName string, attributes ...Attribute) (context.Context, trace.Span, func(*error)) { ctx, span := tracer.Start(ctx, spanName, trace.WithSpanKind(kind)) diff --git a/miner/payload_building.go b/miner/payload_building.go index db8126828a..a2cc8df9d0 100644 --- a/miner/payload_building.go +++ b/miner/payload_building.go @@ -217,7 +217,7 @@ func (payload *Payload) ResolveFull() *engine.ExecutionPayloadEnvelope { func (miner *Miner) runBuildIteration(ctx context.Context, start time.Time, iteration int, payload *Payload, params *generateParams, witness bool) { ctx, span, spanEnd := telemetry.StartSpan(ctx, "miner.buildIteration", - telemetry.Int64Attribute("iteration", int64(iteration)), + telemetry.IntAttribute("iteration", iteration), ) var err error defer spanEnd(&err) @@ -271,7 +271,7 @@ func (miner *Miner) buildPayload(ctx context.Context, args *BuildPayloadArgs, wi telemetry.Int64Attribute("block.number", int64(empty.block.NumberU64())), ) defer func() { - bSpan.SetAttributes(telemetry.Int64Attribute("iterations.total", int64(iteration))) + bSpan.SetAttributes(telemetry.IntAttribute("iterations.total", iteration)) bSpanEnd(nil) }() diff --git a/miner/worker.go b/miner/worker.go index 21bc95cf92..b0e144c0ab 100644 --- a/miner/worker.go +++ b/miner/worker.go @@ -137,7 +137,7 @@ func (miner *Miner) generateWork(ctx context.Context, genParam *generateParams, defer func() { if result != nil && result.err == nil { span.SetAttributes( - telemetry.Int64Attribute("txs.count", int64(len(result.block.Transactions()))), + telemetry.IntAttribute("txs.count", len(result.block.Transactions())), telemetry.Int64Attribute("gas.used", int64(result.block.GasUsed())), telemetry.StringAttribute("fees", result.fees.String()), ) @@ -572,8 +572,8 @@ func (miner *Miner) fillTransactions(ctx context.Context, interrupt *atomic.Int3 } pendingBlobTxs, blobTxCount := miner.txpool.Pending(filter) span.SetAttributes( - telemetry.Int64Attribute("pending.plain.count", int64(plainTxCount)), - telemetry.Int64Attribute("pending.blob.count", int64(blobTxCount)), + telemetry.IntAttribute("pending.plain.count", plainTxCount), + telemetry.IntAttribute("pending.blob.count", blobTxCount), ) // Split the pending transactions into locals and remotes. diff --git a/rpc/handler.go b/rpc/handler.go index a9ffdc7071..e39ca78e2e 100644 --- a/rpc/handler.go +++ b/rpc/handler.go @@ -169,40 +169,49 @@ func (b *batchCallBuffer) doWrite(ctx context.Context, conn jsonWriter, isErrorR } b.wrote = true // can only write once if len(b.resp) > 0 { - conn.writeJSONBatch(ctx, b.resp, isErrorResponse) + spanCtx, _, spanEnd := telemetry.StartSpanWithTracer(ctx, telemetry.TracerFromContext(ctx), "rpc.writeJSONBatch") + err := conn.writeJSONBatch(spanCtx, b.resp, isErrorResponse) + spanEnd(&err) } } // handleBatch executes all messages in a batch and returns the responses. func (h *handler) handleBatch(msgs []*jsonrpcMessage) { - // Emit error response for empty batches: - if len(msgs) == 0 { - h.startCallProc(func(cp *callProc) { - resp := errorMessage(&invalidRequestError{"empty batch"}) - h.conn.writeJSON(cp.ctx, resp, true) + // For valid batches, filter response messages and subscription notifications + // out of msgs here. + var calls []*jsonrpcMessage + valid := len(msgs) > 0 && (h.batchRequestLimit == 0 || len(msgs) <= h.batchRequestLimit) + if valid { + calls = make([]*jsonrpcMessage, 0, len(msgs)) + h.handleResponses(msgs, func(msg *jsonrpcMessage) { + calls = append(calls, msg) }) - return - } - // Apply limit on total number of requests. - if h.batchRequestLimit != 0 && len(msgs) > h.batchRequestLimit { - h.startCallProc(func(cp *callProc) { - h.respondWithBatchTooLarge(cp, msgs) - }) - return - } - - // Handle non-call messages first. - // Here we need to find the requestOp that sent the request batch. - calls := make([]*jsonrpcMessage, 0, len(msgs)) - h.handleResponses(msgs, func(msg *jsonrpcMessage) { - calls = append(calls, msg) - }) - if len(calls) == 0 { - return + if len(calls) == 0 { + // Batch was entirely responses to our own requests; nothing to dispatch. + return + } } // Process calls on a goroutine because they may block indefinitely: h.startCallProc(func(cp *callProc) { + // Top-level batch SERVER span. + var batchSpanEnd func(*error) + cp.ctx, batchSpanEnd = telemetry.StartBatchServerSpan(cp.ctx, h.tracer(), "jsonrpc", len(msgs)) + var spanErr error + defer batchSpanEnd(&spanErr) + + switch { + case len(msgs) == 0: + spanErr = &invalidRequestError{"empty batch"} + resp := errorMessage(spanErr) + h.conn.writeJSON(cp.ctx, resp, true) + return + case h.batchRequestLimit != 0 && len(msgs) > h.batchRequestLimit: + spanErr = errors.New(errMsgBatchTooLarge) + h.respondWithBatchTooLarge(cp, msgs) + return + } + cp.isBatch = true var ( timer *time.Timer @@ -212,35 +221,50 @@ func (h *handler) handleBatch(msgs []*jsonrpcMessage) { cp.ctx, cancel = context.WithCancel(cp.ctx) defer cancel() + batchCtx := cp.ctx // Cancel the request context after timeout and send an error response. Since the // currently-running method might not return immediately on timeout, we must wait // for the timeout concurrently with processing the request. - if timeout, ok := ContextRequestTimeout(cp.ctx); ok { + if timeout, ok := ContextRequestTimeout(batchCtx); ok { timer = time.AfterFunc(timeout, func() { cancel() err := &internalServerError{errcodeTimeout, errMsgTimeout} - callBuffer.respondWithError(cp.ctx, h.conn, err) + callBuffer.respondWithError(batchCtx, h.conn, err) }) } responseBytes := 0 for { // No need to handle rest of calls if timed out. - if cp.ctx.Err() != nil { + if batchCtx.Err() != nil { break } msg := callBuffer.nextCall() if msg == nil { break } + + // Per-call INTERNAL span as a child of the batch SERVER span. + var callSpanEnd func(*error) + cp.ctx, callSpanEnd = telemetry.StartBatchCallSpan(batchCtx, h.tracer(), rpcInfoFromMessage(msg)) resp := h.handleCallMsg(cp, msg) + var callErr error + if resp != nil && resp.Error != nil { + callErr = resp.decodeError() + } + callSpanEnd(&callErr) + + // Notifications don't get a response written into the batch reply. + if msg.isNotification() { + resp = nil + } callBuffer.pushResponse(resp) if resp != nil && h.batchResponseMaxSize != 0 { responseBytes += len(resp.Result) + len(resp.Error) if responseBytes > h.batchResponseMaxSize { err := &internalServerError{errcodeResponseTooLarge, errMsgResponseTooLarge} - callBuffer.respondWithError(cp.ctx, h.conn, err) + callBuffer.respondWithError(batchCtx, h.conn, err) break } } @@ -250,7 +274,7 @@ func (h *handler) handleBatch(msgs []*jsonrpcMessage) { } h.addSubscriptions(cp.notifiers) - callBuffer.write(cp.ctx, h.conn) + callBuffer.write(batchCtx, h.conn) for _, n := range cp.notifiers { n.activate() } @@ -283,22 +307,32 @@ func (h *handler) handleMsg(msg *jsonrpcMessage) { func (h *handler) handleNonBatchCall(cp *callProc, msg *jsonrpcMessage) { var ( - responded sync.Once - timer *time.Timer - cancel context.CancelFunc + responded sync.Once + timer *time.Timer + cancel context.CancelFunc + responseError error ) - cp.ctx, cancel = context.WithCancel(cp.ctx) - defer cancel() + + // Set up the SERVER span for tracing. + var serverSpanEnd func(*error) + cp.ctx, serverSpanEnd = telemetry.StartCallServerSpan(cp.ctx, h.tracer(), rpcInfoFromMessage(msg)) + defer serverSpanEnd(&responseError) // Cancel the request context after timeout and send an error response. Since the // running method might not return immediately on timeout, we must wait for the // timeout concurrently with processing the request. + outerCtx := cp.ctx + cp.ctx, cancel = context.WithCancel(cp.ctx) + defer cancel() if timeout, ok := ContextRequestTimeout(cp.ctx); ok { timer = time.AfterFunc(timeout, func() { cancel() responded.Do(func() { + responseError = errors.New(errMsgTimeout) + writeCtx, _, writeSpanEnd := telemetry.StartSpanWithTracer(outerCtx, h.tracer(), "rpc.writeJSON") resp := msg.errorResponse(&internalServerError{errcodeTimeout, errMsgTimeout}) - h.conn.writeJSON(cp.ctx, resp, true) + err := h.conn.writeJSON(writeCtx, resp, true) + writeSpanEnd(&err) }) }) } @@ -310,9 +344,22 @@ func (h *handler) handleNonBatchCall(cp *callProc, msg *jsonrpcMessage) { h.addSubscriptions(cp.notifiers) if answer != nil { responded.Do(func() { - h.conn.writeJSON(cp.ctx, answer, false) + if answer.Error != nil { + responseError = answer.decodeError() + } + // Notifications don't get a response written, but their errors are + // still recorded on the SERVER span via responseError above. + if msg.isNotification() { + return + } + writeCtx, _, writeSpanEnd := telemetry.StartSpanWithTracer(outerCtx, h.tracer(), "rpc.writeJSON") + err := h.conn.writeJSON(writeCtx, answer, false) + writeSpanEnd(&err) }) } + + // Enable notification sending of subscriptions, since the response with + // subscription ID has now been sent. for _, n := range cp.notifiers { n.activate() } @@ -472,9 +519,11 @@ func (h *handler) handleCallMsg(ctx *callProc, msg *jsonrpcMessage) *jsonrpcMess start := time.Now() switch { case msg.isNotification(): - h.handleCall(ctx, msg) + // Notifications don't get a response written to the client, but the + // answer is returned so the caller can record errors on the SERVER span. + resp := h.handleCall(ctx, msg) h.log.Debug("Served "+msg.Method, "duration", time.Since(start)) - return nil + return resp case msg.isCall(): resp := h.handleCall(ctx, msg) @@ -516,28 +565,15 @@ func (h *handler) handleCall(cp *callProc, msg *jsonrpcMessage) *jsonrpcMessage } return h.runMethod(cp.ctx, msg, h.unsubscribeCb, args) } - callb, service, method := h.reg.callback(msg.Method) + callb := h.reg.callback(msg.Method) // If the method is not found, return an error. if callb == nil { return msg.errorResponse(&methodNotFoundError{method: msg.Method}) } - // Start root span for the request. - rpcInfo := telemetry.RPCInfo{ - System: "jsonrpc", - Service: service, - Method: method, - RequestID: string(msg.ID), - } - attrib := []telemetry.Attribute{ - telemetry.BoolAttribute("rpc.batch", cp.isBatch), - } - ctx, spanEnd := telemetry.StartServerSpan(cp.ctx, h.tracer(), rpcInfo, attrib...) - defer spanEnd(nil) // don't propagate errors to parent spans - // Start tracing span before parsing arguments. - _, _, pSpanEnd := telemetry.StartSpanWithTracer(ctx, h.tracer(), "rpc.parsePositionalArguments") + _, _, pSpanEnd := telemetry.StartSpanWithTracer(cp.ctx, h.tracer(), "rpc.parsePositionalArguments") args, pErr := parsePositionalArguments(msg.Params, callb.argTypes) pSpanEnd(&pErr) if pErr != nil { @@ -546,11 +582,11 @@ func (h *handler) handleCall(cp *callProc, msg *jsonrpcMessage) *jsonrpcMessage start := time.Now() // Start tracing span before running the method. - rctx, _, rSpanEnd := telemetry.StartSpanWithTracer(ctx, h.tracer(), "rpc.runMethod") + rctx, _, rSpanEnd := telemetry.StartSpanWithTracer(cp.ctx, h.tracer(), "rpc.runMethod") answer := h.runMethod(rctx, msg, callb, args) var rErr error if answer.Error != nil { - rErr = errors.New(answer.decodeError().Message) + rErr = answer.decodeError() } rSpanEnd(&rErr) @@ -603,6 +639,18 @@ func (h *handler) handleSubscribe(cp *callProc, msg *jsonrpcMessage) *jsonrpcMes return h.runMethod(ctx, msg, callb, args) } +// rpcInfoFromMessage builds the RPCInfo for a SERVER/INTERNAL RPC span from a +// JSON-RPC message. +func rpcInfoFromMessage(msg *jsonrpcMessage) telemetry.RPCInfo { + info := telemetry.RPCInfo{System: "jsonrpc", RequestID: string(msg.ID)} + if service, method, ok := serviceAndMethod(msg.Method); ok { + info.Service, info.Method = service, method + } else { + info.Method = msg.Method + } + return info +} + // tracer returns the OpenTelemetry Tracer for RPC call tracing. func (h *handler) tracer() trace.Tracer { if h.tracerProvider == nil { @@ -623,7 +671,7 @@ func (h *handler) runMethod(ctx context.Context, msg *jsonrpcMessage, callb *cal _, _, spanEnd := telemetry.StartSpanWithTracer(ctx, h.tracer(), "rpc.encodeJSONResponse", attributes...) response := msg.response(result) if response.Error != nil { - err = errors.New(response.decodeError().Message) + err = response.decodeError() } spanEnd(&err) return response diff --git a/rpc/http.go b/rpc/http.go index 93f5e26c30..2bd761e9cd 100644 --- a/rpc/http.go +++ b/rpc/http.go @@ -31,6 +31,7 @@ import ( "sync" "time" + "github.com/ethereum/go-ethereum/internal/telemetry" "go.opentelemetry.io/otel" "go.opentelemetry.io/otel/propagation" ) @@ -269,13 +270,13 @@ func (s *Server) newHTTPServerConn(r *http.Request, w http.ResponseWriter) Serve conn := &httpServerConn{Reader: body, Writer: w, r: r} var buf []byte - encodeMsg := func(msg *jsonrpcMessage, isError bool) error { + encodeMsg := func(ctx context.Context, msg *jsonrpcMessage, isError bool) error { buf = appendMessage(buf[:0], msg) - return httpWriteResult(w, buf, isError) + return httpWrite(ctx, w, buf, isError) } - encodeBatch := func(msgs []*jsonrpcMessage, isError bool) error { + encodeBatch := func(ctx context.Context, msgs []*jsonrpcMessage, isError bool) error { buf = appendBatch(buf[:0], msgs) - return httpWriteResult(w, buf, isError) + return httpWrite(ctx, w, buf, isError) } dec := json.NewDecoder(conn) @@ -284,16 +285,17 @@ func (s *Server) newHTTPServerConn(r *http.Request, w http.ResponseWriter) Serve return NewFuncCodec(conn, encodeMsg, encodeBatch, dec.Decode) } -// httpWriteResult writes pre-encoded response data over HTTP. -// For error responses, it sets Content-Length and flushes to ensure the response -// is fully written before any HTTP server write timeout occurs. -func httpWriteResult(w http.ResponseWriter, data []byte, isError bool) error { +// httpWrite writes pre-encoded response data over HTTP. +func httpWrite(ctx context.Context, w http.ResponseWriter, data []byte, isError bool) (err error) { + _, _, spanEnd := telemetry.StartSpanWithTracer(ctx, telemetry.TracerFromContext(ctx), "rpc.httpWrite") + defer spanEnd(&err) + w.Header().Set("content-length", strconv.Itoa(len(data))) if !isError { // Normal path, just send the response and let the HTTP server decide // when to flush. - _, err := w.Write(data) + _, err = w.Write(data) return err } @@ -309,7 +311,7 @@ func httpWriteResult(w http.ResponseWriter, data []byte, isError bool) error { // the final chunk is missing. To do this, we set TE = identity, which is a signal // recognized by outer handlers to avoid compression. w.Header().Set("transfer-encoding", "identity") - _, err := w.Write(data) + _, err = w.Write(data) if f, ok := w.(http.Flusher); ok { f.Flush() } diff --git a/rpc/json.go b/rpc/json.go index 9813acae73..b2a961d109 100644 --- a/rpc/json.go +++ b/rpc/json.go @@ -205,9 +205,9 @@ type jsonCodec struct { conn deadlineCloser } -type encodeMsgFunc = func(msg *jsonrpcMessage, isError bool) error +type encodeMsgFunc = func(ctx context.Context, msg *jsonrpcMessage, isError bool) error -type encodeBatchFunc = func(msgs []*jsonrpcMessage, isError bool) error +type encodeBatchFunc = func(ctx context.Context, msgs []*jsonrpcMessage, isError bool) error type decodeFunc = func(v interface{}) error @@ -234,13 +234,13 @@ func NewCodec(conn Conn) ServerCodec { dec := json.NewDecoder(conn) dec.UseNumber() var buf []byte - encodeMsg := func(msg *jsonrpcMessage, isError bool) error { + encodeMsg := func(ctx context.Context, msg *jsonrpcMessage, isError bool) error { buf = appendMessage(buf[:0], msg) buf = append(buf, '\n') _, err := conn.Write(buf) return err } - encodeBatch := func(msgs []*jsonrpcMessage, isError bool) error { + encodeBatch := func(ctx context.Context, msgs []*jsonrpcMessage, isError bool) error { buf = appendBatch(buf[:0], msgs) buf = append(buf, '\n') _, err := conn.Write(buf) @@ -325,7 +325,7 @@ func (c *jsonCodec) writeJSON(ctx context.Context, msg *jsonrpcMessage, isError deadline = time.Now().Add(defaultWriteTimeout) } c.conn.SetWriteDeadline(deadline) - return c.encodeMsg(msg, isError) + return c.encodeMsg(ctx, msg, isError) } func (c *jsonCodec) writeJSONBatch(ctx context.Context, msgs []*jsonrpcMessage, isError bool) error { @@ -337,7 +337,7 @@ func (c *jsonCodec) writeJSONBatch(ctx context.Context, msgs []*jsonrpcMessage, deadline = time.Now().Add(defaultWriteTimeout) } c.conn.SetWriteDeadline(deadline) - return c.encodeBatch(msgs, isError) + return c.encodeBatch(ctx, msgs, isError) } func (c *jsonCodec) close() { diff --git a/rpc/service.go b/rpc/service.go index 8462a5a59a..b64f43b82d 100644 --- a/rpc/service.go +++ b/rpc/service.go @@ -91,15 +91,19 @@ func (r *serviceRegistry) registerName(name string, rcvr interface{}) error { return nil } +func serviceAndMethod(name string) (service, method string, ok bool) { + return strings.Cut(name, serviceMethodSeparator) +} + // callback returns the callback corresponding to the given RPC method name. -func (r *serviceRegistry) callback(method string) (cb *callback, service, methodName string) { - before, after, found := strings.Cut(method, serviceMethodSeparator) +func (r *serviceRegistry) callback(name string) (cb *callback) { + s, m, found := serviceAndMethod(name) if !found { - return nil, "", "" + return nil } r.mu.Lock() defer r.mu.Unlock() - return r.services[before].callbacks[after], before, after + return r.services[s].callbacks[m] } // subscription returns a subscription callback in the given service. diff --git a/rpc/tracing_test.go b/rpc/tracing_test.go index 5a04c901fd..302dff1384 100644 --- a/rpc/tracing_test.go +++ b/rpc/tracing_test.go @@ -18,8 +18,13 @@ package rpc import ( "context" + "io" + "net/http" "net/http/httptest" + "strconv" + "strings" "testing" + "time" "go.opentelemetry.io/otel" "go.opentelemetry.io/otel/attribute" @@ -27,6 +32,7 @@ import ( "go.opentelemetry.io/otel/propagation" sdktrace "go.opentelemetry.io/otel/sdk/trace" "go.opentelemetry.io/otel/sdk/trace/tracetest" + "go.opentelemetry.io/otel/trace" ) // attributeMap converts a slice of attributes to a map. @@ -105,15 +111,30 @@ func TestTracingHTTP(t *testing.T) { t.Fatal("no spans were emitted") } var rpcSpan *tracetest.SpanStub + var writeJSONSpan *tracetest.SpanStub + var httpWriteSpan *tracetest.SpanStub for i := range spans { - if spans[i].Name == "jsonrpc.test/echo" { + switch spans[i].Name { + case "jsonrpc.test/echo": rpcSpan = &spans[i] - break + case "rpc.writeJSON": + writeJSONSpan = &spans[i] + case "rpc.httpWrite": + httpWriteSpan = &spans[i] } } if rpcSpan == nil { t.Fatalf("jsonrpc.test/echo span not found") } + if writeJSONSpan == nil { + t.Fatalf("rpc.writeJSON span not found") + } + if httpWriteSpan == nil { + t.Fatalf("rpc.httpWrite span not found") + } + if got, want := httpWriteSpan.Parent.SpanID(), writeJSONSpan.SpanContext.SpanID(); got != want { + t.Errorf("rpc.httpWrite parent: got %s, want rpc.writeJSON (%s)", got, want) + } // Verify span attributes. attrs := attributeMap(rpcSpan.Attributes) @@ -167,13 +188,13 @@ func TestTracingHTTPErrorRecording(t *testing.T) { } spans := exporter.GetSpans() - // Only the runMethod span should have error status. + // The runMethod span and the SERVER span should both have error status. if len(spans) == 0 { t.Fatal("no spans were emitted") } for _, span := range spans { switch span.Name { - case "rpc.runMethod": + case "rpc.runMethod", "jsonrpc.test/returnError": if span.Status.Code != codes.Error { t.Errorf("expected %s span status Error, got %v", span.Name, span.Status.Code) } @@ -214,7 +235,11 @@ func TestTracingBatchHTTP(t *testing.T) { t.Fatalf("batch RPC call failed: %v", err) } - // Flush and verify we emitted spans for each batch element. + // Flush and verify the batch trace shape: + // jsonrpc.batch (SERVER, rpc.batch.size=N) + // - jsonrpc.test/echo (INTERNAL, x N) + // - rpc.writeJSONBatch (INTERNAL) + // - rpc.httpWriteResult (INTERNAL) if err := tracer.ForceFlush(context.Background()); err != nil { t.Fatalf("failed to flush: %v", err) } @@ -222,20 +247,68 @@ func TestTracingBatchHTTP(t *testing.T) { if len(spans) == 0 { t.Fatal("no spans were emitted") } - var found int + var ( + batchSpan *tracetest.SpanStub + callSpans []*tracetest.SpanStub + writeJSONBatchSpan *tracetest.SpanStub + httpWriteSpan *tracetest.SpanStub + ) for i := range spans { - if spans[i].Name == "jsonrpc.test/echo" { - attrs := attributeMap(spans[i].Attributes) - if attrs["rpc.system"] == "jsonrpc" && - attrs["rpc.service"] == "test" && - attrs["rpc.method"] == "echo" && - attrs["rpc.batch"] == "true" { - found++ - } + switch spans[i].Name { + case "jsonrpc.batch": + batchSpan = &spans[i] + case "jsonrpc.test/echo": + callSpans = append(callSpans, &spans[i]) + case "rpc.writeJSONBatch": + writeJSONBatchSpan = &spans[i] + case "rpc.httpWrite": + httpWriteSpan = &spans[i] } } - if found != len(batch) { - t.Fatalf("expected %d matching batch spans, got %d", len(batch), found) + if batchSpan == nil { + t.Fatal("jsonrpc.batch span not found") + } + if got, want := len(callSpans), len(batch); got != want { + t.Fatalf("got %d per-call spans, want %d", got, want) + } + if writeJSONBatchSpan == nil { + t.Fatal("rpc.writeJSONBatch span not found") + } + if httpWriteSpan == nil { + t.Fatal("rpc.httpWrite span not found") + } + + // Batch span: SERVER kind, rpc.batch.size=N. + if batchSpan.SpanKind != trace.SpanKindServer { + t.Errorf("jsonrpc.batch: got kind %v, want SERVER", batchSpan.SpanKind) + } + batchAttrs := attributeMap(batchSpan.Attributes) + if got, want := batchAttrs["rpc.batch.size"], strconv.Itoa(len(batch)); got != want { + t.Errorf("jsonrpc.batch rpc.batch.size: got %q, want %q", got, want) + } + + // Per-call spans: INTERNAL kind, parented to the batch span, carry rpc.* attrs. + for _, s := range callSpans { + if s.SpanKind != trace.SpanKindInternal { + t.Errorf("jsonrpc.test/echo: got kind %v, want INTERNAL", s.SpanKind) + } + if got, want := s.Parent.SpanID(), batchSpan.SpanContext.SpanID(); got != want { + t.Errorf("jsonrpc.test/echo parent: got %s, want %s (batch)", got, want) + } + attrs := attributeMap(s.Attributes) + if attrs["rpc.system"] != "jsonrpc" || attrs["rpc.service"] != "test" || attrs["rpc.method"] != "echo" { + t.Errorf("jsonrpc.test/echo attrs missing rpc.system/service/method: %v", attrs) + } + } + + // writeJSONBatch parented to the batch span. + if got, want := writeJSONBatchSpan.Parent.SpanID(), batchSpan.SpanContext.SpanID(); got != want { + t.Errorf("rpc.writeJSONBatch parent: got %s, want %s (batch)", got, want) + } + + // httpWriteResult parented to writeJSONBatch. + if got, want := httpWriteSpan.Parent.SpanID(), writeJSONBatchSpan.SpanContext.SpanID(); got != want { + t.Errorf("rpc.httpWriteResult parent: got %s, want %s (rpc.writeJSONBatch)", got, want) } } @@ -266,3 +339,235 @@ func TestTracingSubscribeUnsubscribe(t *testing.T) { t.Errorf("expected no spans for subscribe/unsubscribe, got %d", len(spans)) } } + +// postJSONRPC sends a raw JSON body to the given test server and discards the +// response body. Used to send messages the typed RPC client can't construct, +// like notifications (no "id" field). +func postJSONRPC(t *testing.T, url, body string) { + t.Helper() + req, err := http.NewRequest(http.MethodPost, url, strings.NewReader(body)) + if err != nil { + t.Fatalf("new request: %v", err) + } + req.Header.Set("Content-Type", "application/json") + resp, err := http.DefaultClient.Do(req) + if err != nil { + t.Fatalf("request: %v", err) + } + _, _ = io.Copy(io.Discard, resp.Body) + resp.Body.Close() +} + +// TestTracingHTTPNotification verifies that a JSON-RPC notification emits the +// SERVER span (with error captured when applicable) but no rpc.writeJSON span, +// since notifications do not get a response written. +func TestTracingHTTPNotification(t *testing.T) { + t.Parallel() + server, tracer, exporter := newTracingServer(t) + httpsrv := httptest.NewServer(server) + t.Cleanup(httpsrv.Close) + + // Successful notification (no "id"): should produce a SERVER span without error, + // and no rpc.writeJSON span. + postJSONRPC(t, httpsrv.URL, `{"jsonrpc":"2.0","method":"test_echo","params":["hi",1,{"S":"x"}]}`) + + // Notification with unknown method: SERVER span should be present with error status. + postJSONRPC(t, httpsrv.URL, `{"jsonrpc":"2.0","method":"test_doesNotExist"}`) + + if err := tracer.ForceFlush(context.Background()); err != nil { + t.Fatalf("failed to flush: %v", err) + } + spans := exporter.GetSpans() + + var ( + echoSpan *tracetest.SpanStub + unknownSpan *tracetest.SpanStub + writeJSONFound bool + ) + for i := range spans { + switch spans[i].Name { + case "jsonrpc.test/echo": + echoSpan = &spans[i] + case "jsonrpc.test/doesNotExist": + unknownSpan = &spans[i] + case "rpc.writeJSON": + writeJSONFound = true + } + } + if echoSpan == nil { + t.Fatal("jsonrpc.test/echo span not found for successful notification") + } + if echoSpan.Status.Code == codes.Error { + t.Errorf("successful notification: expected no error status, got %v", echoSpan.Status) + } + if unknownSpan == nil { + t.Fatal("jsonrpc.test/doesNotExist span not found for unknown-method notification") + } + if unknownSpan.Status.Code != codes.Error { + t.Errorf("unknown-method notification: expected error status, got %v", unknownSpan.Status.Code) + } + if writeJSONFound { + t.Error("notifications should not produce an rpc.writeJSON span") + } +} + +// TestTracingBatchHTTPErrorCapture verifies that errors on individual calls +// inside a batch are recorded on the per-call INTERNAL span, including the +// pre-dispatch cases (method not found / invalid params) where runMethod +// never runs. +func TestTracingBatchHTTPErrorCapture(t *testing.T) { + t.Parallel() + server, tracer, exporter := newTracingServer(t) + httpsrv := httptest.NewServer(server) + t.Cleanup(httpsrv.Close) + + // A batch with: one valid call, one unknown method, one method that + // returns an error from its handler. + body := `[ + {"jsonrpc":"2.0","id":1,"method":"test_echo","params":["x",1,{"S":"a"}]}, + {"jsonrpc":"2.0","id":2,"method":"test_doesNotExist"}, + {"jsonrpc":"2.0","id":3,"method":"test_returnError"} + ]` + postJSONRPC(t, httpsrv.URL, body) + + if err := tracer.ForceFlush(context.Background()); err != nil { + t.Fatalf("failed to flush: %v", err) + } + spans := exporter.GetSpans() + + byName := make(map[string]*tracetest.SpanStub) + for i := range spans { + byName[spans[i].Name] = &spans[i] + } + + if byName["jsonrpc.batch"] == nil { + t.Fatal("jsonrpc.batch span not found") + } + if echo := byName["jsonrpc.test/echo"]; echo == nil { + t.Fatal("jsonrpc.test/echo span not found") + } else if echo.Status.Code == codes.Error { + t.Errorf("test/echo: unexpected error status %v", echo.Status) + } + if missing := byName["jsonrpc.test/doesNotExist"]; missing == nil { + t.Fatal("jsonrpc.test/doesNotExist span not found (method-not-found should still get a per-call span)") + } else if missing.Status.Code != codes.Error { + t.Errorf("test/doesNotExist: expected error status, got %v", missing.Status.Code) + } + if ret := byName["jsonrpc.test/returnError"]; ret == nil { + t.Fatal("jsonrpc.test/returnError span not found") + } else if ret.Status.Code != codes.Error { + t.Errorf("test/returnError: expected error status, got %v", ret.Status.Code) + } +} + +// TestTracingBatchHTTPEmpty verifies that an empty batch still emits a +// SERVER span, with rpc.batch.size=0 and error status. +func TestTracingBatchHTTPEmpty(t *testing.T) { + t.Parallel() + server, tracer, exporter := newTracingServer(t) + httpsrv := httptest.NewServer(server) + t.Cleanup(httpsrv.Close) + + postJSONRPC(t, httpsrv.URL, `[]`) + + if err := tracer.ForceFlush(context.Background()); err != nil { + t.Fatalf("failed to flush: %v", err) + } + spans := exporter.GetSpans() + + var batchSpan *tracetest.SpanStub + for i := range spans { + if spans[i].Name == "jsonrpc.batch" { + batchSpan = &spans[i] + } + } + if batchSpan == nil { + t.Fatal("jsonrpc.batch span not found for empty batch") + } + if batchSpan.Status.Code != codes.Error { + t.Errorf("empty batch: expected error status, got %v", batchSpan.Status.Code) + } + attrs := attributeMap(batchSpan.Attributes) + if got, want := attrs["rpc.batch.size"], "0"; got != want { + t.Errorf("empty batch rpc.batch.size: got %q, want %q", got, want) + } +} + +// TestTracingBatchHTTPTooLarge verifies that a batch exceeding the server's +// item limit emits a SERVER span with rpc.batch.size=N and error status. +func TestTracingBatchHTTPTooLarge(t *testing.T) { + t.Parallel() + server, tracer, exporter := newTracingServer(t) + server.SetBatchLimits(2, 100000) // limit to 2 items + httpsrv := httptest.NewServer(server) + t.Cleanup(httpsrv.Close) + + // 3 items > limit of 2. + body := `[ + {"jsonrpc":"2.0","id":1,"method":"test_echo","params":["a",1,{"S":"x"}]}, + {"jsonrpc":"2.0","id":2,"method":"test_echo","params":["b",2,{"S":"y"}]}, + {"jsonrpc":"2.0","id":3,"method":"test_echo","params":["c",3,{"S":"z"}]} + ]` + postJSONRPC(t, httpsrv.URL, body) + + if err := tracer.ForceFlush(context.Background()); err != nil { + t.Fatalf("failed to flush: %v", err) + } + spans := exporter.GetSpans() + + var batchSpan *tracetest.SpanStub + for i := range spans { + if spans[i].Name == "jsonrpc.batch" { + batchSpan = &spans[i] + } + } + if batchSpan == nil { + t.Fatal("jsonrpc.batch span not found for too-large batch") + } + if batchSpan.Status.Code != codes.Error { + t.Errorf("batch-too-large: expected error status, got %v", batchSpan.Status.Code) + } + attrs := attributeMap(batchSpan.Attributes) + if got, want := attrs["rpc.batch.size"], "3"; got != want { + t.Errorf("batch-too-large rpc.batch.size: got %q, want %q", got, want) + } +} + +// TestTracingHTTPTimeout verifies that when a non-batch call exceeds the HTTP +// server's WriteTimeout, the SERVER span ends with error status (carrying the +// timeout error message). +func TestTracingHTTPTimeout(t *testing.T) { + t.Parallel() + server, tracer, exporter := newTracingServer(t) + + // Configure a short WriteTimeout so the internal request timer fires + // quickly. ContextRequestTimeout subtracts 100ms from WriteTimeout, so + // 250ms here gives ~150ms before the timeout response is sent. + httpsrv := httptest.NewUnstartedServer(server) + httpsrv.Config.WriteTimeout = 250 * time.Millisecond + httpsrv.Start() + t.Cleanup(httpsrv.Close) + + // test_block waits on ctx.Done() and returns an error. The internal + // timer cancels ctx, so test_block unblocks shortly after the timeout + // response goes out. + postJSONRPC(t, httpsrv.URL, `{"jsonrpc":"2.0","id":1,"method":"test_block"}`) + + if err := tracer.ForceFlush(context.Background()); err != nil { + t.Fatalf("failed to flush: %v", err) + } + spans := exporter.GetSpans() + + var serverSpan *tracetest.SpanStub + for i := range spans { + if spans[i].Name == "jsonrpc.test/block" { + serverSpan = &spans[i] + } + } + if serverSpan == nil { + t.Fatal("jsonrpc.test/block span not found") + } + if serverSpan.Status.Code != codes.Error { + t.Errorf("timeout: expected SERVER span error status, got %v (%q)", serverSpan.Status.Code, serverSpan.Status.Description) + } +} diff --git a/rpc/websocket.go b/rpc/websocket.go index e70498873a..5e1e09c89d 100644 --- a/rpc/websocket.go +++ b/rpc/websocket.go @@ -294,11 +294,11 @@ type websocketCodec struct { func newWebsocketCodec(conn *websocket.Conn, host string, req http.Header, readLimit int64) ServerCodec { conn.SetReadLimit(readLimit) var buf []byte - encodeMsg := func(msg *jsonrpcMessage, isError bool) error { + encodeMsg := func(ctx context.Context, msg *jsonrpcMessage, isError bool) error { buf = appendMessage(buf[:0], msg) return conn.WriteMessage(websocket.TextMessage, buf) } - encodeBatch := func(msgs []*jsonrpcMessage, isError bool) error { + encodeBatch := func(ctx context.Context, msgs []*jsonrpcMessage, isError bool) error { buf = appendBatch(buf[:0], msgs) return conn.WriteMessage(websocket.TextMessage, buf) }