Skip to content

Commit 92cc9b2

Browse files
committed
Refactor pipeline configuration and update dependencies; enhance project and scheduler models, and clean up unused code
1 parent aa6e8bc commit 92cc9b2

File tree

15 files changed

+739
-343
lines changed

15 files changed

+739
-343
lines changed

examples/hello-world/conf/pipelines/hello_world.yml

Lines changed: 29 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -4,18 +4,17 @@ params:
44
spend_zero_mean:
55
offset: 0
66
run:
7+
config: {}
78
executor: null
89
final_vars:
9-
[
10-
spend,
11-
signups,
12-
avg_x_wk_spend,
13-
spend_per_signup,
14-
spend_zero_mean_unit_variance,
15-
]
10+
- spend
11+
- signups
12+
- avg_x_wk_spend
13+
- spend_per_signup
14+
- spend_zero_mean_unit_variance
1615
inputs: {}
17-
with_tracker: false
1816
with_opentelemetry: false
17+
with_tracker: false
1918
schedule:
2019
run:
2120
coalesce: latest
@@ -27,41 +26,42 @@ schedule:
2726
misfire_grace_time: null
2827
paused: false
2928
trigger:
30-
type_: cron
29+
calendar:
30+
days: null
31+
end_date: null
32+
hour: null
33+
minute: null
34+
months: null
35+
second: null
36+
start_date: null
37+
timezone: null
38+
weeks: null
39+
years: null
3140
cron:
32-
crontab: "* * * * *"
33-
year: null
34-
month: null
35-
week: null
41+
crontab: '* * * * *'
42+
day: null
3643
day_of_week: null
44+
end_time: null
3745
hour: null
3846
minute: null
47+
month: null
3948
second: null
4049
start_time: null
41-
end_time: null
4250
timezone: null
51+
week: null
52+
year: null
53+
date:
54+
run_time: null
4355
interval:
44-
weeks: null
4556
days: null
57+
end_time: null
4658
hours: null
59+
microseconds: null
4760
minutes: null
4861
seconds: null
49-
microseconds: null
5062
start_time: null
51-
end_time: null
52-
calendar:
53-
years: null
54-
months: null
5563
weeks: null
56-
days: null
57-
hour: null
58-
minute: null
59-
second: null
60-
start_date: null
61-
end_date: null
62-
timezone: null
63-
date:
64-
run_time: null
64+
type_: cron
6565
tracker:
6666
dag_name: null
6767
project_id: null

examples/hello-world/conf/project.yml

Lines changed: 8 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -2,20 +2,18 @@ name: hello-world
22
open_telemetry:
33
host: localhost
44
port: 6381
5+
tracker:
6+
api_key: null
7+
api_url: http://localhost:8241
8+
ui_url: http://localhost:8242
9+
username: null
510
worker:
611
cleanup_interval: 900
712
data_store:
813
type: sqlalchemy
914
uri: sqlite+aiosqlite:///flowerpower.db
10-
#uri: postgresql+asyncpg://postgres:password@localhost:5432/flowerpower
15+
#uri: postgresql+asyncpg://postgres:password@localhost/flowerpower
1116
event_broker:
12-
{}
13-
#host: localhost
14-
#port: 6379
15-
#type: redis
17+
type: redis
18+
uri: redis://localhost:6379
1619
max_concurrent_jobs: 100
17-
tracker:
18-
api_key: null
19-
api_url: http://localhost:8241
20-
ui_url: http://localhost:8242
21-
username: null

examples/hello-world/pipelines/hello_world.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,8 @@
1313
Path(__file__).parents[1], pipeline_name="hello_world"
1414
).pipeline.h_params
1515

16+
print(Path(__file__).parents[1])
17+
1618

1719
def spend() -> pd.Series:
1820
"""Returns a series of spend data."""

