Skip to content

Commit cb36637

Browse files
mingjerliclaude
andcommitted
feat: Add Prefect orchestrator support
Add Prefect 3.x integration with Pipeline class: - Add PrefectOrchestrator class with to_flow() and to_deployment() methods - Add Pipeline.to_prefect_flow() for generating Prefect flows with tasks - Add Pipeline.to_prefect_deployment() for creating scheduled deployments - Add comprehensive test suite (17 tests) for Prefect integration Features: - Automatic task generation from SQL queries with proper dependencies - Support for retries, timeouts, and tags - Concurrent execution of independent tasks using wait_for - Deployment support with cron schedules Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
1 parent 469b643 commit cb36637

4 files changed

Lines changed: 745 additions & 1 deletion

File tree

src/clgraph/orchestrators/__init__.py

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,10 +7,11 @@
77
Supported orchestrators:
88
- Airflow (2.x and 3.x)
99
- Dagster (1.x)
10+
- Prefect (2.x and 3.x)
1011
1112
Example:
1213
from clgraph import Pipeline
13-
from clgraph.orchestrators import AirflowOrchestrator, DagsterOrchestrator
14+
from clgraph.orchestrators import AirflowOrchestrator, DagsterOrchestrator, PrefectOrchestrator
1415
1516
pipeline = Pipeline.from_sql_files("queries/", dialect="bigquery")
1617
@@ -21,14 +22,20 @@
2122
# Generate Dagster assets
2223
dagster = DagsterOrchestrator(pipeline)
2324
assets = dagster.to_assets(executor=execute_sql, group_name="analytics")
25+
26+
# Generate Prefect flow
27+
prefect = PrefectOrchestrator(pipeline)
28+
flow = prefect.to_flow(executor=execute_sql, flow_name="my_pipeline")
2429
"""
2530

2631
from .airflow import AirflowOrchestrator
2732
from .base import BaseOrchestrator
2833
from .dagster import DagsterOrchestrator
34+
from .prefect import PrefectOrchestrator
2935

3036
__all__ = [
3137
"BaseOrchestrator",
3238
"AirflowOrchestrator",
3339
"DagsterOrchestrator",
40+
"PrefectOrchestrator",
3441
]
Lines changed: 247 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,247 @@
1+
"""
2+
Prefect orchestrator integration for clgraph.
3+
4+
Converts clgraph pipelines to Prefect flows and deployments.
5+
Supports Prefect 2.x and 3.x.
6+
"""
7+
8+
from typing import TYPE_CHECKING, Any, Callable, Dict, List, Optional
9+
10+
from .base import BaseOrchestrator
11+
12+
if TYPE_CHECKING:
13+
pass
14+
15+
16+
class PrefectOrchestrator(BaseOrchestrator):
17+
"""
18+
Converts clgraph pipelines to Prefect flows.
19+
20+
Uses the Prefect @flow and @task decorators which are compatible
21+
across both Prefect 2.x and 3.x versions.
22+
23+
Example:
24+
from clgraph.orchestrators import PrefectOrchestrator
25+
26+
orchestrator = PrefectOrchestrator(pipeline)
27+
flow_fn = orchestrator.to_flow(
28+
executor=execute_sql,
29+
flow_name="my_pipeline",
30+
)
31+
32+
# Run the flow
33+
flow_fn()
34+
"""
35+
36+
def to_flow(
37+
self,
38+
executor: Callable[[str], None],
39+
flow_name: str,
40+
description: Optional[str] = None,
41+
retries: int = 2,
42+
retry_delay_seconds: int = 60,
43+
timeout_seconds: Optional[int] = None,
44+
tags: Optional[List[str]] = None,
45+
**flow_kwargs,
46+
):
47+
"""
48+
Create Prefect Flow from the pipeline.
49+
50+
Converts the pipeline's table dependency graph into a Prefect flow
51+
where each SQL query becomes a task with proper dependencies.
52+
53+
Args:
54+
executor: Function that executes SQL (takes sql string)
55+
flow_name: Name for the Prefect flow
56+
description: Optional flow description (auto-generated if not provided)
57+
retries: Number of retries for failed tasks (default: 2)
58+
retry_delay_seconds: Delay between retries in seconds (default: 60)
59+
timeout_seconds: Optional task timeout in seconds
60+
tags: Optional list of tags for filtering
61+
**flow_kwargs: Additional flow parameters (version, task_runner, etc.)
62+
63+
Returns:
64+
Prefect Flow function
65+
66+
Examples:
67+
# Basic usage
68+
flow_fn = orchestrator.to_flow(
69+
executor=execute_sql,
70+
flow_name="my_pipeline"
71+
)
72+
73+
# Run the flow
74+
flow_fn()
75+
76+
# Advanced usage with all parameters
77+
flow_fn = orchestrator.to_flow(
78+
executor=execute_sql,
79+
flow_name="my_pipeline",
80+
description="Daily analytics pipeline",
81+
retries=3,
82+
retry_delay_seconds=120,
83+
tags=["analytics", "daily"],
84+
)
85+
86+
Note:
87+
- Requires Prefect 2.x or 3.x: pip install 'prefect>=2.0'
88+
- Tasks are automatically wired based on table dependencies
89+
- Use to_deployment() for scheduled execution
90+
"""
91+
try:
92+
from prefect import flow, task
93+
except ImportError as e:
94+
raise ImportError(
95+
"Prefect is required for flow generation. "
96+
"Install it with: pip install 'prefect>=2.0'"
97+
) from e
98+
99+
table_graph = self.table_graph
100+
101+
# Auto-generate description
102+
if description is None:
103+
query_count = len(table_graph.queries)
104+
table_count = len(table_graph.tables)
105+
description = (
106+
f"Pipeline with {query_count} queries operating on "
107+
f"{table_count} tables. Generated by clgraph."
108+
)
109+
110+
# Default tags
111+
task_tags = tags if tags is not None else ["clgraph"]
112+
113+
# Create task factory with closure pattern
114+
def make_task(query_id: str, sql: str, exec_fn: Callable):
115+
task_name = self._sanitize_name(query_id)
116+
117+
@task(
118+
name=task_name,
119+
retries=retries,
120+
retry_delay_seconds=retry_delay_seconds,
121+
timeout_seconds=timeout_seconds,
122+
tags=task_tags,
123+
)
124+
def sql_task():
125+
"""Execute SQL query."""
126+
exec_fn(sql)
127+
return query_id
128+
129+
return sql_task
130+
131+
# Build task callables for each query
132+
task_callables: Dict[str, Any] = {}
133+
for query_id in table_graph.topological_sort():
134+
query = table_graph.queries[query_id]
135+
task_callables[query_id] = make_task(query_id, query.sql, executor)
136+
137+
# Create flow with dependencies
138+
@flow(
139+
name=flow_name,
140+
description=description,
141+
**flow_kwargs,
142+
)
143+
def pipeline_flow():
144+
"""Generated pipeline flow."""
145+
task_futures: Dict[str, Any] = {}
146+
147+
for query_id in table_graph.topological_sort():
148+
query = table_graph.queries[query_id]
149+
150+
# Find upstream dependencies
151+
wait_for = []
152+
for source_table in query.source_tables:
153+
if source_table in table_graph.tables:
154+
table_node = table_graph.tables[source_table]
155+
if table_node.created_by and table_node.created_by in task_futures:
156+
wait_for.append(task_futures[table_node.created_by])
157+
158+
# Submit task with dependencies
159+
if wait_for:
160+
task_futures[query_id] = task_callables[query_id].submit(wait_for=wait_for)
161+
else:
162+
task_futures[query_id] = task_callables[query_id].submit()
163+
164+
# Wait for all tasks to complete and collect results
165+
results = {qid: future.result() for qid, future in task_futures.items()}
166+
return results
167+
168+
return pipeline_flow
169+
170+
def to_deployment(
171+
self,
172+
executor: Callable[[str], None],
173+
flow_name: str,
174+
deployment_name: str,
175+
cron: Optional[str] = None,
176+
interval_seconds: Optional[int] = None,
177+
work_pool_name: Optional[str] = None,
178+
tags: Optional[List[str]] = None,
179+
**kwargs,
180+
):
181+
"""
182+
Create Prefect Deployment from the pipeline for scheduled execution.
183+
184+
Args:
185+
executor: Function that executes SQL (takes sql string)
186+
flow_name: Name for the Prefect flow
187+
deployment_name: Name for the deployment
188+
cron: Cron schedule (e.g., "0 0 * * *" for daily at midnight)
189+
interval_seconds: Interval schedule in seconds (alternative to cron)
190+
work_pool_name: Work pool to use for execution
191+
tags: Optional list of tags for the deployment
192+
**kwargs: Additional parameters passed to to_flow()
193+
194+
Returns:
195+
Prefect RunnerDeployment instance (Prefect 3.x)
196+
197+
Examples:
198+
# Create deployment with cron schedule
199+
deployment = orchestrator.to_deployment(
200+
executor=execute_sql,
201+
flow_name="my_pipeline",
202+
deployment_name="daily_run",
203+
cron="0 0 * * *", # Daily at midnight
204+
)
205+
206+
# For Prefect 3.x, use serve() to run locally or deploy() to push to server
207+
# deployment.apply() for Prefect 2.x compatibility
208+
209+
Note:
210+
- Requires Prefect 2.x or 3.x: pip install 'prefect>=2.0'
211+
- Prefect 3.x: Returns RunnerDeployment, use with serve() or deploy()
212+
- Use work_pool_name to specify execution environment
213+
"""
214+
try:
215+
from prefect import flow as flow_decorator # noqa: F401
216+
except ImportError as e:
217+
raise ImportError(
218+
"Prefect is required for deployment generation. "
219+
"Install it with: pip install 'prefect>=2.0'"
220+
) from e
221+
222+
# Create flow
223+
flow_fn = self.to_flow(executor=executor, flow_name=flow_name, **kwargs)
224+
225+
# Build deployment configuration for Prefect 3.x using flow.to_deployment()
226+
deployment_config: Dict[str, Any] = {
227+
"name": deployment_name,
228+
}
229+
230+
if cron:
231+
deployment_config["cron"] = cron
232+
elif interval_seconds:
233+
deployment_config["interval"] = interval_seconds
234+
235+
if work_pool_name:
236+
deployment_config["work_pool_name"] = work_pool_name
237+
238+
if tags:
239+
deployment_config["tags"] = tags
240+
241+
# Use flow.to_deployment() for Prefect 3.x
242+
deployment = flow_fn.to_deployment(**deployment_config)
243+
244+
return deployment
245+
246+
247+
__all__ = ["PrefectOrchestrator"]

0 commit comments

Comments
 (0)