Skip to content

Commit 0c09ea2

Browse files
committed
Bump version to 0.8.4 in pyproject.toml; add config attribute to PipelineRun and PipelineRunConfig classes; enhance MQTTClient methods to accept config parameter for improved pipeline configuration management.
1 parent 56d9216 commit 0c09ea2

File tree

8 files changed

+217
-42
lines changed

8 files changed

+217
-42
lines changed

pyproject.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ keywords = [
2727
name = "FlowerPower"
2828
readme = "README.md"
2929
requires-python = ">= 3.11"
30-
version = "0.8.3.3"
30+
version = "0.8.4"
3131

3232
[project.scripts]
3333
flowerpower = "flowerpower.cli:app"

src/flowerpower/cfg/pipeline/run.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ class PipelineRunConfig(BaseConfig):
88
final_vars: list[str] = Field(default_factory=list)
99
inputs: dict | Munch = Field(default_factory=dict)
1010
executor: str | None = None
11+
config: dict | Munch = Field(default_factory=dict)
1112
with_tracker: bool = False
1213
with_opentelemetry: bool = False
1314

src/flowerpower/cli/cfg.py

Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1 +1,44 @@
1+
import typer
2+
3+
from ..cli.utils import parse_dict_or_list_param
14
from ..cfg import Config
5+
6+
app = typer.Typer(help="Config management commands")
7+
8+
9+
# @app.command()
10+
# def get_project(request) -> json:
11+
# cfg = request.app.ctx.pipeline_manager.cfg.project.to_dict()
12+
# # cfg.pop("fs")
13+
# return json({"cfg": cfg})
14+
15+
16+
# @bp.get("/pipeline/<pipeline_name>")
17+
# async def get_pipeline(request, pipeline_name) -> json:
18+
# if pipeline_name != request.app.ctx.pipeline_manager.cfg.pipeline.name:
19+
# request.app.ctx.pipeline_manager.load_config(pipeline_name)
20+
# cfg = request.app.ctx.pipeline_manager.cfg.pipeline.to_dict()
21+
# return json({"cfg": cfg})
22+
23+
24+
# @bp.post("/pipeline/<pipeline_name>")
25+
# @openapi.body({"application/json": PipelineConfig}, required=True)
26+
# @validate(json=PipelineConfig)
27+
# async def update_pipeline(request, pipeline_name, body: PipelineConfig) -> json:
28+
# data = request.json
29+
# if pipeline_name != request.app.ctx.pipeline_manager.cfg.pipeline.name:
30+
# request.app.ctx.pipeline_manager.load_config(pipeline_name)
31+
# cfg = request.app.ctx.pipeline_manager.cfg.pipeline.copy()
32+
# cfg.update(data)
33+
# try:
34+
# cfg.to_yaml(
35+
# os.path.join(
36+
# "pipelines",
37+
# pipeline_name + ".yml",
38+
# ),
39+
# fs=request.app.ctx.pipeline_manager.cfg.fs,
40+
# )
41+
# except NotImplementedError as e:
42+
# raise SanicException(f"Update failed. {e}", status_code=404)
43+
# cfg
44+
# return json({"cfg": cfg})

src/flowerpower/cli/mqtt.py

Lines changed: 14 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,18 @@
11
import typer
22

3-
from flowerpower.cli.utils import parse_dict_or_list_param
4-
from flowerpower.mqtt import MQTTClient
3+
from ..cli.utils import parse_dict_or_list_param
4+
from ..mqtt import MQTTClient
55

66
app = typer.Typer(help="MQTT management commands")
7+
8+
9+
@app.command()
10+
def start_broker(
11+
host: str = "localhost",
12+
port: int = 1883,
13+
username: str = None,
14+
password: str = None,
15+
log_level: str = "INFO",
16+
):
17+
"""Start an MQTT broker."""
18+
MQTTClient.start_broker(host, port, username, password, log_level)

src/flowerpower/cli/pipeline.py

Lines changed: 84 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,9 @@ def run(
4141
base_dir: str | None = None,
4242
inputs: str | None = None,
4343
final_vars: str | None = None,
44+
config: str | None = None,
4445
with_tracker: bool = False,
46+
with_opentelemetry: bool = False,
4547
reload: bool = False,
4648
storage_options: str | None = None,
4749
):
@@ -54,7 +56,9 @@ def run(
5456
base_dir: Base directory for the pipeline
5557
inputs: Input parameters as JSON, dict string, or key=value pairs
5658
final_vars: Final variables as JSON or list
57-
with_tracker: Enable tracking
59+
config: Config for the hamilton pipeline executor
60+
with_tracker: Enable tracking with hamilton ui
61+
with_opentelemetry: Enable OpenTelemetry tracing
5862
reload: Reload pipeline before running
5963
storage_options: Storage options as JSON, dict string, or key=value pairs
6064
@@ -75,6 +79,7 @@ def run(
7579
pipeline run my_pipeline --storage-options 'endpoint=http://localhost,use_ssl=true'
7680
"""
7781
parsed_inputs = parse_dict_or_list_param(inputs, "dict")
82+
parsed_config = parse_dict_or_list_param(config, "dict")
7883
parsed_final_vars = parse_dict_or_list_param(final_vars, "list")
7984
parsed_storage_options = parse_dict_or_list_param(storage_options, "dict")
8085

@@ -87,7 +92,9 @@ def run(
8792
executor=executor,
8893
inputs=parsed_inputs,
8994
final_vars=parsed_final_vars,
95+
config=parsed_config,
9096
with_tracker=with_tracker,
97+
with_opentelemetry=with_opentelemetry,
9198
reload=reload,
9299
)
93100

@@ -99,7 +106,9 @@ def run_job(
99106
base_dir: str | None = None,
100107
inputs: str | None = None,
101108
final_vars: str | None = None,
109+
config: str | None = None,
102110
with_tracker: bool = False,
111+
with_opentelemetry: bool = False,
103112
reload: bool = False,
104113
storage_options: str | None = None,
105114
):
@@ -112,7 +121,9 @@ def run_job(
112121
base_dir: Base directory for the pipeline
113122
inputs: Input parameters as JSON, dict string, or key=value pairs
114123
final_vars: Final variables as JSON or list
115-
with_tracker: Enable tracking
124+
config: Config for the hamilton pipeline executor
125+
with_tracker: Enable tracking with hamilton ui
126+
with_opentelemetry: Enable OpenTelemetry tracing
116127
reload: Reload pipeline before running
117128
storage_options: Storage options as JSON, dict string, or key=value pairs
118129
@@ -133,6 +144,7 @@ def run_job(
133144
pipeline run-job 123 --storage-options 'endpoint=http://localhost,use_ssl=true'
134145
"""
135146
parsed_inputs = parse_dict_or_list_param(inputs, "dict")
147+
parsed_config = parse_dict_or_list_param(config, "dict")
136148
parsed_final_vars = parse_dict_or_list_param(final_vars, "list")
137149
parsed_storage_options = parse_dict_or_list_param(storage_options, "dict")
138150

@@ -145,7 +157,9 @@ def run_job(
145157
executor=executor,
146158
inputs=parsed_inputs,
147159
final_vars=parsed_final_vars,
160+
config=parsed_config,
148161
with_tracker=with_tracker,
162+
with_opentelemetry=with_opentelemetry,
149163
reload=reload,
150164
)
151165

@@ -157,7 +171,10 @@ def add_job(
157171
base_dir: str | None = None,
158172
inputs: str | None = None,
159173
final_vars: str | None = None,
174+
config: str | None = None,
160175
with_tracker: bool = False,
176+
with_opentelemetry: bool = False,
177+
reload: bool = False,
161178
storage_options: str | None = None,
162179
):
163180
"""
@@ -169,7 +186,10 @@ def add_job(
169186
base_dir: Base directory for the pipeline
170187
inputs: Input parameters as JSON, dict string, or key=value pairs
171188
final_vars: Final variables as JSON or list
172-
with_tracker: Enable tracking
189+
config: Config for the hamilton pipeline executor
190+
with_tracker: Enable tracking with hamilton ui
191+
with_opentelemetry: Enable OpenTelemetry tracing
192+
reload: Reload pipeline before running
173193
storage_options: Storage options as JSON, dict string, or key=value pairs
174194
175195
Examples:
@@ -189,6 +209,7 @@ def add_job(
189209
pipeline add-job my_pipeline --storage-options 'endpoint=http://localhost,use_ssl=true'
190210
"""
191211
parsed_inputs = parse_dict_or_list_param(inputs, "dict")
212+
parsed_config = parse_dict_or_list_param(config, "dict")
192213
parsed_final_vars = parse_dict_or_list_param(final_vars, "list")
193214
parsed_storage_options = parse_dict_or_list_param(storage_options, "dict")
194215

@@ -203,7 +224,10 @@ def add_job(
203224
executor=executor,
204225
inputs=parsed_inputs,
205226
final_vars=parsed_final_vars,
227+
config=parsed_config,
206228
with_tracker=with_tracker,
229+
with_opentelemetry=with_opentelemetry,
230+
reload=reload,
207231
)
208232

209233

@@ -215,7 +239,9 @@ def schedule(
215239
type: str = "cron",
216240
inputs: str | None = None,
217241
final_vars: str | None = None,
242+
config: str | None = None,
218243
with_tracker: bool = False,
244+
with_opentelemetry: bool = False,
219245
paused: bool = False,
220246
coalesce: str = "latest",
221247
misfire_grace_time: float | None = None,
@@ -232,13 +258,64 @@ def schedule(
232258
"""
233259
Schedule a pipeline with various configuration options.
234260
235-
Supports flexible input parsing for inputs, final_vars, and storage_options
261+
Args:
262+
name: Name of the pipeline to schedule
263+
executor: Executor to use
264+
base_dir: Base directory for the pipeline
265+
type: Type of schedule
266+
inputs: Input parameters as JSON, dict string, or key=value pairs
267+
final_vars: Final variables as JSON or list
268+
config: Config for the hamilton pipeline executor
269+
with_tracker: Enable tracking with hamilton ui
270+
with_opentelemetry: Enable OpenTelemetry tracing
271+
paused: Start the job in paused state
272+
coalesce: Coalesce policy
273+
misfire_grace_time: Misfire grace time
274+
max_jitter: Maximum jitter
275+
max_running_jobs: Maximum running jobs
276+
conflict_policy: Conflict policy
277+
crontab: Crontab expression
278+
cron_params: Cron parameters as JSON or key=value pairs
279+
interval_params: Interval parameters as JSON or key=value pairs
280+
calendarinterval_params: Calendar interval parameters as JSON or key=value pairs
281+
date_params: Date parameters as JSON or key=value pairs
282+
storage_options: Storage options as JSON, dict string, or key=value pairs
283+
284+
Examples:
285+
# JSON inputs
286+
pipeline schedule my_pipeline --inputs '{"key": "value"}'
287+
288+
# Dict string inputs
289+
pipeline schedule my_pipeline --inputs "{'key': 'value'}"
290+
291+
# Key-value pair inputs
292+
pipeline schedule my_pipeline --inputs 'key1=value1,key2=value2'
293+
294+
# List final vars
295+
pipeline schedule my_pipeline --final-vars '["var1", "var2"]'
296+
297+
# Storage options
298+
pipeline schedule my_pipeline --storage-options 'endpoint=http://localhost,use_ssl=true'
299+
300+
# Cron schedule
301+
pipeline schedule my_pipeline --type cron --crontab '0 0 * * *'
302+
303+
# Interval schedule
304+
pipeline schedule my_pipeline --type interval --interval_params minutes=1
305+
306+
# Calendar interval schedule
307+
pipeline schedule my_pipeline --type calendarinterval --calendarinterval_params month=5
308+
309+
# Date schedule
310+
pipeline schedule my_pipeline --type date --date_params run_date='2021-01-01 12:00:01'
311+
236312
"""
237313
if get_schedule_manager is None:
238314
raise ValueError("APScheduler not installed. Please install it first.")
239315

240316
# Parse inputs
241317
parsed_inputs = parse_dict_or_list_param(inputs, "dict")
318+
parsed_config = parse_dict_or_list_param(config, "dict")
242319
parsed_final_vars = parse_dict_or_list_param(final_vars, "list")
243320
parsed_storage_options = parse_dict_or_list_param(storage_options, "dict")
244321

@@ -278,7 +355,9 @@ def schedule(
278355
type=type,
279356
inputs=parsed_inputs,
280357
final_vars=parsed_final_vars,
358+
config=parsed_config,
281359
with_tracker=with_tracker,
360+
with_opentelemetry=with_opentelemetry,
282361
paused=paused,
283362
coalesce=coalesce,
284363
misfire_grace_time=misfire_grace_time,
@@ -347,7 +426,7 @@ def delete(
347426
base_dir=base_dir,
348427
storage_options=parsed_storage_options or {},
349428
) as pipeline:
350-
pipeline.delete(cfg=cfg,module=module)
429+
pipeline.delete(cfg=cfg, module=module)
351430

352431

353432
@app.command()
@@ -448,30 +527,3 @@ def show_summary(
448527
storage_options=parsed_storage_options or {},
449528
) as manager:
450529
manager.show_summary(name=name, cfg=cfg, module=module)
451-
452-
453-
# @app.command()
454-
# def get_summary(
455-
# name: str,
456-
# config: bool = True,
457-
# module: bool = True,
458-
# base_dir: str | None = None,
459-
# storage_options: str | None = None,
460-
# ):
461-
# """
462-
# Get the summary of the specified pipeline.
463-
464-
# Args:
465-
# name: Name of the pipeline to get
466-
# base_dir: Base directory for the pipeline
467-
468-
# Examples:
469-
# pipeline get-summary my_pipeline
470-
# """
471-
# parsed_storage_options = parse_dict_or_list_param(storage_options, "dict")
472-
473-
# with PipelineManager(
474-
# base_dir=base_dir,
475-
# storage_options=parsed_storage_options or {},
476-
# ) as manager:
477-
# manager.get_summary(name=name)

src/flowerpower/http/models/pipeline.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ class PipelineRun(BaseModel):
66
inputs: dict | None = None
77
final_vars: list | None = None
88
executor: str | None = None
9+
config: dict | None = None
910
with_tracker: bool = None
1011
with_opentelemetry: bool = None
1112

src/flowerpower/mqtt.py

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -256,6 +256,7 @@ def start_pipeline_listener(
256256
topic: str | None = None,
257257
inputs: dict | None = None,
258258
final_vars: list | None = None,
259+
config: dict | None = None,
259260
executor: str | None = None,
260261
with_tracker: bool | None = None,
261262
with_opentelemetry: bool | None = None,
@@ -276,6 +277,7 @@ def start_pipeline_listener(
276277
topic: MQTT topic to listen to
277278
inputs: Inputs for the pipeline
278279
final_vars: Final variables for the pipeline
280+
config: Configuration for the pipeline driver
279281
executor: Executor to use for the pipeline
280282
with_tracker: Use tracker for the pipeline
281283
with_opentelemetry: Use OpenTelemetry for the pipeline
@@ -306,6 +308,7 @@ def on_message(client, userdata, msg):
306308
inputs=inputs,
307309
final_vars=final_vars,
308310
executor=executor,
311+
config=config,
309312
with_tracker=with_tracker,
310313
with_opentelemetry=with_opentelemetry,
311314
reload=reload,
@@ -317,6 +320,7 @@ def on_message(client, userdata, msg):
317320
inputs=inputs,
318321
final_vars=final_vars,
319322
executor=executor,
323+
config=config,
320324
with_tracker=with_tracker,
321325
with_opentelemetry=with_opentelemetry,
322326
reload=reload,
@@ -372,6 +376,7 @@ def start_pipeline_listener(
372376
topic: str | None = None,
373377
inputs: dict | None = None,
374378
final_vars: list | None = None,
379+
config: dict | None = None,
375380
executor: str | None = None,
376381
with_tracker: bool | None = None,
377382
with_opentelemetry: bool | None = None,
@@ -392,6 +397,7 @@ def start_pipeline_listener(
392397
topic: MQTT topic to listen to
393398
inputs: Inputs for the pipeline
394399
final_vars: Final variables for the pipeline
400+
config: Configuration for the pipeline driver
395401
executor: Executor to use for the pipeline
396402
with_tracker: Use tracker for the pipeline
397403
with_opentelemetry: Use OpenTelemetry for the pipeline
@@ -410,6 +416,7 @@ def start_pipeline_listener(
410416
topic=topic,
411417
inputs=inputs,
412418
final_vars=final_vars,
419+
config=config,
413420
executor=executor,
414421
with_tracker=with_tracker,
415422
with_opentelemetry=with_opentelemetry,

0 commit comments

Comments
 (0)