Skip to content

Commit c97b85b

Browse files
committed
Enhance PipelineRunner with context management and logging improvements
- Added context manager support to PipelineRunner for better resource management. - Integrated logging level configuration from settings into the setup_logging function. - Enhanced adapter setup to allow additional Hamilton adapters and improved logging of enabled adapters. - Updated pipeline execution logging to include execution time using the humanize library. - Refactored settings to include Hamilton-specific configurations for telemetry and data capture limits. - Added humanize library as a dependency for better time formatting.
1 parent 1f05959 commit c97b85b

File tree

11 files changed

+858
-577
lines changed

11 files changed

+858
-577
lines changed

examples/hello-world/conf/project.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
name: null
1+
name: hello-world
22
worker:
33
type: apscheduler
44
backend:

pyproject.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ dependencies = [
1010
'aiobotocore<2.18.0',
1111
'dill>=0.3.8',
1212
'fsspec>=2024.10.0',
13+
"humanize>=4.12.2",
1314
"msgspec>=0.19.0",
1415
'munch>=4.0.0',
1516
"orjson>=3.10.15",

src/flowerpower/cfg/pipeline/adapter.py

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,12 +2,15 @@
22
from munch import munchify
33

44
from ..base import BaseConfig
5-
5+
from ... import settings
66

77
class HamiltonTracerConfig(BaseConfig):
88
project_id: int | None = msgspec.field(default=None)
99
dag_name: str | None = msgspec.field(default=None)
1010
tags: dict = msgspec.field(default_factory=dict)
11+
capture_data_statistics: bool = msgspec.field(default=settings.HAMILTON_CAPTURE_DATA_STATISTICS)
12+
max_list_length_capture: int = msgspec.field(default=settings.HAMILTON_MAX_LIST_LENGTH_CAPTURE)
13+
max_dict_length_capture: int = msgspec.field(default=settings.HAMILTON_MAX_DICT_LENGTH_CAPTURE)
1114

1215
def __post_init__(self):
1316
self.tags = munchify(self.tags)

src/flowerpower/cfg/pipeline/run.py

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@
44
from munch import munchify
55

66
from ..base import BaseConfig
7-
7+
from ... import settings
88

99
class WithAdapterConfig(BaseConfig):
1010
tracker: bool = msgspec.field(default=False)
@@ -17,9 +17,9 @@ class WithAdapterConfig(BaseConfig):
1717

1818

1919
class ExecutorConfig(BaseConfig):
20-
type: str | None = msgspec.field(default=None)
21-
max_workers: int | None = msgspec.field(default=10)
22-
num_cpus: int | None = msgspec.field(default_factory=os.cpu_count)
20+
type: str | None = msgspec.field(default=settings.FP_EXECUTOR)
21+
max_workers: int | None = msgspec.field(default=settings.FP_EXECUTOR_MAX_WORKERS)
22+
num_cpus: int | None = msgspec.field(default=settings.FP_EXECUTOR_NUM_CPUS)
2323

2424

2525
class RunConfig(BaseConfig):
@@ -31,6 +31,8 @@ class RunConfig(BaseConfig):
3131
default_factory=WithAdapterConfig
3232
)
3333
executor: ExecutorConfig | dict = msgspec.field(default_factory=ExecutorConfig)
34+
log_level: str | None = msgspec.field(default=None)
35+
3436

3537
def __post_init__(self):
3638
if isinstance(self.inputs, dict):

src/flowerpower/cfg/project/adapter.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2,12 +2,12 @@
22
from munch import munchify
33

44
from ..base import BaseConfig
5-
5+
from ... import settings
66

77
class HamiltonTrackerConfig(BaseConfig):
88
username: str | None = msgspec.field(default=None)
9-
api_url: str = msgspec.field(default="http://localhost:8241")
10-
ui_url: str = msgspec.field(default="http://localhost:8242")
9+
api_url: str = msgspec.field(default=settings.HAMILTON_API_URL)
10+
ui_url: str = msgspec.field(default=settings.HAMILTON_UI_URL)
1111
api_key: str | None = msgspec.field(default=None)
1212
verify: bool = msgspec.field(default=False)
1313

src/flowerpower/cfg/project/worker.py

Lines changed: 60 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
import datetime as dt
22

33
import msgspec
4-
4+
from ... import settings
55
from ..base import BaseConfig
66

77
# class Worker(Enum):
@@ -30,15 +30,46 @@ class WorkerBackendConfig(BaseConfig):
3030

3131

