Skip to content

Commit 45667a0

Browse files
authored
fix: Updated MessageConsumerSubscriber to wait for consumer callback to end when it is a promise (newrelic#3510)
1 parent 724b218 commit 45667a0

File tree

5 files changed

+320
-44
lines changed

5 files changed

+320
-44
lines changed

lib/subscribers/base.js

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -114,7 +114,9 @@ class Subscriber {
114114
asyncStart.runStores(context, () => {
115115
try {
116116
if (callback) {
117-
return Reflect.apply(callback, this, arguments)
117+
const cbResult = Reflect.apply(callback, this, arguments)
118+
context.cbResult = cbResult
119+
return cbResult
118120
}
119121
} finally {
120122
asyncEnd.publish(context)

lib/subscribers/message-consumer.js

Lines changed: 14 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -66,11 +66,22 @@ class MessageConsumerSubscriber extends Subscriber {
6666
}
6767

6868
/**
69-
* Ends the transaction created for the consumption callback.
69+
* Checks the result of the message handler.
70+
* If it's a promise, waits for it to resolve before ending transaction.
71+
* This ensures that the transaction stays active until the promise resolves.
72+
* @param {object} data the data associated with the `asyncEnd` event
7073
*/
71-
asyncEnd() {
74+
asyncEnd(data) {
7275
const ctx = this.agent.tracer.getContext()
73-
ctx?.transaction?.end()
76+
const result = data.cbResult
77+
if (typeof result?.then === 'function') {
78+
const prom = Promise.resolve(result)
79+
prom.finally(() => {
80+
ctx?.transaction?.end()
81+
})
82+
} else {
83+
ctx?.transaction?.end()
84+
}
7485
}
7586

7687
enable() {

test/unit/lib/subscribers/base.test.js

Lines changed: 140 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -328,6 +328,146 @@ test('should bind callback and invoke the asyncStart/error/asyncError events whe
328328
await plan.completed
329329
})
330330

331+
test('should bind callback and invoke the asyncStart/error/asyncError events with right context when callback succeeds', async (t) => {
332+
const plan = tspl(t, { plan: 10 })
333+
const { agent, subscriber } = t.nr
334+
const name = 'test-segment'
335+
const expectedResult = 'test-result'
336+
subscriber.callback = -1
337+
subscriber.error = (data) => {
338+
plan.fail('should not call error when cb succeeds')
339+
}
340+
341+
subscriber.asyncStart = (data) => {
342+
plan.equal(data.callback, true)
343+
plan.ok(!data.error)
344+
plan.equal(data.result, expectedResult)
345+
}
346+
subscriber.asyncEnd = (data) => {
347+
plan.equal(data.callback, true)
348+
plan.ok(!data.error)
349+
plan.equal(data.cbResult, undefined)
350+
plan.equal(data.result, expectedResult)
351+
}
352+
subscriber.enable()
353+
subscriber.events = ['asyncStart', 'asyncEnd', 'error']
354+
subscriber.subscribe()
355+
subscriber.handler = function handler(data, ctx) {
356+
plan.equal(data.name, name)
357+
return subscriber.createSegment({
358+
name: data?.name,
359+
ctx
360+
})
361+
}
362+
363+
function testCb(err, result) {
364+
plan.equal(result, expectedResult)
365+
plan.equal(err, null)
366+
}
367+
368+
helper.runInTransaction(agent, () => {
369+
const event = { name, arguments: [testCb] }
370+
subscriber.channel.start.runStores(event, () => {
371+
event.arguments[0](null, expectedResult)
372+
})
373+
})
374+
375+
await plan.completed
376+
})
377+
378+
test('should bind callback and invoke the asyncStart/error/asyncError events with right context when callback returns a promise', async (t) => {
379+
const plan = tspl(t, { plan: 10 })
380+
const { agent, subscriber } = t.nr
381+
const name = 'test-segment'
382+
const expectedResult = 'test-result'
383+
subscriber.callback = -1
384+
subscriber.error = (data) => {
385+
plan.fail('should not call error when cb succeeds')
386+
}
387+
388+
subscriber.asyncStart = (data) => {
389+
plan.equal(data.callback, true)
390+
plan.ok(!data.error)
391+
plan.equal(data.result, expectedResult)
392+
}
393+
subscriber.asyncEnd = (data) => {
394+
plan.equal(data.callback, true)
395+
plan.ok(!data.error)
396+
plan.ok(data.cbResult instanceof Promise)
397+
plan.equal(data.result, expectedResult)
398+
}
399+
subscriber.enable()
400+
subscriber.events = ['asyncStart', 'asyncEnd', 'error']
401+
subscriber.subscribe()
402+
subscriber.handler = function handler(data, ctx) {
403+
plan.equal(data.name, name)
404+
return subscriber.createSegment({
405+
name: data?.name,
406+
ctx
407+
})
408+
}
409+
410+
async function testCb(err, result) {
411+
plan.equal(result, expectedResult)
412+
plan.equal(err, null)
413+
}
414+
415+
helper.runInTransaction(agent, () => {
416+
const event = { name, arguments: [testCb] }
417+
subscriber.channel.start.runStores(event, () => {
418+
event.arguments[0](null, expectedResult)
419+
})
420+
})
421+
422+
await plan.completed
423+
})
424+
425+
test('should bind callback and invoke the asyncStart/error/asyncError events with right context when callback fails', async (t) => {
426+
const plan = tspl(t, { plan: 10 })
427+
const { agent, subscriber } = t.nr
428+
const name = 'test-segment'
429+
const expectedErr = new Error('cb failed')
430+
subscriber.callback = -1
431+
subscriber.error = (data) => {
432+
plan.equal(data.callback, true)
433+
plan.deepEqual(data.error, expectedErr)
434+
}
435+
436+
subscriber.asyncStart = (data) => {
437+
plan.equal(data.callback, true)
438+
plan.deepEqual(data.error, expectedErr)
439+
}
440+
subscriber.asyncEnd = (data) => {
441+
plan.equal(data.callback, true)
442+
plan.deepEqual(data.error, expectedErr)
443+
plan.equal(data.cbResult, undefined)
444+
}
445+
subscriber.enable()
446+
subscriber.events = ['asyncStart', 'asyncEnd', 'error']
447+
subscriber.subscribe()
448+
subscriber.handler = function handler(data, ctx) {
449+
plan.equal(data.name, name)
450+
return subscriber.createSegment({
451+
name: data?.name,
452+
ctx
453+
})
454+
}
455+
456+
function testCb(err, result) {
457+
plan.deepEqual(err, expectedErr)
458+
plan.equal(result, undefined)
459+
}
460+
461+
helper.runInTransaction(agent, () => {
462+
const event = { name, arguments: [testCb] }
463+
subscriber.channel.start.runStores(event, () => {
464+
event.arguments[0](expectedErr)
465+
})
466+
})
467+
468+
await plan.completed
469+
})
470+
331471
test('should bind callback and invoke asyncStart/asyncEnd events and propagateContext', async (t) => {
332472
const plan = tspl(t, { plan: 6 })
333473
const { agent, subscriber } = t.nr

test/versioned/amqplib/callback.test.js

Lines changed: 100 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -13,18 +13,7 @@ const { removeMatchedModules } = require('../../lib/cache-buster')
1313
const promiseResolvers = require('../../lib/promise-resolvers')
1414
const { version } = require('amqplib/package.json')
1515
const { assertPackageMetrics } = require('../../lib/custom-assertions')
16-
17-
/*
18-
TODO:
19-
20-
- promise API
21-
- callback API
22-
23-
consumer
24-
- off by default for rum
25-
- value of the attribute is limited to 255 bytes
26-
27-
*/
16+
const PROMISE_WAIT = 100
2817

2918
test('amqplib callback instrumentation', async function (t) {
3019
t.beforeEach(async function (ctx) {
@@ -399,7 +388,7 @@ test('amqplib callback instrumentation', async function (t) {
399388
})
400389

401390
await t.test('consume out of transaction', function (t, end) {
402-
const { agent, api, channel } = t.nr
391+
const { agent, channel } = t.nr
403392
const exchange = amqpUtils.DIRECT_EXCHANGE
404393
let queue = null
405394

@@ -421,17 +410,12 @@ test('amqplib callback instrumentation', async function (t) {
421410
channel.consume(
422411
queue,
423412
function (msg) {
424-
const tx = api.getTransaction()
425413
assert.ok(msg, 'should receive a message')
426414

427415
const body = msg.content.toString('utf8')
428416
assert.equal(body, 'hello', 'should receive expected body')
429417

430418
channel.ack(msg)
431-
432-
setImmediate(function () {
433-
tx.end()
434-
})
435419
},
436420
null,
437421
function (err) {
@@ -472,14 +456,108 @@ test('amqplib callback instrumentation', async function (t) {
472456
channel.consume(
473457
queue,
474458
function (msg) {
475-
const tx = api.getTransaction()
476459
api.setTransactionName('foobar')
477460

478461
channel.ack(msg)
462+
},
463+
null,
464+
function (err) {
465+
assert.ok(!err, 'should not error subscribing consumer')
479466

480-
setImmediate(function () {
481-
tx.end()
482-
})
467+
channel.publish(amqpUtils.DIRECT_EXCHANGE, 'consume-tx-key', Buffer.from('hello'))
468+
}
469+
)
470+
})
471+
})
472+
})
473+
})
474+
475+
await t.test('consume async handler', function (t, end) {
476+
const { agent, channel } = t.nr
477+
const exchange = amqpUtils.DIRECT_EXCHANGE
478+
let queue = null
479+
480+
agent.on('transactionFinished', function (tx) {
481+
amqpUtils.verifyConsumeTransaction(tx, exchange, queue, 'consume-tx-key')
482+
assert.ok(tx.trace.getDurationInMillis() >= PROMISE_WAIT, 'transaction should account for async work')
483+
end()
484+
})
485+
486+
channel.assertExchange(exchange, 'direct', null, function (err) {
487+
assert.ok(!err, 'should not error asserting exchange')
488+
489+
channel.assertQueue('', { exclusive: true }, function (err, res) {
490+
assert.ok(!err, 'should not error asserting queue')
491+
queue = res.queue
492+
493+
channel.bindQueue(queue, exchange, 'consume-tx-key', null, function (err) {
494+
assert.ok(!err, 'should not error binding queue')
495+
496+
channel.consume(
497+
queue,
498+
async function (msg) {
499+
assert.ok(msg, 'should receive a message')
500+
501+
const body = msg.content.toString('utf8')
502+
assert.equal(body, 'hello', 'should receive expected body')
503+
504+
await new Promise((resolve) => setTimeout(resolve, PROMISE_WAIT))
505+
channel.ack(msg)
506+
},
507+
null,
508+
function (err) {
509+
assert.ok(!err, 'should not error subscribing consumer')
510+
511+
channel.publish(amqpUtils.DIRECT_EXCHANGE, 'consume-tx-key', Buffer.from('hello'))
512+
}
513+
)
514+
})
515+
})
516+
})
517+
})
518+
519+
await t.test('consume async handler that rejects', function (t, end) {
520+
const { agent, channel } = t.nr
521+
const exchange = amqpUtils.DIRECT_EXCHANGE
522+
let queue = null
523+
524+
agent.on('transactionFinished', function (tx) {
525+
amqpUtils.verifyConsumeTransaction(tx, exchange, queue, 'consume-tx-key')
526+
assert.ok(tx.trace.getDurationInMillis() >= PROMISE_WAIT, 'transaction should account for async work')
527+
end()
528+
})
529+
530+
channel.assertExchange(exchange, 'direct', null, function (err) {
531+
assert.ok(!err, 'should not error asserting exchange')
532+
533+
channel.assertQueue('', { exclusive: true }, function (err, res) {
534+
assert.ok(!err, 'should not error asserting queue')
535+
queue = res.queue
536+
537+
channel.bindQueue(queue, exchange, 'consume-tx-key', null, function (err) {
538+
assert.ok(!err, 'should not error binding queue')
539+
540+
channel.consume(
541+
queue,
542+
async function (msg) {
543+
assert.ok(msg, 'should receive a message')
544+
545+
const body = msg.content.toString('utf8')
546+
assert.equal(body, 'hello', 'should receive expected body')
547+
548+
try {
549+
const err = new Error('async handler failure')
550+
await new Promise((_resolve, reject) => {
551+
setTimeout(() => {
552+
reject(err)
553+
}, PROMISE_WAIT)
554+
})
555+
assert.fail('should not resolve successfully')
556+
} catch (err) {
557+
assert.equal(err.message, 'async handler failure')
558+
} finally {
559+
channel.ack(msg)
560+
}
483561
},
484562
null,
485563
function (err) {

0 commit comments

Comments
 (0)