Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 25 additions & 1 deletion src/dalgo_mcp/tools/pipelines.py
Original file line number Diff line number Diff line change
Expand Up @@ -97,11 +97,35 @@ async def dalgo_get_flow_run(flow_run_id: FlowRunId) -> str:

@app.tool()
async def dalgo_get_flow_run_logs(flow_run_id: FlowRunId) -> str:
"""Get logs for a specific flow run.
"""Get logs for a specific flow run. Large logs are truncated to avoid context overflow —
the response includes metadata showing how many lines were omitted.

Args:
flow_run_id: The Prefect flow run ID.
"""
import json

from dalgo_mcp.truncate import truncate_log_text

client = await adapt_context()
resp = await client.get(f"/api/prefect/flow_runs/{flow_run_id}/logs")

if resp.status_code < 400:
try:
data = resp.json()
# logs might be a string, a list of strings, or a dict with a 'logs' key
if isinstance(data, str):
result = truncate_log_text(data)
return json.dumps(result, indent=2)
elif isinstance(data, list):
text = "\n".join(str(line) for line in data)
result = truncate_log_text(text)
return json.dumps(result, indent=2)
elif isinstance(data, dict) and "logs" in data:
result = truncate_log_text(str(data["logs"]))
data["logs"] = result["content"]
data["_meta"] = result["_meta"]
return json.dumps(data, indent=2, default=str)
except Exception:
pass
return format_response(resp)
58 changes: 58 additions & 0 deletions src/dalgo_mcp/truncate.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
"""Token-aware output truncation for large tool responses.

Prevents context window flooding from large API responses while giving
the model metadata to request more data when needed.
"""

MAX_LOG_LINES = 100
MAX_LIST_ITEMS = 50


def truncate_log_text(text: str, max_lines: int = MAX_LOG_LINES) -> dict:
"""Truncate log text to max_lines, keeping head and tail.

Returns a dict with 'content' and '_meta' keys.
"""
if not text:
return {"content": text, "_meta": {"truncated": False}}

lines = text.splitlines()
if len(lines) <= max_lines:
return {"content": text, "_meta": {"truncated": False, "total_lines": len(lines)}}

half = max_lines // 2
head = lines[:half]
tail = lines[-half:]
omitted = len(lines) - max_lines

truncated_text = "\n".join(head) + f"\n\n... [{omitted} lines omitted] ...\n\n" + "\n".join(tail)
return {
"content": truncated_text,
"_meta": {
"truncated": True,
"total_lines": len(lines),
"shown": max_lines,
"omitted": omitted,
"note": (f"Showing first {half} and last {half} lines. Full logs available in Dalgo UI."),
},
}


def truncate_list(items: list, max_items: int = MAX_LIST_ITEMS) -> dict:
"""Truncate a list to max_items with metadata.

Returns a dict with 'items' and '_meta' keys.
"""
if len(items) <= max_items:
return {"items": items, "_meta": {"truncated": False, "total": len(items)}}

return {
"items": items[:max_items],
"_meta": {
"truncated": True,
"total": len(items),
"shown": max_items,
"omitted": len(items) - max_items,
"note": f"Showing first {max_items} of {len(items)} items.",
},
}
Loading