Skip to content

Commit 701d436

Browse files
committed
BB-764: All Kafka consumers use span Links, never parent-child
Convention: every Kafka consumer read starts a new root trace and adds an OTEL Link to the upstream span from the message headers. No Kafka hop uses parent-child. Rationale: - Async work (replication, lifecycle, GC, notifications) can fire minutes to hours after the original S3 request. Parent-child puts both in one trace → huge wall-clock gap in the waterfall, traces with millions of spans for high-volume buckets. - Links keep each trace small and focused. Jaeger renders them as clickable cross-trace jumps. - OTEL messaging semantic conventions recommend links for async hops with significant time gaps. Changes: - BackbeatConsumer._processTask always calls startLinkedSpanFromKafkaEntry. Removed entryHasLinkHeaders auto-detect and the parent-child code path. - kafkaTraceContext.js simplified: removed linkHeadersFromCurrentContext() and linkContextFromKafkaHeaders(). startLinkedSpanFromKafkaEntry now reads standard traceparent/tracestate. startSpanFromKafkaEntry kept as backward-compat alias (same function ref). - Producer-side: LifecycleTask.js and ReplicationAPI.js switched from linkHeadersFromCurrentContext() to traceHeadersFromCurrentContext() (same output, just standard traceparent headers). Issue: BB-764
1 parent d4c1cea commit 701d436

5 files changed

Lines changed: 40 additions & 174 deletions

File tree

extensions/lifecycle/tasks/LifecycleTask.js

Lines changed: 5 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -24,10 +24,7 @@ const ReplicationAPI = require('../../replication/ReplicationAPI');
2424
const { LifecycleMetrics, LIFECYCLE_MARKER_METRICS_LOCATION } = require('../LifecycleMetrics');
2525
const locationsConfig = require('../../../conf/locationConfig.json') || {};
2626
const { rulesSupportTransition } = require('../util/rules');
27-
const {
28-
linkHeadersFromCurrentContext,
29-
traceHeadersFromCurrentContext,
30-
} = require('../../../lib/tracing/kafkaTraceContext');
27+
const { traceHeadersFromCurrentContext } = require('../../../lib/tracing/kafkaTraceContext');
3128
const { decode } = versioning.VersionID;
3229

3330
const errorTransitionInProgress = errors.InternalError.
@@ -194,11 +191,10 @@ class LifecycleTask extends BackbeatTask {
194191
location,
195192
Date.now() - entry.getAttribute('transitionTime'));
196193

197-
// Attach link-headers (NOT traceparent). The object-processor
198-
// consumer starts a NEW root trace that links back to this
199-
// bucket-processor span — prevents 1M-object traces from breaking
200-
// the Jaeger UI.
201-
const headers = linkHeadersFromCurrentContext();
194+
// Attach traceparent so the downstream consumer can link back to
195+
// this bucket-processor span. The consumer always creates a new
196+
// root trace with a Link (never parent-child across Kafka).
197+
const headers = traceHeadersFromCurrentContext();
202198
const kafkaEntry = { message: entry.toKafkaMessage() };
203199
if (headers) kafkaEntry.headers = headers;
204200
const entries = [kafkaEntry];

extensions/replication/ReplicationAPI.js

Lines changed: 5 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@ const locations = require('../../conf/locationConfig.json') || {};
33

44
const ActionQueueEntry = require('../../lib/models/ActionQueueEntry');
55
const ReplicationMetrics = require('./ReplicationMetrics');
6-
const { linkHeadersFromCurrentContext } = require('../../lib/tracing/kafkaTraceContext');
6+
const { traceHeadersFromCurrentContext } = require('../../lib/tracing/kafkaTraceContext');
77

88
let { dataMoverTopic } = config.extensions.replication;
99
const { coldStorageArchiveTopicPrefix } = config.extensions.lifecycle;
@@ -79,13 +79,10 @@ class ReplicationAPI {
7979
key: `${bucket}/${key}`,
8080
message: action.toKafkaMessage(),
8181
};
82-
// Fan-out transitions can produce many entries per bucket-processor
83-
// span. Use link-headers so the data-mover consumer starts a new
84-
// root trace rather than a child of this one. If no active OTEL
85-
// span (OTEL disabled), linkHeadersFromCurrentContext returns
86-
// undefined and we ship the entry without headers.
87-
const linkHeaders = linkHeadersFromCurrentContext();
88-
if (linkHeaders) kafkaEntry.headers = linkHeaders;
82+
// Attach traceparent so the downstream consumer can link back.
83+
// All Kafka consumers create linked spans (never parent-child).
84+
const traceHeaders = traceHeadersFromCurrentContext();
85+
if (traceHeaders) kafkaEntry.headers = traceHeaders;
8986
let topic = dataMoverTopic;
9087
const toLocation = action.getAttribute('toLocation');
9188
const locationConfig = locations[toLocation];

