33 * SPDX-License-Identifier: Apache-2.0
44 */
55
6- const LangGraphSubscriber = require ( './base' )
6+ const AiMonitoringSubscriber = require ( '../ai-monitoring /base' )
77const { DESTINATIONS } = require ( '#agentlib/config/attribute-filter.js' )
88const LlmErrorMessage = require ( '#agentlib/llm-events/error-message.js' )
99const LangGraphAgentEvent = require ( '#agentlib/llm-events/langgraph/agent.js' )
1010const { AI : { LANGGRAPH } } = require ( '#agentlib/metrics/names.js' )
1111
12- class LangGraphStreamSubscriber extends LangGraphSubscriber {
12+ class LangGraphStreamSubscriber extends AiMonitoringSubscriber {
1313 constructor ( { agent, logger } ) {
14- super ( { agent, logger, channelName : 'nr_stream' } )
14+ super ( { agent,
15+ logger,
16+ packageName : '@langchain/langgraph' ,
17+ channelName : 'nr_stream' ,
18+ name : `${ LANGGRAPH . AGENT } /stream` ,
19+ trackingPrefix : LANGGRAPH . TRACKING_PREFIX } )
20+ this . events = [ 'asyncEnd' ]
1521 }
1622
1723 get enabled ( ) {
1824 return super . enabled && this . agent . config . ai_monitoring . streaming . enabled
1925 }
2026
2127 handler ( data , ctx ) {
22- if ( ! this . enabled ) {
23- this . logger . debug ( 'LangGraph streaming instrumentation is disabled, not creating segment.' )
24- return ctx
25- }
26-
27- // Create segment and return the context with it.
28- const agentName = data ?. self ?. name ?? 'agent'
29- return this . createSegment ( {
30- name : `${ LANGGRAPH . AGENT } /stream/${ agentName } ` ,
31- ctx
32- } )
28+ // Update segment name with LangGraph agent name.
29+ const aiAgentName = data ?. self ?. name ?? 'agent'
30+ this . name = `${ LANGGRAPH . AGENT } /stream/${ aiAgentName } `
31+ return super . handler ( data , ctx )
3332 }
3433
3534 asyncEnd ( data ) {
@@ -44,29 +43,31 @@ class LangGraphStreamSubscriber extends LangGraphSubscriber {
4443 if ( transaction ?. isActive ( ) !== true ) {
4544 return
4645 }
47- const { moduleVersion : pkgVersion , error : initialErr , result : stream } = data
48- const name = data ?. self ?. name ?? 'agent'
46+
47+ // Extract data
48+ const { error : initialErr , result : stream } = data
49+ const aiAgentName = data ?. self ?. name ?? 'agent'
4950
5051 // Mark as LLM event
5152 transaction . trace . attributes . addAttribute ( DESTINATIONS . TRANS_EVENT , 'llm' , true )
5253
5354 // Add AI Agent component attribute
54- segment . addSpanAttribute ( 'subcomponent' , `{"type": "APM-AI_AGENT", "name": ${ name } }` )
55+ segment . addSpanAttribute ( 'subcomponent' , `{"type": "APM-AI_AGENT", "name": ${ aiAgentName } }` )
5556
5657 // If error already:
5758 if ( initialErr ) {
58- this . recordAgentEvent ( { name , transaction, segment, pkgVersion , error : initialErr } )
59+ this . recordAiAgentEvent ( { aiAgentName , transaction, segment, error : initialErr } )
5960 return
6061 }
6162
6263 // Note: as of 18.x `ReadableStream` is a global
6364 // eslint-disable-next-line n/no-unsupported-features/node-builtins
6465 if ( stream instanceof ReadableStream ) {
65- this . wrapNextHandler ( { stream, segment, transaction, pkgVersion , name } )
66+ this . wrapNextHandler ( { stream, segment, transaction, aiAgentName } )
6667 }
6768 }
6869
69- wrapNextHandler ( { stream, segment, transaction, pkgVersion , name } ) {
70+ wrapNextHandler ( { stream, segment, transaction, aiAgentName } ) {
7071 const self = this
7172 const orig = stream . getReader
7273 stream . getReader = function wrappedGetReaderLangGraph ( ) {
@@ -76,11 +77,11 @@ class LangGraphStreamSubscriber extends LangGraphSubscriber {
7677 try {
7778 const result = await origRead . apply ( this , args )
7879 if ( result ?. done ) {
79- self . recordAgentEvent ( { name , transaction, segment, pkgVersion , error : false } )
80+ self . recordAiAgentEvent ( { aiAgentName , transaction, segment, error : false } )
8081 }
8182 return result
8283 } catch ( err ) {
83- self . recordAgentEvent ( { name , transaction, segment, pkgVersion , error : err } )
84+ self . recordAiAgentEvent ( { aiAgentName , transaction, segment, error : err } )
8485 throw err
8586 } finally {
8687 // update segment duration on every stream iteration to extend
@@ -92,18 +93,28 @@ class LangGraphStreamSubscriber extends LangGraphSubscriber {
9293 }
9394 }
9495
95- recordAgentEvent ( { name, transaction, segment, pkgVersion, error } ) {
96+ /**
97+ * Records a LangGraph `LlmAgent` event and a `LlmErrorMessage` event
98+ * if an error occurred.
99+ *
100+ * @param {object } params function parameters
101+ * @param {string } params.aiAgentName LangGraph AI agent name
102+ * @param {object } params.transaction current transaction
103+ * @param {object } params.segment current segment
104+ * @param {object } [params.error] an error if it occurred
105+ */
106+ recordAiAgentEvent ( { aiAgentName, transaction, segment, error } ) {
96107 const { agent } = this
97108 segment . end ( )
98109 const agentEvent = new LangGraphAgentEvent ( {
99110 agent,
100- name,
111+ name : aiAgentName ,
101112 transaction,
102113 segment,
103- error : error !== null
114+ error : ! ! error
104115 } )
105116
106- this . recordEvent ( { type : 'LlmAgent' , pkgVersion , msg : agentEvent } )
117+ this . recordEvent ( { type : 'LlmAgent' , msg : agentEvent } )
107118
108119 if ( error ) {
109120 agent . errors . add (
@@ -112,7 +123,7 @@ class LangGraphStreamSubscriber extends LangGraphSubscriber {
112123 new LlmErrorMessage ( {
113124 response : { } ,
114125 cause : error ,
115- agent : agentEvent
126+ aiAgent : agentEvent
116127 } )
117128 )
118129 }
0 commit comments