-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathpipelines.py
More file actions
131 lines (108 loc) · 4.77 KB
/
Copy pathpipelines.py
File metadata and controls
131 lines (108 loc) · 4.77 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
from mcp.server.fastmcp import FastMCP
from dalgo_mcp.client import format_response
from dalgo_mcp.context import adapt_context
from dalgo_mcp.params import DeploymentId, FlowRunId, Limit
def register(app: FastMCP):
@app.tool()
async def dalgo_list_pipelines() -> str:
"""List all orchestration pipelines (Prefect deployments) in the organization."""
client = await adapt_context()
resp = await client.get("/api/prefect/v1/flows/")
return format_response(resp)
@app.tool()
async def dalgo_get_pipeline(deployment_id: DeploymentId) -> str:
"""Get details of a specific pipeline by its deployment ID.
Args:
deployment_id: The Prefect deployment ID.
"""
client = await adapt_context()
resp = await client.get(f"/api/prefect/v1/flows/{deployment_id}")
return format_response(resp)
@app.tool()
async def dalgo_create_pipeline(pipeline_data: dict) -> str:
"""Create a new orchestration pipeline.
Args:
pipeline_data: Pipeline configuration dict with connection_id, cron schedule, and transform settings.
"""
client = await adapt_context()
resp = await client.post("/api/prefect/v1/flows/", json=pipeline_data)
return format_response(resp)
@app.tool()
async def dalgo_update_pipeline(deployment_id: DeploymentId, pipeline_data: dict) -> str:
"""Update an existing pipeline's configuration.
Args:
pipeline_data: Updated pipeline configuration dict.
"""
client = await adapt_context()
resp = await client.put(f"/api/prefect/v1/flows/{deployment_id}", json=pipeline_data)
return format_response(resp)
@app.tool()
async def dalgo_delete_pipeline(deployment_id: DeploymentId) -> str:
"""Delete a pipeline by its deployment ID.
Args:
deployment_id: The Prefect deployment ID.
"""
client = await adapt_context()
resp = await client.delete(f"/api/prefect/v1/flows/{deployment_id}")
return format_response(resp)
@app.tool()
async def dalgo_trigger_pipeline_run(deployment_id: DeploymentId) -> str:
"""Trigger an immediate run of a pipeline.
Args:
deployment_id: The Prefect deployment ID.
"""
client = await adapt_context()
resp = await client.post(f"/api/prefect/v1/flows/{deployment_id}/flow_run/")
return format_response(resp)
@app.tool()
async def dalgo_get_pipeline_run_history(deployment_id: DeploymentId, limit: Limit = 10) -> str:
"""Get the run history for a specific pipeline.
Args:
deployment_id: The Prefect deployment ID.
limit: Maximum number of runs to return (default 10).
"""
client = await adapt_context()
resp = await client.get(
f"/api/prefect/v1/flows/{deployment_id}/flow_runs/history",
params={"limit": limit},
)
return format_response(resp)
@app.tool()
async def dalgo_get_flow_run(flow_run_id: FlowRunId) -> str:
"""Get details of a specific flow run.
Args:
flow_run_id: The Prefect flow run ID.
"""
client = await adapt_context()
resp = await client.get(f"/api/prefect/flow_runs/{flow_run_id}")
return format_response(resp)
@app.tool()
async def dalgo_get_flow_run_logs(flow_run_id: FlowRunId) -> str:
"""Get logs for a specific flow run. Large logs are truncated to avoid context overflow —
the response includes metadata showing how many lines were omitted.
Args:
flow_run_id: The Prefect flow run ID.
"""
import json
from dalgo_mcp.truncate import truncate_log_text
client = await adapt_context()
resp = await client.get(f"/api/prefect/flow_runs/{flow_run_id}/logs")
if resp.status_code < 400:
try:
data = resp.json()
# logs might be a string, a list of strings, or a dict with a 'logs' key
if isinstance(data, str):
result = truncate_log_text(data)
return json.dumps(result, indent=2)
elif isinstance(data, list):
text = "\n".join(str(line) for line in data)
result = truncate_log_text(text)
return json.dumps(result, indent=2)
elif isinstance(data, dict) and "logs" in data:
result = truncate_log_text(str(data["logs"]))
data["logs"] = result["content"]
data["_meta"] = result["_meta"]
return json.dumps(data, indent=2, default=str)
except Exception:
pass
return format_response(resp)