Skip to content

Commit 9239500

Browse files
committed
feat: add phoenix tracer
1 parent 496e793 commit 9239500

File tree

4 files changed

+858
-75
lines changed

4 files changed

+858
-75
lines changed

agentlightning/tracer/phoenix.py

Lines changed: 195 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,195 @@
1+
"""Phoenix-backed tracer integration for Agent Lightning.
2+
3+
This tracer bridges Agent Lightning's tracing interface with Arize Phoenix by
4+
leveraging the ``arize-phoenix-otel`` package. It registers a Phoenix-aware
5+
``TracerProvider`` for each worker process and reuses the built-in
6+
``LightningSpanProcessor`` to capture spans so that they can be stored or
7+
inspected inside Agent Lightning.
8+
"""
9+
10+
from __future__ import annotations
11+
12+
import inspect
13+
import logging
14+
import os
15+
from collections.abc import AsyncGenerator, Iterator
16+
from contextlib import asynccontextmanager, contextmanager
17+
from typing import Any
18+
19+
from agentlightning.store.base import LightningStore
20+
from agentlightning.tracer.agentops import LightningSpanProcessor
21+
from agentlightning.tracer.base import Tracer
22+
from opentelemetry import trace as trace_api
23+
from opentelemetry.sdk.trace import ReadableSpan, TracerProvider
24+
from phoenix.otel import register as phoenix_register
25+
26+
logger = logging.getLogger(__name__)
27+
28+
29+
class PhoenixTracer(Tracer):
30+
"""Tracer implementation that sends spans to Arize Phoenix.
31+
32+
Parameters are primarily thin wrappers around ``phoenix.otel.register``. By
33+
default, configuration is read from the standard Phoenix environment
34+
variables so that existing deployments keep working without code changes.
35+
36+
Note: This tracer will set its own global OpenTelemetry TracerProvider.
37+
If you have already called ``setup_otel_tracing()`` from this module,
38+
there may be conflicts. Choose one approach:
39+
40+
- For Agent Lightning training: Use PhoenixTracer with Trainer
41+
- For general agent tracing: Use setup_otel_tracing()
42+
"""
43+
44+
def __init__(
45+
self,
46+
*,
47+
endpoint: str | None = None,
48+
project_name: str | None = None,
49+
api_key: str | None = None,
50+
auto_instrument: bool = True,
51+
use_batch_processor: bool = False,
52+
headers: dict[str, str] | None = None,
53+
register_kwargs: dict[str, Any] | None = None,
54+
) -> None:
55+
super().__init__()
56+
self.endpoint = endpoint or os.getenv("PHOENIX_ENDPOINT")
57+
self.project_name = project_name or os.getenv("PHOENIX_PROJECT_NAME")
58+
self.api_key = api_key or os.getenv("PHOENIX_API_KEY")
59+
self.auto_instrument = auto_instrument
60+
self.use_batch_processor = use_batch_processor
61+
self.headers = headers
62+
self.register_kwargs = register_kwargs.copy() if register_kwargs else {}
63+
64+
self._tracer_provider: TracerProvider | None = None
65+
self._lightning_span_processor: LightningSpanProcessor | None = None
66+
self._initialized = False
67+
68+
def init(self, *args: Any, **kwargs: Any) -> None: # noqa: D401 - hook required by interface
69+
"""Main-process initialization hook (no-op for Phoenix)."""
70+
logger.debug("PhoenixTracer main-process init invoked.")
71+
72+
def teardown(self, *args: Any, **kwargs: Any) -> None:
73+
logger.debug("PhoenixTracer main-process teardown invoked.")
74+
75+
def init_worker(self, worker_id: int, *args: Any, **kwargs: Any) -> None:
76+
super().init_worker(worker_id, *args, **kwargs)
77+
if self._initialized:
78+
logger.warning(
79+
"PhoenixTracer already initialized in worker %s; skipping re-registration.",
80+
worker_id,
81+
)
82+
return
83+
84+
logger.info("[Worker %s] Configuring Phoenix tracer provider...", worker_id)
85+
86+
register_options: dict[str, Any] = {
87+
"endpoint": self.endpoint,
88+
"project_name": self.project_name,
89+
"headers": self.headers,
90+
"batch": self.use_batch_processor,
91+
"set_global_tracer_provider": False, # Don't override existing global provider
92+
"auto_instrument": self.auto_instrument,
93+
}
94+
if self.api_key:
95+
register_options["api_key"] = self.api_key
96+
register_options.update(self.register_kwargs)
97+
98+
tracer_provider = phoenix_register(**register_options)
99+
self._tracer_provider = tracer_provider
100+
101+
# Set as global tracer provider (will override if already set)
102+
trace_api.set_tracer_provider(tracer_provider)
103+
logger.info("[Worker %s] Phoenix tracer provider set as global.", worker_id)
104+
105+
self._lightning_span_processor = LightningSpanProcessor()
106+
span_processor_kwargs: dict[str, Any] = {}
107+
parameters = inspect.signature(tracer_provider.add_span_processor).parameters
108+
if "replace_default_processor" in parameters:
109+
span_processor_kwargs["replace_default_processor"] = False
110+
tracer_provider.add_span_processor(
111+
self._lightning_span_processor, **span_processor_kwargs
112+
) # type: ignore[misc]
113+
114+
self._initialized = True
115+
logger.info("[Worker %s] Phoenix tracer provider ready.", worker_id)
116+
117+
def teardown_worker(self, worker_id: int, *args: Any, **kwargs: Any) -> None:
118+
super().teardown_worker(worker_id, *args, **kwargs)
119+
logger.info("[Worker %s] Tearing down Phoenix tracer provider...", worker_id)
120+
if self._lightning_span_processor is not None:
121+
self._lightning_span_processor.shutdown()
122+
self._lightning_span_processor = None
123+
if self._tracer_provider is not None:
124+
self._tracer_provider.shutdown()
125+
self._tracer_provider = None
126+
self._initialized = False
127+
128+
@asynccontextmanager
129+
async def trace_context(
130+
self,
131+
name: str | None = None,
132+
*,
133+
store: LightningStore | None = None,
134+
rollout_id: str | None = None,
135+
attempt_id: str | None = None,
136+
) -> AsyncGenerator[LightningSpanProcessor, None]:
137+
if not self._lightning_span_processor:
138+
raise RuntimeError(
139+
"LightningSpanProcessor is not initialized. Call init_worker() first."
140+
)
141+
142+
with self._trace_context_sync(
143+
name=name,
144+
store=store,
145+
rollout_id=rollout_id,
146+
attempt_id=attempt_id,
147+
) as processor:
148+
yield processor
149+
150+
@contextmanager
151+
def _trace_context_sync(
152+
self,
153+
name: str | None = None,
154+
*,
155+
store: LightningStore | None = None,
156+
rollout_id: str | None = None,
157+
attempt_id: str | None = None,
158+
) -> Iterator[LightningSpanProcessor]:
159+
if not self._lightning_span_processor:
160+
raise RuntimeError(
161+
"LightningSpanProcessor is not initialized. Call init_worker() first."
162+
)
163+
164+
if store is not None and rollout_id is not None and attempt_id is not None:
165+
ctx = self._lightning_span_processor.with_context(
166+
store=store, rollout_id=rollout_id, attempt_id=attempt_id
167+
)
168+
with ctx as processor:
169+
yield processor
170+
elif store is None and rollout_id is None and attempt_id is None:
171+
with self._lightning_span_processor:
172+
yield self._lightning_span_processor
173+
else:
174+
raise ValueError(
175+
"store, rollout_id, and attempt_id must be either all provided or all None"
176+
)
177+
178+
def get_last_trace(self) -> list[ReadableSpan]:
179+
if not self._lightning_span_processor:
180+
raise RuntimeError(
181+
"LightningSpanProcessor is not initialized. Call init_worker() first."
182+
)
183+
return self._lightning_span_processor.spans()
184+
185+
def get_config(self) -> dict[str, Any]:
186+
"""Expose current Phoenix configuration for debugging or tests."""
187+
return {
188+
"endpoint": self.endpoint,
189+
"project_name": self.project_name,
190+
"api_key": bool(self.api_key),
191+
"auto_instrument": self.auto_instrument,
192+
"use_batch_processor": self.use_batch_processor,
193+
"headers": self.headers,
194+
"register_kwargs": self.register_kwargs,
195+
}

0 commit comments

Comments
 (0)