Skip to content

Commit fd8a668

Browse files
committed
Feat: Add Tracing Headers and OTel Baggage with CueX Requests
Signed-off-by: Brian Kane <[email protected]>
1 parent b0310a9 commit fd8a668

File tree

6 files changed

+418
-4
lines changed

6 files changed

+418
-4
lines changed

cue/cuex/externalserver/server.go

+3
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ package externalserver
1919
import (
2020
"context"
2121
"encoding/json"
22+
"github.com/kubevela/pkg/cue/cuex/runtime"
2223
"io"
2324
"net/http"
2425

@@ -39,6 +40,8 @@ type GenericServerProviderFn[T any, U any] func(context.Context, *T) (*U, error)
3940

4041
// Call handle rest call for given request
4142
func (fn GenericServerProviderFn[T, U]) Call(request *restful.Request, response *restful.Response) {
43+
ctx := runtime.ContextFromHeaders(request.Request)
44+
request.Request = request.Request.WithContext(ctx)
4245
bs, err := io.ReadAll(request.Request.Body)
4346
if err != nil {
4447
_ = response.WriteError(http.StatusBadRequest, err)

cue/cuex/externalserver/server_test.go

+86
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,11 @@ import (
2121
"context"
2222
"encoding/json"
2323
"fmt"
24+
"github.com/stretchr/testify/assert"
25+
"go.opentelemetry.io/otel/propagation"
26+
"go.opentelemetry.io/otel/trace"
2427
"io"
28+
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
2529
"net/http"
2630
"net/http/httptest"
2731
"testing"
@@ -115,3 +119,85 @@ func TestExternalServer(t *testing.T) {
115119
})
116120
}
117121
}
122+
123+
func TestExternalServerBaggage(t *testing.T) {
124+
for name, tt := range map[string]struct {
125+
IncludeBaggage bool
126+
}{
127+
"withBaggage": {
128+
IncludeBaggage: true,
129+
},
130+
131+
"withoutBaggage": {
132+
IncludeBaggage: false,
133+
},
134+
} {
135+
t.Run(name, func(t *testing.T) {
136+
var ctx context.Context
137+
var span trace.Span
138+
if tt.IncludeBaggage {
139+
jsonCtx := &unstructured.Unstructured{
140+
Object: map[string]interface{}{
141+
"appName": "app-name",
142+
"namespace": "a-namespace",
143+
},
144+
}
145+
ctx, span, _ = cuexruntime.StartSpanWithBaggage(context.Background(), "span", jsonCtx)
146+
} else {
147+
ctx, span, _ = cuexruntime.StartSpan(context.Background(), "span")
148+
}
149+
defer span.End()
150+
151+
fn := externalserver.GenericServerProviderFn[null, propagationCheckResp](propagationCheck)
152+
payload := []byte("{}")
153+
154+
httpReq := httptest.
155+
NewRequest(http.MethodPost, "/propagationCheck", bytes.NewReader(payload)).
156+
WithContext(ctx)
157+
158+
traceHeaderPropagator := cuexruntime.TraceHeaderPropagator{}
159+
traceHeaderPropagator.Inject(ctx, propagation.HeaderCarrier(httpReq.Header))
160+
161+
httpResp := httptest.NewRecorder()
162+
163+
req := restful.NewRequest(httpReq)
164+
resp := restful.NewResponse(httpResp)
165+
166+
fn.Call(req, resp)
167+
168+
var response propagationCheckResp
169+
err := json.Unmarshal(httpResp.Body.Bytes(), &response)
170+
require.NoError(t, err)
171+
assert.Equal(t, httpResp.Code, 200)
172+
173+
assert.NotEmpty(t, response.TraceID)
174+
assert.NotEmpty(t, response.SpanID)
175+
if tt.IncludeBaggage {
176+
assert.Equal(t, "app-name", response.Context["appName"])
177+
assert.Equal(t, "a-namespace", response.Context["namespace"])
178+
} else {
179+
assert.Empty(t, response.Context)
180+
}
181+
})
182+
}
183+
}
184+
185+
type propagationCheckResp struct {
186+
TraceID string `json:"traceID"`
187+
SpanID string `json:"spanID"`
188+
Context map[string]interface{} `json:"context"`
189+
}
190+
191+
type null struct{}
192+
193+
func propagationCheck(ctx context.Context, input *null) (*propagationCheckResp, error) {
194+
span := trace.SpanFromContext(ctx)
195+
pCtx, _ := cuexruntime.GetPropagatedContext(ctx)
196+
var mapCtx map[string]interface{}
197+
_ = pCtx.UnmarshalContext(&mapCtx)
198+
return &propagationCheckResp{
199+
TraceID: span.SpanContext().TraceID().String(),
200+
SpanID: span.SpanContext().SpanID().String(),
201+
Context: mapCtx,
202+
}, nil
203+
}

cue/cuex/runtime/provider.go

+8
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ import (
2222
"crypto/tls"
2323
"encoding/json"
2424
"fmt"
25+
"go.opentelemetry.io/otel/propagation"
2526
"io"
2627
"net/http"
2728
"strings"
@@ -35,6 +36,7 @@ import (
3536
)
3637

3738
var _ ProviderFn = GenericProviderFn[any, any](nil)
39+
var traceHeaderPropagator = TraceHeaderPropagator{}
3840

3941
// GenericProviderFn generic function that implements ProviderFn interface
4042
type GenericProviderFn[T any, U any] func(context.Context, *T) (*U, error)
@@ -90,6 +92,7 @@ func (in *ExternalProviderFn) Call(ctx context.Context, value cue.Value) (cue.Va
9092
case v1alpha1.ProtocolHTTP, v1alpha1.ProtocolHTTPS:
9193
ep := fmt.Sprintf("%s/%s", strings.TrimSuffix(in.Endpoint, "/"), in.Fn)
9294
req, err := http.NewRequest(http.MethodPost, ep, bytes.NewReader(bs))
95+
in.InjectHeaders(ctx, req)
9396
if err != nil {
9497
return value, err
9598
}
@@ -117,6 +120,11 @@ func (in *ExternalProviderFn) Call(ctx context.Context, value cue.Value) (cue.Va
117120
return value.FillPath(cue.ParsePath(providers.ReturnsKey), ret), nil
118121
}
119122

123+
// InjectHeaders Injects headers from the current span into the http request headers
124+
func (in *ExternalProviderFn) InjectHeaders(ctx context.Context, r *http.Request) {
125+
traceHeaderPropagator.Inject(ctx, propagation.HeaderCarrier(r.Header))
126+
}
127+
120128
var _ ProviderFn = NativeProviderFn(nil)
121129

122130
// NativeProviderFn native function that implements ProviderFn interface

cue/cuex/runtime/tracer.go

+187
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,187 @@
1+
package runtime
2+
3+
import (
4+
"context"
5+
"cuelang.org/go/cue"
6+
"cuelang.org/go/cue/cuecontext"
7+
"encoding/base64"
8+
"encoding/json"
9+
"fmt"
10+
"go.opentelemetry.io/otel"
11+
"go.opentelemetry.io/otel/baggage"
12+
"go.opentelemetry.io/otel/propagation"
13+
tracesdk "go.opentelemetry.io/otel/sdk/trace"
14+
"go.opentelemetry.io/otel/trace"
15+
"k8s.io/klog"
16+
"net/http"
17+
"strings"
18+
)
19+
20+
func init() {
21+
// set the default OTel implementation if the controller did not already handle the setup
22+
// fallback functionality - preferred to update the controller to handle this
23+
oTelTracerProviderSet := otel.GetTracerProvider().Tracer("check") != otel.Tracer("check")
24+
if !oTelTracerProviderSet {
25+
tp := tracesdk.NewTracerProvider()
26+
otel.SetTracerProvider(tp)
27+
}
28+
}
29+
30+
type ctxKey string
31+
32+
// PropagatedCtx .
33+
type PropagatedCtx struct {
34+
pCtx []byte
35+
context.Context
36+
}
37+
38+
const (
39+
key ctxKey = "PropagatedCtx"
40+
headerPrefix = "X-Vela"
41+
traceParent = "traceparent"
42+
traceState = "tracestate"
43+
encodedCtx = "Encoded-Context"
44+
)
45+
46+
// StartSpan creates a new OpenTelemetry span and returns the updated context and span.
47+
func StartSpan(ctx context.Context, name string) (context.Context, trace.Span, error) {
48+
tracer := otel.Tracer(name)
49+
ctx, span := tracer.Start(ctx, name)
50+
return ctx, span, nil
51+
}
52+
53+
// StartSpanWithBaggage creates a new OpenTelemetry span and attaches encoded JSON context as baggage.
54+
func StartSpanWithBaggage(ctx context.Context, name string, hCtx json.Marshaler) (context.Context, trace.Span, error) {
55+
jsonCtx, err := json.Marshal(hCtx)
56+
if err != nil {
57+
klog.Errorf("failed to marshal json: %v", err)
58+
return nil, nil, err
59+
}
60+
61+
member, err := baggage.NewMember(strings.ToLower(encodedCtx), base64.StdEncoding.EncodeToString(jsonCtx))
62+
if err != nil {
63+
klog.Errorf("failed to encode context baggage header: \n%v", err)
64+
return nil, nil, err
65+
}
66+
bag, err := baggage.New(member)
67+
if err != nil {
68+
klog.Errorf("failed to create context bag: \n%v", err)
69+
return nil, nil, err
70+
}
71+
ctx = baggage.ContextWithBaggage(ctx, bag)
72+
73+
ctx, span, err := StartSpan(ctx, name)
74+
return ctx, span, err
75+
}
76+
77+
// WithBaggage appends additional baggage entries to the context.
78+
func WithBaggage(ctx context.Context, b map[string]string) (context.Context, error) {
79+
members := baggage.FromContext(ctx).Members()
80+
for k, v := range b {
81+
m, err := baggage.NewMember(k, v)
82+
if err != nil {
83+
klog.Errorf("failed to create new member\n%v", err)
84+
continue
85+
}
86+
members = append(members, m)
87+
}
88+
bag, err := baggage.New(members...)
89+
if err != nil {
90+
klog.Errorf("failed to create context bag: \n%v", err)
91+
return ctx, err
92+
}
93+
return baggage.ContextWithBaggage(ctx, bag), nil
94+
}
95+
96+
// TraceHeaderPropagator .
97+
type TraceHeaderPropagator struct {
98+
traceContextPropagator propagation.TraceContext
99+
}
100+
101+
// Inject serializes the span baggage and trace context into the provided HTTP headers.
102+
func (cp TraceHeaderPropagator) Inject(ctx context.Context, carrier propagation.TextMapCarrier) {
103+
bag := baggage.FromContext(ctx)
104+
for _, member := range bag.Members() {
105+
headerName := fmt.Sprintf("%s-%s", headerPrefix, member.Key())
106+
carrier.Set(headerName, member.Value())
107+
}
108+
cp.traceContextPropagator.Inject(ctx, carrier)
109+
}
110+
111+
// Extract reconstructs the span context and baggage from the provided HTTP headers.
112+
func (cp TraceHeaderPropagator) Extract(ctx context.Context, carrier propagation.TextMapCarrier) context.Context {
113+
ctx = traceHeaderPropagator.traceContextPropagator.Extract(ctx, carrier)
114+
appContext := &PropagatedCtx{
115+
Context: ctx,
116+
}
117+
for _, hKey := range carrier.Keys() {
118+
value := carrier.Get(hKey)
119+
if strings.HasPrefix(hKey, headerPrefix) {
120+
strippedKey := strings.TrimPrefix(hKey, headerPrefix+"-")
121+
122+
switch strippedKey {
123+
124+
case encodedCtx:
125+
dataBytes, err := base64.StdEncoding.DecodeString(value)
126+
appContext.pCtx = dataBytes
127+
if err != nil {
128+
klog.Errorf("error decoding base64 context: %v\n", err)
129+
continue
130+
}
131+
132+
case traceParent, traceState: // do nothing
133+
134+
default:
135+
appContext.Context = context.WithValue(appContext.Context, strings.ToLower(strippedKey), value)
136+
}
137+
}
138+
}
139+
140+
return context.WithValue(ctx, key, appContext)
141+
}
142+
143+
// RawJSON returns the embedded JSON context as a json.RawMessage.
144+
func (p *PropagatedCtx) RawJSON() json.RawMessage {
145+
return json.RawMessage(p.pCtx)
146+
}
147+
148+
// UnmarshalContext unmarshals the embedded JSON context into the provided struct.
149+
func (p *PropagatedCtx) UnmarshalContext(out interface{}) error {
150+
return json.Unmarshal(p.pCtx, out)
151+
}
152+
153+
// GetCueContext returns the embedded JSON context as a CUE value.
154+
func (p *PropagatedCtx) GetCueContext() (*cue.Value, error) {
155+
cueCtx := cuecontext.New()
156+
cueVal := cueCtx.CompileString(string(p.pCtx))
157+
if cueVal.Err() != nil {
158+
return &cue.Value{}, cueVal.Err()
159+
}
160+
return &cueVal, nil
161+
}
162+
163+
// ContextFromHeaders extracts the span context and baggage from an HTTP request and returns the reconstructed ctx.
164+
func ContextFromHeaders(r *http.Request) context.Context {
165+
return traceHeaderPropagator.Extract(context.Background(), propagation.HeaderCarrier(r.Header))
166+
}
167+
168+
// GetPropagatedContext retrieves the PropagatedCtx from the given context if present.
169+
func GetPropagatedContext(ctx context.Context) (*PropagatedCtx, bool) {
170+
pCtx := ctx.Value(key)
171+
if pCtx != nil {
172+
if pc, ok := pCtx.(*PropagatedCtx); ok {
173+
return pc, true
174+
}
175+
return nil, false
176+
}
177+
return nil, false
178+
}
179+
180+
// Fields returns the list of HTTP headers used for trace context propagation.
181+
func (cp TraceHeaderPropagator) Fields() []string {
182+
return []string{
183+
traceParent,
184+
traceState,
185+
fmt.Sprintf("%s-%s", headerPrefix, encodedCtx),
186+
}
187+
}

0 commit comments

Comments
 (0)