-
Notifications
You must be signed in to change notification settings - Fork 3.2k
fix(tracing): truncate oversized span input/output for OpenAI ingest #2459
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from 1 commit
f52658e
aba2a66
3f5a86c
0cddf1c
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -1,5 +1,6 @@ | ||
| from __future__ import annotations | ||
|
|
||
| import json | ||
| import os | ||
| import queue | ||
| import random | ||
|
|
@@ -29,6 +30,8 @@ def export(self, items: list[Trace | Span[Any]]) -> None: | |
|
|
||
| class BackendSpanExporter(TracingExporter): | ||
| _OPENAI_TRACING_INGEST_ENDPOINT = "https://api.openai.com/v1/traces/ingest" | ||
| _OPENAI_TRACING_MAX_FIELD_BYTES = 100_000 | ||
| _OPENAI_TRACING_STRING_TRUNCATION_SUFFIX = "... [truncated]" | ||
| _OPENAI_TRACING_ALLOWED_USAGE_KEYS = frozenset( | ||
| { | ||
| "input_tokens", | ||
|
|
@@ -182,32 +185,85 @@ def _should_sanitize_for_openai_tracing_api(self) -> bool: | |
| return self.endpoint.rstrip("/") == self._OPENAI_TRACING_INGEST_ENDPOINT.rstrip("/") | ||
|
|
||
| def _sanitize_for_openai_tracing_api(self, payload_item: dict[str, Any]) -> dict[str, Any]: | ||
| """Drop fields known to be rejected by OpenAI tracing ingestion.""" | ||
| """Drop or truncate fields known to be rejected by OpenAI tracing ingestion.""" | ||
| span_data = payload_item.get("span_data") | ||
| if not isinstance(span_data, dict): | ||
| return payload_item | ||
|
|
||
| if span_data.get("type") != "generation": | ||
| return payload_item | ||
|
|
||
| usage = span_data.get("usage") | ||
| if not isinstance(usage, dict): | ||
| return payload_item | ||
| sanitized_span_data = span_data | ||
| did_mutate = False | ||
|
|
||
| filtered_usage = { | ||
| key: value | ||
| for key, value in usage.items() | ||
| if key in self._OPENAI_TRACING_ALLOWED_USAGE_KEYS | ||
| } | ||
| if filtered_usage == usage: | ||
| for field_name in ("input", "output"): | ||
| if field_name not in span_data: | ||
| continue | ||
| truncated_field = self._truncate_span_field_value(span_data[field_name]) | ||
| if truncated_field != span_data[field_name]: | ||
| if not did_mutate: | ||
| sanitized_span_data = dict(span_data) | ||
| did_mutate = True | ||
| sanitized_span_data[field_name] = truncated_field | ||
|
|
||
| if span_data.get("type") == "generation": | ||
| usage = span_data.get("usage") | ||
| if isinstance(usage, dict): | ||
| filtered_usage = { | ||
| key: value | ||
| for key, value in usage.items() | ||
| if key in self._OPENAI_TRACING_ALLOWED_USAGE_KEYS | ||
| } | ||
| if filtered_usage != usage: | ||
| if not did_mutate: | ||
| sanitized_span_data = dict(span_data) | ||
| did_mutate = True | ||
| sanitized_span_data["usage"] = filtered_usage | ||
|
|
||
| if not did_mutate: | ||
| return payload_item | ||
|
|
||
| sanitized_span_data = dict(span_data) | ||
| sanitized_span_data["usage"] = filtered_usage | ||
| sanitized_payload_item = dict(payload_item) | ||
| sanitized_payload_item["span_data"] = sanitized_span_data | ||
| return sanitized_payload_item | ||
|
|
||
| def _value_json_size_bytes(self, value: Any) -> int: | ||
| return len(json.dumps(value, ensure_ascii=False, separators=(",", ":")).encode("utf-8")) | ||
|
|
||
| def _truncate_string_for_json_limit(self, value: str, max_bytes: int) -> str: | ||
| suffix = self._OPENAI_TRACING_STRING_TRUNCATION_SUFFIX | ||
| if self._value_json_size_bytes(value) <= max_bytes: | ||
| return value | ||
| if self._value_json_size_bytes(suffix) > max_bytes: | ||
| return "" | ||
|
|
||
| low = 0 | ||
| high = len(value) | ||
| best = suffix | ||
| while low <= high: | ||
| mid = (low + high) // 2 | ||
| candidate = value[:mid] + suffix | ||
| if self._value_json_size_bytes(candidate) <= max_bytes: | ||
| best = candidate | ||
| low = mid + 1 | ||
| else: | ||
| high = mid - 1 | ||
| return best | ||
|
|
||
| def _truncate_span_field_value(self, value: Any) -> Any: | ||
| max_bytes = self._OPENAI_TRACING_MAX_FIELD_BYTES | ||
| if self._value_json_size_bytes(value) <= max_bytes: | ||
| return value | ||
|
|
||
| if isinstance(value, str): | ||
| return self._truncate_string_for_json_limit(value, max_bytes) | ||
|
|
||
| preview = str(value) | ||
| if len(preview) > 512: | ||
| preview = preview[:512] + self._OPENAI_TRACING_STRING_TRUNCATION_SUFFIX | ||
|
||
| return { | ||
| "truncated": True, | ||
| "original_type": type(value).__name__, | ||
| "preview": preview, | ||
| } | ||
|
|
||
| def close(self): | ||
| """Close the underlying HTTP client.""" | ||
| self._client.close() | ||
|
|
||
Uh oh!
There was an error while loading. Please reload this page.