Skip to content

Commit a336a16

Browse files
committed
feat(cfg,pipeline,cli,utils): introduce nested RetryConfig and migrate retry fields
- add RetryConfig and embed it as RunConfig.retry while keeping top-level retry_* fields for backward compatibility - normalize and convert retry_exceptions (strings → exception classes) in RunConfig.__post_init__ - update builders and RunConfigBuilder to initialize/use nested RetryConfig when setting retry-related values - adapt merge_run_config_with_kwargs to map deprecated flat retry fields into the nested retry config - update Pipeline to prefer the nested RetryConfig for setup/validation and handle exceptions as list/tuple - update CLI to construct and pass a RetryConfig when invoking runs Centralizes retry logic, keeps backward compatibility, and simplifies retry handling across components.
1 parent d5911a5 commit a336a16

File tree

5 files changed

+163
-115
lines changed

5 files changed

+163
-115
lines changed

src/flowerpower/cfg/pipeline/builder.py

Lines changed: 12 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@
44

55
from fsspec_utils import AbstractFileSystem, BaseStorageOptions, filesystem
66

7-
from .run import RunConfig
7+
from .run import RunConfig, RetryConfig
88
from ..project.adapter import AdapterConfig as ProjectAdapterConfig
99
from .builder_executor import ExecutorBuilder
1010
from .builder_adapter import AdapterBuilder
@@ -194,12 +194,14 @@ def with_retries(
194194
Returns:
195195
Self for method chaining
196196
"""
197-
self._config.max_retries = max_attempts
198-
self._config.retry_delay = delay
199-
self._config.jitter_factor = jitter
200-
197+
# Ensure nested retry exists
198+
if self._config.retry is None:
199+
self._config.retry = RetryConfig()
200+
self._config.retry.max_retries = max_attempts
201+
self._config.retry.retry_delay = delay
202+
self._config.retry.jitter_factor = jitter
201203
if exceptions:
202-
self._config.retry_exceptions = exceptions
204+
self._config.retry.retry_exceptions = exceptions
203205

204206
return self
205207

@@ -358,13 +360,14 @@ def _validate_config(self, config: RunConfig):
358360
ValueError: If configuration is invalid
359361
"""
360362
# Validate retry configuration
361-
if config.max_retries < 0:
363+
retry_cfg = config.retry or RetryConfig()
364+
if retry_cfg.max_retries < 0:
362365
raise ValueError("max_retries must be non-negative")
363366

364-
if config.retry_delay < 0:
367+
if retry_cfg.retry_delay < 0:
365368
raise ValueError("retry_delay must be non-negative")
366369

367-
if config.jitter_factor is not None and config.jitter_factor < 0:
370+
if retry_cfg.jitter_factor is not None and retry_cfg.jitter_factor < 0:
368371
raise ValueError("jitter_factor must be non-negative")
369372

370373
# Validate executor configuration

src/flowerpower/cfg/pipeline/run.py

Lines changed: 89 additions & 79 deletions
Original file line numberDiff line numberDiff line change
@@ -31,64 +31,18 @@ class CallbackSpec(msgspec.Struct):
3131
kwargs: dict | None = None
3232

3333

34-
class RunConfig(BaseConfig):
35-
inputs: dict | None = msgspec.field(default_factory=dict)
36-
final_vars: list[str] | None = msgspec.field(default_factory=list)
37-
config: dict | None = msgspec.field(default_factory=dict)
38-
cache: dict | bool | None = msgspec.field(default=False)
39-
with_adapter: WithAdapterConfig = msgspec.field(default_factory=WithAdapterConfig)
40-
executor: ExecutorConfig = msgspec.field(default_factory=ExecutorConfig)
41-
log_level: str | None = msgspec.field(default="INFO")
34+
class RetryConfig(BaseConfig):
35+
"""Retry configuration for pipeline execution."""
4236
max_retries: int = msgspec.field(default=3)
43-
retry_delay: int | float = msgspec.field(default=1)
37+
retry_delay: float = msgspec.field(default=1.0)
4438
jitter_factor: float | None = msgspec.field(default=0.1)
45-
retry_exceptions: list[str] = msgspec.field(default_factory=lambda: ["Exception"])
46-
# New fields for comprehensive configuration
47-
pipeline_adapter_cfg: dict | None = msgspec.field(default=None)
48-
project_adapter_cfg: dict | None = msgspec.field(default=None)
49-
adapter: dict[str, Any] | None = msgspec.field(default=None)
50-
reload: bool = msgspec.field(default=False)
51-
on_success: CallbackSpec | None = msgspec.field(default=None)
52-
on_failure: CallbackSpec | None = msgspec.field(default=None)
39+
# Accept strings or classes; will be converted to exception classes in __post_init__
40+
retry_exceptions: list[Any] = msgspec.field(default_factory=lambda: ["Exception"]) # type: ignore[assignment]
5341

5442
def __post_init__(self):
55-
# if isinstance(self.inputs, dict):
56-
# self.inputs = munchify(self.inputs)
57-
if isinstance(self.config, dict):
58-
self.config = munchify(self.config)
59-
if isinstance(self.cache, (dict)):
60-
self.cache = munchify(self.cache)
61-
if isinstance(self.with_adapter, dict):
62-
self.with_adapter = WithAdapterConfig.from_dict(self.with_adapter)
63-
if isinstance(self.executor, dict):
64-
self.executor = ExecutorConfig.from_dict(self.executor)
65-
if isinstance(self.pipeline_adapter_cfg, dict):
66-
from ..pipeline.adapter import AdapterConfig as PipelineAdapterConfig
67-
self.pipeline_adapter_cfg = PipelineAdapterConfig.from_dict(self.pipeline_adapter_cfg)
68-
if isinstance(self.project_adapter_cfg, dict):
69-
from ..project.adapter import AdapterConfig as ProjectAdapterConfig
70-
self.project_adapter_cfg = ProjectAdapterConfig.from_dict(self.project_adapter_cfg)
71-
if isinstance(self.adapter, dict):
72-
# Convert adapter instances if needed
73-
pass
7443
if isinstance(self.retry_exceptions, list):
75-
# Convert string exceptions to actual exception classes using dynamic import
7644
self.retry_exceptions = self._convert_exception_strings(self.retry_exceptions)
7745

78-
# Handle callback conversions
79-
if self.on_success is not None and not isinstance(self.on_success, CallbackSpec):
80-
if callable(self.on_success):
81-
self.on_success = CallbackSpec(func=self.on_success)
82-
elif isinstance(self.on_success, tuple) and len(self.on_success) == 3:
83-
func, args, kwargs = self.on_success
84-
self.on_success = CallbackSpec(func=func, args=args, kwargs=kwargs)
85-
else:
86-
self.on_success = None
87-
warnings.warn(
88-
"Invalid on_success format, must be Callable or (Callable, args, kwargs)",
89-
RuntimeWarning
90-
)
91-
9246
def _convert_exception_strings(self, exception_list: list) -> list:
9347
"""Convert exception strings to actual exception classes using dynamic import.
9448
@@ -99,44 +53,29 @@ def _convert_exception_strings(self, exception_list: list) -> list:
9953
List of exception classes.
10054
"""
10155
converted_exceptions = []
102-
10356
for exc in exception_list:
10457
if isinstance(exc, str):
10558
try:
106-
# Try to dynamically import the exception class
10759
exc_class = self._import_exception_class(exc)
10860
converted_exceptions.append(exc_class)
10961
except (ImportError, AttributeError) as e:
11062
warnings.warn(
11163
f"Could not import exception class '{exc}': {e}. Using Exception instead.",
112-
RuntimeWarning
64+
RuntimeWarning,
11365
)
11466
converted_exceptions.append(Exception)
11567
elif isinstance(exc, type) and issubclass(exc, BaseException):
11668
converted_exceptions.append(exc)
11769
else:
11870
warnings.warn(
11971
f"Invalid exception type: {type(exc)}. Using Exception instead.",
120-
RuntimeWarning
72+
RuntimeWarning,
12173
)
12274
converted_exceptions.append(Exception)
123-
12475
return converted_exceptions
125-
76+
12677
def _import_exception_class(self, exception_name: str) -> type:
127-
"""Dynamically import an exception class by name.
128-
129-
Args:
130-
exception_name: Name of the exception class to import.
131-
132-
Returns:
133-
The imported exception class.
134-
135-
Raises:
136-
ImportError: If the module cannot be imported.
137-
AttributeError: If the exception class is not found in the module.
138-
"""
139-
# Handle built-in exceptions first
78+
"""Dynamically import an exception class by name."""
14079
built_in_exceptions = {
14180
'Exception': Exception,
14281
'ValueError': ValueError,
@@ -149,17 +88,14 @@ def _import_exception_class(self, exception_name: str) -> type:
14988
'ImportError': ImportError,
15089
'TimeoutError': TimeoutError,
15190
}
152-
15391
if exception_name in built_in_exceptions:
15492
return built_in_exceptions[exception_name]
155-
156-
# Handle module-qualified exceptions (e.g., 'requests.exceptions.HTTPError')
93+
15794
if '.' in exception_name:
15895
module_name, class_name = exception_name.rsplit('.', 1)
15996
module = importlib.import_module(module_name)
16097
return getattr(module, class_name)
161-
162-
# Try to import from common modules
98+
16399
common_modules = [
164100
'requests.exceptions',
165101
'urllib.error',
@@ -169,18 +105,92 @@ def _import_exception_class(self, exception_name: str) -> type:
169105
'os',
170106
'io',
171107
]
172-
173108
for module_name in common_modules:
174109
try:
175110
module = importlib.import_module(module_name)
176111
if hasattr(module, exception_name):
177112
return getattr(module, exception_name)
178113
except ImportError:
179114
continue
180-
181-
# If not found in common modules, raise an error
182115
raise ImportError(f"Could not find exception class: {exception_name}")
183116

117+
118+
class RunConfig(BaseConfig):
119+
inputs: dict | None = msgspec.field(default_factory=dict)
120+
final_vars: list[str] | None = msgspec.field(default_factory=list)
121+
config: dict | None = msgspec.field(default_factory=dict)
122+
cache: dict | bool | None = msgspec.field(default=False)
123+
with_adapter: WithAdapterConfig = msgspec.field(default_factory=WithAdapterConfig)
124+
executor: ExecutorConfig = msgspec.field(default_factory=ExecutorConfig)
125+
log_level: str | None = msgspec.field(default="INFO")
126+
# New nested retry configuration
127+
retry: RetryConfig | None = msgspec.field(default=None)
128+
# Deprecated top-level retry fields (kept for backward compatibility)
129+
max_retries: int = msgspec.field(default=3)
130+
retry_delay: int | float = msgspec.field(default=1)
131+
jitter_factor: float | None = msgspec.field(default=0.1)
132+
retry_exceptions: list[str] = msgspec.field(default_factory=lambda: ["Exception"]) # type: ignore[assignment]
133+
# New fields for comprehensive configuration
134+
pipeline_adapter_cfg: dict | None = msgspec.field(default=None)
135+
project_adapter_cfg: dict | None = msgspec.field(default=None)
136+
adapter: dict[str, Any] | None = msgspec.field(default=None)
137+
reload: bool = msgspec.field(default=False)
138+
on_success: CallbackSpec | None = msgspec.field(default=None)
139+
on_failure: CallbackSpec | None = msgspec.field(default=None)
140+
141+
def __post_init__(self):
142+
# if isinstance(self.inputs, dict):
143+
# self.inputs = munchify(self.inputs)
144+
if isinstance(self.config, dict):
145+
self.config = munchify(self.config)
146+
if isinstance(self.cache, (dict)):
147+
self.cache = munchify(self.cache)
148+
if isinstance(self.with_adapter, dict):
149+
self.with_adapter = WithAdapterConfig.from_dict(self.with_adapter)
150+
if isinstance(self.executor, dict):
151+
self.executor = ExecutorConfig.from_dict(self.executor)
152+
if isinstance(self.pipeline_adapter_cfg, dict):
153+
from ..pipeline.adapter import AdapterConfig as PipelineAdapterConfig
154+
self.pipeline_adapter_cfg = PipelineAdapterConfig.from_dict(self.pipeline_adapter_cfg)
155+
if isinstance(self.project_adapter_cfg, dict):
156+
from ..project.adapter import AdapterConfig as ProjectAdapterConfig
157+
self.project_adapter_cfg = ProjectAdapterConfig.from_dict(self.project_adapter_cfg)
158+
if isinstance(self.adapter, dict):
159+
# Convert adapter instances if needed
160+
pass
161+
162+
# Normalize retry configuration (prefer nested RetryConfig)
163+
if isinstance(self.retry, dict):
164+
self.retry = RetryConfig.from_dict(self.retry)
165+
if self.retry is None:
166+
# Build nested retry from (deprecated) top-level fields
167+
self.retry = RetryConfig(
168+
max_retries=self.max_retries,
169+
retry_delay=float(self.retry_delay),
170+
jitter_factor=self.jitter_factor,
171+
retry_exceptions=self.retry_exceptions,
172+
)
173+
# Keep top-level fields in sync for backward compatibility
174+
self.max_retries = self.retry.max_retries
175+
self.retry_delay = self.retry.retry_delay
176+
self.jitter_factor = self.retry.jitter_factor
177+
# Ensure top-level exceptions reflect converted classes
178+
self.retry_exceptions = list(self.retry.retry_exceptions)
179+
180+
# Handle callback conversions
181+
if self.on_success is not None and not isinstance(self.on_success, CallbackSpec):
182+
if callable(self.on_success):
183+
self.on_success = CallbackSpec(func=self.on_success)
184+
elif isinstance(self.on_success, tuple) and len(self.on_success) == 3:
185+
func, args, kwargs = self.on_success
186+
self.on_success = CallbackSpec(func=func, args=args, kwargs=kwargs)
187+
else:
188+
self.on_success = None
189+
warnings.warn(
190+
"Invalid on_success format, must be Callable or (Callable, args, kwargs)",
191+
RuntimeWarning
192+
)
193+
# Handle on_failure callback conversions (mirror on_success behavior)
184194
if self.on_failure is not None and not isinstance(self.on_failure, CallbackSpec):
185195
if callable(self.on_failure):
186196
self.on_failure = CallbackSpec(func=self.on_failure)
@@ -191,5 +201,5 @@ def _import_exception_class(self, exception_name: str) -> type:
191201
self.on_failure = None
192202
warnings.warn(
193203
"Invalid on_failure format, must be Callable or (Callable, args, kwargs)",
194-
RuntimeWarning
204+
RuntimeWarning,
195205
)

src/flowerpower/cli/pipeline.py

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@
66

77
from ..flowerpower import FlowerPowerProject
88
from ..pipeline.manager import HookType, PipelineManager
9-
from ..cfg.pipeline.run import RunConfig
9+
from ..cfg.pipeline.run import RunConfig, RetryConfig
1010
from ..utils.logging import setup_logging
1111
from .utils import parse_dict_or_list_param
1212

@@ -168,9 +168,11 @@ def run(
168168
config=parsed_config,
169169
cache=parsed_cache,
170170
with_adapter=with_adapter_config, # type: ignore
171-
max_retries=max_retries,
172-
retry_delay=retry_delay,
173-
jitter_factor=jitter_factor,
171+
retry=RetryConfig(
172+
max_retries=max_retries,
173+
retry_delay=retry_delay,
174+
jitter_factor=jitter_factor,
175+
),
174176
)
175177

176178
# Handle executor configuration

src/flowerpower/pipeline/pipeline.py

Lines changed: 11 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -124,12 +124,13 @@ def run(self, run_config: RunConfig | None = None, **kwargs) -> dict[str, Any]:
124124
if run_config.reload:
125125
self._reload_module()
126126

127-
# Set up retry configuration
127+
# Set up retry configuration (use nested RetryConfig)
128+
retry_cfg = run_config.retry or self.config.run.retry
128129
retry_config = self._setup_retry_config(
129-
run_config.max_retries,
130-
run_config.retry_delay,
131-
run_config.jitter_factor,
132-
run_config.retry_exceptions,
130+
retry_cfg.max_retries,
131+
retry_cfg.retry_delay,
132+
retry_cfg.jitter_factor,
133+
tuple(retry_cfg.retry_exceptions) if isinstance(retry_cfg.retry_exceptions, (list, tuple)) else retry_cfg.retry_exceptions,
133134
)
134135
max_retries = retry_config["max_retries"]
135136
retry_delay = retry_config["retry_delay"]
@@ -151,12 +152,13 @@ def _setup_retry_config(
151152
max_retries: int | None,
152153
retry_delay: float | None,
153154
jitter_factor: float | None,
154-
retry_exceptions: tuple | None,
155+
retry_exceptions: tuple | list | None,
155156
) -> dict:
156157
"""Set up retry configuration with defaults and validation."""
157-
max_retries = max_retries or self.config.run.max_retries or 0
158-
retry_delay = retry_delay or self.config.run.retry_delay or 1.0
159-
jitter_factor = jitter_factor or self.config.run.jitter_factor or 0.1
158+
cfg = self.config.run.retry
159+
max_retries = max_retries or (cfg.max_retries if cfg else 0) or 0
160+
retry_delay = retry_delay or (cfg.retry_delay if cfg else 1.0) or 1.0
161+
jitter_factor = jitter_factor or (cfg.jitter_factor if cfg else 0.1) or 0.1
160162

161163
# Convert string exceptions to actual exception classes
162164
if retry_exceptions and isinstance(retry_exceptions, (list, tuple)):

0 commit comments

Comments
 (0)