Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 29 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -410,6 +410,35 @@ async def use_mcp_tools():
result = await agent.run(context)
```

## Prompt Observability

Echo ships with a Langfuse-backed prompt observability surface that automatically records fetch start, success, and failure events for every prompt retrieval. When you configure the Langfuse environment (`LANGFUSE_PUBLIC_KEY`, `LANGFUSE_SECRET_KEY`, and optional `LANGFUSE_BASE_URL`) the SDK instantiates `LangfusePromptObservability` and emits a Langfuse span for each prompt combination.

### Out-of-the-box telemetry

- Each `LangfusePromptProvider` fetch now calls into `PromptObservability` with `PromptFetchMetadata` (prompt name, version, variables, provider) so both Langfuse spans and SDK logs capture the same payload.
- `LangfusePromptObservability` updates the span on success/failure, records duration, and logs a summary through `logging.getLogger("echo.prompts.observability")`.

### Custom instrumentation

Swap out the default hooks before you request a provider to plug your own logging/metrics system.

```python
from echo.prompts import (
PromptObservability,
set_prompt_observability,
get_prompt_provider,
)

class MyObservability(PromptObservability):
...

set_prompt_observability(MyObservability())
provider = get_prompt_provider()
```

`PromptFetchMetadata` exposes the prompt payload for each hook, and the SDK exposes `PromptObservationContext` plus `PromptTelemetryConfig` if you need to share configuration.

## Conversation Context

Manage multi-turn conversations:
Expand Down
20 changes: 20 additions & 0 deletions src/echo/prompts/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,31 @@

from .base import BasePromptProvider, FetchedPrompt, PromptFetchError
from .factory import get_prompt_provider, reset_prompt_provider
from .observability import (
LangfusePromptObservability,
NoopPromptObservability,
PromptFetchMetadata,
PromptObservationContext,
PromptObservability,
PromptTelemetryConfig,
get_prompt_observability,
reset_prompt_observability,
set_prompt_observability,
)

__all__ = [
"BasePromptProvider",
"FetchedPrompt",
"PromptFetchError",
"get_prompt_provider",
"reset_prompt_provider",
"LangfusePromptObservability",
"NoopPromptObservability",
"PromptFetchMetadata",
"PromptObservationContext",
"PromptObservability",
"PromptTelemetryConfig",
"get_prompt_observability",
"reset_prompt_observability",
"set_prompt_observability",
]
11 changes: 11 additions & 0 deletions src/echo/prompts/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@

from echo.agents.config import AgentConfig

from .observability import NoopPromptObservability, PromptObservability


class FetchedPrompt(BaseModel):
"""Prompt with ready-to-use AgentConfig."""
Expand All @@ -27,6 +29,15 @@ class PromptFetchError(Exception):
class BasePromptProvider(ABC):
"""Abstract base class for prompt providers."""

def __init__(self, observability: Optional[PromptObservability] = None) -> None:
"""
Args:
observability: Hook into prompt events (start/success/failure).
"""
self.observability: PromptObservability = (
observability or NoopPromptObservability()
)

@abstractmethod
async def get_prompt(
self,
Expand Down
10 changes: 8 additions & 2 deletions src/echo/prompts/factory.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,18 +4,22 @@
from typing import Optional

from .base import BasePromptProvider
from .observability import PromptObservability, get_prompt_observability

_provider_instance: Optional[BasePromptProvider] = None


def get_prompt_provider(reset: bool = False) -> BasePromptProvider:
def get_prompt_provider(
reset: bool = False, observability: Optional[PromptObservability] = None
) -> BasePromptProvider:
"""
Get prompt provider singleton.

Provider is determined by ECHO_PROMPT_PROVIDER env var (default: langfuse).

Args:
reset: Force create new instance (useful for testing)
observability: Optional custom hooks for prompt fetch events.

Returns:
BasePromptProvider singleton instance
Expand All @@ -31,7 +35,9 @@ def get_prompt_provider(reset: bool = False) -> BasePromptProvider:
if provider == "langfuse":
from .langfuse_provider import LangfusePromptProvider

_provider_instance = LangfusePromptProvider()
_provider_instance = LangfusePromptProvider(
observability=observability or get_prompt_observability()
)
else:
raise ValueError(f"Unsupported provider: {provider}")

Expand Down
42 changes: 37 additions & 5 deletions src/echo/prompts/langfuse_provider.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,19 @@
from echo.agents.config import AgentConfig, PersonaConfig, TaskConfig

from .base import BasePromptProvider, FetchedPrompt, PromptFetchError
from .observability import (
PromptFetchMetadata,
PromptObservationContext,
PromptObservability,
get_prompt_observability,
)


class LangfusePromptProvider(BasePromptProvider):
"""Langfuse prompt provider with lazy client initialization."""

def __init__(self):
def __init__(self, observability: Optional[PromptObservability] = None):
super().__init__(observability=observability or get_prompt_observability())
self._client = None

@property
Expand Down Expand Up @@ -61,15 +68,29 @@ async def get_prompt(
Raises:
PromptFetchError: If fetch fails
"""
prompt_variables = dict(prompt_variables or {})
metadata = PromptFetchMetadata(
prompt_name=name,
provider_name="langfuse",
version=version,
prompt_variables=prompt_variables,
)

context: Optional[PromptObservationContext] = None
try:
client = self.client
context = self.observability.on_fetch_start(
metadata, langfuse_client=client
)

kwargs: dict[str, Any] = {}
if version is not None:
kwargs["version"] = int(version)

# Langfuse SDK is sync, run in executor
loop = asyncio.get_event_loop()
langfuse_prompt = await loop.run_in_executor(
None, lambda: self.client.get_prompt(name, **kwargs)
None, lambda: client.get_prompt(name, **kwargs)
)

# Compile with variables NOW
Expand All @@ -90,11 +111,22 @@ async def get_prompt(
),
)

return FetchedPrompt(
fetched_prompt = FetchedPrompt(
name=name,
version=str(getattr(langfuse_prompt, "version", "")) or None,
agent_config=agent_config,
)

except Exception as e:
raise PromptFetchError(f"Failed to fetch '{name}': {e}")
self.observability.on_fetch_success(
context, metadata, {"version": fetched_prompt.version}
)

return fetched_prompt

except Exception as exc:
if context is None:
context = self.observability.on_fetch_start(
metadata, langfuse_client=None
)
self.observability.on_fetch_failure(context, metadata, exc)
raise PromptFetchError(f"Failed to fetch '{name}': {exc}") from exc
Loading