Skip to content
Draft
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions bin/queuePopulator.js
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
require('../lib/otel');

const async = require('async');
const schedule = require('node-schedule');

Expand Down
1 change: 1 addition & 0 deletions extensions/gc/service.js
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
'use strict';
require('../../lib/otel');

const { errors } = require('arsenal');
const werelogs = require('werelogs');
Expand Down
1 change: 1 addition & 0 deletions extensions/lifecycle/bucketProcessor/task.js
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
'use strict';
require('../../../lib/otel');

const async = require('async');
const werelogs = require('werelogs');
Expand Down
33 changes: 32 additions & 1 deletion extensions/lifecycle/conductor/LifecycleConductor.js
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,10 @@
startCircuitBreakerMetricsExport,
updateCircuitBreakerConfigForImplicitOutputQueue,
} = require('../../../lib/CircuitBreaker');
const { context: otelContext, trace, SpanKind, SpanStatusCode } =
require('@opentelemetry/api');
const { traceHeadersFromCurrentContext } =
require('../../../lib/tracing/kafkaTraceContext');

const DEFAULT_CRON_RULE = '* * * * *';
const DEFAULT_CONCURRENCY = 10;
Expand Down Expand Up @@ -309,7 +313,11 @@
}

_taskToMessage(task, taskVersion, log) {
return {
// Attach traceparent so the bucket-processor consumer becomes a
// child of the conductor.scan root span. Returns undefined when
// OTEL is disabled (no active span).
const headers = traceHeadersFromCurrentContext();
const kafkaEntry = {
message: JSON.stringify({
action: 'processObjects',
contextInfo: {
Expand All @@ -324,6 +332,8 @@
details: {},
}),
};
if (headers) kafkaEntry.headers = headers;

Check warning on line 335 in extensions/lifecycle/conductor/LifecycleConductor.js

View workflow job for this annotation

GitHub Actions / lint

Expected { after 'if' condition
return kafkaEntry;
}

_getAccountIds(unknownCanonicalIds, log, cb) {
Expand Down Expand Up @@ -371,6 +381,27 @@
}

processBuckets(cb) {
// Wrap every cron firing in its own root trace so bucket-listing
// and downstream bucket-task produces appear as children. The
// trace is INTERNAL because there's no upstream HTTP/Kafka parent.
const tracer = trace.getTracer('backbeat');
const span = tracer.startSpan('lifecycle.conductor.scan', {
kind: SpanKind.INTERNAL,
});
const ctx = trace.setSpan(otelContext.active(), span);
otelContext.with(ctx, () => {
this._processBucketsInternal((err, res) => {
if (err) {
span.recordException(err);
span.setStatus({ code: SpanStatusCode.ERROR });
}
span.end();
if (cb) cb(err, res);

Check warning on line 399 in extensions/lifecycle/conductor/LifecycleConductor.js

View workflow job for this annotation

GitHub Actions / lint

Expected { after 'if' condition
});
});
}

_processBucketsInternal(cb) {
const log = this.logger.newRequestLogger();
const start = new Date();
let nBucketsQueued = 0;
Expand Down
1 change: 1 addition & 0 deletions extensions/lifecycle/conductor/service.js
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
'use strict';
require('../../../lib/otel');

const async = require('async');
const werelogs = require('werelogs');
Expand Down
1 change: 1 addition & 0 deletions extensions/lifecycle/objectProcessor/task.js
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
'use strict';
require('../../../lib/otel');

const async = require('async');
const werelogs = require('werelogs');
Expand Down
22 changes: 20 additions & 2 deletions extensions/lifecycle/tasks/LifecycleTask.js
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,10 @@
const { LifecycleMetrics, LIFECYCLE_MARKER_METRICS_LOCATION } = require('../LifecycleMetrics');
const locationsConfig = require('../../../conf/locationConfig.json') || {};
const { rulesSupportTransition } = require('../util/rules');
const {
linkHeadersFromCurrentContext,
traceHeadersFromCurrentContext,
} = require('../../../lib/tracing/kafkaTraceContext');
const { decode } = versioning.VersionID;

const errorTransitionInProgress = errors.InternalError.
Expand Down Expand Up @@ -121,7 +125,14 @@
* @return {undefined}
*/
_sendBucketEntry(entry, cb) {
const entries = [{ message: JSON.stringify(entry) }];
// Bucket-task re-queue: keep normal parent-child semantics so the
// continuation stays in the same trace as the originating conductor
// scan. Re-queues happen rarely (only when a bucket is too big to
// finish in one batch) so the trace stays small.
const headers = traceHeadersFromCurrentContext();
const kafkaEntry = { message: JSON.stringify(entry) };
if (headers) kafkaEntry.headers = headers;

Check warning on line 134 in extensions/lifecycle/tasks/LifecycleTask.js

View workflow job for this annotation

GitHub Actions / lint

Expected { after 'if' condition
const entries = [kafkaEntry];
this.producer.sendToTopic(this.bucketTasksTopic, entries, err => {
LifecycleMetrics.onKafkaPublish(null, 'BucketTopic', 'bucket', err, 1);
return cb(err);
Expand Down Expand Up @@ -183,7 +194,14 @@
location,
Date.now() - entry.getAttribute('transitionTime'));

const entries = [{ message: entry.toKafkaMessage() }];
// Attach link-headers (NOT traceparent). The object-processor
// consumer starts a NEW root trace that links back to this
// bucket-processor span — prevents 1M-object traces from breaking
// the Jaeger UI.
const headers = linkHeadersFromCurrentContext();
const kafkaEntry = { message: entry.toKafkaMessage() };
if (headers) kafkaEntry.headers = headers;

Check warning on line 203 in extensions/lifecycle/tasks/LifecycleTask.js

View workflow job for this annotation

GitHub Actions / lint

Expected { after 'if' condition
const entries = [kafkaEntry];
this.producer.sendToTopic(this.objectTasksTopic, entries, err => {
LifecycleMetrics.onKafkaPublish(null, 'ObjectTopic', 'bucket', err, 1);
return cb(err);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,22 @@ class KafkaNotificationDestination extends NotificationDestination {
*/
send(messages, done) {
const starTime = Date.now();
this._notificationProducer.send(messages, error => {
// Trust boundary: strip any trace headers before producing to the
// external customer Kafka destination. There's no ingress-layer
// strip on the Kafka protocol (unlike HTTP via nginx), so the only
// place to drop headers is at the producer. Keeps everything else
// on the message intact.
const safeMessages = Array.isArray(messages)
? messages.map(m => {
if (m && m.headers) {
// eslint-disable-next-line no-unused-vars
const { headers, ...rest } = m;
return rest;
}
return m;
})
: messages;
this._notificationProducer.send(safeMessages, error => {
if (error) {
const { host, topic } = this._destinationConfig;
this._log.error('error in message delivery to external Kafka destination', {
Expand Down
2 changes: 2 additions & 0 deletions extensions/notification/queueProcessor/task.js
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
'use strict';
require('../../../lib/otel');

const assert = require('assert');
const { errors } = require('arsenal');
const async = require('async');
Expand Down
8 changes: 8 additions & 0 deletions extensions/replication/ReplicationAPI.js
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@

const ActionQueueEntry = require('../../lib/models/ActionQueueEntry');
const ReplicationMetrics = require('./ReplicationMetrics');
const { linkHeadersFromCurrentContext } = require('../../lib/tracing/kafkaTraceContext');

let { dataMoverTopic } = config.extensions.replication;
const { coldStorageArchiveTopicPrefix } = config.extensions.lifecycle;
Expand Down Expand Up @@ -78,6 +79,13 @@
key: `${bucket}/${key}`,
message: action.toKafkaMessage(),
};
// Fan-out transitions can produce many entries per bucket-processor
// span. Use link-headers so the data-mover consumer starts a new
// root trace rather than a child of this one. If no active OTEL
// span (OTEL disabled), linkHeadersFromCurrentContext returns
// undefined and we ship the entry without headers.
const linkHeaders = linkHeadersFromCurrentContext();
if (linkHeaders) kafkaEntry.headers = linkHeaders;

Check warning on line 88 in extensions/replication/ReplicationAPI.js

View workflow job for this annotation

GitHub Actions / lint

Expected { after 'if' condition
let topic = dataMoverTopic;
const toLocation = action.getAttribute('toLocation');
const locationConfig = locations[toLocation];
Expand Down
7 changes: 6 additions & 1 deletion extensions/replication/ReplicationQueuePopulator.js
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ const QueuePopulatorExtension =
require('../../lib/queuePopulator/QueuePopulatorExtension');
const ObjectQueueEntry = require('../../lib/models/ObjectQueueEntry');
const locationsConfig = require('../../conf/locationConfig.json') || {};
const { traceHeadersFromEntry } = require('../../lib/tracing/kafkaTraceContext');

class ReplicationQueuePopulator extends QueuePopulatorExtension {
constructor(params) {
Expand Down Expand Up @@ -95,11 +96,15 @@ class ReplicationQueuePopulator extends QueuePopulatorExtension {
const publishedEntry = Object.assign({}, entry);
delete publishedEntry.logReader;

const traceHeaders = traceHeadersFromEntry(value);

this.log.trace('publishing object replication entry',
{ entry: queueEntry.getLogInfo() });
this.publish(this.repConfig.topic,
`${queueEntry.getBucket()}/${queueEntry.getObjectKey()}`,
JSON.stringify(publishedEntry));
JSON.stringify(publishedEntry),
undefined,
traceHeaders);
}

/**
Expand Down
2 changes: 2 additions & 0 deletions extensions/replication/queueProcessor/task.js
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
'use strict';
require('../../../lib/otel');

const async = require('async');
const assert = require('assert');
const werelogs = require('werelogs');
Expand Down
1 change: 1 addition & 0 deletions extensions/replication/replicationStatusProcessor/task.js
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
'use strict';
require('../../../lib/otel');

const werelogs = require('werelogs');

Expand Down
37 changes: 36 additions & 1 deletion lib/BackbeatConsumer.js
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,26 @@
}
} = require('./constants');

const { context: otelContext, SpanStatusCode } = require('@opentelemetry/api');
const {
startSpanFromKafkaEntry,
startLinkedSpanFromKafkaEntry,
} = require('./tracing/kafkaTraceContext');

/**
* Detect whether a Kafka entry carries link-traceparent (fan-out break)
* rather than standard traceparent. Link semantics mean the consumer
* starts a NEW root trace that references the upstream via OTEL Link,
* instead of becoming a child of it.
*/
function entryHasLinkHeaders(entry) {
if (!entry || !Array.isArray(entry.headers)) return false;

Check warning on line 39 in lib/BackbeatConsumer.js

View workflow job for this annotation

GitHub Actions / lint

Expected { after 'if' condition
for (const h of entry.headers) {
if (h['link-traceparent']) return true;

Check warning on line 41 in lib/BackbeatConsumer.js

View workflow job for this annotation

GitHub Actions / lint

Expected { after 'if' condition
}
return false;
}

const CLIENT_ID = 'BackbeatConsumer';
const { withTopicPrefix } = require('./util/topic');

Expand Down Expand Up @@ -514,7 +534,22 @@
const { topic, partition } = entry;
KafkaBacklogMetrics.onTaskStarted(topic, partition, this._groupId);

this._queueProcessor(entry, (err, completionArgs) => done(err, completionArgs, finishProcessingTask));
const { ctx, span } = entryHasLinkHeaders(entry)
? startLinkedSpanFromKafkaEntry(entry, `${topic}.process`)
: startSpanFromKafkaEntry(entry, `${topic}.process`);
span.setAttribute('messaging.kafka.topic', topic);
span.setAttribute('messaging.kafka.partition', partition);

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Consider adding messaging.kafka.consumer_group — it helps distinguish spans from different consumer groups in Jaeger (e.g. replication vs lifecycle processors running in the same cluster).

```suggestion
span.setAttribute('messaging.kafka.topic', topic);
span.setAttribute('messaging.kafka.partition', partition);
span.setAttribute('messaging.kafka.consumer_group', this._groupId);

otelContext.with(ctx, () => {
this._queueProcessor(entry, (err, completionArgs) => {
if (err) {
span.recordException(err);
span.setStatus({ code: SpanStatusCode.ERROR });
}
span.end();
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

recordException only adds an event to the span — it does not set the span status. Without calling span.setStatus({ code: SpanStatusCode.ERROR }), failed spans will still appear as OK in Jaeger. Add span.setStatus({ code: 2 }) (SpanStatusCode.ERROR) alongside recordException. — Claude Code

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

recordException adds an event to the span but does not mark the span as failed. Without setStatus, error spans will appear as successful in Jaeger/Grafana Tempo. Add span.setStatus({ code: 2, message: err.message }) alongside recordException.

Suggested change
span.end();
if (err) {
span.recordException(err);
span.setStatus({ code: 2, message: err.message });
}

— Claude Code

done(err, completionArgs, finishProcessingTask);
});
});
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If _queueProcessor throws synchronously, span.end() is never called and the span leaks. Wrap the call in try/catch to ensure cleanup:

suggestion<br> otelContext.with(ctx, () => {<br> try {<br> this._queueProcessor(entry, (err, completionArgs) => {<br> if (err) {<br> span.recordException(err);<br> span.setStatus({ code: SpanStatusCode.ERROR });<br> }<br> span.end();<br> done(err, completionArgs, finishProcessingTask);<br> });<br> } catch (err) {<br> span.recordException(err);<br> span.setStatus({ code: SpanStatusCode.ERROR });<br> span.end();<br> throw err;<br> }<br>

— Claude Code

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If _queueProcessor throws synchronously (e.g. TypeError from a null entry field), the span created at line 537 is never ended — it leaks. Wrap the _queueProcessor call in try/catch to ensure span.end() is always called. Same pattern applies to LifecycleConductor.processBuckets (line 393).

— Claude Code

}

/**
Expand Down
3 changes: 2 additions & 1 deletion lib/BackbeatProducer.js
Original file line number Diff line number Diff line change
Expand Up @@ -288,7 +288,8 @@ class BackbeatProducer extends EventEmitter {
Buffer.from(item.message), // value
item.key, // key (for keyed partitioning)
Date.now(), // timestamp
sendCtx // opaque
sendCtx, // opaque
item.headers || undefined // Kafka message headers
);
});
} catch (err) {
Expand Down
Loading
Loading