Skip to content

Commit 7ae2f3d

Browse files
committed
OTEL-2540 Add SpanContext to persistent queue
1 parent 5ee4816 commit 7ae2f3d

File tree

2 files changed

+271
-8
lines changed

2 files changed

+271
-8
lines changed

exporter/exporterhelper/internal/queuebatch/persistent_queue.go

+92-8
Original file line numberDiff line numberDiff line change
@@ -6,11 +6,14 @@ package queuebatch // import "go.opentelemetry.io/collector/exporter/exporterhel
66
import (
77
"context"
88
"encoding/binary"
9+
"encoding/hex"
10+
"encoding/json"
911
"errors"
1012
"fmt"
1113
"strconv"
1214
"sync"
1315

16+
"go.opentelemetry.io/otel/trace"
1417
"go.uber.org/zap"
1518

1619
"go.opentelemetry.io/collector/component"
@@ -29,6 +32,8 @@ const (
2932
writeIndexKey = "wi"
3033
currentlyDispatchedItemsKey = "di"
3134
queueSizeKey = "si"
35+
36+
errInvalidTraceFlagsLength = "trace flags must only be 1 byte"
3237
)
3338

3439
var (
@@ -238,6 +243,56 @@ func (pq *persistentQueue[T]) Offer(ctx context.Context, req T) error {
238243
return pq.putInternal(ctx, req)
239244
}
240245

246+
type marshaledRequestWithSpanContext struct {
247+
RequestBytes []byte `json:"request"`
248+
SpanContextJSON json.RawMessage `json:"span_context"`
249+
}
250+
251+
type spanContextConfigWrapper struct {
252+
TraceID string
253+
SpanID string
254+
TraceFlags string
255+
TraceState string
256+
Remote bool
257+
}
258+
259+
func SpanContextFromWrapper(wrapper spanContextConfigWrapper) (*trace.SpanContext, error) {
260+
traceID, err := trace.TraceIDFromHex(wrapper.TraceID)
261+
if err != nil {
262+
return nil, err
263+
}
264+
spanID, err := trace.SpanIDFromHex(wrapper.SpanID)
265+
if err != nil {
266+
return nil, err
267+
}
268+
decoded, err := hex.DecodeString(wrapper.TraceFlags)
269+
if err != nil {
270+
return nil, err
271+
}
272+
if len(decoded) != 1 {
273+
return nil, errors.New(errInvalidTraceFlagsLength)
274+
}
275+
traceFlags := trace.TraceFlags(decoded[0])
276+
traceState, err := trace.ParseTraceState(wrapper.TraceState)
277+
if err != nil {
278+
return nil, err
279+
}
280+
281+
sc := trace.NewSpanContext(trace.SpanContextConfig{
282+
TraceID: traceID,
283+
SpanID: spanID,
284+
TraceFlags: traceFlags,
285+
TraceState: traceState,
286+
Remote: wrapper.Remote,
287+
})
288+
289+
if !sc.IsValid() {
290+
return nil, nil
291+
}
292+
293+
return &sc, nil
294+
}
295+
241296
// putInternal is the internal version that requires caller to hold the mutex lock.
242297
func (pq *persistentQueue[T]) putInternal(ctx context.Context, req T) error {
243298
reqSize := pq.set.sizer.Sizeof(req)
@@ -254,11 +309,24 @@ func (pq *persistentQueue[T]) putInternal(ctx context.Context, req T) error {
254309
if err != nil {
255310
return err
256311
}
257-
312+
// Retrieve SpanContext object from provided context, and store alongside the request
313+
sc := trace.SpanContextFromContext(ctx)
314+
scJSON, err := json.Marshal(sc)
315+
if err != nil {
316+
return err
317+
}
318+
envelope := marshaledRequestWithSpanContext{
319+
RequestBytes: reqBuf,
320+
SpanContextJSON: scJSON,
321+
}
322+
envelopeBytes, err := json.Marshal(envelope)
323+
if err != nil {
324+
return err
325+
}
258326
// Carry out a transaction where we both add the item and update the write index
259327
ops := []*storage.Operation{
260328
storage.SetOperation(writeIndexKey, itemIndexToBytes(pq.writeIndex+1)),
261-
storage.SetOperation(getItemKey(pq.writeIndex), reqBuf),
329+
storage.SetOperation(getItemKey(pq.writeIndex), envelopeBytes),
262330
}
263331
if err = pq.client.Batch(ctx, ops...); err != nil {
264332
return err
@@ -291,7 +359,7 @@ func (pq *persistentQueue[T]) Read(ctx context.Context) (context.Context, T, Don
291359

292360
// Read until either a successful retrieved element or no more elements in the storage.
293361
for pq.readIndex != pq.writeIndex {
294-
index, req, consumed := pq.getNextItem(ctx)
362+
index, req, consumed, restoredContext := pq.getNextItem(ctx)
295363
// Ensure the used size and the channel size are in sync.
296364
if pq.readIndex == pq.writeIndex {
297365
pq.queueSize = 0
@@ -300,7 +368,7 @@ func (pq *persistentQueue[T]) Read(ctx context.Context) (context.Context, T, Don
300368
if consumed {
301369
id := indexDonePool.Get().(*indexDone)
302370
id.reset(index, pq.set.sizer.Sizeof(req), pq)
303-
return context.Background(), req, id, true
371+
return restoredContext, req, id, true
304372
}
305373
}
306374

@@ -313,7 +381,7 @@ func (pq *persistentQueue[T]) Read(ctx context.Context) (context.Context, T, Don
313381
// getNextItem pulls the next available item from the persistent storage along with its index. Once processing is
314382
// finished, the index should be called with onDone to clean up the storage. If no new item is available,
315383
// returns false.
316-
func (pq *persistentQueue[T]) getNextItem(ctx context.Context) (uint64, T, bool) {
384+
func (pq *persistentQueue[T]) getNextItem(ctx context.Context) (uint64, T, bool, context.Context) {
317385
index := pq.readIndex
318386
// Increase here, so even if errors happen below, it always iterates
319387
pq.readIndex++
@@ -325,8 +393,24 @@ func (pq *persistentQueue[T]) getNextItem(ctx context.Context) (uint64, T, bool)
325393
getOp)
326394

327395
var request T
396+
restoredContext := context.Background()
328397
if err == nil {
329-
request, err = pq.set.encoding.Unmarshal(getOp.Value)
398+
var envelope marshaledRequestWithSpanContext
399+
if err = json.Unmarshal(getOp.Value, &envelope); err == nil {
400+
// Unmarshal the request using the specified encoding
401+
if request, err = pq.set.encoding.Unmarshal(envelope.RequestBytes); err == nil {
402+
// Unmarshal the SpanContext from JSON
403+
var wrapper spanContextConfigWrapper
404+
if len(envelope.SpanContextJSON) > 0 {
405+
if err = json.Unmarshal(envelope.SpanContextJSON, &wrapper); err == nil {
406+
var sc *trace.SpanContext
407+
if sc, err = SpanContextFromWrapper(wrapper); err == nil && sc != nil {
408+
restoredContext = trace.ContextWithSpanContext(restoredContext, *sc)
409+
}
410+
}
411+
}
412+
}
413+
}
330414
}
331415

332416
if err != nil {
@@ -336,14 +420,14 @@ func (pq *persistentQueue[T]) getNextItem(ctx context.Context) (uint64, T, bool)
336420
pq.logger.Error("Error deleting item from queue", zap.Error(err))
337421
}
338422

339-
return 0, request, false
423+
return 0, request, false, restoredContext
340424
}
341425

342426
// Increase the reference count, so the client is not closed while the request is being processed.
343427
// The client cannot be closed because we hold the lock since last we checked `stopped`.
344428
pq.refClient++
345429

346-
return index, request, true
430+
return index, request, true, restoredContext
347431
}
348432

349433
// onDone should be called to remove the item of the given index from the queue once processing is finished.

exporter/exporterhelper/internal/queuebatch/persistent_queue_test.go

+179
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ import (
1818

1919
"github.com/stretchr/testify/assert"
2020
"github.com/stretchr/testify/require"
21+
"go.opentelemetry.io/otel/trace"
2122

2223
"go.opentelemetry.io/collector/component"
2324
"go.opentelemetry.io/collector/component/componenttest"
@@ -1205,3 +1206,181 @@ func requireCurrentlyDispatchedItemsEqual(t *testing.T, pq *persistentQueue[uint
12051206
defer pq.mu.Unlock()
12061207
assert.ElementsMatch(t, compare, pq.currentlyDispatchedItems)
12071208
}
1209+
1210+
func TestSpanContextFromWrapper(t *testing.T) {
1211+
testCases := []struct {
1212+
name string
1213+
wrapper spanContextConfigWrapper
1214+
expectErr bool
1215+
errContains string
1216+
expectNil bool
1217+
expectValid bool
1218+
expectTraceID string
1219+
expectSpanID string
1220+
expectFlags string
1221+
expectState string
1222+
expectRemote bool
1223+
}{
1224+
{
1225+
name: "invalid trace id",
1226+
wrapper: spanContextConfigWrapper{
1227+
TraceID: "invalidtraceid",
1228+
SpanID: "0102030405060708",
1229+
TraceFlags: "01",
1230+
TraceState: "",
1231+
Remote: false,
1232+
},
1233+
expectErr: true,
1234+
expectNil: true,
1235+
},
1236+
{
1237+
name: "invalid span id",
1238+
wrapper: spanContextConfigWrapper{
1239+
TraceID: "0102030405060708090a0b0c0d0e0f10",
1240+
SpanID: "invalidspanid",
1241+
TraceFlags: "01",
1242+
TraceState: "",
1243+
Remote: false,
1244+
},
1245+
expectErr: true,
1246+
expectNil: true,
1247+
},
1248+
{
1249+
name: "invalid trace flags hex",
1250+
wrapper: spanContextConfigWrapper{
1251+
TraceID: "0102030405060708090a0b0c0d0e0f10",
1252+
SpanID: "0102030405060708",
1253+
TraceFlags: "zz",
1254+
TraceState: "",
1255+
Remote: false,
1256+
},
1257+
expectErr: true,
1258+
expectNil: true,
1259+
},
1260+
{
1261+
name: "invalid trace flags length",
1262+
wrapper: spanContextConfigWrapper{
1263+
TraceID: "0102030405060708090a0b0c0d0e0f10",
1264+
SpanID: "0102030405060708",
1265+
TraceFlags: "0102",
1266+
TraceState: "",
1267+
Remote: false,
1268+
},
1269+
expectErr: true,
1270+
expectNil: true,
1271+
errContains: errInvalidTraceFlagsLength,
1272+
},
1273+
{
1274+
name: "invalid trace state",
1275+
wrapper: spanContextConfigWrapper{
1276+
TraceID: "0102030405060708090a0b0c0d0e0f10",
1277+
SpanID: "0102030405060708",
1278+
TraceFlags: "01",
1279+
TraceState: "invalid=tracestate,=bad",
1280+
Remote: false,
1281+
},
1282+
expectErr: true,
1283+
expectNil: true,
1284+
},
1285+
{
1286+
name: "valid span context",
1287+
wrapper: spanContextConfigWrapper{
1288+
TraceID: "0102030405060708090a0b0c0d0e0f10",
1289+
SpanID: "0102030405060708",
1290+
TraceFlags: "01",
1291+
TraceState: "vendor=value",
1292+
Remote: true,
1293+
},
1294+
expectErr: false,
1295+
expectNil: false,
1296+
expectValid: true,
1297+
expectTraceID: "0102030405060708090a0b0c0d0e0f10",
1298+
expectSpanID: "0102030405060708",
1299+
expectFlags: "01",
1300+
expectState: "vendor=value",
1301+
expectRemote: true,
1302+
},
1303+
}
1304+
1305+
for _, tc := range testCases {
1306+
t.Run(tc.name, func(t *testing.T) {
1307+
scc, err := SpanContextFromWrapper(tc.wrapper)
1308+
if tc.expectErr {
1309+
require.Error(t, err)
1310+
if tc.errContains != "" {
1311+
assert.Contains(t, err.Error(), tc.errContains)
1312+
}
1313+
} else {
1314+
require.NoError(t, err)
1315+
}
1316+
if tc.expectNil {
1317+
assert.Nil(t, scc)
1318+
} else {
1319+
assert.NotNil(t, scc)
1320+
if tc.expectValid {
1321+
assert.True(t, scc.IsValid())
1322+
assert.Equal(t, tc.expectTraceID, scc.TraceID().String())
1323+
assert.Equal(t, tc.expectSpanID, scc.SpanID().String())
1324+
assert.Equal(t, tc.expectFlags, scc.TraceFlags().String())
1325+
assert.Equal(t, tc.expectState, scc.TraceState().String())
1326+
assert.Equal(t, tc.expectRemote, scc.IsRemote())
1327+
}
1328+
}
1329+
})
1330+
}
1331+
}
1332+
1333+
func TestPersistentQueue_SpanContextRoundTrip(t *testing.T) {
1334+
// Setup a minimal persistent queue using uint64Encoding and uint64
1335+
pq := newPersistentQueue[uint64](persistentQueueSettings[uint64]{
1336+
sizer: request.RequestsSizer[uint64]{},
1337+
capacity: 10,
1338+
signal: pipeline.SignalTraces,
1339+
storageID: component.ID{},
1340+
encoding: uint64Encoding{},
1341+
id: component.NewID(exportertest.NopType),
1342+
telemetry: componenttest.NewNopTelemetrySettings(),
1343+
}).(*persistentQueue[uint64])
1344+
1345+
ext := storagetest.NewMockStorageExtension(nil)
1346+
client, err := ext.GetClient(context.Background(), component.KindExporter, pq.set.id, pq.set.signal.String())
1347+
require.NoError(t, err)
1348+
pq.initClient(context.Background(), client)
1349+
1350+
// Create a valid SpanContext
1351+
traceID, _ := trace.TraceIDFromHex("0102030405060708090a0b0c0d0e0f10")
1352+
spanID, _ := trace.SpanIDFromHex("0102030405060708")
1353+
sc := trace.NewSpanContext(trace.SpanContextConfig{
1354+
TraceID: traceID,
1355+
SpanID: spanID,
1356+
TraceFlags: 0x01,
1357+
TraceState: trace.TraceState{},
1358+
Remote: true,
1359+
})
1360+
ctxWithSC := trace.ContextWithSpanContext(context.Background(), sc)
1361+
1362+
// Offer a request with this context
1363+
req := uint64(42)
1364+
require.NoError(t, pq.Offer(ctxWithSC, req))
1365+
1366+
// Read the request and restored context
1367+
restoredCtx, gotReq, _, ok := pq.Read(context.Background())
1368+
require.True(t, ok)
1369+
assert.Equal(t, req, gotReq)
1370+
restoredSC := trace.SpanContextFromContext(restoredCtx)
1371+
assert.True(t, restoredSC.IsValid())
1372+
assert.Equal(t, sc.TraceID(), restoredSC.TraceID())
1373+
assert.Equal(t, sc.SpanID(), restoredSC.SpanID())
1374+
assert.Equal(t, sc.TraceFlags(), restoredSC.TraceFlags())
1375+
assert.Equal(t, sc.TraceState().String(), restoredSC.TraceState().String())
1376+
assert.Equal(t, sc.IsRemote(), restoredSC.IsRemote())
1377+
1378+
// Also test with a context with no SpanContext
1379+
req2 := uint64(99)
1380+
require.NoError(t, pq.Offer(context.Background(), req2))
1381+
restoredCtx2, gotReq2, _, ok2 := pq.Read(context.Background())
1382+
require.True(t, ok2)
1383+
assert.Equal(t, req2, gotReq2)
1384+
restoredSC2 := trace.SpanContextFromContext(restoredCtx2)
1385+
assert.False(t, restoredSC2.IsValid())
1386+
}

0 commit comments

Comments
 (0)