Skip to content

Feat: Add OTel Tracing Headers and Baggage with CueX Requests #115

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Apr 10, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions cue/cuex/externalserver/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package externalserver
import (
"context"
"encoding/json"
"github.com/kubevela/pkg/cue/cuex/runtime"
"io"
"net/http"

Expand All @@ -39,6 +40,8 @@ type GenericServerProviderFn[T any, U any] func(context.Context, *T) (*U, error)

// Call handle rest call for given request
func (fn GenericServerProviderFn[T, U]) Call(request *restful.Request, response *restful.Response) {
ctx := runtime.ContextFromHeaders(request.Request)
request.Request = request.Request.WithContext(ctx)
bs, err := io.ReadAll(request.Request.Body)
if err != nil {
_ = response.WriteError(http.StatusBadRequest, err)
Expand Down
86 changes: 86 additions & 0 deletions cue/cuex/externalserver/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,11 @@ import (
"context"
"encoding/json"
"fmt"
"github.com/stretchr/testify/assert"
"go.opentelemetry.io/otel/propagation"
"go.opentelemetry.io/otel/trace"
"io"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"net/http"
"net/http/httptest"
"testing"
Expand Down Expand Up @@ -115,3 +119,85 @@ func TestExternalServer(t *testing.T) {
})
}
}

func TestExternalServerBaggage(t *testing.T) {
for name, tt := range map[string]struct {
IncludeBaggage bool
}{
"withBaggage": {
IncludeBaggage: true,
},

"withoutBaggage": {
IncludeBaggage: false,
},
} {
t.Run(name, func(t *testing.T) {
var ctx context.Context
var span trace.Span
if tt.IncludeBaggage {
jsonCtx := &unstructured.Unstructured{
Object: map[string]interface{}{
"appName": "app-name",
"namespace": "a-namespace",
},
}
ctx, span, _ = cuexruntime.StartSpanWithBaggage(context.Background(), "span", jsonCtx)
} else {
ctx, span = cuexruntime.StartSpan(context.Background(), "span")
}
defer span.End()

fn := externalserver.GenericServerProviderFn[null, propagationCheckResp](propagationCheck)
payload := []byte("{}")

httpReq := httptest.
NewRequest(http.MethodPost, "/propagationCheck", bytes.NewReader(payload)).
WithContext(ctx)

traceHeaderPropagator := cuexruntime.TraceHeaderPropagator{}
traceHeaderPropagator.Inject(ctx, propagation.HeaderCarrier(httpReq.Header))

httpResp := httptest.NewRecorder()

req := restful.NewRequest(httpReq)
resp := restful.NewResponse(httpResp)

fn.Call(req, resp)

var response propagationCheckResp
err := json.Unmarshal(httpResp.Body.Bytes(), &response)
require.NoError(t, err)
assert.Equal(t, httpResp.Code, 200)

assert.NotEmpty(t, response.TraceID)
assert.NotEmpty(t, response.SpanID)
if tt.IncludeBaggage {
assert.Equal(t, "app-name", response.Context["appName"])
assert.Equal(t, "a-namespace", response.Context["namespace"])
} else {
assert.Empty(t, response.Context)
}
})
}
}

type propagationCheckResp struct {
TraceID string `json:"traceID"`
SpanID string `json:"spanID"`
Context map[string]interface{} `json:"context"`
}

type null struct{}

func propagationCheck(ctx context.Context, input *null) (*propagationCheckResp, error) {
span := trace.SpanFromContext(ctx)
pCtx, _ := cuexruntime.GetPropagatedContext(ctx)
var mapCtx map[string]interface{}
_ = pCtx.UnmarshalContext(&mapCtx)
return &propagationCheckResp{
TraceID: span.SpanContext().TraceID().String(),
SpanID: span.SpanContext().SpanID().String(),
Context: mapCtx,
}, nil
}
7 changes: 7 additions & 0 deletions cue/cuex/runtime/provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"crypto/tls"
"encoding/json"
"fmt"
"go.opentelemetry.io/otel/propagation"
"io"
"net/http"
"strings"
Expand Down Expand Up @@ -90,6 +91,7 @@ func (in *ExternalProviderFn) Call(ctx context.Context, value cue.Value) (cue.Va
case v1alpha1.ProtocolHTTP, v1alpha1.ProtocolHTTPS:
ep := fmt.Sprintf("%s/%s", strings.TrimSuffix(in.Endpoint, "/"), in.Fn)
req, err := http.NewRequest(http.MethodPost, ep, bytes.NewReader(bs))
in.InjectHeaders(ctx, req)
if err != nil {
return value, err
}
Expand Down Expand Up @@ -117,6 +119,11 @@ func (in *ExternalProviderFn) Call(ctx context.Context, value cue.Value) (cue.Va
return value.FillPath(cue.ParsePath(providers.ReturnsKey), ret), nil
}

// InjectHeaders Injects headers from the current span into the http request headers
func (in *ExternalProviderFn) InjectHeaders(ctx context.Context, r *http.Request) {
TraceHeaderPropagator{}.Inject(ctx, propagation.HeaderCarrier(r.Header))
}

var _ ProviderFn = NativeProviderFn(nil)

// NativeProviderFn native function that implements ProviderFn interface
Expand Down
200 changes: 200 additions & 0 deletions cue/cuex/runtime/tracer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,200 @@
package runtime

import (
"context"
"cuelang.org/go/cue"
"cuelang.org/go/cue/cuecontext"
"encoding/base64"
"encoding/json"
"errors"
"fmt"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/baggage"
"go.opentelemetry.io/otel/propagation"
tracesdk "go.opentelemetry.io/otel/sdk/trace"
"go.opentelemetry.io/otel/trace"
"k8s.io/klog"
"net/http"
"strings"
)

var newBaggageMember = baggage.NewMember // function injection for tests
var newBaggage = baggage.New // function injection for tests

func init() {
// set the default OTel implementation if the controller did not already handle the setup
// fallback functionality - preferred to update the controller to handle this
oTelTracerProviderSet := otel.GetTracerProvider().Tracer("check") != otel.Tracer("check")
if !oTelTracerProviderSet {
tp := tracesdk.NewTracerProvider()
otel.SetTracerProvider(tp)
}
}

type ctxKey string

// PropagatedCtx .
type PropagatedCtx struct {
pCtx []byte
context.Context
}

const (
key ctxKey = "PropagatedCtx"
headerPrefix = "X-Vela"
traceParent = "traceparent"
traceState = "tracestate"
encodedCtx = "Encoded-Context"
)

// StartSpan creates a new OpenTelemetry span and returns the updated context and span.
func StartSpan(ctx context.Context, name string) (context.Context, trace.Span) {
tracer := otel.Tracer(name)
ctx, span := tracer.Start(ctx, name)
return ctx, span
}

// StartSpanWithBaggage creates a new OpenTelemetry span and attaches encoded JSON context as baggage.
func StartSpanWithBaggage(ctx context.Context, name string, hCtx json.Marshaler) (context.Context, trace.Span, error) {
jsonCtx, err := json.Marshal(hCtx)
if err != nil {
klog.Errorf("failed to marshal json: %v", err)
ctx, span := StartSpan(ctx, name)
return ctx, span, err
}

member, err := newBaggageMember(strings.ToLower(encodedCtx), base64.StdEncoding.EncodeToString(jsonCtx))
if err != nil {
klog.Warningf("failed to encode context baggage header: \n%v", err)
} else {
bag, err := newBaggage(member)
if err != nil {
klog.Errorf("failed to create context bag: \n%v", err)
ctx, span := StartSpan(ctx, name)
return ctx, span, err
}
ctx = baggage.ContextWithBaggage(ctx, bag)
}

ctx, span := StartSpan(ctx, name)
return ctx, span, err
}

// WithBaggage appends additional baggage entries to the context.
func WithBaggage(ctx context.Context, b map[string]string) (context.Context, error) {
members := baggage.FromContext(ctx).Members()
var failures []string
for k, v := range b {
m, err := newBaggageMember(k, v)
if err != nil {
klog.Errorf("failed to create new member\n%v", err)
failures = append(failures, fmt.Sprintf("failed to create new member %s = %s", k, v))
continue
}
members = append(members, m)
}
bag, err := newBaggage(members...)
if err != nil {
klog.Errorf("failed to create context bag: \n%v", err)
return ctx, err
}

if len(failures) > 0 {
klog.Warningf("Baggage created with failed members!")
return baggage.ContextWithBaggage(ctx, bag), errors.New(strings.Join(failures, "\n"))
}
return baggage.ContextWithBaggage(ctx, bag), nil
}

// TraceHeaderPropagator .
type TraceHeaderPropagator struct {
traceContextPropagator propagation.TraceContext
}

// Inject serializes the span baggage and trace context into the provided HTTP headers.
func (cp TraceHeaderPropagator) Inject(ctx context.Context, carrier propagation.TextMapCarrier) {
bag := baggage.FromContext(ctx)
for _, member := range bag.Members() {
headerName := fmt.Sprintf("%s-%s", headerPrefix, member.Key())
carrier.Set(headerName, member.Value())
}
cp.traceContextPropagator.Inject(ctx, carrier)
}

// Extract reconstructs the span context and baggage from the provided HTTP headers.
func (cp TraceHeaderPropagator) Extract(ctx context.Context, carrier propagation.TextMapCarrier) context.Context {
ctx = TraceHeaderPropagator{}.traceContextPropagator.Extract(ctx, carrier)
appContext := &PropagatedCtx{
Context: ctx,
}
for _, hKey := range carrier.Keys() {
value := carrier.Get(hKey)
if strings.HasPrefix(hKey, headerPrefix) {
strippedKey := strings.TrimPrefix(hKey, headerPrefix+"-")

switch strippedKey {

case encodedCtx:
dataBytes, err := base64.StdEncoding.DecodeString(value)
if err != nil {
klog.Errorf("error decoding base64 context: %v\n", err)
continue
}
appContext.pCtx = dataBytes

case traceParent, traceState: // do nothing

Check warning on line 145 in cue/cuex/runtime/tracer.go

View check run for this annotation

Codecov / codecov/patch

cue/cuex/runtime/tracer.go#L145

Added line #L145 was not covered by tests

default:
appContext.Context = context.WithValue(appContext.Context, strings.ToLower(strippedKey), value)
}
}
}

return context.WithValue(ctx, key, appContext)
}

// RawJSON returns the embedded JSON context as a json.RawMessage.
func (p *PropagatedCtx) RawJSON() json.RawMessage {
return json.RawMessage(p.pCtx)
}

// UnmarshalContext unmarshals the embedded JSON context into the provided struct.
func (p *PropagatedCtx) UnmarshalContext(out interface{}) error {
return json.Unmarshal(p.pCtx, out)
}

// GetCueContext returns the embedded JSON context as a CUE value.
func (p *PropagatedCtx) GetCueContext() (*cue.Value, error) {
cueCtx := cuecontext.New()
cueVal := cueCtx.CompileString(string(p.pCtx))
if cueVal.Err() != nil {
return &cue.Value{}, cueVal.Err()
}
return &cueVal, nil
}

// ContextFromHeaders extracts the span context and baggage from an HTTP request and returns the reconstructed ctx.
func ContextFromHeaders(r *http.Request) context.Context {
return TraceHeaderPropagator{}.Extract(context.Background(), propagation.HeaderCarrier(r.Header))
}

// GetPropagatedContext retrieves the PropagatedCtx from the given context if present.
func GetPropagatedContext(ctx context.Context) (*PropagatedCtx, bool) {
pCtx := ctx.Value(key)
if pCtx != nil {
if pc, ok := pCtx.(*PropagatedCtx); ok {
return pc, true
}
return &PropagatedCtx{pCtx: []byte("")}, false

Check warning on line 188 in cue/cuex/runtime/tracer.go

View check run for this annotation

Codecov / codecov/patch

cue/cuex/runtime/tracer.go#L188

Added line #L188 was not covered by tests
}
return &PropagatedCtx{pCtx: []byte("")}, false
}

// Fields returns the list of HTTP headers used for trace context propagation.
func (cp TraceHeaderPropagator) Fields() []string {
return []string{
traceParent,
traceState,
fmt.Sprintf("%s-%s", headerPrefix, encodedCtx),
}

Check warning on line 199 in cue/cuex/runtime/tracer.go

View check run for this annotation

Codecov / codecov/patch

cue/cuex/runtime/tracer.go#L194-L199

Added lines #L194 - L199 were not covered by tests
}
Loading
Loading