diff --git a/README.md b/README.md index 6f37967..50183f0 100644 --- a/README.md +++ b/README.md @@ -410,6 +410,35 @@ async def use_mcp_tools(): result = await agent.run(context) ``` +## Prompt Observability + +Echo ships with a Langfuse-backed prompt observability surface that automatically records fetch start, success, and failure events for every prompt retrieval. When you configure the Langfuse environment (`LANGFUSE_PUBLIC_KEY`, `LANGFUSE_SECRET_KEY`, and optional `LANGFUSE_BASE_URL`) the SDK instantiates `LangfusePromptObservability` and emits a Langfuse span for each prompt combination. + +### Out-of-the-box telemetry + +- Each `LangfusePromptProvider` fetch now calls into `PromptObservability` with `PromptFetchMetadata` (prompt name, version, variables, provider) so both Langfuse spans and SDK logs capture the same payload. +- `LangfusePromptObservability` updates the span on success/failure, records duration, and logs a summary through `logging.getLogger("echo.prompts.observability")`. + +### Custom instrumentation + +Swap out the default hooks before you request a provider to plug your own logging/metrics system. + +```python +from echo.prompts import ( + PromptObservability, + set_prompt_observability, + get_prompt_provider, +) + +class MyObservability(PromptObservability): + ... + +set_prompt_observability(MyObservability()) +provider = get_prompt_provider() +``` + +`PromptFetchMetadata` exposes the prompt payload for each hook, and the SDK exposes `PromptObservationContext` plus `PromptTelemetryConfig` if you need to share configuration. + ## Conversation Context Manage multi-turn conversations: diff --git a/src/echo/prompts/__init__.py b/src/echo/prompts/__init__.py index 7e885a8..cc61e1b 100644 --- a/src/echo/prompts/__init__.py +++ b/src/echo/prompts/__init__.py @@ -2,6 +2,17 @@ from .base import BasePromptProvider, FetchedPrompt, PromptFetchError from .factory import get_prompt_provider, reset_prompt_provider +from .observability import ( + LangfusePromptObservability, + NoopPromptObservability, + PromptFetchMetadata, + PromptObservationContext, + PromptObservability, + PromptTelemetryConfig, + get_prompt_observability, + reset_prompt_observability, + set_prompt_observability, +) __all__ = [ "BasePromptProvider", @@ -9,4 +20,13 @@ "PromptFetchError", "get_prompt_provider", "reset_prompt_provider", + "LangfusePromptObservability", + "NoopPromptObservability", + "PromptFetchMetadata", + "PromptObservationContext", + "PromptObservability", + "PromptTelemetryConfig", + "get_prompt_observability", + "reset_prompt_observability", + "set_prompt_observability", ] diff --git a/src/echo/prompts/base.py b/src/echo/prompts/base.py index 6ce9d79..ad1732d 100644 --- a/src/echo/prompts/base.py +++ b/src/echo/prompts/base.py @@ -7,6 +7,8 @@ from echo.agents.config import AgentConfig +from .observability import NoopPromptObservability, PromptObservability + class FetchedPrompt(BaseModel): """Prompt with ready-to-use AgentConfig.""" @@ -27,6 +29,15 @@ class PromptFetchError(Exception): class BasePromptProvider(ABC): """Abstract base class for prompt providers.""" + def __init__(self, observability: Optional[PromptObservability] = None) -> None: + """ + Args: + observability: Hook into prompt events (start/success/failure). + """ + self.observability: PromptObservability = ( + observability or NoopPromptObservability() + ) + @abstractmethod async def get_prompt( self, diff --git a/src/echo/prompts/factory.py b/src/echo/prompts/factory.py index f8c5b5a..c4233df 100644 --- a/src/echo/prompts/factory.py +++ b/src/echo/prompts/factory.py @@ -4,11 +4,14 @@ from typing import Optional from .base import BasePromptProvider +from .observability import PromptObservability, get_prompt_observability _provider_instance: Optional[BasePromptProvider] = None -def get_prompt_provider(reset: bool = False) -> BasePromptProvider: +def get_prompt_provider( + reset: bool = False, observability: Optional[PromptObservability] = None +) -> BasePromptProvider: """ Get prompt provider singleton. @@ -16,6 +19,7 @@ def get_prompt_provider(reset: bool = False) -> BasePromptProvider: Args: reset: Force create new instance (useful for testing) + observability: Optional custom hooks for prompt fetch events. Returns: BasePromptProvider singleton instance @@ -31,7 +35,9 @@ def get_prompt_provider(reset: bool = False) -> BasePromptProvider: if provider == "langfuse": from .langfuse_provider import LangfusePromptProvider - _provider_instance = LangfusePromptProvider() + _provider_instance = LangfusePromptProvider( + observability=observability or get_prompt_observability() + ) else: raise ValueError(f"Unsupported provider: {provider}") diff --git a/src/echo/prompts/langfuse_provider.py b/src/echo/prompts/langfuse_provider.py index 7aafbfc..de440e5 100644 --- a/src/echo/prompts/langfuse_provider.py +++ b/src/echo/prompts/langfuse_provider.py @@ -7,12 +7,19 @@ from echo.agents.config import AgentConfig, PersonaConfig, TaskConfig from .base import BasePromptProvider, FetchedPrompt, PromptFetchError +from .observability import ( + PromptFetchMetadata, + PromptObservationContext, + PromptObservability, + get_prompt_observability, +) class LangfusePromptProvider(BasePromptProvider): """Langfuse prompt provider with lazy client initialization.""" - def __init__(self): + def __init__(self, observability: Optional[PromptObservability] = None): + super().__init__(observability=observability or get_prompt_observability()) self._client = None @property @@ -61,7 +68,21 @@ async def get_prompt( Raises: PromptFetchError: If fetch fails """ + prompt_variables = dict(prompt_variables or {}) + metadata = PromptFetchMetadata( + prompt_name=name, + provider_name="langfuse", + version=version, + prompt_variables=prompt_variables, + ) + + context: Optional[PromptObservationContext] = None try: + client = self.client + context = self.observability.on_fetch_start( + metadata, langfuse_client=client + ) + kwargs: dict[str, Any] = {} if version is not None: kwargs["version"] = int(version) @@ -69,7 +90,7 @@ async def get_prompt( # Langfuse SDK is sync, run in executor loop = asyncio.get_event_loop() langfuse_prompt = await loop.run_in_executor( - None, lambda: self.client.get_prompt(name, **kwargs) + None, lambda: client.get_prompt(name, **kwargs) ) # Compile with variables NOW @@ -90,11 +111,22 @@ async def get_prompt( ), ) - return FetchedPrompt( + fetched_prompt = FetchedPrompt( name=name, version=str(getattr(langfuse_prompt, "version", "")) or None, agent_config=agent_config, ) - except Exception as e: - raise PromptFetchError(f"Failed to fetch '{name}': {e}") + self.observability.on_fetch_success( + context, metadata, {"version": fetched_prompt.version} + ) + + return fetched_prompt + + except Exception as exc: + if context is None: + context = self.observability.on_fetch_start( + metadata, langfuse_client=None + ) + self.observability.on_fetch_failure(context, metadata, exc) + raise PromptFetchError(f"Failed to fetch '{name}': {exc}") from exc diff --git a/src/echo/prompts/observability.py b/src/echo/prompts/observability.py new file mode 100644 index 0000000..93e6863 --- /dev/null +++ b/src/echo/prompts/observability.py @@ -0,0 +1,275 @@ +"""Observability helpers for prompt providers.""" + +from __future__ import annotations + +import logging +import time +from abc import ABC, abstractmethod +from dataclasses import dataclass, field +from typing import Any, Dict, Optional + + +@dataclass +class PromptFetchMetadata: + """Information emitted to observability hooks for a prompt fetch.""" + + prompt_name: str + provider_name: str + version: Optional[str] = None + prompt_variables: Dict[str, Any] = field(default_factory=dict) + + +@dataclass +class PromptObservationContext: + """Captures runtime state for a prompt observability session.""" + + start_time: float + langfuse_observation: Optional[Any] = None + + +@dataclass +class PromptTelemetryConfig: + """Configuration applied to prompt observability helpers.""" + + logger: logging.Logger = logging.getLogger("echo.prompts.observability") + log_on_success: bool = True + log_on_failure: bool = True + success_log_level: int = logging.INFO + failure_log_level: int = logging.ERROR + span_name_template: str = "prompt.fetch.{name}" + span_type: str = "generation" + + +class PromptObservability(ABC): + """Abstract observability surface for prompt providers.""" + + @abstractmethod + def on_fetch_start( + self, + metadata: PromptFetchMetadata, + *, + langfuse_client: Optional[Any] = None, + ) -> PromptObservationContext: + """Begin a prompt fetch observation.""" + + @abstractmethod + def on_fetch_success( + self, + context: PromptObservationContext, + metadata: PromptFetchMetadata, + result: Dict[str, Any], + ) -> None: + """Record a successful prompt fetch.""" + + @abstractmethod + def on_fetch_failure( + self, + context: PromptObservationContext, + metadata: PromptFetchMetadata, + error: Exception, + ) -> None: + """Record a failed prompt fetch.""" + + +class NoopPromptObservability(PromptObservability): + """No-op implementation used when instrumentation is disabled.""" + + def on_fetch_start( + self, + metadata: PromptFetchMetadata, + *, + langfuse_client: Optional[Any] = None, + ) -> PromptObservationContext: + del metadata, langfuse_client + return PromptObservationContext(start_time=time.monotonic()) + + def on_fetch_success( + self, + context: PromptObservationContext, + metadata: PromptFetchMetadata, + result: Dict[str, Any], + ) -> None: + del context, metadata, result + + def on_fetch_failure( + self, + context: PromptObservationContext, + metadata: PromptFetchMetadata, + error: Exception, + ) -> None: + del context, metadata, error + + +class LangfusePromptObservability(PromptObservability): + """Langfuse-aware observability implementation.""" + + def __init__(self, config: Optional[PromptTelemetryConfig] = None) -> None: + self.config = config or PromptTelemetryConfig() + + def _span_name(self, metadata: PromptFetchMetadata) -> str: + return self.config.span_name_template.format( + name=metadata.prompt_name, + provider=metadata.provider_name, + ) + + def _start_langfuse_observation( + self, + client: Any, + metadata: PromptFetchMetadata, + ) -> Optional[Any]: + if client is None: + return None + + try: + return client.start_observation( + name=self._span_name(metadata), + as_type=self.config.span_type, + input={ + "prompt_name": metadata.prompt_name, + "version": metadata.version, + "variables": metadata.prompt_variables, + }, + metadata={"provider": metadata.provider_name}, + ) + except AttributeError as attribute_error: + self.config.logger.debug( + "Langfuse client missing observation helpers: %s", attribute_error + ) + except Exception as error: + self.config.logger.debug("Failed to start Langfuse span: %s", error) + + return None + + def on_fetch_start( + self, + metadata: PromptFetchMetadata, + *, + langfuse_client: Optional[Any] = None, + ) -> PromptObservationContext: + context = PromptObservationContext(start_time=time.monotonic()) + context.langfuse_observation = self._start_langfuse_observation( + langfuse_client, metadata + ) + return context + + def _update_span_for_success( + self, + context: PromptObservationContext, + metadata: PromptFetchMetadata, + duration_ms: float, + version: Optional[str], + ) -> None: + span = context.langfuse_observation + if not span: + return + + try: + span.update( + output={"version": version}, + metadata={"status": "success", "duration_ms": duration_ms}, + ) + except Exception as exc: # pragma: no cover - best effort logging + self.config.logger.debug("Failed to update Langfuse span: %s", exc) + finally: + span.end() + context.langfuse_observation = None + + def _update_span_for_failure( + self, + context: PromptObservationContext, + metadata: PromptFetchMetadata, + duration_ms: float, + error: Exception, + ) -> None: + span = context.langfuse_observation + if not span: + return + + try: + span.update( + metadata={ + "status": "failure", + "duration_ms": duration_ms, + "error": str(error), + }, + ) + except Exception as exc: # pragma: no cover - best effort logging + self.config.logger.debug("Failed to update Langfuse span: %s", exc) + finally: + span.end() + context.langfuse_observation = None + + def on_fetch_success( + self, + context: PromptObservationContext, + metadata: PromptFetchMetadata, + result: Dict[str, Any], + ) -> None: + duration_ms = (time.monotonic() - context.start_time) * 1e3 + version = result.get("version") + self._update_span_for_success(context, metadata, duration_ms, version) + + if self.config.log_on_success: + self.config.logger.log( + self.config.success_log_level, + "Prompt fetch succeeded: %s (version=%s, provider=%s) in %.2fms", + metadata.prompt_name, + version, + metadata.provider_name, + duration_ms, + ) + + def on_fetch_failure( + self, + context: PromptObservationContext, + metadata: PromptFetchMetadata, + error: Exception, + ) -> None: + duration_ms = (time.monotonic() - context.start_time) * 1e3 + self._update_span_for_failure(context, metadata, duration_ms, error) + + if self.config.log_on_failure: + self.config.logger.log( + self.config.failure_log_level, + "Prompt fetch failed: %s (provider=%s) after %.2fms: %s", + metadata.prompt_name, + metadata.provider_name, + duration_ms, + error, + ) + + +_default_observability: Optional[PromptObservability] = None + + +def get_prompt_observability(reset: bool = False) -> PromptObservability: + """Return the SDK default prompt observability implementation.""" + global _default_observability + if _default_observability is None or reset: + _default_observability = LangfusePromptObservability() + return _default_observability + + +def set_prompt_observability(observability: PromptObservability) -> None: + """Override the SDK default prompt observability singleton.""" + global _default_observability + _default_observability = observability + + +def reset_prompt_observability() -> None: + """Clear the cached SDK prompt observability instance (for tests).""" + global _default_observability + _default_observability = None + + +__all__ = [ + "LangfusePromptObservability", + "NoopPromptObservability", + "PromptFetchMetadata", + "PromptObservationContext", + "PromptObservability", + "PromptTelemetryConfig", + "get_prompt_observability", + "reset_prompt_observability", + "set_prompt_observability", +] diff --git a/tests/test_prompts.py b/tests/test_prompts.py index eb0d9cc..a848d7f 100644 --- a/tests/test_prompts.py +++ b/tests/test_prompts.py @@ -5,6 +5,9 @@ functionality of the prompt management system. """ +import time +from typing import Any, Dict, Optional + import pytest from echo.agents.config import AgentConfig, PersonaConfig, TaskConfig @@ -14,6 +17,13 @@ PromptFetchError, get_prompt_provider, reset_prompt_provider, + reset_prompt_observability, + set_prompt_observability, +) +from echo.prompts.observability import ( + PromptFetchMetadata, + PromptObservationContext, + PromptObservability, ) @@ -122,7 +132,10 @@ def test_default_persona(self): class MockPromptProvider(BasePromptProvider): """Mock provider for testing base class functionality.""" - def __init__(self): + def __init__( + self, observability: Optional[PromptObservability] = None + ): + super().__init__(observability=observability) self.fetch_count = 0 async def get_prompt(self, name, version=None, prompt_variables=None, **kwargs): @@ -141,6 +154,94 @@ async def get_prompt(self, name, version=None, prompt_variables=None, **kwargs): ) +class RecordingObservability(PromptObservability): + """Helper observability implementation used in tests.""" + + def __init__(self) -> None: + self.starts: list[PromptFetchMetadata] = [] + self.successes: list[tuple[PromptFetchMetadata, Dict[str, Any]]] = [] + self.failures: list[tuple[PromptFetchMetadata, Exception]] = [] + + def on_fetch_start( + self, + metadata: PromptFetchMetadata, + *, + langfuse_client: Optional[Any] = None, + ) -> PromptObservationContext: + del langfuse_client + self.starts.append(metadata) + return PromptObservationContext(start_time=time.monotonic()) + + def on_fetch_success( + self, + context: PromptObservationContext, + metadata: PromptFetchMetadata, + result: Dict[str, Any], + ) -> None: + del context + self.successes.append((metadata, result)) + + def on_fetch_failure( + self, + context: PromptObservationContext, + metadata: PromptFetchMetadata, + error: Exception, + ) -> None: + del context + self.failures.append((metadata, error)) + + +class InstrumentedPromptProvider(BasePromptProvider): + """Simple provider that exercises the observability hooks.""" + + def __init__( + self, + fail: bool = False, + observability: Optional[PromptObservability] = None, + ): + super().__init__(observability=observability) + self.fail = fail + + async def get_prompt( + self, + name: str, + version: Optional[str] = None, + prompt_variables: Optional[Dict[str, Any]] = None, + **kwargs: Any, + ) -> FetchedPrompt: + del kwargs, prompt_variables + + metadata = PromptFetchMetadata( + prompt_name=name, + provider_name="instrumented", + version=version, + prompt_variables={}, + ) + + context = self.observability.on_fetch_start(metadata, langfuse_client=None) + + try: + if self.fail: + raise RuntimeError("forced failure") + + prompt = FetchedPrompt( + name=name, + version=version, + agent_config=AgentConfig( + task=TaskConfig(description="desc", expected_output="ok") + ), + ) + + self.observability.on_fetch_success( + context, metadata, {"version": prompt.version} + ) + + return prompt + except Exception as exc: + self.observability.on_fetch_failure(context, metadata, exc) + raise + + class TestBasePromptProvider: """Tests for BasePromptProvider.""" @@ -182,6 +283,72 @@ async def test_get_prompt_with_variables(self): assert "John" in prompt.agent_config.task.description +class TestPromptObservabilityHooks: + """Tests that the observability hooks are invoked.""" + + @pytest.mark.asyncio + async def test_success_records_events(self): + observer = RecordingObservability() + provider = InstrumentedPromptProvider(observability=observer) + + prompt = await provider.get_prompt("obs-success") + + assert len(observer.starts) == 1 + assert len(observer.successes) == 1 + assert observer.successes[0][0].prompt_name == "obs-success" + assert observer.successes[0][1]["version"] == prompt.version + assert not observer.failures + + @pytest.mark.asyncio + async def test_failure_records_events(self): + observer = RecordingObservability() + provider = InstrumentedPromptProvider(observability=observer, fail=True) + + with pytest.raises(RuntimeError): + await provider.get_prompt("obs-failure") + + assert len(observer.starts) == 1 + assert len(observer.failures) == 1 + assert observer.failures[0][0].prompt_name == "obs-failure" + + +class TestPromptObservabilityFactory: + """Ensure the prompt factory reuses the shared observability instance.""" + + def test_factory_reuses_shared_observability( + self, monkeypatch: pytest.MonkeyPatch + ): + observer = RecordingObservability() + set_prompt_observability(observer) + reset_prompt_provider() + + class DummyPromptProvider(BasePromptProvider): + def __init__(self, observability: Optional[PromptObservability] = None): + super().__init__(observability=observability) + + async def get_prompt(self, name, **kwargs): + return FetchedPrompt( + name=name, + agent_config=AgentConfig( + task=TaskConfig(description="dummy", expected_output="ok") + ), + ) + + monkeypatch.setattr( + "echo.prompts.factory.LangfusePromptProvider", + DummyPromptProvider, + ) + + provider_one = get_prompt_provider() + provider_two = get_prompt_provider() + + assert provider_one is provider_two + assert provider_one.observability is observer + + reset_prompt_provider() + reset_prompt_observability() + + class TestSingletonPattern: """Tests for the singleton pattern in factory."""