Skip to content

Commit 4dfb8bb

Browse files
authored
feat: [ECO-515] - Add traces to MCP server (#215)
* feat: [ECO-515] - Add traces to MCP server * fix: add tests & improve search history * fix: tests * fix: format
1 parent 8ce040e commit 4dfb8bb

11 files changed

Lines changed: 2545 additions & 58 deletions

File tree

src/deepset_mcp/api/search_history/__init__.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@
44

55
"""Search history API module."""
66

7-
from .models import SearchHistoryEntry
7+
from .models import HaystackTraceV1, PipelineTraceEntry, SearchHistoryEntry
88
from .resource import SearchHistoryResource
99

10-
__all__ = ["SearchHistoryEntry", "SearchHistoryResource"]
10+
__all__ = ["HaystackTraceV1", "PipelineTraceEntry", "SearchHistoryEntry", "SearchHistoryResource"]

src/deepset_mcp/api/search_history/models.py

Lines changed: 134 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -6,22 +6,77 @@
66

77
from typing import Any
88

9-
from pydantic import BaseModel, Field, field_validator
9+
from pydantic import BaseModel, Field, field_validator, model_validator
1010

1111

1212
class SearchHistoryEntry(BaseModel):
13-
"""A single search history entry from the deepset platform.
13+
"""A single search history entry from the deepset platform (v1 API format).
1414
15-
Contains query, answers, prompts, feedback, and other metadata.
15+
The v1 search history API returns entries with the following top-level fields.
16+
Key nested data:
17+
- The search query text is at ``request.query``.
18+
- The timestamp is at ``time`` (also aliased to ``created_at`` for convenience).
19+
- The pipeline name is at ``pipeline.name``.
20+
- Search results are in ``response`` (list of result entries).
1621
"""
1722

1823
model_config = {"extra": "allow"}
1924

20-
query: str | None = Field(default=None, description="The search query that was executed")
21-
answer: str | None = Field(default=None, description="The answer returned by the pipeline")
22-
created_at: str | None = Field(default=None, description="When the search was performed")
23-
pipeline_name: str | None = Field(default=None, description="Name of the pipeline used")
24-
feedback: list[dict[str, Any]] | None = Field(default=None, description="User feedback on the search")
25+
# -- Record identifiers --
26+
search_history_id: str | None = Field(default=None, description="Unique identifier for this search history record")
27+
session_id: str | None = Field(default=None, description="Session identifier grouping related searches")
28+
29+
# -- The search request --
30+
request: dict[str, Any] | None = Field(
31+
default=None,
32+
description="The original search request (contains 'query', 'filters', 'params', etc.)",
33+
)
34+
# Convenience field — extracted from request.query via model_validator
35+
query: str | None = Field(default=None, description="The search query text (extracted from request.query)")
36+
37+
# -- The search response --
38+
response: list[dict[str, Any]] | None = Field(
39+
default=None, description="List of search result entries returned by the pipeline"
40+
)
41+
42+
# -- Timing & status --
43+
time: str | None = Field(default=None, description="ISO-8601 timestamp when the search was performed")
44+
# Convenience alias — populated from `time` via model_validator
45+
created_at: str | None = Field(default=None, description="Alias for `time` — when the search was performed")
46+
duration: float | None = Field(default=None, description="End-to-end query duration in seconds")
47+
status: str | None = Field(default=None, description="Run status: 'success' or 'failed'")
48+
49+
# -- Pipeline & version --
50+
pipeline: dict[str, Any] | None = Field(
51+
default=None, description="Pipeline metadata (contains 'name' and other pipeline info)"
52+
)
53+
pipeline_version_id: str | None = Field(default=None, description="UUID of the pipeline version used")
54+
client_source_path: str | None = Field(default=None, description="Client path that initiated the query")
55+
56+
# -- User & auth --
57+
user: dict[str, Any] | None = Field(
58+
default=None, description="User who ran the search (contains 'id', 'given_name', 'family_name')"
59+
)
60+
api_key: dict[str, Any] | None = Field(default=None, description="API key used (contains 'id', 'name')")
61+
62+
# -- Feedback, labels, notes --
63+
feedback: list[dict[str, Any]] | None = Field(default=None, description="User feedback on this search")
64+
labels: list[str] = Field(default_factory=list, description="Labels assigned to this search history record")
65+
note: str | None = Field(default=None, description="Free-text note attached to this search history record")
66+
67+
@model_validator(mode="before")
68+
@classmethod
69+
def _normalize(cls, data: Any) -> Any:
70+
"""Extract convenience fields from nested API response structure."""
71+
if not isinstance(data, dict):
72+
return data
73+
# Populate `query` from request.query if not already set at top level
74+
if not data.get("query") and isinstance(data.get("request"), dict):
75+
data["query"] = data["request"].get("query")
76+
# Populate `created_at` from `time` if not already set
77+
if not data.get("created_at") and data.get("time"):
78+
data["created_at"] = data["time"]
79+
return data
2580

2681
@field_validator("feedback", mode="before")
2782
@classmethod
@@ -33,3 +88,74 @@ def _feedback_to_list(cls, v: Any) -> list[dict[str, Any]] | None:
3388
if isinstance(v, dict):
3489
return [v]
3590
return None
91+
92+
93+
class HaystackTraceSpan(BaseModel):
94+
"""A single span in a Haystack pipeline trace."""
95+
96+
model_config = {"extra": "allow"}
97+
98+
span_id: str = Field(description="Unique identifier for this span")
99+
parent_span_id: str | None = Field(default=None, description="Parent span ID, if this is a child span")
100+
operation_name: str = Field(description="Name of the operation performed in this span")
101+
component: str | None = Field(default=None, description="Haystack component that produced this span")
102+
start_time: str = Field(description="ISO-8601 timestamp when the span started")
103+
end_time: str | None = Field(default=None, description="ISO-8601 timestamp when the span ended")
104+
duration_ms: float | None = Field(default=None, description="Duration of the span in milliseconds")
105+
tags: dict[str, Any] = Field(default_factory=dict, description="Arbitrary key-value tags attached to the span")
106+
107+
108+
class HaystackTraceLog(BaseModel):
109+
"""A log entry emitted during a Haystack pipeline run."""
110+
111+
model_config = {"extra": "allow"}
112+
113+
logger: str = Field(description="Logger name that emitted this entry")
114+
level: str = Field(description="Log level (e.g. INFO, WARNING, ERROR)")
115+
message: str = Field(description="Log message text")
116+
timestamp: str = Field(description="ISO-8601 timestamp of the log entry")
117+
extra_fields: dict[str, Any] = Field(default_factory=dict, description="Additional structured fields")
118+
119+
120+
class HaystackTraceFailure(BaseModel):
121+
"""Failure information for a Haystack pipeline run."""
122+
123+
type: str = Field(description="Exception type name")
124+
message: str = Field(description="Exception message")
125+
stacktrace: list[str] = Field(description="Stack trace lines")
126+
127+
128+
class HaystackTraceV1(BaseModel):
129+
"""Full Haystack pipeline run trace (schema version haystack-trace/v1)."""
130+
131+
model_config = {"extra": "allow"}
132+
133+
schema_version: str = Field(description="Trace schema version identifier")
134+
run_id: str = Field(description="Unique run identifier")
135+
started_at: str = Field(description="ISO-8601 timestamp when the run started")
136+
finished_at: str | None = Field(default=None, description="ISO-8601 timestamp when the run finished")
137+
duration_ms: float | None = Field(default=None, description="Total run duration in milliseconds")
138+
status: str | None = Field(default=None, description="Run status: 'success' or 'failed'")
139+
traces: list[HaystackTraceSpan] = Field(default_factory=list, description="Ordered list of trace spans")
140+
logs: list[HaystackTraceLog] = Field(default_factory=list, description="Log entries emitted during the run")
141+
failure: HaystackTraceFailure | None = Field(default=None, description="Failure details if the run failed")
142+
143+
144+
class PipelineTraceEntry(BaseModel):
145+
"""A single pipeline trace response from the deepset platform (v2 API format)."""
146+
147+
model_config = {"extra": "allow"}
148+
149+
query_id: str = Field(description="Unique identifier for the search query")
150+
query: str = Field(description="The search query text that was executed")
151+
status: str | None = Field(default=None, description="Run status: 'success' or 'failed'")
152+
duration_s: float = Field(description="End-to-end query duration in seconds")
153+
created_at: str = Field(description="ISO-8601 timestamp when the query was executed")
154+
client_source_path: str | None = Field(default=None, description="Client path that initiated the query")
155+
pipeline_version_id: str | None = Field(default=None, description="UUID of the pipeline version used")
156+
pipeline_version_name: str | None = Field(default=None, description="Name of the pipeline version used")
157+
pipeline_version_number: int | None = Field(default=None, description="Number of the pipeline version used")
158+
api_key: dict[str, Any] | None = Field(default=None, description="API key metadata (id, name)")
159+
feedback: dict[str, Any] | None = Field(default=None, description="Aggregated user feedback for this query")
160+
user_id: str | None = Field(default=None, description="UUID of the user who ran the query")
161+
haystack_trace: HaystackTraceV1 | None = Field(default=None, description="Full Haystack pipeline run trace")

src/deepset_mcp/api/search_history/protocols.py

Lines changed: 53 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -4,36 +4,85 @@
44

55
"""Protocols for search history resources."""
66

7-
from typing import Protocol
7+
from typing import Literal, Protocol
88

9-
from deepset_mcp.api.search_history.models import SearchHistoryEntry
9+
from deepset_mcp.api.search_history.models import PipelineTraceEntry, SearchHistoryEntry
1010
from deepset_mcp.api.shared_models import PaginatedResponse
1111

1212

1313
class SearchHistoryResourceProtocol(Protocol):
1414
"""Protocol defining the interface for search history resources."""
1515

1616
async def list(
17-
self, limit: int = 10, after: str | None = None, query_filter: str | None = None
17+
self,
18+
limit: int = 10,
19+
after: str | None = None,
20+
query_filter: str | None = None,
21+
sort_field: Literal["created_at", "query", "duration", "feedbacks/score"] = "created_at",
22+
sort_order: Literal["ASC", "DESC"] = "DESC",
1823
) -> PaginatedResponse[SearchHistoryEntry]:
1924
"""List search history entries in the workspace.
2025
2126
:param limit: Maximum number of entries to return per page.
2227
:param after: Cursor to fetch the next page of results.
2328
:param query_filter: OData filter expression to narrow results.
29+
:param sort_field: Field to sort by.
30+
:param sort_order: Sort direction (ASC or DESC).
2431
:returns: Paginated response of search history entries.
2532
"""
2633
...
2734

2835
async def list_pipeline(
29-
self, pipeline_name: str, limit: int = 10, after: str | None = None, query_filter: str | None = None
36+
self,
37+
pipeline_name: str,
38+
limit: int = 10,
39+
after: str | None = None,
40+
query_filter: str | None = None,
41+
sort_field: Literal["created_at", "query", "duration", "feedbacks/score"] = "created_at",
42+
sort_order: Literal["ASC", "DESC"] = "DESC",
3043
) -> PaginatedResponse[SearchHistoryEntry]:
3144
"""List search history entries for a specific pipeline with pagination.
3245
3346
:param pipeline_name: Name of the pipeline.
3447
:param limit: Maximum number of entries to return per page.
3548
:param after: Cursor to fetch the next page of results.
3649
:param query_filter: OData filter expression to narrow results.
50+
:param sort_field: Field to sort by.
51+
:param sort_order: Sort direction (ASC or DESC).
3752
:returns: Paginated response of search history entries (most recent first).
3853
"""
3954
...
55+
56+
async def list_pipeline_traces(
57+
self,
58+
pipeline_name: str,
59+
limit: int = 10,
60+
after: str | None = None,
61+
query_filter: str | None = None,
62+
sort_field: Literal["created_at", "query", "duration", "feedbacks/score"] = "created_at",
63+
sort_order: Literal["ASC", "DESC"] = "DESC",
64+
) -> PaginatedResponse[PipelineTraceEntry]:
65+
"""List Haystack pipeline run traces with pagination, filtering, and sorting.
66+
67+
:param pipeline_name: Name of the pipeline.
68+
:param limit: Maximum number of trace entries to return per page.
69+
:param after: Cursor to fetch the next page of results.
70+
:param query_filter: OData filter expression to narrow results.
71+
:param sort_field: Field to sort by.
72+
:param sort_order: Sort direction (ASC or DESC).
73+
:returns: Paginated response of pipeline trace entries.
74+
"""
75+
...
76+
77+
async def get_pipeline_trace(
78+
self,
79+
pipeline_name: str,
80+
query_id: str,
81+
) -> PipelineTraceEntry | None:
82+
"""Get the Haystack pipeline run trace for a single search history record.
83+
84+
:param pipeline_name: Name of the pipeline.
85+
:param query_id: UUID of the search history query.
86+
:returns: The trace entry, or None if not found.
87+
"""
88+
...

0 commit comments

Comments
 (0)