Skip to content

Commit 7af9642

Browse files
committed
Refactor logging setup and configuration
- Introduced a centralized logging setup in `src/flowerpower/utils/logging.py` to standardize log formatting and levels across the application. - Updated `src/flowerpower/pipeline/scheduler.py`, `src/flowerpower/worker/__init__.py`, and other worker modules to utilize the new logging setup. - Created a new settings module `src/flowerpower/settings.py` to manage configuration variables, including logging levels and executor settings. - Removed unused imports and commented-out code in various files for improved code clarity. - Enhanced timezone handling in `src/flowerpower/utils/sql.py` to ensure proper error handling and timezone awareness. - Updated dependency markers in `uv.lock` for better compatibility across platforms. - Added support for `openlineage-python` package in the dependency management.
1 parent 22be4bf commit 7af9642

40 files changed

+699
-710
lines changed

examples/hello-world/pipelines/hello_world.py

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@
66
from pathlib import Path
77

88
import pandas as pd
9-
from hamilton.function_modifiers import parameterize, config
9+
from hamilton.function_modifiers import config, parameterize
1010
from loguru import logger
1111

1212
from flowerpower.cfg import Config
@@ -15,24 +15,28 @@
1515
Path(__file__).parents[1], pipeline_name="hello_world"
1616
).pipeline.h_params
1717

18+
1819
@config.when(range=10_000)
1920
def spend__10000() -> pd.Series:
2021
"""Returns a series of spend data."""
2122
# time.sleep(2)
2223
return pd.Series(range(10_000)) * 10
2324

25+
2426
@config.when(range=10_000)
2527
def signups__10000() -> pd.Series:
2628
"""Returns a series of signups data."""
2729
time.sleep(1)
2830
return pd.Series(range(10_000))
2931

32+
3033
@config.when(range=1_000)
3134
def spend__1000() -> pd.Series:
3235
"""Returns a series of spend data."""
3336
# time.sleep(2)
3437
return pd.Series(range(10_000)) * 10
3538

39+
3640
@config.when(range=1_000)
3741
def signups__1000() -> pd.Series:
3842
"""Returns a series of signups data."""

pyproject.toml

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -123,3 +123,6 @@ package = true
123123
mlflow = [
124124
"mlflow>=2.21.3",
125125
]
126+
openlineage = [
127+
"openlineage-python>=1.31.0",
128+
]

src/flowerpower/__init__.py

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,10 @@
11
import importlib.metadata
22

3-
from .flowerpower import init as init_flowerpower
3+
from .flowerpower import init as init_flowerpower # noqa: E402
44

55
__version__ = importlib.metadata.version("FlowerPower")
6+
7+
__all__ = [
8+
"__version__",
9+
"init_flowerpower",
10+
]

src/flowerpower/cfg/__init__.py

Lines changed: 2 additions & 137 deletions
Original file line numberDiff line numberDiff line change
@@ -3,146 +3,11 @@
33
import msgspec
44
from munch import Munch
55

6-
from ..fs import get_filesystem, AbstractFileSystem
7-
#from .base import BaseConfig
6+
from ..fs import AbstractFileSystem, get_filesystem
7+
from .base import BaseConfig
88
from .pipeline import PipelineConfig
99
from .project import ProjectConfig
1010

