Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 9 additions & 9 deletions internal/telemetry/telemetry.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,12 +46,12 @@ func BoolAttribute(key string, val bool) Attribute {
}

// StartSpan creates a SpanKind=INTERNAL span.
func StartSpan(ctx context.Context, spanName string, attributes ...Attribute) (context.Context, trace.Span, func(error)) {
func StartSpan(ctx context.Context, spanName string, attributes ...Attribute) (context.Context, trace.Span, func(*error)) {
return StartSpanWithTracer(ctx, otel.Tracer(""), spanName, attributes...)
}

// StartSpanWithTracer requires a tracer to be passed in and creates a SpanKind=INTERNAL span.
func StartSpanWithTracer(ctx context.Context, tracer trace.Tracer, name string, attributes ...Attribute) (context.Context, trace.Span, func(error)) {
func StartSpanWithTracer(ctx context.Context, tracer trace.Tracer, name string, attributes ...Attribute) (context.Context, trace.Span, func(*error)) {
return startSpan(ctx, tracer, trace.SpanKindInternal, name, attributes...)
}

Expand All @@ -67,7 +67,7 @@ type RPCInfo struct {
// The span name is formatted as $rpcSystem.$rpcService/$rpcMethod
// (e.g. "jsonrpc.engine/newPayloadV4") which follows the Open Telemetry
// semantic convensions: https://opentelemetry.io/docs/specs/semconv/rpc/rpc-spans/#span-name.
func StartServerSpan(ctx context.Context, tracer trace.Tracer, rpc RPCInfo, others ...Attribute) (context.Context, func(error)) {
func StartServerSpan(ctx context.Context, tracer trace.Tracer, rpc RPCInfo, others ...Attribute) (context.Context, func(*error)) {
var (
name = fmt.Sprintf("%s.%s/%s", rpc.System, rpc.Service, rpc.Method)
attributes = append([]Attribute{
Expand All @@ -84,7 +84,7 @@ func StartServerSpan(ctx context.Context, tracer trace.Tracer, rpc RPCInfo, othe
}

// startSpan creates a span with the given kind.
func startSpan(ctx context.Context, tracer trace.Tracer, kind trace.SpanKind, spanName string, attributes ...Attribute) (context.Context, trace.Span, func(error)) {
func startSpan(ctx context.Context, tracer trace.Tracer, kind trace.SpanKind, spanName string, attributes ...Attribute) (context.Context, trace.Span, func(*error)) {
ctx, span := tracer.Start(ctx, spanName, trace.WithSpanKind(kind))
if len(attributes) > 0 {
span.SetAttributes(attributes...)
Expand All @@ -93,11 +93,11 @@ func startSpan(ctx context.Context, tracer trace.Tracer, kind trace.SpanKind, sp
}

// endSpan ends the span and handles error recording.
func endSpan(span trace.Span) func(error) {
return func(err error) {
if err != nil {
span.RecordError(err)
span.SetStatus(codes.Error, err.Error())
func endSpan(span trace.Span) func(*error) {
return func(err *error) {
if err != nil && *err != nil {
span.RecordError(*err)
span.SetStatus(codes.Error, (*err).Error())
}
span.End()
}
Expand Down
18 changes: 9 additions & 9 deletions rpc/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -524,7 +524,6 @@ func (h *handler) handleCall(cp *callProc, msg *jsonrpcMessage) *jsonrpcMessage
}

// Start root span for the request.
var err error
rpcInfo := telemetry.RPCInfo{
System: "jsonrpc",
Service: service,
Expand All @@ -535,24 +534,25 @@ func (h *handler) handleCall(cp *callProc, msg *jsonrpcMessage) *jsonrpcMessage
telemetry.BoolAttribute("rpc.batch", cp.isBatch),
}
ctx, spanEnd := telemetry.StartServerSpan(cp.ctx, h.tracer(), rpcInfo, attrib...)
defer spanEnd(err)
defer spanEnd(nil) // don't propagate errors to parent spans

// Start tracing span before parsing arguments.
_, _, pSpanEnd := telemetry.StartSpanWithTracer(ctx, h.tracer(), "rpc.parsePositionalArguments")
args, err := parsePositionalArguments(msg.Params, callb.argTypes)
pSpanEnd(err)
if err != nil {
return msg.errorResponse(&invalidParamsError{err.Error()})
args, pErr := parsePositionalArguments(msg.Params, callb.argTypes)
pSpanEnd(&pErr)
if pErr != nil {
return msg.errorResponse(&invalidParamsError{pErr.Error()})
}
start := time.Now()

// Start tracing span before running the method.
rctx, _, rSpanEnd := telemetry.StartSpanWithTracer(ctx, h.tracer(), "rpc.runMethod")
answer := h.runMethod(rctx, msg, callb, args)
var rErr error
if answer.Error != nil {
err = errors.New(answer.Error.Message)
rErr = errors.New(answer.Error.Message)
}
rSpanEnd(err)
rSpanEnd(&rErr)

// Collect the statistics for RPC calls if metrics is enabled.
rpcRequestGauge.Inc(1)
Expand Down Expand Up @@ -625,7 +625,7 @@ func (h *handler) runMethod(ctx context.Context, msg *jsonrpcMessage, callb *cal
if response.Error != nil {
err = errors.New(response.Error.Message)
}
spanEnd(err)
spanEnd(&err)
return response
}

Expand Down
44 changes: 44 additions & 0 deletions rpc/tracing_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (

"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/codes"
"go.opentelemetry.io/otel/propagation"
sdktrace "go.opentelemetry.io/otel/sdk/trace"
"go.opentelemetry.io/otel/sdk/trace/tracetest"
Expand Down Expand Up @@ -141,6 +142,49 @@ func TestTracingHTTP(t *testing.T) {
}
}

// TestTracingErrorRecording verifies that errors are recorded on spans.
func TestTracingHTTPErrorRecording(t *testing.T) {
t.Parallel()
server, tracer, exporter := newTracingServer(t)
httpsrv := httptest.NewServer(server)
t.Cleanup(httpsrv.Close)
client, err := DialHTTP(httpsrv.URL)
if err != nil {
t.Fatalf("failed to dial: %v", err)
}
t.Cleanup(client.Close)

// Call a method that returns an error.
var result any
err = client.Call(&result, "test_returnError")
if err == nil {
t.Fatal("expected error from test_returnError")
}

// Flush and verify spans recorded the error.
if err := tracer.ForceFlush(context.Background()); err != nil {
t.Fatalf("failed to flush: %v", err)
}
spans := exporter.GetSpans()

// Only the runMethod span should have error status.
if len(spans) == 0 {
t.Fatal("no spans were emitted")
}
for _, span := range spans {
switch span.Name {
case "rpc.runMethod":
if span.Status.Code != codes.Error {
t.Errorf("expected %s span status Error, got %v", span.Name, span.Status.Code)
}
default:
if span.Status.Code == codes.Error {
t.Errorf("unexpected error status on span %s", span.Name)
}
}
}
}

// TestTracingBatchHTTP verifies that RPC spans are emitted for batched JSON-RPC calls over HTTP.
func TestTracingBatchHTTP(t *testing.T) {
t.Parallel()
Expand Down