Skip to content

Commit 2e336d3

Browse files
committed
Refactor stream concatenation, add streaming disabled metric
1 parent a8a239a commit 2e336d3

File tree

4 files changed

+156
-124
lines changed

4 files changed

+156
-124
lines changed

lib/subscribers/langchain/runnable-stream.js

Lines changed: 58 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,6 @@ class LangchainRunnableStreamSubscriber extends LangchainRunnableSubscriber {
1212
}
1313

1414
asyncEnd(data) {
15-
// Exit early if disabled.
1615
if (!this.enabled) {
1716
this.logger.debug('`ai_monitoring.enabled` is set to false, stream will not be instrumented.')
1817
return
@@ -23,15 +22,16 @@ class LangchainRunnableStreamSubscriber extends LangchainRunnableSubscriber {
2322
return
2423
}
2524

26-
// Get context.
2725
const ctx = this.agent.tracer.getContext()
2826
const { transaction } = ctx
2927
if (transaction?.isActive() !== true) {
3028
return
3129
}
3230

33-
// Extract data.
3431
const request = data?.arguments?.[0]
32+
// Requests via LangGraph API have the `messages` property with the
33+
// information we need, otherwise it just lives on the `request`
34+
// object directly.
3535
const userRequest = request?.messages ? request.messages?.[0] : request
3636
const params = data?.arguments?.[1] || {}
3737
const metadata = params?.metadata ?? {}
@@ -41,7 +41,7 @@ class LangchainRunnableStreamSubscriber extends LangchainRunnableSubscriber {
4141
// Note: as of 18.x `ReadableStream` is a global
4242
// eslint-disable-next-line n/no-unsupported-features/node-builtins
4343
if (response instanceof ReadableStream) {
44-
this.wrapNextHandler({ response, ctx, request: userRequest, metadata, tags })
44+
this.instrumentStream({ response, ctx, request: userRequest, metadata, tags })
4545
} else {
4646
// Input error occurred which means a stream was not created.
4747
// Skip instrumenting streaming and create Llm Events from
@@ -67,22 +67,22 @@ class LangchainRunnableStreamSubscriber extends LangchainRunnableSubscriber {
6767
* @param {object} params.metadata metadata for the call
6868
* @param {Array} params.tags tags for the call
6969
*/
70-
wrapNextHandler({ ctx, response, request, metadata, tags }) {
70+
instrumentStream({ ctx, response, request, metadata, tags }) {
7171
const self = this
7272
const orig = response.getReader
7373
response.getReader = function wrapedGetReader() {
7474
const reader = orig.apply(this, arguments)
7575
const origRead = reader.read
76-
let content = ''
77-
let langgraphMessages = []
76+
const accumulator = { content: '', langgraphMessages: [] }
7877
reader.read = async function wrappedRead(...args) {
7978
try {
8079
const result = await origRead.apply(this, args)
8180
if (result?.done) {
8281
// only create Llm events when stream iteration is done
83-
const responseMsgs = langgraphMessages.length > 0
84-
? langgraphMessages.filter((msg) => msg.constructor?.name !== 'HumanMessage')
85-
: content
82+
const responseMsgs = self.getResponseMessages(
83+
accumulator.langgraphMessages,
84+
accumulator.content
85+
)
8686
self.recordChatCompletionEvents({
8787
ctx,
8888
response: responseMsgs,
@@ -92,29 +92,14 @@ class LangchainRunnableStreamSubscriber extends LangchainRunnableSubscriber {
9292
})
9393
} else {
9494
// Concat the streamed content
95-
if (result?.value?.messages || result?.value?.agent?.messages) {
96-
// LangGraph case:
97-
// The result.value.messages field contains all messages,
98-
// request and response, and adds new events for the length
99-
// of the stream. The last iteration will contain all messages
100-
// in the stream so we can just re-assign it.
101-
langgraphMessages = result?.value?.messages ?? result?.value?.agent?.messages
102-
} else if (typeof result?.value?.content === 'string') {
103-
// LangChain MessageChunk case
104-
content += result.value.content
105-
} else if (typeof result?.value === 'string') {
106-
// Base LangChain case
107-
content += result.value
108-
} else if (typeof result?.value?.[0] === 'string') {
109-
// Array parser case
110-
content += result.value[0]
111-
}
95+
self.accumulateStreamContent(result, accumulator)
11296
}
11397
return result
11498
} catch (error) {
115-
const responseMsgs = langgraphMessages.length > 0
116-
? langgraphMessages.filter((msg) => msg.constructor?.name !== 'HumanMessage')
117-
: content
99+
const responseMsgs = self.getResponseMessages(
100+
accumulator.langgraphMessages,
101+
accumulator.content
102+
)
118103
self.recordChatCompletionEvents({
119104
ctx,
120105
request,
@@ -125,14 +110,55 @@ class LangchainRunnableStreamSubscriber extends LangchainRunnableSubscriber {
125110
})
126111
throw error
127112
} finally {
128-
// update segment duration on every stream iteration to extend
129-
// the timer
113+
// update segment duration on every stream
114+
// iteration to extend the timer
130115
ctx.segment.touch()
131116
}
132117
}
133118
return reader
134119
}
135120
}
121+
122+
/**
123+
* Accumulates streamed content from various LangChain/LangGraph result formats.
124+
*
125+
* @param {object} result the stream result chunk
126+
* @param {object} accumulator object containing content string and langgraphMessages array
127+
*/
128+
accumulateStreamContent(result, accumulator) {
129+
if (result?.value?.messages || result?.value?.agent?.messages) {
130+
// LangGraph case:
131+
// The result.value.messages field contains all messages,
132+
// request and response, and adds new events for the length
133+
// of the stream. The last iteration will contain all messages
134+
// in the stream, so we can just re-assign `langgraphMessages`.
135+
accumulator.langgraphMessages = result?.value?.messages ?? result?.value?.agent?.messages
136+
} else if (typeof result?.value?.content === 'string') {
137+
// LangChain MessageChunk case
138+
accumulator.content += result.value.content
139+
} else if (typeof result?.value === 'string') {
140+
// Base LangChain case
141+
accumulator.content += result.value
142+
} else if (typeof result?.value?.[0] === 'string') {
143+
// LangChain array parser case
144+
accumulator.content += result.value[0]
145+
}
146+
}
147+
148+
/**
149+
* Extracts response messages from accumulated stream data.
150+
* For LangGraph, filters out HumanMessages.
151+
* For LangChain, returns concatenated content.
152+
*
153+
* @param {object[]} langgraphMessages accumulated LangGraph messages
154+
* @param {string} content accumulated LangChain content
155+
* @returns {object[]|string} the response messages
156+
*/
157+
getResponseMessages(langgraphMessages, content) {
158+
return langgraphMessages.length > 0
159+
? langgraphMessages.filter((msg) => msg.constructor?.name !== 'HumanMessage')
160+
: content
161+
}
136162
}
137163

138164
module.exports = LangchainRunnableStreamSubscriber

lib/subscribers/langchain/runnable.js

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,9 @@ class LangchainRunnableSubscriber extends AiMonitoringChatSubscriber {
2020
asyncEnd(data) {
2121
const ctx = this.agent.tracer.getContext()
2222
const request = data?.arguments?.[0]
23+
// Requests via LangGraph API have the `messages` property with the
24+
// information we need, otherwise it just lives on the `request`
25+
// object directly.
2326
const userRequest = request?.messages ? request.messages?.[0] : request
2427
const params = data?.arguments?.[1] || {}
2528
const { result: response, error: err } = data

lib/subscribers/langgraph/graph-stream.js

Lines changed: 21 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -4,38 +4,44 @@
44
*/
55

66
const AiMonitoringSubscriber = require('../ai-monitoring/base')
7-
const { DESTINATIONS } = require('#agentlib/config/attribute-filter.js')
87
const LlmErrorMessage = require('#agentlib/llm-events/error-message.js')
98
const LangGraphAgentEvent = require('#agentlib/llm-events/langgraph/agent.js')
10-
const { AI: { LANGGRAPH } } = require('#agentlib/metrics/names.js')
9+
const { AI: { LANGGRAPH, STREAMING_DISABLED } } = require('#agentlib/metrics/names.js')
1110

1211
class LangGraphStreamSubscriber extends AiMonitoringSubscriber {
1312
constructor({ agent, logger }) {
1413
super({ agent,
1514
logger,
1615
packageName: '@langchain/langgraph',
1716
channelName: 'nr_stream',
18-
name: `${LANGGRAPH.AGENT}/stream`,
17+
// 'agent' is the default name for an unknown AI
18+
// agent or one with no name
19+
name: `${LANGGRAPH.AGENT}/stream/agent`,
1920
trackingPrefix: LANGGRAPH.TRACKING_PREFIX })
2021
this.events = ['asyncEnd']
2122
}
2223

23-
get enabled() {
24-
return super.enabled && this.agent.config.ai_monitoring.streaming.enabled
24+
get streamingEnabled() {
25+
return this.agent.config.ai_monitoring.streaming.enabled
2526
}
2627

2728
handler(data, ctx) {
28-
// Update segment name with LangGraph agent name.
29-
const aiAgentName = data?.self?.name ?? 'agent'
30-
this.name = `${LANGGRAPH.AGENT}/stream/${aiAgentName}`
29+
// Store LangGraph AI agent name and update
30+
// segment name to use it.
31+
this.aiAgentName = data?.self?.name ?? 'agent'
32+
this.name = `${LANGGRAPH.AGENT}/stream/${this.aiAgentName}`
3133
return super.handler(data, ctx)
3234
}
3335

3436
asyncEnd(data) {
35-
// Get constants and exit early if need be
36-
const { agent, logger } = this
37+
const { agent, logger, aiAgentName } = this
3738
if (!this.enabled) {
38-
logger.debug('LangGraph streaming instrumentation is disabled, not recording Llm events.')
39+
logger.debug('LangGraph instrumentation is disabled, not instrumenting stream.')
40+
return
41+
}
42+
if (!this.streamingEnabled) {
43+
logger.debug('LangGraph streaming instrumentation is disabled, not instrumenting stream.')
44+
this.agent.metrics.getOrCreateMetric(STREAMING_DISABLED).incrementCallCount()
3945
return
4046
}
4147
const ctx = agent.tracer.getContext()
@@ -44,17 +50,9 @@ class LangGraphStreamSubscriber extends AiMonitoringSubscriber {
4450
return
4551
}
4652

47-
// Extract data
4853
const { error: initialErr, result: stream } = data
49-
const aiAgentName = data?.self?.name ?? 'agent'
50-
51-
// Mark as LLM event
52-
transaction.trace.attributes.addAttribute(DESTINATIONS.TRANS_EVENT, 'llm', true)
53-
54-
// Add AI Agent component attribute
5554
segment.addSpanAttribute('subcomponent', `{"type": "APM-AI_AGENT", "name": ${aiAgentName}}`)
5655

57-
// If error already:
5856
if (initialErr) {
5957
this.recordAiAgentEvent({ aiAgentName, transaction, segment, error: initialErr })
6058
return
@@ -63,11 +61,11 @@ class LangGraphStreamSubscriber extends AiMonitoringSubscriber {
6361
// Note: as of 18.x `ReadableStream` is a global
6462
// eslint-disable-next-line n/no-unsupported-features/node-builtins
6563
if (stream instanceof ReadableStream) {
66-
this.wrapNextHandler({ stream, segment, transaction, aiAgentName })
64+
this.instrumentStream({ stream, segment, transaction, aiAgentName })
6765
}
6866
}
6967

70-
wrapNextHandler({ stream, segment, transaction, aiAgentName }) {
68+
instrumentStream({ stream, segment, transaction, aiAgentName }) {
7169
const self = this
7270
const orig = stream.getReader
7371
stream.getReader = function wrappedGetReaderLangGraph() {
@@ -84,8 +82,8 @@ class LangGraphStreamSubscriber extends AiMonitoringSubscriber {
8482
self.recordAiAgentEvent({ aiAgentName, transaction, segment, error: err })
8583
throw err
8684
} finally {
87-
// update segment duration on every stream iteration to extend
88-
// the timer
85+
// update segment duration on every stream
86+
// iteration to extend the timer
8987
segment.touch()
9088
}
9189
}

0 commit comments

Comments
 (0)