11-
import copy
12-
13-
from typing import Any, Self
14-
15-
from fsspec import filesystem
16-
17-
18-
class BaseConfig(msgspec.Struct, kw_only=True):
19-
def to_dict(self) -> dict[str, Any]:
20-
return msgspec.to_builtins(self)
21-
22-
def to_yaml(self, path: str, fs: AbstractFileSystem | None = None) -> None:
23-
"""
24-
Converts the instance to a YAML file.
25-
26-
Args:
27-
path: The path to the YAML file.
28-
fs: An optional filesystem instance to use for file operations.
29-
30-
Raises:
31-
NotImplementedError: If the filesystem does not support writing files.
32-
"""
33-
if fs is None:
34-
fs = filesystem("file")
35-
try:
36-
with fs.open(path, "wb") as f:
37-
f.write(msgspec.yaml.encode(self, order="deterministic"))
38-
# yaml.dump(self.to_dict(), f, default_flow_style=False)
39-
except NotImplementedError:
40-
raise NotImplementedError("The filesystem does not support writing files.")
41-
42-
@classmethod
43-
def from_dict(cls, data: dict[str, Any]) -> "BaseConfig":
44-
"""
45-
Converts a dictionary to an instance of the class.
46-
Args:
47-
data: The dictionary to convert.
48-
49-
Returns:
50-
An instance of the class with the values from the dictionary.
51-
"""
52-
return msgspec.convert(data, cls)
53-
54-
@classmethod
55-
def from_yaml(cls, path: str, fs: AbstractFileSystem | None = None) -> "BaseConfig":
56-
"""
57-
Loads a YAML file and converts it to an instance of the class.
58-
59-
Args:
60-
path: The path to the YAML file.
61-
fs: An optional filesystem instance to use for file operations.
62-
63-
Returns:
64-
An instance of the class with the values from the YAML file.
65-
66-
"""
67-
if fs is None:
68-
fs = filesystem("file")
69-
with fs.open(path) as f:
70-
# data = yaml.full_load(f)
71-
# return cls.from_dict(data)
72-
return msgspec.yaml.decode(f.read(), type=cls, strict=False)
73-
74-
def update(self, d: dict[str, Any]) -> None:
75-
for k, v in d.items():
76-
if hasattr(self, k):
77-
current_value = getattr(self, k)
78-
if isinstance(current_value, dict) and isinstance(v, dict):
79-
current_value.update(v)
80-
else:
81-
setattr(self, k, v)
82-
else:
83-
setattr(self, k, v)
84-
85-
def merge_dict(self, d: dict[str, Any]) -> Self:
86-
"""
87-
Creates a copy of this instance and updates the copy with values
88-
from the provided dictionary, only if the dictionary field's value is not
89-
its default value. The original instance (self) is not modified.
90-
91-
Args:
92-
d: The dictionary to get values from.
93-
94-
Returns:
95-
A new instance of the struct with updated values.
96-
"""
97-
self_copy = copy.copy(self)
98-
for k, v in d.items():
99-
if hasattr(self_copy, k):
100-
current_value = getattr(self_copy, k)
101-
if isinstance(current_value, dict) and isinstance(v, dict):
102-
current_value.update(v)
103-
else:
104-
setattr(self_copy, k, v)
105-
else:
106-
setattr(self_copy, k, v)
107-
return self_copy
108-
109-
def merge(self, source: Self) -> Self:
110-
"""
111-
Creates a copy of this instance and updates the copy with values
112-
from the source struct, only if the source field's value is not
113-
its default value. The original instance (self) is not modified.
114-
115-
Args:
116-
source: The msgspec.Struct instance of the same type to get values from.
117-
118-
Returns:
119-
A new instance of the struct with updated values.
120-
121-
Raises:
122-
TypeError: If source is not of the same type as self.
123-
"""
124-
if type(self) is not type(source):
125-
raise TypeError(f"Source must be an instance of {type(self).__name__}, not {type(source).__name__}")
126-
127-
updated_instance = copy.copy(self)
128-
129-
# Get default values if they exist
130-
defaults = getattr(source, '__struct_defaults__', {})
131-
132-
for field in source.__struct_fields__:
133-
source_value = getattr(source, field)
134-
has_explicit_default = field in defaults
135-
is_default_value = False
136-
137-
if has_explicit_default:
138-
is_default_value = (source_value == defaults[field])
139-
else:
140-
is_default_value = (source_value is None)
141-
142-
if not is_default_value:
143-
setattr(updated_instance, field, source_value)
144-
145-
return updated_instance
14611

14712
class Config(BaseConfig):
14813
pipeline: PipelineConfig = msgspec.field(default_factory=PipelineConfig)

src/flowerpower/cfg/_base.py

Lines changed: 0 additions & 36 deletions
This file was deleted.

src/flowerpower/cfg/base.py

