Skip to content

Commit bde2766

Browse files
committed
reworked the telemetry package into helpers for SERVER spans and INTERNAL spans
1 parent d2372c0 commit bde2766

File tree

4 files changed

+110
-97
lines changed

4 files changed

+110
-97
lines changed

internal/telemetry/telemetry.go

Lines changed: 81 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -18,10 +18,12 @@ package telemetry
1818

1919
import (
2020
"context"
21+
"fmt"
2122

2223
"go.opentelemetry.io/otel"
2324
"go.opentelemetry.io/otel/attribute"
2425
"go.opentelemetry.io/otel/codes"
26+
semconv "go.opentelemetry.io/otel/semconv/v1.38.0"
2527
"go.opentelemetry.io/otel/trace"
2628
)
2729

@@ -38,37 +40,96 @@ func Int64Attribute(key string, val int64) Attribute {
3840
return attribute.Int64(key, val)
3941
}
4042

41-
// StartSpan starts a tracing span on the default tracer and returns a function
42-
// to end the span. The function will record errors and set span status based
43-
// on the error value.
44-
func StartSpan(ctx context.Context, spanName string, attributes ...Attribute) (context.Context, func(*error)) {
45-
return StartSpanWithTracer(ctx, otel.Tracer(""), spanName, attributes...)
43+
// StartServerSpan creates a SpanKind=SERVER span at the JSON-RPC boundary.
44+
// The span name is formatted as $rpcSystem.$rpcService/$rpcMethod
45+
// (e.g. "jsonrpc.engine/newPayloadV4").
46+
func StartServerSpan(
47+
ctx context.Context,
48+
tracer trace.Tracer,
49+
rpcSystem string,
50+
rpcService string,
51+
rpcMethod string,
52+
requestID string,
53+
additionalAttributes ...Attribute,
54+
) (context.Context, func(error)) {
55+
spanName := fmt.Sprintf("%s.%s/%s", rpcSystem, rpcService, rpcMethod)
56+
ctx, span := tracer.Start(
57+
ctx,
58+
spanName,
59+
trace.WithSpanKind(trace.SpanKindServer),
60+
)
61+
62+
// Fast path: noop provider or span not sampled
63+
if !span.IsRecording() {
64+
return ctx, func(error) { span.End() }
65+
}
66+
67+
// Define required attributes
68+
attrs := []Attribute{
69+
semconv.RPCSystemKey.String(rpcSystem),
70+
semconv.RPCServiceKey.String(rpcService),
71+
semconv.RPCMethodKey.String(rpcMethod),
72+
semconv.RPCJSONRPCRequestID(requestID),
73+
}
74+
75+
// Add any additional attributes provided
76+
if len(additionalAttributes) > 0 {
77+
attrs = append(attrs, additionalAttributes...)
78+
}
79+
span.SetAttributes(attrs...)
80+
return ctx, endSpan(span)
4681
}
4782

48-
// StartSpanWithTracer starts a tracing span on the supplied tracer and returns
49-
// a function to end the span. The function will record errors and set span
50-
// status based on the error value.
51-
func StartSpanWithTracer(ctx context.Context, tracer trace.Tracer, spanName string, attributes ...Attribute) (context.Context, func(*error)) {
52-
ctx, span := tracer.Start(ctx, spanName)
83+
// StartInternalSpan creates a SpanKind=INTERNAL span.
84+
func StartInternalSpan(
85+
ctx context.Context,
86+
spanName string,
87+
attributes ...Attribute,
88+
) (context.Context, func(error)) {
89+
return StartInternalSpanWithTracer(ctx, otel.Tracer(""), spanName, attributes...)
90+
}
5391

54-
// Fast path: noop provider or span not sampled
92+
// StartInternalSpanWithTracer requires a tracer to be passed in and creates a SpanKind=INTERNAL span.
93+
func StartInternalSpanWithTracer(
94+
ctx context.Context,
95+
tracer trace.Tracer,
96+
spanName string,
97+
attributes ...Attribute,
98+
) (context.Context, func(error)) {
99+
return startInternalSpan(ctx, tracer, spanName, attributes...)
100+
}
101+
102+
// startInternalSpan creates a SpanKind=INTERNAL span.
103+
func startInternalSpan(
104+
ctx context.Context,
105+
tracer trace.Tracer,
106+
spanName string,
107+
attributes ...Attribute,
108+
) (context.Context, func(error)) {
109+
ctx, span := tracer.Start(
110+
ctx,
111+
spanName,
112+
trace.WithSpanKind(trace.SpanKindInternal),
113+
)
114+
115+
// Fast path
55116
if !span.IsRecording() {
56-
return ctx, func(*error) { span.End() }
117+
return ctx, func(error) { span.End() }
57118
}
58119

59-
// Set span attributes.
60120
if len(attributes) > 0 {
61121
span.SetAttributes(attributes...)
62122
}
123+
return ctx, endSpan(span)
124+
}
63125

64-
// Define the function to end the span and handle error recording
65-
spanEnd := func(err *error) {
66-
if *err != nil {
67-
// Error occurred, record it and set status on span
68-
span.RecordError(*err)
69-
span.SetStatus(codes.Error, (*err).Error())
126+
// endSpan ends the span and handles error recording.
127+
func endSpan(span trace.Span) func(error) {
128+
return func(err error) {
129+
if err != nil {
130+
span.RecordError(err)
131+
span.SetStatus(codes.Error, err.Error())
70132
}
71133
span.End()
72134
}
73-
return ctx, spanEnd
74135
}

rpc/handler.go

Lines changed: 16 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -510,45 +510,38 @@ func (h *handler) handleCall(cp *callProc, msg *jsonrpcMessage) *jsonrpcMessage
510510
return h.runMethod(cp.ctx, msg, h.unsubscribeCb, args)
511511
}
512512

513-
// Start root span for the request.
514-
var err error
515-
attributes := []telemetry.Attribute{
516-
telemetry.StringAttribute("rpc.method", msg.Method),
517-
telemetry.StringAttribute("rpc.id", string(msg.ID)),
518-
}
519-
ctx, spanEnd := telemetry.StartSpanWithTracer(cp.ctx, h.tracer(), "rpc.handleCall", attributes...)
520-
defer spanEnd(&err)
521-
522513
// Check method name length
523514
if len(msg.Method) > maxMethodNameLength {
524-
errMessage := fmt.Sprintf("method name too long: %d > %d", len(msg.Method), maxMethodNameLength)
525-
invalidRequestError := &invalidRequestError{errMessage}
526-
err = errors.New(invalidRequestError.Error())
527-
return msg.errorResponse(invalidRequestError)
515+
return msg.errorResponse(&invalidRequestError{fmt.Sprintf("method name too long: %d > %d", len(msg.Method), maxMethodNameLength)})
528516
}
529-
callb := h.reg.callback(msg.Method)
517+
callb, rpcService, rpcMethod := h.reg.callback(msg.Method)
518+
519+
// If the method is not found, return an error.
530520
if callb == nil {
531-
methodNotFoundError := &methodNotFoundError{method: msg.Method}
532-
err = errors.New(methodNotFoundError.Error())
533-
return msg.errorResponse(methodNotFoundError)
521+
return msg.errorResponse(&methodNotFoundError{method: msg.Method})
534522
}
535523

524+
// Start root span for the request.
525+
var err error
526+
ctx, spanEnd := telemetry.StartServerSpan(cp.ctx, h.tracer(), "jsonrpc", rpcService, rpcMethod, string(msg.ID))
527+
defer spanEnd(err)
528+
536529
// Start tracing span before parsing arguments.
537-
_, pSpanEnd := telemetry.StartSpanWithTracer(ctx, h.tracer(), "rpc.parsePositionalArguments", attributes...)
530+
_, pSpanEnd := telemetry.StartInternalSpanWithTracer(ctx, h.tracer(), "rpc.parsePositionalArguments")
538531
args, err := parsePositionalArguments(msg.Params, callb.argTypes)
539-
pSpanEnd(&err)
532+
pSpanEnd(err)
540533
if err != nil {
541534
return msg.errorResponse(&invalidParamsError{err.Error()})
542535
}
543536
start := time.Now()
544537

545538
// Start tracing span before running the method.
546-
rctx, rSpanEnd := telemetry.StartSpanWithTracer(ctx, h.tracer(), "rpc.runMethod", attributes...)
539+
rctx, rSpanEnd := telemetry.StartInternalSpanWithTracer(ctx, h.tracer(), "rpc.runMethod")
547540
answer := h.runMethod(rctx, msg, callb, args)
548541
if answer.Error != nil {
549542
err = errors.New(answer.Error.Message)
550543
}
551-
rSpanEnd(&err)
544+
rSpanEnd(err)
552545

553546
// Collect the statistics for RPC calls if metrics is enabled.
554547
rpcRequestGauge.Inc(1)
@@ -616,12 +609,12 @@ func (h *handler) runMethod(ctx context.Context, msg *jsonrpcMessage, callb *cal
616609
if err != nil {
617610
return msg.errorResponse(err)
618611
}
619-
_, spanEnd := telemetry.StartSpanWithTracer(ctx, h.tracer(), "rpc.msg.response", attributes...)
612+
_, spanEnd := telemetry.StartInternalSpanWithTracer(ctx, h.tracer(), "rpc.encodeJSONResponse", attributes...)
620613
response := msg.response(result)
621614
if response.Error != nil {
622615
err = errors.New(response.Error.Message)
623616
}
624-
spanEnd(&err)
617+
spanEnd(err)
625618
return response
626619
}
627620

rpc/service.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -92,14 +92,14 @@ func (r *serviceRegistry) registerName(name string, rcvr interface{}) error {
9292
}
9393

9494
// callback returns the callback corresponding to the given RPC method name.
95-
func (r *serviceRegistry) callback(method string) *callback {
95+
func (r *serviceRegistry) callback(method string) (cb *callback, service, methodName string) {
9696
before, after, found := strings.Cut(method, serviceMethodSeparator)
9797
if !found {
98-
return nil
98+
return nil, "", ""
9999
}
100100
r.mu.Lock()
101101
defer r.mu.Unlock()
102-
return r.services[before].callbacks[after]
102+
return r.services[before].callbacks[after], before, after
103103
}
104104

105105
// subscription returns a subscription callback in the given service.

rpc/tracing_test.go

Lines changed: 10 additions & 51 deletions
Original file line numberDiff line numberDiff line change
@@ -86,67 +86,26 @@ func TestTracingHTTP(t *testing.T) {
8686
}
8787
var rpcSpan *tracetest.SpanStub
8888
for i := range spans {
89-
if spans[i].Name == "rpc.handleCall" {
89+
if spans[i].Name == "jsonrpc.test/echo" {
9090
rpcSpan = &spans[i]
9191
break
9292
}
9393
}
9494
if rpcSpan == nil {
95-
t.Fatalf("rpc.handleCall span not found.")
95+
t.Fatalf("jsonrpc.test/echo span not found.")
9696
}
9797
attrs := attributeMap(rpcSpan.Attributes)
98-
if attrs["rpc.method"] != "test_echo" {
99-
t.Errorf("expected rpc.method=test_echo, got %v", attrs["rpc.method"])
98+
if attrs["rpc.system"] != "jsonrpc" {
99+
t.Errorf("expected rpc.system=jsonrpc, got %v", attrs["rpc.system"])
100100
}
101-
if _, ok := attrs["rpc.id"]; !ok {
102-
t.Errorf("expected rpc.id attribute to be set")
101+
if attrs["rpc.service"] != "test" {
102+
t.Errorf("expected rpc.service=test, got %v", attrs["rpc.service"])
103103
}
104-
}
105-
106-
// TestTracingHTTPMethodNotFound verifies that a span is emitted when rpc method does not exist.
107-
func TestTracingHTTPMethodNotFound(t *testing.T) {
108-
t.Parallel()
109-
server, tracer, exporter := newTracingServer(t)
110-
httpsrv := httptest.NewServer(server)
111-
t.Cleanup(httpsrv.Close)
112-
client, err := DialHTTP(httpsrv.URL)
113-
if err != nil {
114-
t.Fatalf("failed to dial: %v", err)
115-
}
116-
t.Cleanup(client.Close)
117-
118-
// Make a RPC call that should fail.
119-
var result echoResult
120-
if err := client.Call(&result, "testnonexistent", "hello", 42, &echoArgs{S: "world"}); err == nil {
121-
t.Fatalf("RPC call should have failed")
122-
}
123-
124-
// Flush spans.
125-
if err := tracer.ForceFlush(context.Background()); err != nil {
126-
t.Fatalf("failed to flush: %v", err)
127-
}
128-
129-
// Check spans.
130-
spans := exporter.GetSpans()
131-
if len(spans) == 0 {
132-
t.Fatal("no spans were emitted")
133-
}
134-
var rpcSpan *tracetest.SpanStub
135-
for i := range spans {
136-
if spans[i].Name == "rpc.handleCall" {
137-
rpcSpan = &spans[i]
138-
break
139-
}
140-
}
141-
if rpcSpan == nil {
142-
t.Fatalf("rpc.handleCall span not found.")
143-
}
144-
attrs := attributeMap(rpcSpan.Attributes)
145-
if attrs["rpc.method"] != "testnonexistent" {
146-
t.Errorf("expected rpc.method=testnonexistent, got %v", attrs["rpc.method"])
104+
if attrs["rpc.method"] != "echo" {
105+
t.Errorf("expected rpc.method=echo, got %v", attrs["rpc.method"])
147106
}
148-
if _, ok := attrs["rpc.id"]; !ok {
149-
t.Errorf("expected rpc.id attribute to be set")
107+
if _, ok := attrs["rpc.jsonrpc.request_id"]; !ok {
108+
t.Errorf("expected rpc.jsonrpc.request_id attribute to be set")
150109
}
151110
}
152111

0 commit comments

Comments
 (0)