lib/BackbeatConsumer.js

Lines changed: 2 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -24,24 +24,7 @@ const {
2424
} = require('./constants');
2525

2626
const { context: otelContext, SpanStatusCode } = require('@opentelemetry/api');
27-
const {
28-
startSpanFromKafkaEntry,
29-
startLinkedSpanFromKafkaEntry,
30-
} = require('./tracing/kafkaTraceContext');
31-
32-
/**
33-
* Detect whether a Kafka entry carries link-traceparent (fan-out break)
34-
* rather than standard traceparent. Link semantics mean the consumer
35-
* starts a NEW root trace that references the upstream via OTEL Link,
36-
* instead of becoming a child of it.
37-
*/
38-
function entryHasLinkHeaders(entry) {
39-
if (!entry || !Array.isArray(entry.headers)) return false;
40-
for (const h of entry.headers) {
41-
if (h['link-traceparent']) return true;
42-
}
43-
return false;
44-
}
27+
const { startLinkedSpanFromKafkaEntry } = require('./tracing/kafkaTraceContext');
4528

4629
const CLIENT_ID = 'BackbeatConsumer';
4730
const { withTopicPrefix } = require('./util/topic');
@@ -534,9 +517,7 @@ class BackbeatConsumer extends EventEmitter {
534517
const { topic, partition } = entry;
535518
KafkaBacklogMetrics.onTaskStarted(topic, partition, this._groupId);
536519

537-
const { ctx, span } = entryHasLinkHeaders(entry)
538-
? startLinkedSpanFromKafkaEntry(entry, `${topic}.process`)
539-
: startSpanFromKafkaEntry(entry, `${topic}.process`);
520+
const { ctx, span } = startLinkedSpanFromKafkaEntry(entry, `${topic}.process`);
540521
span.setAttribute('messaging.kafka.topic', topic);
541522
span.setAttribute('messaging.kafka.partition', partition);
542523

lib/tracing/kafkaTraceContext.js

Lines changed: 19 additions & 79 deletions
Original file line numberDiff line numberDiff line change
@@ -33,47 +33,15 @@ function contextFromKafkaHeaders(kafkaHeaders) {
3333
return propagation.extract(context.active(), carrier);
3434
}
3535

36-
/**
37-
* Extract a link-trace-context from Kafka message headers.
38-
* Reads `link-traceparent` / `link-tracestate` (the link-variant that
39-
* marks the upstream as a related trace, not a parent).
40-
* @param {Array|undefined} kafkaHeaders - array of single-key objects from node-rdkafka
41-
* @returns {Object|null} OTEL context carrying the linked SpanContext, or null
42-
*/
43-
function linkContextFromKafkaHeaders(kafkaHeaders) {
44-
if (!kafkaHeaders) return null;
45-
const carrier = {};
46-
for (const h of kafkaHeaders) {
47-
if (h['link-traceparent']) carrier.traceparent = h['link-traceparent'].toString();
48-
if (h['link-tracestate']) carrier.tracestate = h['link-tracestate'].toString();
49-
}
50-
if (!carrier.traceparent) return null;
51-
return propagation.extract(context.active(), carrier);
52-
}
53-
54-
/**
55-
* Start a span linked to a Kafka entry's trace context.
56-
* Returns { ctx, span } — caller must call span.end() when done.
57-
* @param {Object} kafkaEntry - Kafka consumer entry with .headers
58-
* @param {string} operationName - name for the span
59-
* @returns {{ ctx: Object, span: Object }}
60-
*/
61-
function startSpanFromKafkaEntry(kafkaEntry, operationName) {
62-
const parentCtx = contextFromKafkaHeaders(kafkaEntry.headers);
63-
const tracer = trace.getTracer('backbeat');
64-
const span = tracer.startSpan(operationName, {
65-
kind: SpanKind.CONSUMER,
66-
}, parentCtx);
67-
return { ctx: trace.setSpan(parentCtx, span), span };
68-
}
69-
7036
/**
7137
* Start a NEW root span that LINKS to the span referenced by the Kafka
72-
* entry's `link-traceparent` header — as opposed to being a child of it.
38+
* entry's `traceparent` header — as opposed to being a child of it.
7339
*
74-
* Use this for high-volume fan-out consumers (e.g. the lifecycle
75-
* object-processor). Traces stay small and clickable from Jaeger's Link
76-
* UI, instead of producing 1M-span super-traces that break the UI.
40+
* Convention: every Kafka consumer read starts a new trace (new trace ID)
41+
* and adds an OTEL Link to the upstream span. Async work (replication,
42+
* lifecycle, GC, notifications) can fire minutes to hours after the
43+
* original S3 request — links keep traces small and navigable via
44+
* Jaeger's link UI.
7745
*
7846
* Returns { ctx, span } — caller must call span.end() when done.
7947
*
@@ -85,17 +53,13 @@ function startLinkedSpanFromKafkaEntry(kafkaEntry, operationName) {
8553
const tracer = trace.getTracer('backbeat');
8654
const links = [];
8755

88-
const linkedCtx = linkContextFromKafkaHeaders(kafkaEntry.headers);
89-
if (linkedCtx) {
90-
const linkedSpan = trace.getSpan(linkedCtx);
91-
const linkedSpanCtx = linkedSpan && linkedSpan.spanContext();
92-
if (linkedSpanCtx && linkedSpanCtx.traceId) {
93-
links.push({ context: linkedSpanCtx });
94-
}
56+
const parentCtx = contextFromKafkaHeaders(kafkaEntry.headers);
57+
const remoteSpan = trace.getSpan(parentCtx);
58+
const remoteSpanCtx = remoteSpan && remoteSpan.spanContext();
59+
if (remoteSpanCtx && remoteSpanCtx.traceId) {
60+
links.push({ context: remoteSpanCtx });
9561
}
9662

97-
// NEW root span — do not pass an active parent. The link is the only
98-
// reference to the upstream trace.
9963
const span = tracer.startSpan(operationName, {
10064
kind: SpanKind.CONSUMER,
10165
links,
@@ -104,10 +68,16 @@ function startLinkedSpanFromKafkaEntry(kafkaEntry, operationName) {
10468
return { ctx: trace.setSpan(context.active(), span), span };
10569
}
10670

71+
// Alias kept for backward-compatibility with any call sites that still
72+
// reference the old name. Behaviour is identical: always creates a linked
73+
// root span, never a parent-child.
74+
const startSpanFromKafkaEntry = startLinkedSpanFromKafkaEntry;
75+
10776
/**
10877
* Serialize the currently active OTEL context into standard
109-
* `traceparent` / `tracestate` Kafka headers — the consumer will
110-
* become a CHILD of the current span.
78+
* `traceparent` / `tracestate` Kafka headers. The downstream consumer
79+
* will create a LINK to this span (not a child — all Kafka hops use
80+
* links).
11181
*
11282
* Returns an array of single-key header objects compatible with
11383
* node-rdkafka's header format, or undefined if no active span.
@@ -128,40 +98,10 @@ function traceHeadersFromCurrentContext() {
12898
return headers.length > 0 ? headers : undefined;
12999
}
130100

131-
/**
132-
* Serialize the currently active OTEL context into `link-traceparent`
133-
* and `link-tracestate` Kafka headers — the link-variant, NOT the
134-
* standard `traceparent`. Producers use this when the downstream
135-
* consumer should LINK to the current span rather than become its
136-
* child.
137-
*
138-
* Returns an array of single-key header objects compatible with
139-
* node-rdkafka's header format, or undefined if no active span.
140-
*
141-
* @returns {Array|undefined}
142-
*/
143-
function linkHeadersFromCurrentContext() {
144-
const span = trace.getSpan(context.active());
145-
if (!span) return undefined;
146-
const spanCtx = span.spanContext();
147-
if (!spanCtx || !spanCtx.traceId || !spanCtx.spanId) return undefined;
148-
149-
const carrier = {};
150-
propagation.inject(context.active(), carrier);
151-
// carrier now has { traceparent, tracestate? } — rename them to the
152-
// link-variant so the consumer knows to LINK, not parent.
153-
const headers = [];
154-
if (carrier.traceparent) headers.push({ 'link-traceparent': carrier.traceparent });
155-
if (carrier.tracestate) headers.push({ 'link-tracestate': carrier.tracestate });
156-
return headers.length > 0 ? headers : undefined;
157-
}
158-
159101
module.exports = {
160102
traceHeadersFromEntry,
161103
traceHeadersFromCurrentContext,
162104
contextFromKafkaHeaders,
163-
linkContextFromKafkaHeaders,
164105
startSpanFromKafkaEntry,
165106
startLinkedSpanFromKafkaEntry,
166-
linkHeadersFromCurrentContext,
167107
};

tests/unit/lib/tracing/kafkaTraceContext.spec.js

Lines changed: 9 additions & 57 deletions
Original file line numberDiff line numberDiff line change
@@ -7,10 +7,8 @@ const {
77
traceHeadersFromEntry,
88
traceHeadersFromCurrentContext,
99
contextFromKafkaHeaders,
10-
linkContextFromKafkaHeaders,
1110
startSpanFromKafkaEntry,
1211
startLinkedSpanFromKafkaEntry,
13-
linkHeadersFromCurrentContext,
1412
} = require('../../../../lib/tracing/kafkaTraceContext');
1513

1614
describe('kafkaTraceContext', () => {
@@ -74,9 +72,6 @@ describe('kafkaTraceContext', () => {
7472
assert.strictEqual(result, context.active());
7573
});
7674

77-
// NOTE: With a full OTEL SDK (propagator registered), extract()
78-
// returns a new context. Without SDK, the no-op propagator returns
79-
// the active context unchanged. Full extraction is tested on-cluster.
8075
it('should call propagation.extract with correct carrier', () => {
8176
const tp = '00-abcdef1234567890abcdef1234567890-1234567890abcdef-01';
8277
const headers = [
@@ -88,17 +83,17 @@ describe('kafkaTraceContext', () => {
8883
});
8984
});
9085

91-
describe('startSpanFromKafkaEntry', () => {
92-
it('should return ctx and span with no headers', () => {
86+
describe('startLinkedSpanFromKafkaEntry', () => {
87+
it('returns a span with no headers (no link)', () => {
9388
const entry = { topic: 'test-topic', partition: 0 };
94-
const { ctx, span } = startSpanFromKafkaEntry(entry, 'test-op');
89+
const { ctx, span } = startLinkedSpanFromKafkaEntry(entry, 'test-op');
9590
assert(ctx);
9691
assert(span);
9792
assert.strictEqual(typeof span.end, 'function');
9893
span.end();
9994
});
10095

101-
it('should return ctx and span with traceparent header', () => {
96+
it('returns a span with traceparent header (creates link)', () => {
10297
const tp = '00-abcdef1234567890abcdef1234567890-1234567890abcdef-01';
10398
const entry = {
10499
topic: 'test-topic',
@@ -107,7 +102,7 @@ describe('kafkaTraceContext', () => {
107102
{ traceparent: Buffer.from(tp) },
108103
],
109104
};
110-
const { ctx, span } = startSpanFromKafkaEntry(entry, 'test-op');
105+
const { ctx, span } = startLinkedSpanFromKafkaEntry(entry, 'test-op');
111106
assert(ctx);
112107
assert(span);
113108
assert.strictEqual(typeof span.setAttribute, 'function');
@@ -116,57 +111,14 @@ describe('kafkaTraceContext', () => {
116111
});
117112
});
118113

119-
describe('linkContextFromKafkaHeaders', () => {
120-
it('returns null when headers are missing', () => {
121-
assert.strictEqual(linkContextFromKafkaHeaders(undefined), null);
122-
assert.strictEqual(linkContextFromKafkaHeaders(null), null);
123-
});
124-
125-
it('returns null when no link-traceparent header', () => {
126-
const result = linkContextFromKafkaHeaders([
127-
{ unrelated: Buffer.from('x') },
128-
]);
129-
assert.strictEqual(result, null);
130-
});
131-
132-
it('returns a context when link-traceparent is present', () => {
133-
const tp = '00-abcdef1234567890abcdef1234567890-1234567890abcdef-01';
134-
const result = linkContextFromKafkaHeaders([
135-
{ 'link-traceparent': Buffer.from(tp) },
136-
]);
137-
// Without a propagator registered this may be the active
138-
// context; the important bit is the call doesn't throw.
139-
assert(result !== undefined);
140-
});
141-
});
142-
143-
describe('startLinkedSpanFromKafkaEntry', () => {
144-
it('returns a span even with no link headers', () => {
145-
const entry = { topic: 'lifecycle.object-tasks', partition: 0 };
146-
const { ctx, span } = startLinkedSpanFromKafkaEntry(entry, 'linked-op');
147-
assert(ctx);
148-
assert(span);
149-
span.end();
150-
});
151-
152-
it('returns a span with link headers present', () => {
153-
const tp = '00-abcdef1234567890abcdef1234567890-1234567890abcdef-01';
154-
const entry = {
155-
topic: 'lifecycle.object-tasks',
156-
partition: 0,
157-
headers: [{ 'link-traceparent': Buffer.from(tp) }],
158-
};
159-
const { ctx, span } = startLinkedSpanFromKafkaEntry(entry, 'linked-op');
160-
assert(ctx);
161-
assert(span);
162-
span.end();
114+
describe('startSpanFromKafkaEntry (backward-compat alias)', () => {
115+
it('is the same function as startLinkedSpanFromKafkaEntry', () => {
116+
assert.strictEqual(startSpanFromKafkaEntry, startLinkedSpanFromKafkaEntry);
163117
});
164118
});
165119

166-
describe('linkHeadersFromCurrentContext / traceHeadersFromCurrentContext', () => {
120+
describe('traceHeadersFromCurrentContext', () => {
167121
it('returns undefined when no active span', () => {
168-
// In a fresh test context, no span is active.
169-
assert.strictEqual(linkHeadersFromCurrentContext(), undefined);
170122
assert.strictEqual(traceHeadersFromCurrentContext(), undefined);
171123
});
172124
});

0 commit comments

Comments
 (0)