Lines changed: 137 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,137 @@
1+
import copy
2+
from typing import Any, Self
3+
4+
import msgspec
5+
from fsspec import AbstractFileSystem, filesystem
6+
7+
8+
class BaseConfig(msgspec.Struct, kw_only=True):
9+
def to_dict(self) -> dict[str, Any]:
10+
return msgspec.to_builtins(self)
11+
12+
def to_yaml(self, path: str, fs: AbstractFileSystem | None = None) -> None:
13+
"""
14+
Converts the instance to a YAML file.
15+
16+
Args:
17+
path: The path to the YAML file.
18+
fs: An optional filesystem instance to use for file operations.
19+
20+
Raises:
21+
NotImplementedError: If the filesystem does not support writing files.
22+
"""
23+
if fs is None:
24+
fs = filesystem("file")
25+
try:
26+
with fs.open(path, "wb") as f:
27+
f.write(msgspec.yaml.encode(self, order="deterministic"))
28+
# yaml.dump(self.to_dict(), f, default_flow_style=False)
29+
except NotImplementedError:
30+
raise NotImplementedError("The filesystem does not support writing files.")
31+
32+
@classmethod
33+
def from_dict(cls, data: dict[str, Any]) -> "BaseConfig":
34+
"""
35+
Converts a dictionary to an instance of the class.
36+
Args:
37+
data: The dictionary to convert.
38+
39+
Returns:
40+
An instance of the class with the values from the dictionary.
41+
"""
42+
return msgspec.convert(data, cls)
43+
44+
@classmethod
45+
def from_yaml(cls, path: str, fs: AbstractFileSystem | None = None) -> "BaseConfig":
46+
"""
47+
Loads a YAML file and converts it to an instance of the class.
48+
49+
Args:
50+
path: The path to the YAML file.
51+
fs: An optional filesystem instance to use for file operations.
52+
53+
Returns:
54+
An instance of the class with the values from the YAML file.
55+
56+
"""
57+
if fs is None:
58+
fs = filesystem("file")
59+
with fs.open(path) as f:
60+
# data = yaml.full_load(f)
61+
# return cls.from_dict(data)
62+
return msgspec.yaml.decode(f.read(), type=cls, strict=False)
63+
64+
def update(self, d: dict[str, Any]) -> None:
65+
for k, v in d.items():
66+
if hasattr(self, k):
67+
current_value = getattr(self, k)
68+
if isinstance(current_value, dict) and isinstance(v, dict):
69+
current_value.update(v)
70+
else:
71+
setattr(self, k, v)
72+
else:
73+
setattr(self, k, v)
74+
75+
def merge_dict(self, d: dict[str, Any]) -> Self:
76+
"""
77+
Creates a copy of this instance and updates the copy with values
78+
from the provided dictionary, only if the dictionary field's value is not
79+
its default value. The original instance (self) is not modified.
80+
81+
Args:
82+
d: The dictionary to get values from.
83+
84+
Returns:
85+
A new instance of the struct with updated values.
86+
"""
87+
self_copy = copy.copy(self)
88+
for k, v in d.items():
89+
if hasattr(self_copy, k):
90+
current_value = getattr(self_copy, k)
91+
if isinstance(current_value, dict) and isinstance(v, dict):
92+
current_value.update(v)
93+
else:
94+
setattr(self_copy, k, v)
95+
else:
96+
setattr(self_copy, k, v)
97+
return self_copy
98+
99+
def merge(self, source: Self) -> Self:
100+
"""
101+
Creates a copy of this instance and updates the copy with values
102+
from the source struct, only if the source field's value is not
103+
its default value. The original instance (self) is not modified.
104+
105+
Args:
106+
source: The msgspec.Struct instance of the same type to get values from.
107+
108+
Returns:
109+
A new instance of the struct with updated values.
110+
111+
Raises:
112+
TypeError: If source is not of the same type as self.
113+
"""
114+
if type(self) is not type(source):
115+
raise TypeError(
116+
f"Source must be an instance of {type(self).__name__}, not {type(source).__name__}"
117+
)
118+
119+
updated_instance = copy.copy(self)
120+
121+
# Get default values if they exist
122+
defaults = getattr(source, "__struct_defaults__", {})
123+
124+
for field in source.__struct_fields__:
125+
source_value = getattr(source, field)
126+
has_explicit_default = field in defaults
127+
is_default_value = False
128+
129+
if has_explicit_default:
130+
is_default_value = source_value == defaults[field]
131+
else:
132+
is_default_value = source_value is None
133+
134+
if not is_default_value:
135+
setattr(updated_instance, field, source_value)
136+
137+
return updated_instance

0 commit comments

Comments
 (0)