Skip to content

Commit c4133eb

Browse files
committed
feat: Enhance configuration management and CLI for FlowerPower
- Added detailed docstrings and examples to Config, PipelineConfig, and ProjectConfig classes for better usability. - Introduced init_config, init_pipeline_config, and init_project_config functions to streamline configuration initialization. - Refactored the CLI to replace the scheduler commands with worker commands, consolidating functionality under a new worker.py module. - Removed the deprecated scheduler.py file and its associated commands. - Implemented hook management in PipelineManager, allowing users to add hooks to pipelines. - Updated Worker class to dynamically determine the worker type from ProjectConfig. - Enhanced logging setup across the CLI commands for better traceability. - Adjusted filesystem caching behavior in load methods for ProjectConfig and PipelineConfig. - Updated settings to use EXECUTOR_MAX_WORKERS for worker count configuration. - Cleaned up unused template code in templates.py.
1 parent 54d26fd commit c4133eb

File tree

14 files changed

+678
-486
lines changed

14 files changed

+678
-486
lines changed

examples/hello-world/conf/project.yml

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
name: null
22
worker:
33
type: apscheduler
4+
45
backend:
56
data_store:
67
type: postgresql
@@ -33,7 +34,7 @@ worker:
3334
cleanup_interval: 300
3435
max_concurrent_jobs: 10
3536
default_job_executor: threadpool
36-
num_workers: 10
37+
num_workers: 100
3738
adapter:
3839
hamilton_tracker:
3940
username: null

src/flowerpower/cfg/__init__.py

Lines changed: 151 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5,11 +5,38 @@
55

66
from ..fs import AbstractFileSystem, get_filesystem
77
from .base import BaseConfig
8-
from .pipeline import PipelineConfig
9-
from .project import ProjectConfig
8+
from .pipeline import PipelineConfig, init_pipeline_config
9+
from .project import ProjectConfig, init_project_config
1010

1111