pyproject.toml

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -52,11 +52,11 @@ opentelemetry = [
5252
ray = ["ray>=2.34.0"]
5353
redis = ["redis>=5.0.4"]
5454
scheduler = [
55-
"aiosqlite>=0.20.0",
5655
"greenlet>=3.0.3",
5756
"asyncpg>=0.29.0",
5857
"sqlalchemy>=2.0.30",
5958
"apscheduler>=4.0.0a5", #"apscheduler @ git+https://github.com/agronholm/apscheduler",
59+
"aiosqlite>=0.21.0",
6060
]
6161
tui = ["textual>=0.85.2"]
6262
ui = ["sf-hamilton-ui>=0.0.11"]
@@ -109,5 +109,8 @@ dev-dependencies = [
109109
"obstore>=0.3.0",
110110
"pytest>=8.3.4",
111111
"mocker>=1.1.1",
112+
"marimo>=0.10.19",
113+
"panel>=1.6.0",
114+
"ipywidgets>=8.1.5",
112115
]
113116
package = true

src/flowerpower/cli/pipeline.py

Lines changed: 88 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -254,6 +254,7 @@ def schedule(
254254
calendarinterval_params: str | None = None,
255255
date_params: str | None = None,
256256
storage_options: str | None = None,
257+
overwrite: bool = False,
257258
):
258259
"""
259260
Schedule a pipeline with various configuration options.
@@ -280,6 +281,7 @@ def schedule(
280281
calendarinterval_params: Calendar interval parameters as JSON or key=value pairs
281282
date_params: Date parameters as JSON or key=value pairs
282283
storage_options: Storage options as JSON, dict string, or key=value pairs
284+
overwrite: Overwrite existing schedule
283285
284286
Examples:
285287
# JSON inputs
@@ -364,12 +366,98 @@ def schedule(
364366
max_jitter=max_jitter,
365367
max_running_jobs=max_running_jobs,
366368
conflict_policy=conflict_policy,
369+
overwrite=overwrite,
367370
**kwargs,
368371
)
369372

370373
logger.info(f"Job {id_} scheduled.")
371374

372375

376+
@app.command()
377+
def schedule_all(
378+
executor: str = "local",
379+
base_dir: str | None = None,
380+
type: str = "cron",
381+
inputs: str | None = None,
382+
final_vars: str | None = None,
383+
config: str | None = None,
384+
with_tracker: bool = False,
385+
with_opentelemetry: bool = False,
386+
paused: bool = False,
387+
coalesce: str = "latest",
388+
misfire_grace_time: float | None = None,
389+
max_jitter: float | None = None,
390+
max_running_jobs: int | None = None,
391+
conflict_policy: str = "do_nothing",
392+
crontab: str | None = None,
393+
cron_params: str | None = None,
394+
interval_params: str | None = None,
395+
calendarinterval_params: str | None = None,
396+
date_params: str | None = None,
397+
storage_options: str | None = None,
398+
overwrite: bool = False,
399+
):
400+
"""
401+
Schedule all pipelines using the pipeline specific configurations (`conf/pipelines/<name>.yml`).
402+
403+
Args:
404+
executor: Executor to use
405+
base_dir: Base directory for the pipeline
406+
type: Type of schedule
407+
inputs: Input parameters as JSON, dict string, or key=value pairs
408+
final_vars: Final variables as JSON or list
409+
config: Config for the hamilton pipeline executor
410+
with_tracker: Enable tracking with hamilton ui
411+
with_opentelemetry: Enable OpenTelemetry tracing
412+
paused: Start the job in paused state
413+
coalesce: Coalesce policy
414+
misfire_grace_time: Misfire grace time
415+
max_jitter: Maximum jitter
416+
max_running_jobs: Maximum running jobs
417+
conflict_policy: Conflict policy
418+
crontab: Crontab expression
419+
cron_params: Cron parameters as JSON or key=value pairs
420+
interval_params: Interval parameters as JSON or key=value pairs
421+
calendarinterval_params: Calendar interval parameters as JSON or key=value pairs
422+
date_params: Date parameters as JSON or key=value pairs
423+
storage_options: Storage options as JSON, dict string, or key=value pairs
424+
overwrite: Overwrite existing schedule
425+
426+
Examples:
427+
pipeline schedule-all
428+
"""
429+
if get_schedule_manager is None:
430+
raise ValueError("APScheduler not installed. Please install it first.")
431+
432+
parsed_storage_options = parse_dict_or_list_param(storage_options, "dict")
433+
434+
with PipelineManager(
435+
base_dir=base_dir,
436+
storage_options=parsed_storage_options or {},
437+
) as manager:
438+
manager.schedule_all(
439+
executor=executor,
440+
type=type,
441+
inputs=inputs,
442+
final_vars=final_vars,
443+
config=config,
444+
with_tracker=with_tracker,
445+
with_opentelemetry=with_opentelemetry,
446+
paused=paused,
447+
coalesce=coalesce,
448+
misfire_grace_time=misfire_grace_time,
449+
max_jitter=max_jitter,
450+
max_running_jobs=max_running_jobs,
451+
conflict_policy=conflict_policy,
452+
overwrite=overwrite,
453+
crontab=crontab,
454+
cron_params=cron_params,
455+
interval_params=interval_params,
456+
calendarinterval_params=calendarinterval_params,
457+
date_params=date_params,
458+
)
459+
460+
373461
@app.command()
374462
def new(
375463
name: str,

src/flowerpower/filesystem/base.py

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -285,10 +285,6 @@ def get_filesystem(
285285

286286
pp = infer_storage_options(str(path) if isinstance(path, Path) else path)
287287
protocol = pp.get("protocol")
288-
host = pp.get("host", "")
289-
path = pp.get("path", "")
290-
if host and host not in path:
291-
path = posixpath.join(host, path)
292288

293289
if protocol == "file" or protocol == "local":
294290
fs = filesystem(protocol)
@@ -298,6 +294,11 @@ def get_filesystem(
298294
fs.is_cache_fs = False
299295
return fs
300296

297+
host = pp.get("host", "")
298+
path = pp.get("path", "").lstrip("/")
299+
if len(host) and host not in path:
300+
path = posixpath.join(host, path)
301+
301302
if isinstance(storage_options, dict):
302303
storage_options = storage_options_from_dict(protocol, storage_options)
303304

src/flowerpower/http/api/cfg.py

Lines changed: 17 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@
55
from sanic.response import json
66
from sanic_ext import openapi, validate
77

8-
from ...cfg import PipelineConfig
8+
from ...cfg import PipelineConfig, ProjectConfig
99

1010
bp = Blueprint("api_flowerpower_cfg", url_prefix="api/cfg")
1111

@@ -17,6 +17,18 @@ async def get_project(request) -> json:
1717
return json({"cfg": cfg})
1818

1919

20+
@bp.post("/project")
21+
@validate(json=ProjectConfig)
22+
async def update_project(request, body: ProjectConfig) -> json:
23+
cfg = request.app.ctx.pipeline_manager.cfg.copy()
24+
cfg.project.update(body.model_dump())
25+
try:
26+
cfg.save()
27+
except NotImplementedError as e:
28+
raise SanicException(f"Update failed. {e}", status_code=404)
29+
return json({"cfg": cfg})
30+
31+
2032
@bp.get("/pipeline/<pipeline_name>")
2133
async def get_pipeline(request, pipeline_name) -> json:
2234
if pipeline_name != request.app.ctx.pipeline_manager.cfg.pipeline.name:
@@ -26,23 +38,15 @@ async def get_pipeline(request, pipeline_name) -> json:
2638

2739

2840
@bp.post("/pipeline/<pipeline_name>")
29-
@openapi.body({"application/json": PipelineConfig}, required=True)
3041
@validate(json=PipelineConfig)
3142
async def update_pipeline(request, pipeline_name, body: PipelineConfig) -> json:
32-
data = request.json
3343
if pipeline_name != request.app.ctx.pipeline_manager.cfg.pipeline.name:
3444
request.app.ctx.pipeline_manager.load_config(pipeline_name)
35-
cfg = request.app.ctx.pipeline_manager.cfg.pipeline.copy()
36-
cfg.update(data)
45+
cfg = request.app.ctx.pipeline_manager.cfg.copy()
46+
cfg.pipeline.update(body.model_dump())
3747
try:
38-
cfg.to_yaml(
39-
posixpath.join(
40-
"pipelines",
41-
pipeline_name + ".yml",
42-
),
43-
fs=request.app.ctx.pipeline_manager.cfg.fs,
44-
)
48+
cfg.save()
4549
except NotImplementedError as e:
4650
raise SanicException(f"Update failed. {e}", status_code=404)
47-
cfg
51+
4852
return json({"cfg": cfg})

0 commit comments

Comments
 (0)