rpc, internal/telemetry: trace JSON-RPC response writes (#35049)
Some checks are pending
/ Linux Build (push) Waiting to run
/ Linux Build (arm) (push) Waiting to run
/ Keeper Build (push) Waiting to run
/ Windows Build (push) Waiting to run
/ Docker Image (push) Waiting to run

The per-call SERVER span ended inside `handleCall()`, so the JSON-RPC
response write happened after the span closed. For large responses like
`engine_getBlobsV*`, that write time was missing from traces.

- Extend the SERVER span past `writeJSON`. 
- For batches, add a top-level `jsonrpc.batch` SERVER span (with `rpc.batch.size`) covering the whole batch including `callBuffer.write`.
- Add `rpc.writeJSON` span around the non-batch response write.
- Add `rpc.writeJSONBatch` span around the batch response write.
- Add `rpc.httpWrite` span around the actual HTTP write, separating JSON encoding from network write.
- Add additional telemetry helpers.

---------

Co-authored-by: Felix Lange <fjl@twurst.com>
This commit is contained in:
Jonny Rhea 2026-06-02 07:13:06 -05:00 committed by GitHub
parent 77a2816468
commit 19f5fe079b
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
12 changed files with 512 additions and 111 deletions

View file

@ -104,7 +104,7 @@ func (p *StateProcessor) Process(ctx context.Context, block *types.Block, stated
statedb.SetTxContext(tx.Hash(), i, uint32(i+1))
_, _, spanEnd := telemetry.StartSpan(ctx, "core.ApplyTransactionWithEVM",
telemetry.StringAttribute("tx.hash", tx.Hash().Hex()),
telemetry.Int64Attribute("tx.index", int64(i)),
telemetry.IntAttribute("tx.index", i),
)
receipt, bal, err := ApplyTransactionWithEVM(msg, gp, statedb, blockNumber, blockHash, context.Time, tx, evm)
if err != nil {

View file

@ -556,12 +556,12 @@ func (api *ConsensusAPI) GetBlobsV1(ctx context.Context, hashes []common.Hash) (
var (
filled int
attrs = []telemetry.Attribute{
telemetry.Int64Attribute("blobs.requested", int64(len(hashes))),
telemetry.IntAttribute("blobs.requested", len(hashes)),
}
)
ctx, span, spanEnd := telemetry.StartSpan(ctx, "engine.getBlobsV1", attrs...)
defer func() {
span.SetAttributes(telemetry.Int64Attribute("blobs.filled", int64(filled)))
span.SetAttributes(telemetry.IntAttribute("blobs.filled", filled))
spanEnd(&err)
}()
@ -643,12 +643,12 @@ func (api *ConsensusAPI) getBlobs(ctx context.Context, hashes []common.Hash, v2
var (
filled int
attrs = []telemetry.Attribute{
telemetry.Int64Attribute("blobs.requested", int64(len(hashes))),
telemetry.IntAttribute("blobs.requested", len(hashes)),
}
)
ctx, span, spanEnd := telemetry.StartSpan(ctx, "engine.getBlobs", attrs...)
defer func() {
span.SetAttributes(telemetry.Int64Attribute("blobs.filled", int64(filled)))
span.SetAttributes(telemetry.IntAttribute("blobs.filled", filled))
spanEnd(&err)
}()
@ -833,7 +833,7 @@ func (api *ConsensusAPI) newPayload(ctx context.Context, params engine.Executabl
var attrs = []telemetry.Attribute{
telemetry.Int64Attribute("block.number", int64(params.Number)),
telemetry.StringAttribute("block.hash", params.BlockHash.Hex()),
telemetry.Int64Attribute("tx.count", int64(len(params.Transactions))),
telemetry.IntAttribute("tx.count", len(params.Transactions)),
}
ctx, _, spanEnd := telemetry.StartSpan(ctx, "engine.newPayload", attrs...)
defer spanEnd(&err)

View file

@ -215,7 +215,7 @@ func (c *SimulatedBeacon) sealBlock(withdrawals []*types.Withdrawal, timestamp u
// Create a server span for forkchoiceUpdated with payload attributes,
// simulating an incoming engine API request from a real consensus client.
fcCtx, fcSpanEnd := telemetry.StartServerSpan(context.Background(), tracer, telemetry.RPCInfo{
fcCtx, fcSpanEnd := telemetry.StartCallServerSpan(context.Background(), tracer, telemetry.RPCInfo{
System: "jsonrpc",
Service: "engine",
Method: "forkchoiceUpdatedV" + fmt.Sprintf("%d", version),
@ -237,7 +237,7 @@ func (c *SimulatedBeacon) sealBlock(withdrawals []*types.Withdrawal, timestamp u
// Create a server span for getPayload, simulating the consensus client
// coming back to retrieve the built payload.
_, gpSpanEnd := telemetry.StartServerSpan(context.Background(), tracer, telemetry.RPCInfo{
_, gpSpanEnd := telemetry.StartCallServerSpan(context.Background(), tracer, telemetry.RPCInfo{
System: "jsonrpc",
Service: "engine",
Method: "getPayloadV" + fmt.Sprintf("%d", version),
@ -286,7 +286,7 @@ func (c *SimulatedBeacon) sealBlock(withdrawals []*types.Withdrawal, timestamp u
// Create a server span for newPayload, simulating the consensus client
// sending the execution payload for validation.
npCtx, npSpanEnd := telemetry.StartServerSpan(context.Background(), tracer, telemetry.RPCInfo{
npCtx, npSpanEnd := telemetry.StartCallServerSpan(context.Background(), tracer, telemetry.RPCInfo{
System: "jsonrpc",
Service: "engine",
Method: "newPayloadV" + fmt.Sprintf("%d", version),
@ -302,7 +302,7 @@ func (c *SimulatedBeacon) sealBlock(withdrawals []*types.Withdrawal, timestamp u
// Create a server span for the final forkchoiceUpdated (no payload attributes),
// which sets the new block as the canonical chain head.
fcuCtx, fcuSpanEnd := telemetry.StartServerSpan(context.Background(), tracer, telemetry.RPCInfo{
fcuCtx, fcuSpanEnd := telemetry.StartCallServerSpan(context.Background(), tracer, telemetry.RPCInfo{
System: "jsonrpc",
Service: "engine",
Method: "forkchoiceUpdatedV" + fmt.Sprintf("%d", version),

View file

@ -40,6 +40,11 @@ func Int64Attribute(key string, val int64) Attribute {
return attribute.Int64(key, val)
}
// IntAttribute creates an attribute with an int value.
func IntAttribute(key string, val int) Attribute {
return attribute.Int(key, val)
}
// BoolAttribute creates an attribute with a bool value.
func BoolAttribute(key string, val bool) Attribute {
return attribute.Bool(key, val)
@ -60,6 +65,13 @@ func StartSpanWithTracer(ctx context.Context, tracer trace.Tracer, name string,
return startSpan(ctx, tracer, trace.SpanKindInternal, name, attributes...)
}
// TracerFromContext returns a Tracer from the TracerProvider associated with the
// parent span in ctx. If ctx has no parent span, the returned tracer comes from
// the no-op provider, so spans created with it will not be exported.
func TracerFromContext(ctx context.Context) trace.Tracer {
return trace.SpanFromContext(ctx).TracerProvider().Tracer("")
}
// RPCInfo contains information about the RPC request.
type RPCInfo struct {
System string
@ -68,11 +80,11 @@ type RPCInfo struct {
RequestID string
}
// StartServerSpan creates a SpanKind=SERVER span at the JSON-RPC boundary.
// StartCallServerSpan creates a SpanKind=SERVER span for a JSON-RPC call.
// The span name is formatted as $rpcSystem.$rpcService/$rpcMethod
// (e.g. "jsonrpc.engine/newPayloadV4") which follows the Open Telemetry
// semantic convensions: https://opentelemetry.io/docs/specs/semconv/rpc/rpc-spans/#span-name.
func StartServerSpan(ctx context.Context, tracer trace.Tracer, rpc RPCInfo, others ...Attribute) (context.Context, func(*error)) {
func StartCallServerSpan(ctx context.Context, tracer trace.Tracer, rpc RPCInfo, others ...Attribute) (context.Context, func(*error)) {
var (
name = fmt.Sprintf("%s.%s/%s", rpc.System, rpc.Service, rpc.Method)
attributes = append([]Attribute{
@ -88,6 +100,36 @@ func StartServerSpan(ctx context.Context, tracer trace.Tracer, rpc RPCInfo, othe
return ctx, end
}
// StartBatchServerSpan creates a SpanKind=SERVER span representing a batched request.
// The span name is "$system.batch" (e.g. "jsonrpc.batch") and per-call spans are nested under it.
// batchSize is exposed as rpc.batch.size.
func StartBatchServerSpan(ctx context.Context, tracer trace.Tracer, system string, batchSize int, others ...Attribute) (context.Context, func(*error)) {
attributes := append([]Attribute{
semconv.RPCSystemKey.String(system),
IntAttribute("rpc.batch.size", batchSize),
}, others...)
ctx, _, end := startSpan(ctx, tracer, trace.SpanKindServer, system+".batch", attributes...)
return ctx, end
}
// StartBatchCallSpan creates a SpanKind=INTERNAL span for an individual RPC call as part of a batch.
// This carries the same name and attributes as StartCallServerSpan.
func StartBatchCallSpan(ctx context.Context, tracer trace.Tracer, rpc RPCInfo, others ...Attribute) (context.Context, func(*error)) {
var (
name = fmt.Sprintf("%s.%s/%s", rpc.System, rpc.Service, rpc.Method)
attributes = append([]Attribute{
semconv.RPCSystemKey.String(rpc.System),
semconv.RPCServiceKey.String(rpc.Service),
semconv.RPCMethodKey.String(rpc.Method),
semconv.RPCJSONRPCRequestID(rpc.RequestID),
},
others...,
)
)
ctx, _, end := startSpan(ctx, tracer, trace.SpanKindInternal, name, attributes...)
return ctx, end
}
// startSpan creates a span with the given kind.
func startSpan(ctx context.Context, tracer trace.Tracer, kind trace.SpanKind, spanName string, attributes ...Attribute) (context.Context, trace.Span, func(*error)) {
ctx, span := tracer.Start(ctx, spanName, trace.WithSpanKind(kind))

View file

@ -217,7 +217,7 @@ func (payload *Payload) ResolveFull() *engine.ExecutionPayloadEnvelope {
func (miner *Miner) runBuildIteration(ctx context.Context, start time.Time, iteration int, payload *Payload, params *generateParams, witness bool) {
ctx, span, spanEnd := telemetry.StartSpan(ctx, "miner.buildIteration",
telemetry.Int64Attribute("iteration", int64(iteration)),
telemetry.IntAttribute("iteration", iteration),
)
var err error
defer spanEnd(&err)
@ -271,7 +271,7 @@ func (miner *Miner) buildPayload(ctx context.Context, args *BuildPayloadArgs, wi
telemetry.Int64Attribute("block.number", int64(empty.block.NumberU64())),
)
defer func() {
bSpan.SetAttributes(telemetry.Int64Attribute("iterations.total", int64(iteration)))
bSpan.SetAttributes(telemetry.IntAttribute("iterations.total", iteration))
bSpanEnd(nil)
}()

View file

@ -137,7 +137,7 @@ func (miner *Miner) generateWork(ctx context.Context, genParam *generateParams,
defer func() {
if result != nil && result.err == nil {
span.SetAttributes(
telemetry.Int64Attribute("txs.count", int64(len(result.block.Transactions()))),
telemetry.IntAttribute("txs.count", len(result.block.Transactions())),
telemetry.Int64Attribute("gas.used", int64(result.block.GasUsed())),
telemetry.StringAttribute("fees", result.fees.String()),
)
@ -572,8 +572,8 @@ func (miner *Miner) fillTransactions(ctx context.Context, interrupt *atomic.Int3
}
pendingBlobTxs, blobTxCount := miner.txpool.Pending(filter)
span.SetAttributes(
telemetry.Int64Attribute("pending.plain.count", int64(plainTxCount)),
telemetry.Int64Attribute("pending.blob.count", int64(blobTxCount)),
telemetry.IntAttribute("pending.plain.count", plainTxCount),
telemetry.IntAttribute("pending.blob.count", blobTxCount),
)
// Split the pending transactions into locals and remotes.

View file

@ -169,40 +169,49 @@ func (b *batchCallBuffer) doWrite(ctx context.Context, conn jsonWriter, isErrorR
}
b.wrote = true // can only write once
if len(b.resp) > 0 {
conn.writeJSONBatch(ctx, b.resp, isErrorResponse)
spanCtx, _, spanEnd := telemetry.StartSpanWithTracer(ctx, telemetry.TracerFromContext(ctx), "rpc.writeJSONBatch")
err := conn.writeJSONBatch(spanCtx, b.resp, isErrorResponse)
spanEnd(&err)
}
}
// handleBatch executes all messages in a batch and returns the responses.
func (h *handler) handleBatch(msgs []*jsonrpcMessage) {
// Emit error response for empty batches:
if len(msgs) == 0 {
h.startCallProc(func(cp *callProc) {
resp := errorMessage(&invalidRequestError{"empty batch"})
h.conn.writeJSON(cp.ctx, resp, true)
// For valid batches, filter response messages and subscription notifications
// out of msgs here.
var calls []*jsonrpcMessage
valid := len(msgs) > 0 && (h.batchRequestLimit == 0 || len(msgs) <= h.batchRequestLimit)
if valid {
calls = make([]*jsonrpcMessage, 0, len(msgs))
h.handleResponses(msgs, func(msg *jsonrpcMessage) {
calls = append(calls, msg)
})
return
}
// Apply limit on total number of requests.
if h.batchRequestLimit != 0 && len(msgs) > h.batchRequestLimit {
h.startCallProc(func(cp *callProc) {
h.respondWithBatchTooLarge(cp, msgs)
})
return
}
// Handle non-call messages first.
// Here we need to find the requestOp that sent the request batch.
calls := make([]*jsonrpcMessage, 0, len(msgs))
h.handleResponses(msgs, func(msg *jsonrpcMessage) {
calls = append(calls, msg)
})
if len(calls) == 0 {
return
if len(calls) == 0 {
// Batch was entirely responses to our own requests; nothing to dispatch.
return
}
}
// Process calls on a goroutine because they may block indefinitely:
h.startCallProc(func(cp *callProc) {
// Top-level batch SERVER span.
var batchSpanEnd func(*error)
cp.ctx, batchSpanEnd = telemetry.StartBatchServerSpan(cp.ctx, h.tracer(), "jsonrpc", len(msgs))
var spanErr error
defer batchSpanEnd(&spanErr)
switch {
case len(msgs) == 0:
spanErr = &invalidRequestError{"empty batch"}
resp := errorMessage(spanErr)
h.conn.writeJSON(cp.ctx, resp, true)
return
case h.batchRequestLimit != 0 && len(msgs) > h.batchRequestLimit:
spanErr = errors.New(errMsgBatchTooLarge)
h.respondWithBatchTooLarge(cp, msgs)
return
}
cp.isBatch = true
var (
timer *time.Timer
@ -212,35 +221,50 @@ func (h *handler) handleBatch(msgs []*jsonrpcMessage) {
cp.ctx, cancel = context.WithCancel(cp.ctx)
defer cancel()
batchCtx := cp.ctx
// Cancel the request context after timeout and send an error response. Since the
// currently-running method might not return immediately on timeout, we must wait
// for the timeout concurrently with processing the request.
if timeout, ok := ContextRequestTimeout(cp.ctx); ok {
if timeout, ok := ContextRequestTimeout(batchCtx); ok {
timer = time.AfterFunc(timeout, func() {
cancel()
err := &internalServerError{errcodeTimeout, errMsgTimeout}
callBuffer.respondWithError(cp.ctx, h.conn, err)
callBuffer.respondWithError(batchCtx, h.conn, err)
})
}
responseBytes := 0
for {
// No need to handle rest of calls if timed out.
if cp.ctx.Err() != nil {
if batchCtx.Err() != nil {
break
}
msg := callBuffer.nextCall()
if msg == nil {
break
}
// Per-call INTERNAL span as a child of the batch SERVER span.
var callSpanEnd func(*error)
cp.ctx, callSpanEnd = telemetry.StartBatchCallSpan(batchCtx, h.tracer(), rpcInfoFromMessage(msg))
resp := h.handleCallMsg(cp, msg)
var callErr error
if resp != nil && resp.Error != nil {
callErr = resp.decodeError()
}
callSpanEnd(&callErr)
// Notifications don't get a response written into the batch reply.
if msg.isNotification() {
resp = nil
}
callBuffer.pushResponse(resp)
if resp != nil && h.batchResponseMaxSize != 0 {
responseBytes += len(resp.Result) + len(resp.Error)
if responseBytes > h.batchResponseMaxSize {
err := &internalServerError{errcodeResponseTooLarge, errMsgResponseTooLarge}
callBuffer.respondWithError(cp.ctx, h.conn, err)
callBuffer.respondWithError(batchCtx, h.conn, err)
break
}
}
@ -250,7 +274,7 @@ func (h *handler) handleBatch(msgs []*jsonrpcMessage) {
}
h.addSubscriptions(cp.notifiers)
callBuffer.write(cp.ctx, h.conn)
callBuffer.write(batchCtx, h.conn)
for _, n := range cp.notifiers {
n.activate()
}
@ -283,22 +307,32 @@ func (h *handler) handleMsg(msg *jsonrpcMessage) {
func (h *handler) handleNonBatchCall(cp *callProc, msg *jsonrpcMessage) {
var (
responded sync.Once
timer *time.Timer
cancel context.CancelFunc
responded sync.Once
timer *time.Timer
cancel context.CancelFunc
responseError error
)
cp.ctx, cancel = context.WithCancel(cp.ctx)
defer cancel()
// Set up the SERVER span for tracing.
var serverSpanEnd func(*error)
cp.ctx, serverSpanEnd = telemetry.StartCallServerSpan(cp.ctx, h.tracer(), rpcInfoFromMessage(msg))
defer serverSpanEnd(&responseError)
// Cancel the request context after timeout and send an error response. Since the
// running method might not return immediately on timeout, we must wait for the
// timeout concurrently with processing the request.
outerCtx := cp.ctx
cp.ctx, cancel = context.WithCancel(cp.ctx)
defer cancel()
if timeout, ok := ContextRequestTimeout(cp.ctx); ok {
timer = time.AfterFunc(timeout, func() {
cancel()
responded.Do(func() {
responseError = errors.New(errMsgTimeout)
writeCtx, _, writeSpanEnd := telemetry.StartSpanWithTracer(outerCtx, h.tracer(), "rpc.writeJSON")
resp := msg.errorResponse(&internalServerError{errcodeTimeout, errMsgTimeout})
h.conn.writeJSON(cp.ctx, resp, true)
err := h.conn.writeJSON(writeCtx, resp, true)
writeSpanEnd(&err)
})
})
}
@ -310,9 +344,22 @@ func (h *handler) handleNonBatchCall(cp *callProc, msg *jsonrpcMessage) {
h.addSubscriptions(cp.notifiers)
if answer != nil {
responded.Do(func() {
h.conn.writeJSON(cp.ctx, answer, false)
if answer.Error != nil {
responseError = answer.decodeError()
}
// Notifications don't get a response written, but their errors are
// still recorded on the SERVER span via responseError above.
if msg.isNotification() {
return
}
writeCtx, _, writeSpanEnd := telemetry.StartSpanWithTracer(outerCtx, h.tracer(), "rpc.writeJSON")
err := h.conn.writeJSON(writeCtx, answer, false)
writeSpanEnd(&err)
})
}
// Enable notification sending of subscriptions, since the response with
// subscription ID has now been sent.
for _, n := range cp.notifiers {
n.activate()
}
@ -472,9 +519,11 @@ func (h *handler) handleCallMsg(ctx *callProc, msg *jsonrpcMessage) *jsonrpcMess
start := time.Now()
switch {
case msg.isNotification():
h.handleCall(ctx, msg)
// Notifications don't get a response written to the client, but the
// answer is returned so the caller can record errors on the SERVER span.
resp := h.handleCall(ctx, msg)
h.log.Debug("Served "+msg.Method, "duration", time.Since(start))
return nil
return resp
case msg.isCall():
resp := h.handleCall(ctx, msg)
@ -516,28 +565,15 @@ func (h *handler) handleCall(cp *callProc, msg *jsonrpcMessage) *jsonrpcMessage
}
return h.runMethod(cp.ctx, msg, h.unsubscribeCb, args)
}
callb, service, method := h.reg.callback(msg.Method)
callb := h.reg.callback(msg.Method)
// If the method is not found, return an error.
if callb == nil {
return msg.errorResponse(&methodNotFoundError{method: msg.Method})
}
// Start root span for the request.
rpcInfo := telemetry.RPCInfo{
System: "jsonrpc",
Service: service,
Method: method,
RequestID: string(msg.ID),
}
attrib := []telemetry.Attribute{
telemetry.BoolAttribute("rpc.batch", cp.isBatch),
}
ctx, spanEnd := telemetry.StartServerSpan(cp.ctx, h.tracer(), rpcInfo, attrib...)
defer spanEnd(nil) // don't propagate errors to parent spans
// Start tracing span before parsing arguments.
_, _, pSpanEnd := telemetry.StartSpanWithTracer(ctx, h.tracer(), "rpc.parsePositionalArguments")
_, _, pSpanEnd := telemetry.StartSpanWithTracer(cp.ctx, h.tracer(), "rpc.parsePositionalArguments")
args, pErr := parsePositionalArguments(msg.Params, callb.argTypes)
pSpanEnd(&pErr)
if pErr != nil {
@ -546,11 +582,11 @@ func (h *handler) handleCall(cp *callProc, msg *jsonrpcMessage) *jsonrpcMessage
start := time.Now()
// Start tracing span before running the method.
rctx, _, rSpanEnd := telemetry.StartSpanWithTracer(ctx, h.tracer(), "rpc.runMethod")
rctx, _, rSpanEnd := telemetry.StartSpanWithTracer(cp.ctx, h.tracer(), "rpc.runMethod")
answer := h.runMethod(rctx, msg, callb, args)
var rErr error
if answer.Error != nil {
rErr = errors.New(answer.decodeError().Message)
rErr = answer.decodeError()
}
rSpanEnd(&rErr)
@ -603,6 +639,18 @@ func (h *handler) handleSubscribe(cp *callProc, msg *jsonrpcMessage) *jsonrpcMes
return h.runMethod(ctx, msg, callb, args)
}
// rpcInfoFromMessage builds the RPCInfo for a SERVER/INTERNAL RPC span from a
// JSON-RPC message.
func rpcInfoFromMessage(msg *jsonrpcMessage) telemetry.RPCInfo {
info := telemetry.RPCInfo{System: "jsonrpc", RequestID: string(msg.ID)}
if service, method, ok := serviceAndMethod(msg.Method); ok {
info.Service, info.Method = service, method
} else {
info.Method = msg.Method
}
return info
}
// tracer returns the OpenTelemetry Tracer for RPC call tracing.
func (h *handler) tracer() trace.Tracer {
if h.tracerProvider == nil {
@ -623,7 +671,7 @@ func (h *handler) runMethod(ctx context.Context, msg *jsonrpcMessage, callb *cal
_, _, spanEnd := telemetry.StartSpanWithTracer(ctx, h.tracer(), "rpc.encodeJSONResponse", attributes...)
response := msg.response(result)
if response.Error != nil {
err = errors.New(response.decodeError().Message)
err = response.decodeError()
}
spanEnd(&err)
return response

View file

@ -31,6 +31,7 @@ import (
"sync"
"time"
"github.com/ethereum/go-ethereum/internal/telemetry"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/propagation"
)
@ -269,13 +270,13 @@ func (s *Server) newHTTPServerConn(r *http.Request, w http.ResponseWriter) Serve
conn := &httpServerConn{Reader: body, Writer: w, r: r}
var buf []byte
encodeMsg := func(msg *jsonrpcMessage, isError bool) error {
encodeMsg := func(ctx context.Context, msg *jsonrpcMessage, isError bool) error {
buf = appendMessage(buf[:0], msg)
return httpWriteResult(w, buf, isError)
return httpWrite(ctx, w, buf, isError)
}
encodeBatch := func(msgs []*jsonrpcMessage, isError bool) error {
encodeBatch := func(ctx context.Context, msgs []*jsonrpcMessage, isError bool) error {
buf = appendBatch(buf[:0], msgs)
return httpWriteResult(w, buf, isError)
return httpWrite(ctx, w, buf, isError)
}
dec := json.NewDecoder(conn)
@ -284,16 +285,17 @@ func (s *Server) newHTTPServerConn(r *http.Request, w http.ResponseWriter) Serve
return NewFuncCodec(conn, encodeMsg, encodeBatch, dec.Decode)
}
// httpWriteResult writes pre-encoded response data over HTTP.
// For error responses, it sets Content-Length and flushes to ensure the response
// is fully written before any HTTP server write timeout occurs.
func httpWriteResult(w http.ResponseWriter, data []byte, isError bool) error {
// httpWrite writes pre-encoded response data over HTTP.
func httpWrite(ctx context.Context, w http.ResponseWriter, data []byte, isError bool) (err error) {
_, _, spanEnd := telemetry.StartSpanWithTracer(ctx, telemetry.TracerFromContext(ctx), "rpc.httpWrite")
defer spanEnd(&err)
w.Header().Set("content-length", strconv.Itoa(len(data)))
if !isError {
// Normal path, just send the response and let the HTTP server decide
// when to flush.
_, err := w.Write(data)
_, err = w.Write(data)
return err
}
@ -309,7 +311,7 @@ func httpWriteResult(w http.ResponseWriter, data []byte, isError bool) error {
// the final chunk is missing. To do this, we set TE = identity, which is a signal
// recognized by outer handlers to avoid compression.
w.Header().Set("transfer-encoding", "identity")
_, err := w.Write(data)
_, err = w.Write(data)
if f, ok := w.(http.Flusher); ok {
f.Flush()
}

View file

@ -205,9 +205,9 @@ type jsonCodec struct {
conn deadlineCloser
}
type encodeMsgFunc = func(msg *jsonrpcMessage, isError bool) error
type encodeMsgFunc = func(ctx context.Context, msg *jsonrpcMessage, isError bool) error
type encodeBatchFunc = func(msgs []*jsonrpcMessage, isError bool) error
type encodeBatchFunc = func(ctx context.Context, msgs []*jsonrpcMessage, isError bool) error
type decodeFunc = func(v interface{}) error
@ -234,13 +234,13 @@ func NewCodec(conn Conn) ServerCodec {
dec := json.NewDecoder(conn)
dec.UseNumber()
var buf []byte
encodeMsg := func(msg *jsonrpcMessage, isError bool) error {
encodeMsg := func(ctx context.Context, msg *jsonrpcMessage, isError bool) error {
buf = appendMessage(buf[:0], msg)
buf = append(buf, '\n')
_, err := conn.Write(buf)
return err
}
encodeBatch := func(msgs []*jsonrpcMessage, isError bool) error {
encodeBatch := func(ctx context.Context, msgs []*jsonrpcMessage, isError bool) error {
buf = appendBatch(buf[:0], msgs)
buf = append(buf, '\n')
_, err := conn.Write(buf)
@ -325,7 +325,7 @@ func (c *jsonCodec) writeJSON(ctx context.Context, msg *jsonrpcMessage, isError
deadline = time.Now().Add(defaultWriteTimeout)
}
c.conn.SetWriteDeadline(deadline)
return c.encodeMsg(msg, isError)
return c.encodeMsg(ctx, msg, isError)
}
func (c *jsonCodec) writeJSONBatch(ctx context.Context, msgs []*jsonrpcMessage, isError bool) error {
@ -337,7 +337,7 @@ func (c *jsonCodec) writeJSONBatch(ctx context.Context, msgs []*jsonrpcMessage,
deadline = time.Now().Add(defaultWriteTimeout)
}
c.conn.SetWriteDeadline(deadline)
return c.encodeBatch(msgs, isError)
return c.encodeBatch(ctx, msgs, isError)
}
func (c *jsonCodec) close() {

View file

@ -91,15 +91,19 @@ func (r *serviceRegistry) registerName(name string, rcvr interface{}) error {
return nil
}
func serviceAndMethod(name string) (service, method string, ok bool) {
return strings.Cut(name, serviceMethodSeparator)
}
// callback returns the callback corresponding to the given RPC method name.
func (r *serviceRegistry) callback(method string) (cb *callback, service, methodName string) {
before, after, found := strings.Cut(method, serviceMethodSeparator)
func (r *serviceRegistry) callback(name string) (cb *callback) {
s, m, found := serviceAndMethod(name)
if !found {
return nil, "", ""
return nil
}
r.mu.Lock()
defer r.mu.Unlock()
return r.services[before].callbacks[after], before, after
return r.services[s].callbacks[m]
}
// subscription returns a subscription callback in the given service.

View file

@ -18,8 +18,13 @@ package rpc
import (
"context"
"io"
"net/http"
"net/http/httptest"
"strconv"
"strings"
"testing"
"time"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/attribute"
@ -27,6 +32,7 @@ import (
"go.opentelemetry.io/otel/propagation"
sdktrace "go.opentelemetry.io/otel/sdk/trace"
"go.opentelemetry.io/otel/sdk/trace/tracetest"
"go.opentelemetry.io/otel/trace"
)
// attributeMap converts a slice of attributes to a map.
@ -105,15 +111,30 @@ func TestTracingHTTP(t *testing.T) {
t.Fatal("no spans were emitted")
}
var rpcSpan *tracetest.SpanStub
var writeJSONSpan *tracetest.SpanStub
var httpWriteSpan *tracetest.SpanStub
for i := range spans {
if spans[i].Name == "jsonrpc.test/echo" {
switch spans[i].Name {
case "jsonrpc.test/echo":
rpcSpan = &spans[i]
break
case "rpc.writeJSON":
writeJSONSpan = &spans[i]
case "rpc.httpWrite":
httpWriteSpan = &spans[i]
}
}
if rpcSpan == nil {
t.Fatalf("jsonrpc.test/echo span not found")
}
if writeJSONSpan == nil {
t.Fatalf("rpc.writeJSON span not found")
}
if httpWriteSpan == nil {
t.Fatalf("rpc.httpWrite span not found")
}
if got, want := httpWriteSpan.Parent.SpanID(), writeJSONSpan.SpanContext.SpanID(); got != want {
t.Errorf("rpc.httpWrite parent: got %s, want rpc.writeJSON (%s)", got, want)
}
// Verify span attributes.
attrs := attributeMap(rpcSpan.Attributes)
@ -167,13 +188,13 @@ func TestTracingHTTPErrorRecording(t *testing.T) {
}
spans := exporter.GetSpans()
// Only the runMethod span should have error status.
// The runMethod span and the SERVER span should both have error status.
if len(spans) == 0 {
t.Fatal("no spans were emitted")
}
for _, span := range spans {
switch span.Name {
case "rpc.runMethod":
case "rpc.runMethod", "jsonrpc.test/returnError":
if span.Status.Code != codes.Error {
t.Errorf("expected %s span status Error, got %v", span.Name, span.Status.Code)
}
@ -214,7 +235,11 @@ func TestTracingBatchHTTP(t *testing.T) {
t.Fatalf("batch RPC call failed: %v", err)
}
// Flush and verify we emitted spans for each batch element.
// Flush and verify the batch trace shape:
// jsonrpc.batch (SERVER, rpc.batch.size=N)
// - jsonrpc.test/echo (INTERNAL, x N)
// - rpc.writeJSONBatch (INTERNAL)
// - rpc.httpWriteResult (INTERNAL)
if err := tracer.ForceFlush(context.Background()); err != nil {
t.Fatalf("failed to flush: %v", err)
}
@ -222,20 +247,68 @@ func TestTracingBatchHTTP(t *testing.T) {
if len(spans) == 0 {
t.Fatal("no spans were emitted")
}
var found int
var (
batchSpan *tracetest.SpanStub
callSpans []*tracetest.SpanStub
writeJSONBatchSpan *tracetest.SpanStub
httpWriteSpan *tracetest.SpanStub
)
for i := range spans {
if spans[i].Name == "jsonrpc.test/echo" {
attrs := attributeMap(spans[i].Attributes)
if attrs["rpc.system"] == "jsonrpc" &&
attrs["rpc.service"] == "test" &&
attrs["rpc.method"] == "echo" &&
attrs["rpc.batch"] == "true" {
found++
}
switch spans[i].Name {
case "jsonrpc.batch":
batchSpan = &spans[i]
case "jsonrpc.test/echo":
callSpans = append(callSpans, &spans[i])
case "rpc.writeJSONBatch":
writeJSONBatchSpan = &spans[i]
case "rpc.httpWrite":
httpWriteSpan = &spans[i]
}
}
if found != len(batch) {
t.Fatalf("expected %d matching batch spans, got %d", len(batch), found)
if batchSpan == nil {
t.Fatal("jsonrpc.batch span not found")
}
if got, want := len(callSpans), len(batch); got != want {
t.Fatalf("got %d per-call spans, want %d", got, want)
}
if writeJSONBatchSpan == nil {
t.Fatal("rpc.writeJSONBatch span not found")
}
if httpWriteSpan == nil {
t.Fatal("rpc.httpWrite span not found")
}
// Batch span: SERVER kind, rpc.batch.size=N.
if batchSpan.SpanKind != trace.SpanKindServer {
t.Errorf("jsonrpc.batch: got kind %v, want SERVER", batchSpan.SpanKind)
}
batchAttrs := attributeMap(batchSpan.Attributes)
if got, want := batchAttrs["rpc.batch.size"], strconv.Itoa(len(batch)); got != want {
t.Errorf("jsonrpc.batch rpc.batch.size: got %q, want %q", got, want)
}
// Per-call spans: INTERNAL kind, parented to the batch span, carry rpc.* attrs.
for _, s := range callSpans {
if s.SpanKind != trace.SpanKindInternal {
t.Errorf("jsonrpc.test/echo: got kind %v, want INTERNAL", s.SpanKind)
}
if got, want := s.Parent.SpanID(), batchSpan.SpanContext.SpanID(); got != want {
t.Errorf("jsonrpc.test/echo parent: got %s, want %s (batch)", got, want)
}
attrs := attributeMap(s.Attributes)
if attrs["rpc.system"] != "jsonrpc" || attrs["rpc.service"] != "test" || attrs["rpc.method"] != "echo" {
t.Errorf("jsonrpc.test/echo attrs missing rpc.system/service/method: %v", attrs)
}
}
// writeJSONBatch parented to the batch span.
if got, want := writeJSONBatchSpan.Parent.SpanID(), batchSpan.SpanContext.SpanID(); got != want {
t.Errorf("rpc.writeJSONBatch parent: got %s, want %s (batch)", got, want)
}
// httpWriteResult parented to writeJSONBatch.
if got, want := httpWriteSpan.Parent.SpanID(), writeJSONBatchSpan.SpanContext.SpanID(); got != want {
t.Errorf("rpc.httpWriteResult parent: got %s, want %s (rpc.writeJSONBatch)", got, want)
}
}
@ -266,3 +339,235 @@ func TestTracingSubscribeUnsubscribe(t *testing.T) {
t.Errorf("expected no spans for subscribe/unsubscribe, got %d", len(spans))
}
}
// postJSONRPC sends a raw JSON body to the given test server and discards the
// response body. Used to send messages the typed RPC client can't construct,
// like notifications (no "id" field).
func postJSONRPC(t *testing.T, url, body string) {
t.Helper()
req, err := http.NewRequest(http.MethodPost, url, strings.NewReader(body))
if err != nil {
t.Fatalf("new request: %v", err)
}
req.Header.Set("Content-Type", "application/json")
resp, err := http.DefaultClient.Do(req)
if err != nil {
t.Fatalf("request: %v", err)
}
_, _ = io.Copy(io.Discard, resp.Body)
resp.Body.Close()
}
// TestTracingHTTPNotification verifies that a JSON-RPC notification emits the
// SERVER span (with error captured when applicable) but no rpc.writeJSON span,
// since notifications do not get a response written.
func TestTracingHTTPNotification(t *testing.T) {
t.Parallel()
server, tracer, exporter := newTracingServer(t)
httpsrv := httptest.NewServer(server)
t.Cleanup(httpsrv.Close)
// Successful notification (no "id"): should produce a SERVER span without error,
// and no rpc.writeJSON span.
postJSONRPC(t, httpsrv.URL, `{"jsonrpc":"2.0","method":"test_echo","params":["hi",1,{"S":"x"}]}`)
// Notification with unknown method: SERVER span should be present with error status.
postJSONRPC(t, httpsrv.URL, `{"jsonrpc":"2.0","method":"test_doesNotExist"}`)
if err := tracer.ForceFlush(context.Background()); err != nil {
t.Fatalf("failed to flush: %v", err)
}
spans := exporter.GetSpans()
var (
echoSpan *tracetest.SpanStub
unknownSpan *tracetest.SpanStub
writeJSONFound bool
)
for i := range spans {
switch spans[i].Name {
case "jsonrpc.test/echo":
echoSpan = &spans[i]
case "jsonrpc.test/doesNotExist":
unknownSpan = &spans[i]
case "rpc.writeJSON":
writeJSONFound = true
}
}
if echoSpan == nil {
t.Fatal("jsonrpc.test/echo span not found for successful notification")
}
if echoSpan.Status.Code == codes.Error {
t.Errorf("successful notification: expected no error status, got %v", echoSpan.Status)
}
if unknownSpan == nil {
t.Fatal("jsonrpc.test/doesNotExist span not found for unknown-method notification")
}
if unknownSpan.Status.Code != codes.Error {
t.Errorf("unknown-method notification: expected error status, got %v", unknownSpan.Status.Code)
}
if writeJSONFound {
t.Error("notifications should not produce an rpc.writeJSON span")
}
}
// TestTracingBatchHTTPErrorCapture verifies that errors on individual calls
// inside a batch are recorded on the per-call INTERNAL span, including the
// pre-dispatch cases (method not found / invalid params) where runMethod
// never runs.
func TestTracingBatchHTTPErrorCapture(t *testing.T) {
t.Parallel()
server, tracer, exporter := newTracingServer(t)
httpsrv := httptest.NewServer(server)
t.Cleanup(httpsrv.Close)
// A batch with: one valid call, one unknown method, one method that
// returns an error from its handler.
body := `[
{"jsonrpc":"2.0","id":1,"method":"test_echo","params":["x",1,{"S":"a"}]},
{"jsonrpc":"2.0","id":2,"method":"test_doesNotExist"},
{"jsonrpc":"2.0","id":3,"method":"test_returnError"}
]`
postJSONRPC(t, httpsrv.URL, body)
if err := tracer.ForceFlush(context.Background()); err != nil {
t.Fatalf("failed to flush: %v", err)
}
spans := exporter.GetSpans()
byName := make(map[string]*tracetest.SpanStub)
for i := range spans {
byName[spans[i].Name] = &spans[i]
}
if byName["jsonrpc.batch"] == nil {
t.Fatal("jsonrpc.batch span not found")
}
if echo := byName["jsonrpc.test/echo"]; echo == nil {
t.Fatal("jsonrpc.test/echo span not found")
} else if echo.Status.Code == codes.Error {
t.Errorf("test/echo: unexpected error status %v", echo.Status)
}
if missing := byName["jsonrpc.test/doesNotExist"]; missing == nil {
t.Fatal("jsonrpc.test/doesNotExist span not found (method-not-found should still get a per-call span)")
} else if missing.Status.Code != codes.Error {
t.Errorf("test/doesNotExist: expected error status, got %v", missing.Status.Code)
}
if ret := byName["jsonrpc.test/returnError"]; ret == nil {
t.Fatal("jsonrpc.test/returnError span not found")
} else if ret.Status.Code != codes.Error {
t.Errorf("test/returnError: expected error status, got %v", ret.Status.Code)
}
}
// TestTracingBatchHTTPEmpty verifies that an empty batch still emits a
// SERVER span, with rpc.batch.size=0 and error status.
func TestTracingBatchHTTPEmpty(t *testing.T) {
t.Parallel()
server, tracer, exporter := newTracingServer(t)
httpsrv := httptest.NewServer(server)
t.Cleanup(httpsrv.Close)
postJSONRPC(t, httpsrv.URL, `[]`)
if err := tracer.ForceFlush(context.Background()); err != nil {
t.Fatalf("failed to flush: %v", err)
}
spans := exporter.GetSpans()
var batchSpan *tracetest.SpanStub
for i := range spans {
if spans[i].Name == "jsonrpc.batch" {
batchSpan = &spans[i]
}
}
if batchSpan == nil {
t.Fatal("jsonrpc.batch span not found for empty batch")
}
if batchSpan.Status.Code != codes.Error {
t.Errorf("empty batch: expected error status, got %v", batchSpan.Status.Code)
}
attrs := attributeMap(batchSpan.Attributes)
if got, want := attrs["rpc.batch.size"], "0"; got != want {
t.Errorf("empty batch rpc.batch.size: got %q, want %q", got, want)
}
}
// TestTracingBatchHTTPTooLarge verifies that a batch exceeding the server's
// item limit emits a SERVER span with rpc.batch.size=N and error status.
func TestTracingBatchHTTPTooLarge(t *testing.T) {
t.Parallel()
server, tracer, exporter := newTracingServer(t)
server.SetBatchLimits(2, 100000) // limit to 2 items
httpsrv := httptest.NewServer(server)
t.Cleanup(httpsrv.Close)
// 3 items > limit of 2.
body := `[
{"jsonrpc":"2.0","id":1,"method":"test_echo","params":["a",1,{"S":"x"}]},
{"jsonrpc":"2.0","id":2,"method":"test_echo","params":["b",2,{"S":"y"}]},
{"jsonrpc":"2.0","id":3,"method":"test_echo","params":["c",3,{"S":"z"}]}
]`
postJSONRPC(t, httpsrv.URL, body)
if err := tracer.ForceFlush(context.Background()); err != nil {
t.Fatalf("failed to flush: %v", err)
}
spans := exporter.GetSpans()
var batchSpan *tracetest.SpanStub
for i := range spans {
if spans[i].Name == "jsonrpc.batch" {
batchSpan = &spans[i]
}
}
if batchSpan == nil {
t.Fatal("jsonrpc.batch span not found for too-large batch")
}
if batchSpan.Status.Code != codes.Error {
t.Errorf("batch-too-large: expected error status, got %v", batchSpan.Status.Code)
}
attrs := attributeMap(batchSpan.Attributes)
if got, want := attrs["rpc.batch.size"], "3"; got != want {
t.Errorf("batch-too-large rpc.batch.size: got %q, want %q", got, want)
}
}
// TestTracingHTTPTimeout verifies that when a non-batch call exceeds the HTTP
// server's WriteTimeout, the SERVER span ends with error status (carrying the
// timeout error message).
func TestTracingHTTPTimeout(t *testing.T) {
t.Parallel()
server, tracer, exporter := newTracingServer(t)
// Configure a short WriteTimeout so the internal request timer fires
// quickly. ContextRequestTimeout subtracts 100ms from WriteTimeout, so
// 250ms here gives ~150ms before the timeout response is sent.
httpsrv := httptest.NewUnstartedServer(server)
httpsrv.Config.WriteTimeout = 250 * time.Millisecond
httpsrv.Start()
t.Cleanup(httpsrv.Close)
// test_block waits on ctx.Done() and returns an error. The internal
// timer cancels ctx, so test_block unblocks shortly after the timeout
// response goes out.
postJSONRPC(t, httpsrv.URL, `{"jsonrpc":"2.0","id":1,"method":"test_block"}`)
if err := tracer.ForceFlush(context.Background()); err != nil {
t.Fatalf("failed to flush: %v", err)
}
spans := exporter.GetSpans()
var serverSpan *tracetest.SpanStub
for i := range spans {
if spans[i].Name == "jsonrpc.test/block" {
serverSpan = &spans[i]
}
}
if serverSpan == nil {
t.Fatal("jsonrpc.test/block span not found")
}
if serverSpan.Status.Code != codes.Error {
t.Errorf("timeout: expected SERVER span error status, got %v (%q)", serverSpan.Status.Code, serverSpan.Status.Description)
}
}

View file

@ -294,11 +294,11 @@ type websocketCodec struct {
func newWebsocketCodec(conn *websocket.Conn, host string, req http.Header, readLimit int64) ServerCodec {
conn.SetReadLimit(readLimit)
var buf []byte
encodeMsg := func(msg *jsonrpcMessage, isError bool) error {
encodeMsg := func(ctx context.Context, msg *jsonrpcMessage, isError bool) error {
buf = appendMessage(buf[:0], msg)
return conn.WriteMessage(websocket.TextMessage, buf)
}
encodeBatch := func(msgs []*jsonrpcMessage, isError bool) error {
encodeBatch := func(ctx context.Context, msgs []*jsonrpcMessage, isError bool) error {
buf = appendBatch(buf[:0], msgs)
return conn.WriteMessage(websocket.TextMessage, buf)
}