@@ -16,37 +16,39 @@ class LangchainRunnableStreamSubscriber extends LangchainRunnableSubscriber {
1616 this . logger . debug ( '`ai_monitoring.enabled` is set to false, stream will not be instrumented.' )
1717 return
1818 }
19-
2019 if ( ! this . streamingEnabled ) {
2120 this . logger . debug ( '`ai_monitoring.streaming.enabled` is set to false, stream will not be instrumented.' )
2221 this . agent . metrics . getOrCreateMetric ( STREAMING_DISABLED ) . incrementCallCount ( )
2322 return
2423 }
2524
2625 const ctx = this . agent . tracer . getContext ( )
27-
2826 const { transaction } = ctx
2927 if ( transaction ?. isActive ( ) !== true ) {
3028 return
3129 }
3230
33- // Extract data.
3431 const request = data ?. arguments ?. [ 0 ]
32+ // Requests via LangGraph API have the `messages` property with the
33+ // information we need, otherwise it just lives on the `request`
34+ // object directly.
35+ const userRequest = request ?. messages ? request . messages ?. [ 0 ] : request
3536 const params = data ?. arguments ?. [ 1 ] || { }
3637 const metadata = params ?. metadata ?? { }
3738 const tags = params ?. tags ?? [ ]
3839 const { result : response , error : err } = data
3940
40- // Instrument stream.
41- if ( response ?. next ) {
42- this . wrapNextHandler ( { response, ctx, request, metadata, tags } )
41+ // Note: as of 18.x `ReadableStream` is a global
42+ // eslint-disable-next-line n/no-unsupported-features/node-builtins
43+ if ( response instanceof ReadableStream ) {
44+ this . instrumentStream ( { response, ctx, request : userRequest , metadata, tags } )
4345 } else {
4446 // Input error occurred which means a stream was not created.
4547 // Skip instrumenting streaming and create Llm Events from
4648 // the data we have
4749 this . recordChatCompletionEvents ( {
4850 ctx,
49- request,
51+ request : userRequest ,
5052 err,
5153 metadata,
5254 tags
@@ -55,7 +57,7 @@ class LangchainRunnableStreamSubscriber extends LangchainRunnableSubscriber {
5557 }
5658
5759 /**
58- * Wraps the next method on the IterableReadableStream . It will also record the Llm
60+ * Wraps `read` method on the ReadableStream reader . It will also record the Llm
5961 * events when the stream is done processing.
6062 *
6163 * @param {object } params function params
@@ -65,56 +67,82 @@ class LangchainRunnableStreamSubscriber extends LangchainRunnableSubscriber {
6567 * @param {object } params.metadata metadata for the call
6668 * @param {Array } params.tags tags for the call
6769 */
68- wrapNextHandler ( { ctx, response, request, metadata, tags } ) {
70+ instrumentStream ( { ctx, response, request, metadata, tags } ) {
6971 const self = this
70- const orig = response . next
71- let content = ''
72- const { segment } = ctx
73-
74- async function wrappedIterator ( ...args ) {
75- try {
76- const result = await orig . apply ( this , args )
77- // only create Llm events when stream iteration is done
78- if ( result ?. done ) {
72+ const orig = response . getReader
73+ response . getReader = function wrapedGetReader ( ) {
74+ const reader = orig . apply ( this , arguments )
75+ const origRead = reader . read
76+ let responseContent = ''
77+ reader . read = async function wrappedRead ( ...args ) {
78+ try {
79+ const result = await origRead . apply ( this , args )
80+ if ( result ?. done ) {
81+ // only create Llm events when stream iteration is done
82+ self . recordChatCompletionEvents ( {
83+ ctx,
84+ response : responseContent ,
85+ request,
86+ metadata,
87+ tags
88+ } )
89+ } else {
90+ // Concat the streamed content
91+ responseContent = self . concatResponseContent ( result , responseContent )
92+ }
93+ return result
94+ } catch ( error ) {
7995 self . recordChatCompletionEvents ( {
8096 ctx,
8197 request,
82- response : content ,
98+ response : responseContent ,
8399 metadata,
84- tags
100+ tags,
101+ err : error
85102 } )
86- } else {
87- // Concat the streamed content
88- if ( typeof result ?. value ?. content === 'string' ) {
89- // LangChain BaseMessageChunk case
90- content += result . value . content
91- } else if ( typeof result ?. value === 'string' ) {
92- // Base LangChain case
93- content += result . value
94- } else if ( typeof result ?. value ?. [ 0 ] === 'string' ) {
95- // Array parser case
96- content += result . value [ 0 ]
97- }
103+ throw error
104+ } finally {
105+ // update segment duration on every stream
106+ // iteration to extend the timer
107+ ctx . segment . touch ( )
98108 }
99- return result
100- } catch ( error ) {
101- self . recordChatCompletionEvents ( {
102- ctx,
103- request,
104- response : content ,
105- metadata,
106- tags,
107- err : error
108- } )
109- throw error
110- } finally {
111- // update segment duration on every stream iteration to extend
112- // the timer
113- segment . touch ( )
114109 }
110+ return reader
111+ }
112+ }
113+
114+ /**
115+ * Concats streamed content from various LangChain/LangGraph result formats.
116+ *
117+ * @param {object } result the stream result chunk
118+ * @param {string|object } content the response so far
119+ * @returns {string|object } updated response content. For LangGraph, it will return an object
120+ * (e.g. AIMessage), so we have more info if we need to drop this response if it is incomplete
121+ * (e.g outgoing tool call).
122+ */
123+ concatResponseContent ( result , content ) {
124+ if ( result ?. value ?. messages || result ?. value ?. agent ?. messages ) {
125+ // LangGraph case:
126+ // The result.value.%messages field contains all messages,
127+ // request and response, and appends new events at the
128+ // end of the array. Therefore, the last message is the
129+ // relevant response object.
130+ const langgraphMessages = result ?. value ?. messages ?? result ?. value ?. agent ?. messages
131+ if ( langgraphMessages . length > 0 ) {
132+ content = langgraphMessages [ langgraphMessages . length - 1 ]
133+ }
134+ } else if ( typeof result ?. value ?. content === 'string' ) {
135+ // LangChain MessageChunk case
136+ content += result . value . content
137+ } else if ( typeof result ?. value === 'string' ) {
138+ // Base LangChain case
139+ content += result . value
140+ } else if ( typeof result ?. value ?. [ 0 ] === 'string' ) {
141+ // LangChain array parser case
142+ content += result . value [ 0 ]
115143 }
116144
117- response . next = this . agent . tracer . bindFunction ( wrappedIterator , ctx , false )
145+ return content
118146 }
119147}
120148
0 commit comments