1212
class Config(BaseConfig):
13+
"""Main configuration class for FlowerPower, combining project and pipeline settings.
14+
15+
This class serves as the central configuration manager, handling both project-wide
16+
and pipeline-specific settings. It provides functionality for loading and saving
17+
configurations using various filesystem abstractions.
18+
19+
Attributes:
20+
pipeline (PipelineConfig): Configuration for the pipeline.
21+
project (ProjectConfig): Configuration for the project.
22+
fs (AbstractFileSystem | None): Filesystem abstraction for I/O operations.
23+
base_dir (str | Path | None): Base directory for the configuration.
24+
storage_options (dict | Munch): Options for filesystem operations.
25+
26+
Example:
27+
```python
28+
# Load configuration
29+
config = Config.load(
30+
base_dir="my_project",
31+
name="project1",
32+
pipeline_name="data-pipeline"
33+
)
34+
35+
# Save configuration
36+
config.save(project=True, pipeline=True)
37+
```
38+
"""
39+
1340
pipeline: PipelineConfig = msgspec.field(default_factory=PipelineConfig)
1441
project: ProjectConfig = msgspec.field(default_factory=ProjectConfig)
1542
fs: AbstractFileSystem | None = None
@@ -26,6 +53,29 @@ def load(
2653
fs: AbstractFileSystem | None = None,
2754
storage_options: dict | Munch = Munch(),
2855
):
56+
"""Load both project and pipeline configurations.
57+
58+
Args:
59+
base_dir (str, optional): Base directory for configurations. Defaults to ".".
60+
name (str | None, optional): Project name. Defaults to None.
61+
pipeline_name (str | None, optional): Pipeline name. Defaults to None.
62+
worker_type (str | None, optional): Type of worker to use. Defaults to None.
63+
fs (AbstractFileSystem | None, optional): Filesystem to use. Defaults to None.
64+
storage_options (dict | Munch, optional): Options for filesystem. Defaults to empty Munch.
65+
66+
Returns:
67+
Config: Combined configuration instance.
68+
69+
Example:
70+
```python
71+
config = Config.load(
72+
base_dir="my_project",
73+
name="test_project",
74+
pipeline_name="etl",
75+
worker_type="local"
76+
)
77+
```
78+
"""
2979
if fs is None:
3080
fs = get_filesystem(base_dir, cached=True, dirfs=True, **storage_options)
3181
project = ProjectConfig.load(
@@ -57,6 +107,19 @@ def save(
57107
fs: AbstractFileSystem | None = None,
58108
storage_options: dict | Munch = Munch(),
59109
):
110+
"""Save project and/or pipeline configurations.
111+
112+
Args:
113+
project (bool, optional): Whether to save project config. Defaults to False.
114+
pipeline (bool, optional): Whether to save pipeline config. Defaults to True.
115+
fs (AbstractFileSystem | None, optional): Filesystem to use. Defaults to None.
116+
storage_options (dict | Munch, optional): Options for filesystem. Defaults to empty Munch.
117+
118+
Example:
119+
```python
120+
config.save(project=True, pipeline=True)
121+
```
122+
"""
60123
if fs is None and self.fs is None:
61124
self.fs = get_filesystem(
62125
self.base_dir, cached=True, dirfs=True, **storage_options
@@ -84,6 +147,25 @@ def load(
84147
storage_options: dict | Munch = Munch(),
85148
fs: AbstractFileSystem | None = None,
86149
):
150+
"""Helper function to load configuration.
151+
152+
This is a convenience wrapper around Config.load().
153+
154+
Args:
155+
base_dir (str): Base directory for configurations.
156+
name (str | None, optional): Project name. Defaults to None.
157+
pipeline_name (str | None, optional): Pipeline name. Defaults to None.
158+
storage_options (dict | Munch, optional): Options for filesystem. Defaults to empty Munch.
159+
fs (AbstractFileSystem | None, optional): Filesystem to use. Defaults to None.
160+
161+
Returns:
162+
Config: Combined configuration instance.
163+
164+
Example:
165+
```python
166+
config = load(base_dir="my_project", name="test", pipeline_name="etl")
167+
```
168+
"""
87169
return Config.load(
88170
name=name,
89171
pipeline_name=pipeline_name,
@@ -100,6 +182,73 @@ def save(
100182
fs: AbstractFileSystem | None = None,
101183
storage_options: dict | Munch = Munch(),
102184
):
185+
"""Helper function to save configuration.
186+
187+
This is a convenience wrapper around Config.save().
188+
189+
Args:
190+
config (Config): Configuration instance to save.
191+
project (bool, optional): Whether to save project config. Defaults to False.
192+
pipeline (bool, optional): Whether to save pipeline config. Defaults to True.
193+
fs (AbstractFileSystem | None, optional): Filesystem to use. Defaults to None.
194+
storage_options (dict | Munch, optional): Options for filesystem. Defaults to empty Munch.
195+
196+
Example:
197+
```python
198+
config = load(base_dir="my_project")
199+
save(config, project=True, pipeline=True)
200+
```
201+
"""
103202
config.save(
104203
project=project, pipeline=pipeline, fs=fs, storage_options=storage_options
105204
)
205+
206+
207+
def init_config(
208+
base_dir: str = ".",
209+
name: str | None = None,
210+
pipeline_name: str | None = None,
211+
worker_type: str | None = None,
212+
fs: AbstractFileSystem | None = None,
213+
storage_options: dict | Munch = Munch(),
214+
):
215+
"""Initialize a new configuration with both project and pipeline settings.
216+
217+
This function creates and initializes both project and pipeline configurations,
218+
combining them into a single Config instance.
219+
220+
Args:
221+
base_dir (str, optional): Base directory for configurations. Defaults to ".".
222+
name (str | None, optional): Project name. Defaults to None.
223+
pipeline_name (str | None, optional): Pipeline name. Defaults to None.
224+
worker_type (str | None, optional): Type of worker to use. Defaults to None.
225+
fs (AbstractFileSystem | None, optional): Filesystem to use. Defaults to None.
226+
storage_options (dict | Munch, optional): Options for filesystem. Defaults to empty Munch.
227+
228+
Returns:
229+
Config: The initialized configuration instance.
230+
231+
Example:
232+
```python
233+
config = init_config(
234+
base_dir="my_project",
235+
name="test_project",
236+
pipeline_name="data-pipeline",
237+
worker_type="local"
238+
)
239+
```
240+
"""
241+
pipeline_cfg = init_pipeline_config(
242+
base_dir=base_dir,
243+
name=pipeline_name,
244+
fs=fs,
245+
storage_options=storage_options,
246+
)
247+
project_cfg = init_project_config(
248+
base_dir=base_dir,
249+
name=name,
250+
worker_type=worker_type,
251+
fs=fs,
252+
storage_options=storage_options,
253+
)
254+
return Config(pipeline=pipeline_cfg, project=project_cfg, fs=fs, base_dir=base_dir)

src/flowerpower/cfg/pipeline/__init__.py

Lines changed: 120 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,36 @@
1111

1212

1313
class PipelineConfig(BaseConfig):
14+
"""Configuration class for managing pipeline settings in FlowerPower.
15+
16+
This class handles pipeline-specific configuration including run settings, scheduling,
17+
parameters, and adapter settings. It supports Hamilton-style parameter configuration
18+
and YAML serialization.
19+
20+
Attributes:
21+
name (str | None): The name of the pipeline.
22+
run (RunConfig): Configuration for pipeline execution.
23+
schedule (ScheduleConfig): Configuration for pipeline scheduling.
24+
params (dict): Pipeline parameters.
25+
adapter (AdapterConfig): Configuration for the pipeline adapter.
26+
h_params (dict): Hamilton-formatted parameters.
27+
28+
Example:
29+
```python
30+
# Create a new pipeline config
31+
pipeline = PipelineConfig(name="data-transform")
32+
33+
# Set parameters
34+
pipeline.params = {
35+
"input_path": "data/input",
36+
"batch_size": 100
37+
}
38+
39+
# Save configuration
40+
pipeline.save(name="data-transform")
41+
```
42+
"""
43+
1444
name: str | None = msgspec.field(default=None)
1545
run: RunConfig = msgspec.field(default_factory=RunConfig)
1646
schedule: ScheduleConfig = msgspec.field(default_factory=ScheduleConfig)
@@ -62,7 +92,26 @@ def update(self, d: dict | Munch):
6292

6393
@staticmethod
6494
def to_h_params(d: dict) -> dict:
65-
"""Converts a dictionary of function arguments to Hamilton function parameters"""
95+
"""Convert a dictionary of parameters to Hamilton-compatible format.
96+
97+
This method transforms regular parameter dictionaries into Hamilton's function parameter
98+
format, supporting nested parameters and source/value decorators.
99+
100+
Args:
101+
d (dict): The input parameter dictionary.
102+
103+
Returns:
104+
dict: Hamilton-formatted parameter dictionary.
105+
106+
Example:
107+
```python
108+
params = {
109+
"batch_size": 100,
110+
"paths": {"input": "data/in", "output": "data/out"}
111+
}
112+
h_params = PipelineConfig.to_h_params(params)
113+
```
114+
"""
66115

67116
def transform_recursive(val, original_dict, depth=1):
68117
if isinstance(val, dict):
@@ -97,8 +146,27 @@ def load(
97146
fs: AbstractFileSystem | None = None,
98147
storage_options: dict | Munch = Munch(),
99148
):
149+
"""Load pipeline configuration from a YAML file.
150+
151+
Args:
152+
base_dir (str, optional): Base directory for the pipeline. Defaults to ".".
153+
name (str | None, optional): Pipeline name. Defaults to None.
154+
fs (AbstractFileSystem | None, optional): Filesystem to use. Defaults to None.
155+
storage_options (dict | Munch, optional): Options for filesystem. Defaults to empty Munch.
156+
157+
Returns:
158+
PipelineConfig: Loaded pipeline configuration.
159+
160+
Example:
161+
```python
162+
pipeline = PipelineConfig.load(
163+
base_dir="my_project",
164+
name="data-pipeline"
165+
)
166+
```
167+
"""
100168
if fs is None:
101-
fs = get_filesystem(base_dir, cached=True, dirfs=True, **storage_options)
169+
fs = get_filesystem(base_dir, cached=False, dirfs=True, **storage_options)
102170
if fs.exists("conf/pipelines"):
103171
if name is not None:
104172
pipeline = PipelineConfig.from_yaml(
@@ -120,6 +188,22 @@ def save(
120188
fs: AbstractFileSystem | None = None,
121189
storage_options: dict | Munch = Munch(),
122190
):
191+
"""Save pipeline configuration to a YAML file.
192+
193+
Args:
194+
name (str | None, optional): Pipeline name. Defaults to None.
195+
base_dir (str, optional): Base directory for the pipeline. Defaults to ".".
196+
fs (AbstractFileSystem | None, optional): Filesystem to use. Defaults to None.
197+
storage_options (dict | Munch, optional): Options for filesystem. Defaults to empty Munch.
198+
199+
Raises:
200+
ValueError: If pipeline name is not set.
201+
202+
Example:
203+
```python
204+
pipeline_config.save(name="data-pipeline", base_dir="my_project")
205+
```
206+
"""
123207
if fs is None:
124208
fs = get_filesystem(base_dir, cached=True, dirfs=True, **storage_options)
125209

@@ -134,3 +218,37 @@ def save(
134218
self.to_yaml(path=f"conf/pipelines/{self.name}.yml", fs=fs)
135219

136220
setattr(self, "h_params", h_params)
221+
222+
223+
def init_pipeline_config(
224+
base_dir: str = ".",
225+
name: str | None = None,
226+
fs: AbstractFileSystem | None = None,
227+
storage_options: dict | Munch = Munch(),
228+
):
229+
"""Initialize a new pipeline configuration.
230+
231+
This function creates a new pipeline configuration and saves it to disk.
232+
233+
Args:
234+
base_dir (str, optional): Base directory for the pipeline. Defaults to ".".
235+
name (str | None, optional): Pipeline name. Defaults to None.
236+
fs (AbstractFileSystem | None, optional): Filesystem to use. Defaults to None.
237+
storage_options (dict | Munch, optional): Options for filesystem. Defaults to empty Munch.
238+
239+
Returns:
240+
PipelineConfig: The initialized pipeline configuration.
241+
242+
Example:
243+
```python
244+
pipeline = init_pipeline_config(
245+
base_dir="my_project",
246+
name="etl-pipeline"
247+
)
248+
```
249+
"""
250+
pipeline = PipelineConfig.load(
251+
base_dir=base_dir, name=name, fs=fs, storage_options=storage_options
252+
)
253+
pipeline.save(name=name, base_dir=base_dir, fs=fs, storage_options=storage_options)
254+
return pipeline

0 commit comments

Comments
 (0)