-
Notifications
You must be signed in to change notification settings - Fork 385
Expand file tree
/
Copy pathspan_processor.js
More file actions
327 lines (273 loc) · 9.63 KB
/
span_processor.js
File metadata and controls
327 lines (273 loc) · 9.63 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
'use strict'
const util = require('node:util')
const tracerVersion = require('../../../../package.json').version
const logger = require('../log')
const {
ERROR_MESSAGE,
ERROR_TYPE,
ERROR_STACK,
} = require('../constants')
const PublicSpan = require('../opentracing/public/span')
const {
SPAN_KIND,
MODEL_NAME,
MODEL_PROVIDER,
METADATA,
INPUT_MESSAGES,
INPUT_VALUE,
INTEGRATION,
OUTPUT_MESSAGES,
INPUT_DOCUMENTS,
OUTPUT_DOCUMENTS,
OUTPUT_VALUE,
METRICS,
ML_APP,
TAGS,
PARENT_ID_KEY,
SESSION_ID,
NAME,
INPUT_PROMPT,
ROUTING_API_KEY,
ROUTING_SITE,
} = require('./constants/tags')
const { UNSERIALIZABLE_VALUE_TEXT } = require('./constants/text')
const telemetry = require('./telemetry')
const LLMObsTagger = require('./tagger')
/**
* Resolves the tagMap key for a raw span from the finish channel.
* Plugin-instrumented spans are registered with raw Span instances,
* while SDK spans are registered with PublicSpan wrappers.
*/
function resolveTagMapKey (span) {
if (LLMObsTagger.tagMap.has(span)) return span
const wrapped = new PublicSpan(span)
if (LLMObsTagger.tagMap.has(wrapped)) return wrapped
}
class LLMObservabilitySpan {
constructor () {
this.input = []
this.output = []
this._tags = {}
}
getTag (key) {
return this._tags[key]
}
}
class LLMObsSpanProcessor {
/** @type {import('../config')} */
#config
/** @type {((span: LLMObservabilitySpan) => LLMObservabilitySpan | null) | null} */
#userSpanProcessor
/** @type {import('./writers/spans')} */
#writer
constructor (config) {
this.#config = config
}
setUserSpanProcessor (userSpanProcessor) {
this.#userSpanProcessor = userSpanProcessor
}
setWriter (writer) {
this.#writer = writer
}
// TODO: instead of relying on the tagger's weakmap registry, can we use some namespaced storage correlation?
process (span) {
if (!this.#config.llmobs.enabled) return
// The finish channel always publishes raw Span instances, but the tagMap
// may be keyed by either a raw Span (plugin-instrumented) or a PublicSpan
// wrapper (SDK-instrumented). Resolve to whichever key is present.
const tagMapKey = resolveTagMapKey(span)
// if the span is not in our private tagger map, it is not an llmobs span
if (!tagMapKey) return
try {
const formattedEvent = this.format(span, tagMapKey)
telemetry.incrementLLMObsSpanFinishedCount(tagMapKey)
if (formattedEvent == null) return
const mlObsTags = LLMObsTagger.tagMap.get(tagMapKey)
const routing = {
apiKey: mlObsTags[ROUTING_API_KEY],
site: mlObsTags[ROUTING_SITE],
}
this.#writer.append(formattedEvent, routing)
} catch (e) {
// this should be a rare case
// we protect against unserializable properties in the format function, and in
// safeguards in the tagger
logger.warn(`
Failed to append span to LLM Observability writer, likely due to an unserializable property.
Span won't be sent to LLM Observability: ${e.message}
`)
}
}
format (span, tagMapKey) {
const llmObsSpan = new LLMObservabilitySpan()
let inputType, outputType
const spanTags = span.context()._tags
const mlObsTags = LLMObsTagger.tagMap.get(tagMapKey)
const spanKind = mlObsTags[SPAN_KIND]
const meta = { 'span.kind': spanKind, input: {}, output: {} }
const input = {}
const output = {}
if (['llm', 'embedding'].includes(spanKind)) {
meta.model_name = mlObsTags[MODEL_NAME] || 'custom'
meta.model_provider = (mlObsTags[MODEL_PROVIDER] || 'custom').toLowerCase()
}
if (mlObsTags[METADATA]) {
this.#addObject(mlObsTags[METADATA], meta.metadata = {})
}
if (spanKind === 'llm' && mlObsTags[INPUT_MESSAGES]) {
llmObsSpan.input = mlObsTags[INPUT_MESSAGES]
inputType = 'messages'
} else if (spanKind === 'embedding' && mlObsTags[INPUT_DOCUMENTS]) {
input.documents = mlObsTags[INPUT_DOCUMENTS]
} else if (mlObsTags[INPUT_VALUE]) {
llmObsSpan.input = [{ role: '', content: mlObsTags[INPUT_VALUE] }]
inputType = 'value'
}
if (spanKind === 'llm' && mlObsTags[OUTPUT_MESSAGES]) {
llmObsSpan.output = mlObsTags[OUTPUT_MESSAGES]
outputType = 'messages'
} else if (spanKind === 'retrieval' && mlObsTags[OUTPUT_DOCUMENTS]) {
output.documents = mlObsTags[OUTPUT_DOCUMENTS]
} else if (mlObsTags[OUTPUT_VALUE]) {
llmObsSpan.output = [{ role: '', content: mlObsTags[OUTPUT_VALUE] }]
outputType = 'value'
}
const error = spanTags.error || spanTags[ERROR_TYPE]
if (error) {
meta[ERROR_MESSAGE] = spanTags[ERROR_MESSAGE] || error.message || error.code
meta[ERROR_TYPE] = spanTags[ERROR_TYPE] || error.name
meta[ERROR_STACK] = spanTags[ERROR_STACK] || error.stack
}
const metrics = mlObsTags[METRICS] || {}
const mlApp = mlObsTags[ML_APP]
const sessionId = mlObsTags[SESSION_ID]
const parentId = mlObsTags[PARENT_ID_KEY]
const name = mlObsTags[NAME] || span._name
const tags = this.#getTags(span, tagMapKey, mlApp, sessionId, error)
llmObsSpan._tags = tags
const processedSpan = this.#runProcessor(llmObsSpan)
if (processedSpan === undefined) return null
if (processedSpan.input) {
if (inputType === 'messages') {
input.messages = processedSpan.input
} else if (inputType === 'value') {
input.value = processedSpan.input[0].content
}
}
if (processedSpan.output) {
if (outputType === 'messages') {
output.messages = processedSpan.output
} else if (outputType === 'value') {
output.value = processedSpan.output[0].content
}
}
if (input) meta.input = input
if (output) meta.output = output
const prompt = mlObsTags[INPUT_PROMPT]
if (prompt && spanKind === 'llm') {
// by this point, we should have logged a warning if the span kind was not llm
meta.input.prompt = prompt
}
const llmObsSpanEvent = {
trace_id: span.context().toTraceId(true),
span_id: span.context().toSpanId(),
parent_id: parentId,
name,
tags: this.#objectTagsToStringArrayTags(tags),
start_ns: Math.round(span._startTime * 1e6),
duration: Math.round(span._duration * 1e6),
status: error ? 'error' : 'ok',
meta,
metrics,
_dd: {
span_id: span.context().toSpanId(),
trace_id: span.context().toTraceId(true),
},
}
if (sessionId) llmObsSpanEvent.session_id = sessionId
return llmObsSpanEvent
}
// For now, this only applies to metadata, as we let users annotate this field with any object
// However, we want to protect against circular references or BigInts (unserializable)
// This function can be reused for other fields if needed
// Messages, Documents, and Metrics are safeguarded in `llmobs/tagger.js`
#addObject (obj, carrier) {
// Capture root object by default
const seenObjects = new WeakSet([obj])
const isCircular = value => {
if (value == null || typeof value !== 'object') return false
if (seenObjects.has(value)) return true
seenObjects.add(value)
return false
}
const add = (obj, carrier) => {
for (const key in obj) {
const value = obj[key]
if (!Object.hasOwn(obj, key)) continue
if (typeof value === 'bigint' || isCircular(value)) {
// mark as unserializable instead of dropping
logger.warn(`Unserializable property found in metadata: ${key}`)
carrier[key] = UNSERIALIZABLE_VALUE_TEXT
continue
}
if (value !== null && typeof value === 'object') {
carrier[key] = Array.isArray(value) ? [] : {}
add(value, carrier[key])
} else {
carrier[key] = value
}
}
}
add(obj, carrier)
}
#getTags (span, tagMapKey, mlApp, sessionId, error) {
let tags = {
...this.#config.parsedDdTags,
version: this.#config.version,
env: this.#config.env,
service: this.#config.service,
source: 'integration',
ml_app: mlApp,
'ddtrace.version': tracerVersion,
error: Number(!!error) || 0,
language: 'javascript',
}
const errType = span.context()._tags[ERROR_TYPE] || error?.name
if (errType) tags.error_type = errType
if (sessionId) tags.session_id = sessionId
const integration = LLMObsTagger.tagMap.get(tagMapKey)?.[INTEGRATION]
if (integration) tags.integration = integration
const existingTags = LLMObsTagger.tagMap.get(tagMapKey)?.[TAGS] || {}
if (existingTags) tags = { ...tags, ...existingTags }
return tags
}
#objectTagsToStringArrayTags (tags) {
return Object.entries(tags).map(([key, value]) => `${key}:${value ?? ''}`)
}
/**
* Runs the user span processor, emitting telemetry and adding some guardrails against invalid return types
* @param {LLMObservabilitySpan} span
* @returns {LLMObservabilitySpan | undefined}
*/
#runProcessor (span) {
const processor = this.#userSpanProcessor
if (!processor) return span
let error = false
try {
const processedLLMObsSpan = processor(span)
if (processedLLMObsSpan === null) return
if (!(processedLLMObsSpan instanceof LLMObservabilitySpan)) {
error = true
logger.warn('User span processor must return an instance of an LLMObservabilitySpan or null, dropping span.')
return
}
return processedLLMObsSpan
} catch (e) {
logger.error(`[LLMObs] Error in LLMObs span processor (${util.inspect(processor)}): ${util.inspect(e)}`)
error = true
} finally {
telemetry.recordLLMObsUserProcessorCalled(error)
}
}
}
module.exports = LLMObsSpanProcessor