Skip to content

Commit 03df3ab

Browse files
committed
Feat: Add Tracing Headers and OTel Baggage with CueX Requests
Signed-off-by: Brian Kane <[email protected]>
1 parent b0310a9 commit 03df3ab

File tree

6 files changed

+575
-4
lines changed

6 files changed

+575
-4
lines changed

cue/cuex/externalserver/server.go

+3
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ package externalserver
1919
import (
2020
"context"
2121
"encoding/json"
22+
"github.com/kubevela/pkg/cue/cuex/runtime"
2223
"io"
2324
"net/http"
2425

@@ -39,6 +40,8 @@ type GenericServerProviderFn[T any, U any] func(context.Context, *T) (*U, error)
3940

4041
// Call handle rest call for given request
4142
func (fn GenericServerProviderFn[T, U]) Call(request *restful.Request, response *restful.Response) {
43+
ctx := runtime.ContextFromHeaders(request.Request)
44+
request.Request = request.Request.WithContext(ctx)
4245
bs, err := io.ReadAll(request.Request.Body)
4346
if err != nil {
4447
_ = response.WriteError(http.StatusBadRequest, err)

cue/cuex/externalserver/server_test.go

+86
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,11 @@ import (
2121
"context"
2222
"encoding/json"
2323
"fmt"
24+
"github.com/stretchr/testify/assert"
25+
"go.opentelemetry.io/otel/propagation"
26+
"go.opentelemetry.io/otel/trace"
2427
"io"
28+
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
2529
"net/http"
2630
"net/http/httptest"
2731
"testing"
@@ -115,3 +119,85 @@ func TestExternalServer(t *testing.T) {
115119
})
116120
}
117121
}
122+
123+
func TestExternalServerBaggage(t *testing.T) {
124+
for name, tt := range map[string]struct {
125+
IncludeBaggage bool
126+
}{
127+
"withBaggage": {
128+
IncludeBaggage: true,
129+
},
130+
131+
"withoutBaggage": {
132+
IncludeBaggage: false,
133+
},
134+
} {
135+
t.Run(name, func(t *testing.T) {
136+
var ctx context.Context
137+
var span trace.Span
138+
if tt.IncludeBaggage {
139+
jsonCtx := &unstructured.Unstructured{
140+
Object: map[string]interface{}{
141+
"appName": "app-name",
142+
"namespace": "a-namespace",
143+
},
144+
}
145+
ctx, span, _ = cuexruntime.StartSpanWithBaggage(context.Background(), "span", jsonCtx)
146+
} else {
147+
ctx, span = cuexruntime.StartSpan(context.Background(), "span")
148+
}
149+
defer span.End()
150+
151+
fn := externalserver.GenericServerProviderFn[null, propagationCheckResp](propagationCheck)
152+
payload := []byte("{}")
153+
154+
httpReq := httptest.
155+
NewRequest(http.MethodPost, "/propagationCheck", bytes.NewReader(payload)).
156+
WithContext(ctx)
157+
158+
traceHeaderPropagator := cuexruntime.TraceHeaderPropagator{}
159+
traceHeaderPropagator.Inject(ctx, propagation.HeaderCarrier(httpReq.Header))
160+
161+
httpResp := httptest.NewRecorder()
162+
163+
req := restful.NewRequest(httpReq)
164+
resp := restful.NewResponse(httpResp)
165+
166+
fn.Call(req, resp)
167+
168+
var response propagationCheckResp
169+
err := json.Unmarshal(httpResp.Body.Bytes(), &response)
170+
require.NoError(t, err)
171+
assert.Equal(t, httpResp.Code, 200)
172+
173+
assert.NotEmpty(t, response.TraceID)
174+
assert.NotEmpty(t, response.SpanID)
175+
if tt.IncludeBaggage {
176+
assert.Equal(t, "app-name", response.Context["appName"])
177+
assert.Equal(t, "a-namespace", response.Context["namespace"])
178+
} else {
179+
assert.Empty(t, response.Context)
180+
}
181+
})
182+
}
183+
}
184+
185+
type propagationCheckResp struct {
186+
TraceID string `json:"traceID"`
187+
SpanID string `json:"spanID"`
188+
Context map[string]interface{} `json:"context"`
189+
}
190+
191+
type null struct{}
192+
193+
func propagationCheck(ctx context.Context, input *null) (*propagationCheckResp, error) {
194+
span := trace.SpanFromContext(ctx)
195+
pCtx, _ := cuexruntime.GetPropagatedContext(ctx)
196+
var mapCtx map[string]interface{}
197+
_ = pCtx.UnmarshalContext(&mapCtx)
198+
return &propagationCheckResp{
199+
TraceID: span.SpanContext().TraceID().String(),
200+
SpanID: span.SpanContext().SpanID().String(),
201+
Context: mapCtx,
202+
}, nil
203+
}

cue/cuex/runtime/provider.go

+7
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ import (
2222
"crypto/tls"
2323
"encoding/json"
2424
"fmt"
25+
"go.opentelemetry.io/otel/propagation"
2526
"io"
2627
"net/http"
2728
"strings"
@@ -90,6 +91,7 @@ func (in *ExternalProviderFn) Call(ctx context.Context, value cue.Value) (cue.Va
9091
case v1alpha1.ProtocolHTTP, v1alpha1.ProtocolHTTPS:
9192
ep := fmt.Sprintf("%s/%s", strings.TrimSuffix(in.Endpoint, "/"), in.Fn)
9293
req, err := http.NewRequest(http.MethodPost, ep, bytes.NewReader(bs))
94+
in.InjectHeaders(ctx, req)
9395
if err != nil {
9496
return value, err
9597
}
@@ -117,6 +119,11 @@ func (in *ExternalProviderFn) Call(ctx context.Context, value cue.Value) (cue.Va
117119
return value.FillPath(cue.ParsePath(providers.ReturnsKey), ret), nil
118120
}
119121

122+
// InjectHeaders Injects headers from the current span into the http request headers
123+
func (in *ExternalProviderFn) InjectHeaders(ctx context.Context, r *http.Request) {
124+
TraceHeaderPropagator{}.Inject(ctx, propagation.HeaderCarrier(r.Header))
125+
}
126+
120127
var _ ProviderFn = NativeProviderFn(nil)
121128

122129
// NativeProviderFn native function that implements ProviderFn interface

cue/cuex/runtime/tracer.go

+200
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,200 @@
1+
package runtime
2+
3+
import (
4+
"context"
5+
"cuelang.org/go/cue"
6+
"cuelang.org/go/cue/cuecontext"
7+
"encoding/base64"
8+
"encoding/json"
9+
"errors"
10+
"fmt"
11+
"go.opentelemetry.io/otel"
12+
"go.opentelemetry.io/otel/baggage"
13+
"go.opentelemetry.io/otel/propagation"
14+
tracesdk "go.opentelemetry.io/otel/sdk/trace"
15+
"go.opentelemetry.io/otel/trace"
16+
"k8s.io/klog"
17+
"net/http"
18+
"strings"
19+
)
20+
21+
var newBaggageMember = baggage.NewMember // function injection for tests
22+
var newBaggage = baggage.New // function injection for tests
23+
24+
func init() {
25+
// set the default OTel implementation if the controller did not already handle the setup
26+
// fallback functionality - preferred to update the controller to handle this
27+
oTelTracerProviderSet := otel.GetTracerProvider().Tracer("check") != otel.Tracer("check")
28+
if !oTelTracerProviderSet {
29+
tp := tracesdk.NewTracerProvider()
30+
otel.SetTracerProvider(tp)
31+
}
32+
}
33+
34+
type ctxKey string
35+
36+
// PropagatedCtx .
37+
type PropagatedCtx struct {
38+
pCtx []byte
39+
context.Context
40+
}
41+
42+
const (
43+
key ctxKey = "PropagatedCtx"
44+
headerPrefix = "X-Vela"
45+
traceParent = "traceparent"
46+
traceState = "tracestate"
47+
encodedCtx = "Encoded-Context"
48+
)
49+
50+
// StartSpan creates a new OpenTelemetry span and returns the updated context and span.
51+
func StartSpan(ctx context.Context, name string) (context.Context, trace.Span) {
52+
tracer := otel.Tracer(name)
53+
ctx, span := tracer.Start(ctx, name)
54+
return ctx, span
55+
}
56+
57+
// StartSpanWithBaggage creates a new OpenTelemetry span and attaches encoded JSON context as baggage.
58+
func StartSpanWithBaggage(ctx context.Context, name string, hCtx json.Marshaler) (context.Context, trace.Span, error) {
59+
jsonCtx, err := json.Marshal(hCtx)
60+
if err != nil {
61+
klog.Errorf("failed to marshal json: %v", err)
62+
ctx, span := StartSpan(ctx, name)
63+
return ctx, span, err
64+
}
65+
66+
member, err := newBaggageMember(strings.ToLower(encodedCtx), base64.StdEncoding.EncodeToString(jsonCtx))
67+
if err != nil {
68+
klog.Warningf("failed to encode context baggage header: \n%v", err)
69+
} else {
70+
bag, err := newBaggage(member)
71+
if err != nil {
72+
klog.Errorf("failed to create context bag: \n%v", err)
73+
ctx, span := StartSpan(ctx, name)
74+
return ctx, span, err
75+
}
76+
ctx = baggage.ContextWithBaggage(ctx, bag)
77+
}
78+
79+
ctx, span := StartSpan(ctx, name)
80+
return ctx, span, err
81+
}
82+
83+
// WithBaggage appends additional baggage entries to the context.
84+
func WithBaggage(ctx context.Context, b map[string]string) (context.Context, error) {
85+
members := baggage.FromContext(ctx).Members()
86+
var failures []string
87+
for k, v := range b {
88+
m, err := newBaggageMember(k, v)
89+
if err != nil {
90+
klog.Errorf("failed to create new member\n%v", err)
91+
failures = append(failures, fmt.Sprintf("failed to create new member %s = %s", k, v))
92+
continue
93+
}
94+
members = append(members, m)
95+
}
96+
bag, err := newBaggage(members...)
97+
if err != nil {
98+
klog.Errorf("failed to create context bag: \n%v", err)
99+
return ctx, err
100+
}
101+
102+
if len(failures) > 0 {
103+
klog.Warningf("Baggage created with failed members!")
104+
return baggage.ContextWithBaggage(ctx, bag), errors.New(strings.Join(failures, "\n"))
105+
}
106+
return baggage.ContextWithBaggage(ctx, bag), nil
107+
}
108+
109+
// TraceHeaderPropagator .
110+
type TraceHeaderPropagator struct {
111+
traceContextPropagator propagation.TraceContext
112+
}
113+
114+
// Inject serializes the span baggage and trace context into the provided HTTP headers.
115+
func (cp TraceHeaderPropagator) Inject(ctx context.Context, carrier propagation.TextMapCarrier) {
116+
bag := baggage.FromContext(ctx)
117+
for _, member := range bag.Members() {
118+
headerName := fmt.Sprintf("%s-%s", headerPrefix, member.Key())
119+
carrier.Set(headerName, member.Value())
120+
}
121+
cp.traceContextPropagator.Inject(ctx, carrier)
122+
}
123+
124+
// Extract reconstructs the span context and baggage from the provided HTTP headers.
125+
func (cp TraceHeaderPropagator) Extract(ctx context.Context, carrier propagation.TextMapCarrier) context.Context {
126+
ctx = TraceHeaderPropagator{}.traceContextPropagator.Extract(ctx, carrier)
127+
appContext := &PropagatedCtx{
128+
Context: ctx,
129+
}
130+
for _, hKey := range carrier.Keys() {
131+
value := carrier.Get(hKey)
132+
if strings.HasPrefix(hKey, headerPrefix) {
133+
strippedKey := strings.TrimPrefix(hKey, headerPrefix+"-")
134+
135+
switch strippedKey {
136+
137+
case encodedCtx:
138+
dataBytes, err := base64.StdEncoding.DecodeString(value)
139+
if err != nil {
140+
klog.Errorf("error decoding base64 context: %v\n", err)
141+
continue
142+
}
143+
appContext.pCtx = dataBytes
144+
145+
case traceParent, traceState: // do nothing
146+
147+
default:
148+
appContext.Context = context.WithValue(appContext.Context, strings.ToLower(strippedKey), value)
149+
}
150+
}
151+
}
152+
153+
return context.WithValue(ctx, key, appContext)
154+
}
155+
156+
// RawJSON returns the embedded JSON context as a json.RawMessage.
157+
func (p *PropagatedCtx) RawJSON() json.RawMessage {
158+
return json.RawMessage(p.pCtx)
159+
}
160+
161+
// UnmarshalContext unmarshals the embedded JSON context into the provided struct.
162+
func (p *PropagatedCtx) UnmarshalContext(out interface{}) error {
163+
return json.Unmarshal(p.pCtx, out)
164+
}
165+
166+
// GetCueContext returns the embedded JSON context as a CUE value.
167+
func (p *PropagatedCtx) GetCueContext() (*cue.Value, error) {
168+
cueCtx := cuecontext.New()
169+
cueVal := cueCtx.CompileString(string(p.pCtx))
170+
if cueVal.Err() != nil {
171+
return &cue.Value{}, cueVal.Err()
172+
}
173+
return &cueVal, nil
174+
}
175+
176+
// ContextFromHeaders extracts the span context and baggage from an HTTP request and returns the reconstructed ctx.
177+
func ContextFromHeaders(r *http.Request) context.Context {
178+
return TraceHeaderPropagator{}.Extract(context.Background(), propagation.HeaderCarrier(r.Header))
179+
}
180+
181+
// GetPropagatedContext retrieves the PropagatedCtx from the given context if present.
182+
func GetPropagatedContext(ctx context.Context) (*PropagatedCtx, bool) {
183+
pCtx := ctx.Value(key)
184+
if pCtx != nil {
185+
if pc, ok := pCtx.(*PropagatedCtx); ok {
186+
return pc, true
187+
}
188+
return &PropagatedCtx{pCtx: []byte("")}, false
189+
}
190+
return &PropagatedCtx{pCtx: []byte("")}, false
191+
}
192+
193+
// Fields returns the list of HTTP headers used for trace context propagation.
194+
func (cp TraceHeaderPropagator) Fields() []string {
195+
return []string{
196+
traceParent,
197+
traceState,
198+
fmt.Sprintf("%s-%s", headerPrefix, encodedCtx),
199+
}
200+
}

0 commit comments

Comments
 (0)