Skip to content

Commit da55c26

Browse files
committed
fixes
1 parent bbc4c41 commit da55c26

File tree

3 files changed

+33
-9
lines changed

3 files changed

+33
-9
lines changed

packages/datadog-plugin-confluentinc-kafka-javascript/test/index.spec.js

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -287,8 +287,7 @@ describe('Plugin', () => {
287287
})
288288

289289
await consumer.run({ eachBatch: () => {} })
290-
await sendMessages(kafka, testTopic, batchMessages)
291-
return expectedSpanPromise
290+
return Promise.all([sendMessages(kafka, testTopic, batchMessages), expectedSpanPromise])
292291
})
293292

294293
it('should run the consumer in the context of the consumer span', done => {
@@ -329,8 +328,7 @@ describe('Plugin', () => {
329328
})
330329

331330
await consumer.run({ eachBatch: () => {} })
332-
await sendMessages(kafka, testTopic, batchMessages)
333-
await expectedSpanPromise
331+
await Promise.all([sendMessages(kafka, testTopic, batchMessages), expectedSpanPromise])
334332
})
335333
})
336334
})

packages/datadog-plugin-kafkajs/src/batch-consumer.js

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,9 @@ class KafkajsBatchConsumerPlugin extends ConsumerPlugin {
3333
const headers = convertToTextMap(message.headers)
3434
if (headers) {
3535
const childOf = this.tracer.extract('text_map', headers)
36-
span.addLink(childOf)
36+
if (childOf) {
37+
span.addLink(childOf)
38+
}
3739
}
3840

3941
if (!this.config.dsmEnabled) continue

packages/datadog-plugin-kafkajs/test/index.spec.js

Lines changed: 28 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -444,8 +444,7 @@ describe('Plugin', () => {
444444
await consumer.run({
445445
eachBatch: () => {},
446446
})
447-
await sendMessages(kafka, testTopic, batchMessages)
448-
return expectedSpanPromise
447+
return Promise.all([sendMessages(kafka, testTopic, batchMessages), expectedSpanPromise])
449448
})
450449

451450
it('should run the consumer in the context of the consumer span', done => {
@@ -485,8 +484,33 @@ describe('Plugin', () => {
485484
})
486485

487486
await consumer.run({ eachBatch: () => {} })
488-
await sendMessages(kafka, testTopic, batchMessages)
489-
await expectedSpanPromise
487+
await Promise.all([sendMessages(kafka, testTopic, batchMessages), expectedSpanPromise])
488+
})
489+
490+
it('should not fail when messages have headers without trace context', async () => {
491+
const messagesWithHeaders = [
492+
{ key: 'key1', value: 'test1', headers: { 'x-custom-header': 'value' } },
493+
]
494+
const meta = {
495+
'span.kind': 'consumer',
496+
component: 'kafkajs',
497+
'kafka.topic': testTopic,
498+
'messaging.destination.name': testTopic,
499+
'messaging.system': 'kafka',
500+
}
501+
if (clusterIdAvailable) meta['kafka.cluster_id'] = testKafkaClusterId
502+
503+
const expectedSpanPromise = expectSpanWithDefaults({
504+
name: expectedSchema.receive.opName,
505+
service: expectedSchema.receive.serviceName,
506+
meta,
507+
resource: testTopic,
508+
error: 0,
509+
type: 'worker',
510+
})
511+
512+
await consumer.run({ eachBatch: () => {} })
513+
return Promise.all([sendMessages(kafka, testTopic, messagesWithHeaders), expectedSpanPromise])
490514
})
491515

492516
withNamingSchema(

0 commit comments

Comments
 (0)