Skip to content

Commit aa3f8d9

Browse files
authored
refactor: Updated subscribers to have a common createSegment that creates, assigns attributes, starts segment (newrelic#3317)
1 parent e45ab27 commit aa3f8d9

File tree

5 files changed

+205
-26
lines changed

5 files changed

+205
-26
lines changed

lib/subscribers/base.js

Lines changed: 68 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,12 @@
77
// eslint-disable-next-line n/no-unsupported-features/node-builtins
88
const { tracingChannel } = require('node:diagnostics_channel')
99

10+
/**
11+
* Base class for defining a subscriber.
12+
* events property is an array with the following possible event names:
13+
* `start`, `end`, `asyncStart`, `asyncEnd`, `error`
14+
* @link https://nodejs.org/api/diagnostics_channel.html#class-tracingchannel
15+
*/
1016
class Subscriber {
1117
constructor({ agent, logger, packageName, channelName }) {
1218
this.agent = agent
@@ -23,31 +29,88 @@ class Subscriber {
2329
this.store = agent.tracer._contextManager._asyncLocalStorage
2430
}
2531

32+
/**
33+
* Creates a segment with a name, parent, transaction and optional recorder.
34+
* If the segment is successfully created, it will be started and added to the context.
35+
* @param {Object} params - Parameters for creating the segment
36+
* @param {string} params.name - The name of the segment
37+
* @param {Object} params.recorder - Optional recorder for the segment
38+
* @param {Context} params.ctx - The context containing the parent segment and transaction
39+
* @returns {Context} - The updated context with the new segment or existing context if segment creation fails
40+
*/
41+
createSegment({ name, recorder, ctx }) {
42+
const segment = this.agent.tracer.createSegment({
43+
name,
44+
parent: ctx?.segment,
45+
recorder,
46+
transaction: ctx?.transaction,
47+
})
48+
49+
if (segment) {
50+
segment.opaque = this.opaque
51+
segment.start()
52+
this.logger.trace('Created segment %s', name)
53+
this.addAttributes(segment)
54+
const newCtx = ctx.enterSegment({ segment })
55+
return newCtx
56+
} else {
57+
this.logger.trace('Failed to create segment for %s', name)
58+
return ctx
59+
}
60+
}
61+
62+
/**
63+
* By default this is a no-op, but can be overridden by subclasses
64+
* @param {Segment} segment - The segment to which attributes will be added
65+
* @returns {void}
66+
*/
67+
addAttributes(segment) {
68+
69+
}
70+
71+
/**
72+
* Checks if the subscriber is enabled based on the agent's configuration.
73+
*/
2674
get enabled() {
2775
return this.config.instrumentation[this.packageName].enabled === true
2876
}
2977

78+
/**
79+
* Enables the subscriber by binding the store to the channel and setting up the handler.
80+
* If the subscriber requires an active transaction, it will check the context before passing the event to the handler.
81+
* @returns {Context} - The context after processing the event
82+
*/
3083
enable() {
3184
this.channel.start.bindStore(this.store, (data) => {
3285
const ctx = this.agent.tracer.getContext()
3386
if (this.requireActiveTx && !ctx?.transaction?.isActive()) {
3487
this.logger.debug('Not recording event for %s, transaction is not active', this.package)
35-
return
88+
return ctx
3689
}
3790

3891
return this.handler(data, ctx)
3992
})
4093
}
4194

95+
/**
96+
* Disables the subscriber by unbinding the store from the channel.
97+
*/
4298
disable() {
4399
this.channel.start.unbindStore(this.store)
44100
}
45101

102+
/**
103+
* Common handler for when async events end.
104+
* It gets the context and touches the segment if it exists.
105+
*/
46106
asyncEnd() {
47107
const ctx = this.agent.tracer.getContext()
48108
ctx?.segment?.touch()
49109
}
50110

111+
/*
112+
* Subscribes to the events defined in the `events` array.
113+
*/
51114
subscribe() {
52115
this.subscriptions = this.events.reduce((events, curr) => {
53116
events[curr] = this[curr].bind(this)
@@ -57,8 +120,12 @@ class Subscriber {
57120
this.channel.subscribe(this.subscriptions)
58121
}
59122

123+
/**
124+
* Unsubscribes from the events defined in the `events` array..
125+
*/
60126
unsubscribe() {
61127
this.channel.unsubscribe(this.subscriptions)
128+
this.subscriptions = null
62129
}
63130
}
64131

lib/subscribers/db-operation.js

Lines changed: 2 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -10,16 +10,11 @@ const { DB } = require('../metrics/names')
1010
class DbOperationSubscriber extends DbSubscriber {
1111
handler(data, ctx) {
1212
const name = `${DB.OPERATION}/${this.system}/${this.operation}`
13-
const segment = this.agent.tracer.createSegment({
13+
return this.createSegment({
1414
name,
15-
parent: ctx.segment,
15+
ctx,
1616
recorder: recordOperationMetrics.bind(this),
17-
transaction: ctx.transaction
1817
})
19-
20-
this.addAttributes(segment)
21-
const newCtx = ctx.enterSegment({ segment })
22-
return newCtx
2318
}
2419
}
2520

lib/subscribers/db-query.js

Lines changed: 2 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -14,18 +14,11 @@ class DbQuerySubscriber extends DbSubscriber {
1414
const queryString = this.queryString
1515
const parsed = this.parseQueryString(queryString)
1616
const name = `${DB.STATEMENT}/${this.system}/${parsed.collection}/${parsed.operation}`
17-
const segment = this.agent.tracer.createSegment({
17+
return this.createSegment({
1818
name,
19-
parent: ctx.segment,
19+
ctx,
2020
recorder: recordQueryMetrics.bind(parsed),
21-
transaction: ctx.transaction
2221
})
23-
24-
segment.opaque = this.opaque
25-
26-
this.addAttributes(segment)
27-
const newCtx = ctx.enterSegment({ segment })
28-
return newCtx
2922
}
3023

3124
parseQueryString(queryString) {

lib/subscribers/mcp-sdk/client.js

Lines changed: 3 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -11,24 +11,18 @@ class McpClientSubscriber extends Subscriber {
1111
constructor({ agent, logger, channelName }) {
1212
super({ agent, logger, packageName: '@modelcontextprotocol/sdk', channelName })
1313
this.events = ['asyncEnd']
14-
this.requireActiveTx = true
1514
this.segmentName = 'Unknown'
1615
}
1716

1817
get enabled() {
19-
return this.config.instrumentation[this.packageName].enabled === true &&
20-
this.config.ai_monitoring.enabled === true
18+
return super.enabled && this.config.ai_monitoring.enabled === true
2119
}
2220

2321
handler(ctx) {
24-
const segment = this.agent.tracer.createSegment({
22+
return this.createSegment({
2523
name: this.segmentName,
26-
parent: ctx.segment,
27-
transaction: ctx.transaction
24+
ctx
2825
})
29-
30-
const newCtx = ctx.enterSegment({ segment })
31-
return newCtx
3226
}
3327
}
3428

Lines changed: 130 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,130 @@
1+
/*
2+
* Copyright 2025 New Relic Corporation. All rights reserved.
3+
* SPDX-License-Identifier: Apache-2.0
4+
*/
5+
6+
'use strict'
7+
const assert = require('node:assert')
8+
const test = require('node:test')
9+
const Subscriber = require('#agentlib/subscribers/base.js')
10+
const helper = require('#testlib/agent_helper.js')
11+
const loggerMock = require('../../mocks/logger')
12+
const { tspl } = require('@matteo.collina/tspl')
13+
14+
test.beforeEach((ctx) => {
15+
const agent = helper.loadMockedAgent()
16+
const logger = loggerMock()
17+
const subscriber = new Subscriber({ agent, logger, packageName: 'test-package', channelName: 'test-channel' })
18+
ctx.nr = { agent, subscriber }
19+
})
20+
21+
test.afterEach((ctx) => {
22+
helper.unloadAgent(ctx.nr.agent)
23+
})
24+
25+
test('should define default properties to subscriber', (t) => {
26+
const { subscriber } = t.nr
27+
assert.ok(subscriber.agent)
28+
assert.ok(subscriber.logger)
29+
assert.ok(subscriber.config)
30+
assert.equal(subscriber.packageName, 'test-package')
31+
assert.equal(subscriber.channelName, 'test-channel')
32+
assert.deepEqual(subscriber.events, [])
33+
assert.equal(subscriber.opaque, false)
34+
assert.equal(subscriber.prefix, 'orchestrion:')
35+
assert.equal(subscriber.requireActiveTx, true)
36+
assert.equal(subscriber.id, 'orchestrion:test-package:test-channel')
37+
assert.ok(subscriber.channel)
38+
assert.ok(subscriber.store)
39+
})
40+
41+
test('addAttributes should not crash', (t) => {
42+
const { subscriber } = t.nr
43+
assert.doesNotThrow(() => {
44+
subscriber.addAttributes({})
45+
})
46+
})
47+
48+
test('enabled should return true if package is enabled', (t) => {
49+
const { subscriber } = t.nr
50+
subscriber.config.instrumentation['test-package'] = { enabled: true }
51+
assert.equal(subscriber.enabled, true)
52+
})
53+
54+
test('enabled should return false if package is not enabled', (t) => {
55+
const { subscriber } = t.nr
56+
subscriber.config.instrumentation['test-package'] = { enabled: false }
57+
assert.equal(subscriber.enabled, false)
58+
})
59+
60+
test('should create segment if active tx with proper parent', async (t) => {
61+
const { agent, subscriber } = t.nr
62+
const plan = tspl(t, { plan: 7 })
63+
subscriber.addAttributes = (segment) => {
64+
plan.equal(segment.name, 'test-segment')
65+
}
66+
helper.runInTransaction(agent, async () => {
67+
const ctx = agent.tracer.getContext()
68+
const newCtx = subscriber.createSegment({
69+
name: 'test-segment',
70+
ctx,
71+
})
72+
73+
plan.ok(newCtx)
74+
plan.equal(newCtx.transaction.id, ctx.transaction.id)
75+
const segment = newCtx.segment
76+
plan.equal(segment.name, 'test-segment')
77+
plan.equal(segment.parentId, ctx.segment.id)
78+
plan.equal(segment.opaque, false)
79+
// indicates that the segment timer is running
80+
plan.equal(segment.timer.state, 2)
81+
})
82+
83+
await plan.completed
84+
})
85+
86+
test('should not create segment if no active tx', (t) => {
87+
const { agent, subscriber } = t.nr
88+
const ctx = agent.tracer.getContext()
89+
const newCtx = subscriber.createSegment({
90+
name: 'test-segment',
91+
ctx,
92+
})
93+
94+
assert.deepEqual(newCtx, ctx)
95+
assert.ok(!newCtx.segment)
96+
})
97+
98+
test('should touch segment when asyncEnd is called', (t, end) => {
99+
const { agent, subscriber } = t.nr
100+
helper.runInTransaction(agent, () => {
101+
const ctx = agent.tracer.getContext()
102+
const segment = ctx.segment
103+
assert.equal(segment.timer.state, 2)
104+
assert.equal(segment.timer.touched, false)
105+
subscriber.asyncEnd()
106+
assert.equal(segment.timer.state, 2)
107+
assert.equal(segment.timer.touched, true)
108+
end()
109+
})
110+
})
111+
112+
test('should subscribe/unsubscribe to specific events on channel', (t) => {
113+
const { subscriber } = t.nr
114+
subscriber.bogus = () => {}
115+
subscriber.start = () => {}
116+
subscriber.end = () => {}
117+
subscriber.events = ['bogus', 'start', 'end']
118+
subscriber.subscribe()
119+
assert.equal(subscriber.channel.start.hasSubscribers, true)
120+
assert.equal(subscriber.channel.end.hasSubscribers, true)
121+
assert.ok(!subscriber.channel.bogus)
122+
assert.ok(subscriber.subscriptions.bogus)
123+
assert.ok(subscriber.subscriptions.start)
124+
assert.ok(subscriber.subscriptions.end)
125+
subscriber.unsubscribe()
126+
assert.equal(subscriber.channel.start.hasSubscribers, false)
127+
assert.equal(subscriber.channel.end.hasSubscribers, false)
128+
assert.ok(!subscriber.channel.bogus)
129+
assert.equal(subscriber.subscriptions, null)
130+
})

0 commit comments

Comments
 (0)