3232
class APSDataStoreConfig(WorkerBackendConfig):
33-
type: str = msgspec.field(default_factory=lambda: "postgresql")
34-
host: str = msgspec.field(default_factory=lambda: "localhost")
35-
port: int = msgspec.field(default_factory=lambda: 5432)
33+
type: str = msgspec.field(default=settings.FP_APS_WORKER_BACKEND_DS)
34+
host: str = msgspec.field(
35+
default=settings.BACKEND_PROPERTIES[settings.FP_APS_WORKER_BACKEND_DS][
36+
"default_host"
37+
]
38+
)
39+
port: int = msgspec.field(
40+
default=settings.BACKEND_PROPERTIES[settings.FP_APS_WORKER_BACKEND_DS][
41+
"default_port"
42+
]
43+
)
3644
schema: str | None = msgspec.field(default="flowerpower")
37-
username: str = msgspec.field(default_factory=lambda: "postgres")
45+
username: str = msgspec.field(
46+
default=settings.BACKEND_PROPERTIES[settings.FP_APS_WORKER_BACKEND_DS][
47+
"default_username"
48+
]
49+
)
3850

3951

4052
class APSEventBrokerConfig(WorkerBackendConfig):
41-
from_ds_sqla: bool = msgspec.field(default=True)
53+
type: str = msgspec.field(default=settings.FP_APS_WORKER_BACKEND_EB)
54+
host: str = msgspec.field(
55+
default=settings.BACKEND_PROPERTIES[settings.FP_APS_WORKER_BACKEND_EB][
56+
"default_host"
57+
]
58+
)
59+
port: int = msgspec.field(
60+
default=settings.BACKEND_PROPERTIES[settings.FP_APS_WORKER_BACKEND_EB][
61+
"default_port"
62+
]
63+
)
64+
username: str = msgspec.field(
65+
default=settings.BACKEND_PROPERTIES[settings.FP_APS_WORKER_BACKEND_EB][
66+
"default_username"
67+
]
68+
)
69+
from_ds_sqla: bool = msgspec.field(
70+
default_factory=lambda: settings.FP_APS_WORKER_BACKEND_EB == "postgresql"
71+
and settings.FP_APS_WORKER_BACKEND_DS == "postgresql"
72+
)
4273

4374

4475
class APSBackendConfig(BaseConfig):
@@ -47,18 +78,33 @@ class APSBackendConfig(BaseConfig):
4778
default_factory=APSEventBrokerConfig
4879
)
4980
cleanup_interval: int | float | dt.timedelta = msgspec.field(
50-
default=300
81+
default=settings.FP_APS_WORKER_CLEANUP_INTERVAL
5182
) # int in secods
52-
max_concurrent_jobs: int = msgspec.field(default=10)
53-
default_job_executor: str | None = msgspec.field(default="threadpool")
54-
num_workers: int | None = msgspec.field(default=None)
83+
max_concurrent_jobs: int = msgspec.field(
84+
default=settings.FP_APS_WORKER_MAX_CONCURRENT_JOBS
85+
)
86+
default_job_executor: str | None = msgspec.field(
87+
default=settings.FP_EXECUTOR
88+
)
89+
num_workers: int | None = msgspec.field(
90+
default=settings.FP_EXECUTOR_NUM_CPUS
91+
)
5592

5693

5794
class RQBackendConfig(WorkerBackendConfig):
58-
type: str = msgspec.field(default_factory=lambda: "redis")
59-
host: str = msgspec.field(default_factory=lambda: "localhost")
60-
port: int = msgspec.field(default_factory=lambda: 6379)
61-
queues: str | list[str] = msgspec.field(default_factory=lambda: ["default"])
95+
type: str = msgspec.field(default="redis")
96+
host: str = msgspec.field(
97+
default=settings.BACKEND_PROPERTIES["redis"]["default_host"]
98+
)
99+
port: int = msgspec.field(
100+
default=settings.BACKEND_PROPERTIES["redis"]["default_port"]
101+
)
102+
database: int = msgspec.field(
103+
default=settings.BACKEND_PROPERTIES["redis"]["default_database"]
104+
)
105+
queues: str | list[str] = msgspec.field(
106+
default_factory=lambda: ["low-prio", "default", "high-prio"]
107+
)
62108

63109

64110
class HueyBackendConfig(WorkerBackendConfig):

src/flowerpower/pipeline/base.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
import importlib
22
import posixpath
33
import sys
4-
from traceback import TracebackType
4+
from types import TracebackType
55

66
from loguru import logger
77
from munch import Munch

0 commit comments

Comments
 (0)