55
66'use strict'
77
8- const { geminiApiKey } = require ( '../../../lib/symbols' )
98const {
109 LlmChatCompletionMessage,
1110 LlmChatCompletionSummary,
@@ -17,6 +16,7 @@ const { extractLlmContext } = require('../../util/llm-utils')
1716
1817const { AI } = require ( '../../../lib/metrics/names' )
1918const { GEMINI } = AI
19+ const { DESTINATIONS } = require ( '../../config/attribute-filter' )
2020let TRACKING_METRIC = GEMINI . TRACKING_PREFIX
2121
2222/**
@@ -158,8 +158,6 @@ module.exports = function initialize(agent, googleGenAi, moduleName, shim) {
158158 /**
159159 * Instruments chat completion creation
160160 * and creates the LLM events
161- *
162- * **Note**: Currently only for promises. streams will come later
163161 */
164162 shim . record ( models . prototype , 'generateContentInternal' ,
165163 function wrapGenerateContent ( shim , func , name , args ) {
@@ -185,54 +183,76 @@ module.exports = function initialize(agent, googleGenAi, moduleName, shim) {
185183 }
186184 )
187185
188- shim . record ( models . prototype , 'generateContentStreamInternal' , function wrapGenerateContentStream ( shim , func , name , args ) {
189- if ( ! agent . config . ai_monitoring . streaming . enabled ) {
190- shim . logger . warn (
191- '`ai_monitoring.streaming.enabled` is set to `false`, stream will not be instrumented.'
192- )
193- agent . metrics . getOrCreateMetric ( AI . STREAMING_DISABLED ) . incrementCallCount ( )
194- return
195- }
186+ /*
187+ * Chat completions create can return a stream once promise resolves
188+ * This wraps the iterator which is a generator function
189+ * We will call the original iterator, intercept chunks and yield
190+ * to the original. On complete we will construct the new message object
191+ * with what we have seen in the stream and create the chat completion
192+ * messages
193+ */
194+
195+ // TODO: might need to instrument processAfcStream too
196+ // https://github.com/googleapis/js-genai/blob/cd0454862b4a0251d2606eeca8500b3b76004944/src/models.ts#L183
197+ shim . record ( models . prototype , 'generateContentStreamInternal' ,
198+ function wrapGenerateContentStream ( shim , func , name , args ) {
199+ if ( ! agent . config . ai_monitoring . streaming . enabled ) {
200+ shim . logger . warn (
201+ '`ai_monitoring.streaming.enabled` is set to `false`, stream will not be instrumented.'
202+ )
203+ agent . metrics . getOrCreateMetric ( AI . STREAMING_DISABLED ) . incrementCallCount ( )
204+ return
205+ }
206+ const [ request ] = args
196207
197- shim . wrap ( response , 'iterator' , function wrapIterator ( shim , orig ) {
198- return async function * wrappedIterator ( ) {
199- let content = ''
200- let role = ''
201- let chunk
202- let err
203- try {
204- const iterator = orig . apply ( this , arguments )
205-
206- for await ( chunk of iterator ) {
207- if ( chunk . choices ?. [ 0 ] ?. delta ?. role ) {
208- role = chunk . choices [ 0 ] . delta . role
208+ return new RecorderSpec ( {
209+ name : GEMINI . COMPLETION ,
210+ promise : true ,
211+ after ( { error : err , result : response , segment, transaction } ) {
212+ // Symbol.asyncIterator
213+ // FIXME: it's causing recursion
214+ shim . wrap ( response , Symbol . asyncIterator , function wrapIterator ( shim , orig ) {
215+ const originalAsyncIterator = orig
216+ return async function * wrappedIterator ( ) {
217+ let content = ''
218+ let role = ''
219+ let chunk
220+ let err
221+ try {
222+ const iterator = originalAsyncIterator . apply ( this , arguments )
223+ for await ( chunk of iterator ) {
224+ if ( chunk . choices ?. [ 0 ] ?. delta ?. role ) {
225+ role = chunk . choices [ 0 ] . delta . role
226+ }
227+
228+ content += chunk . choices ?. [ 0 ] ?. delta ?. content ?? ''
229+ yield chunk
230+ }
231+ } catch ( streamErr ) {
232+ err = streamErr
233+ } finally {
234+ chunk . choices [ 0 ] . message = { role, content }
235+ // update segment duration since we want to extend the time it took to
236+ // handle the stream
237+ segment . touch ( )
238+
239+ recordChatCompletionMessages ( {
240+ agent : shim . agent ,
241+ shim,
242+ segment,
243+ transaction,
244+ request,
245+ response : chunk ,
246+ err
247+ } )
248+
249+ addLlmMeta ( { agent, transaction } )
250+ }
209251 }
210-
211- content += chunk . choices ?. [ 0 ] ?. delta ?. content ?? ''
212- yield chunk
213- }
214- } catch ( streamErr ) {
215- err = streamErr
216- throw err
217- } finally {
218- chunk . choices [ 0 ] . message = { role, content }
219- // update segment duration since we want to extend the time it took to
220- // handle the stream
221- segment . touch ( )
222-
223- recordChatCompletionMessages ( {
224- agent : shim . agent ,
225- shim,
226- segment,
227- transaction,
228- request,
229- response : chunk ,
230- err
231252 } )
232253 }
233- }
254+ } )
234255 } )
235- } )
236256
237257 /**
238258 * Instruments embedding creation
@@ -282,6 +302,4 @@ module.exports = function initialize(agent, googleGenAi, moduleName, shim) {
282302 } )
283303 }
284304 )
285-
286- // TODO: shim.record generateContentStreamInternal
287305}
0 commit comments