Skip to content

Commit 92edd3b

Browse files
committed
fix(executor): treat "local" as synchronous and add debug logs
- Normalize executor type "local" -> "synchronous" so local configs use the synchronous executor. - Add debug logging for executor selection (synchronous/local, threadpool, processpool, ray, dask). - Minor formatting and whitespace cleanup across pipeline and executor modules. - chore(pyproject): bump version to 0.32.3
1 parent 28df992 commit 92edd3b

File tree

4 files changed

+49
-36
lines changed

4 files changed

+49
-36
lines changed

pyproject.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ description = "A simple workflow framework for building and managing data proces
44
authors = [{ name = "Volker L.", email = "[email protected]" }]
55
readme = "README.md"
66
requires-python = ">= 3.11"
7-
version = "0.32.2"
7+
version = "0.32.3"
88
keywords = ["hamilton", "workflow", "pipeline", "scheduler", "dask", "ray"]
99

1010
dependencies = [

src/flowerpower/pipeline/executor.py

Lines changed: 25 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -13,22 +13,22 @@
1313

1414
class PipelineExecutor:
1515
"""Handles pipeline execution with comprehensive parameter handling.
16-
16+
1717
This class is responsible for:
1818
- Executing pipelines with various configurations
1919
- Merging runtime parameters with pipeline defaults
2020
- Setting up execution environment (logging, etc.)
2121
- Delegating to Pipeline objects for actual execution
2222
"""
23-
23+
2424
def __init__(
2525
self,
2626
config_manager: "PipelineConfigManager",
2727
registry: "PipelineRegistry",
28-
project_context: Optional[Any] = None
28+
project_context: Optional[Any] = None,
2929
):
3030
"""Initialize the pipeline executor.
31-
31+
3232
Args:
3333
config_manager: Configuration manager for accessing pipeline configs
3434
registry: Pipeline registry for accessing pipeline objects
@@ -37,89 +37,83 @@ def __init__(
3737
self._config_manager = config_manager
3838
self._registry = registry
3939
self._project_context = project_context
40-
40+
4141
def run(
42-
self,
43-
name: str,
44-
run_config: Optional[RunConfig] = None,
45-
**kwargs
42+
self, name: str, run_config: Optional[RunConfig] = None, **kwargs
4643
) -> dict[str, Any]:
4744
"""Execute a pipeline synchronously and return its results.
48-
45+
4946
This is the main method for running pipelines directly. It handles configuration
5047
loading, adapter setup, and execution via Pipeline objects.
51-
48+
5249
Args:
5350
name: Name of the pipeline to run. Must be a valid identifier.
5451
run_config: Run configuration object containing all execution parameters.
5552
If None, the default configuration from the pipeline will be used.
5653
**kwargs: Additional parameters to override the run_config.
57-
54+
5855
Returns:
5956
dict[str, Any]: Results of pipeline execution
60-
57+
6158
Raises:
6259
ValueError: If pipeline configuration cannot be loaded
6360
Exception: If pipeline execution fails
6461
"""
6562
# Load pipeline configuration
6663
pipeline_config = self._config_manager.load_pipeline_config(name=name)
67-
64+
6865
# Initialize run_config with pipeline defaults if not provided
6966
run_config = run_config or pipeline_config.run
70-
67+
7168
# Merge kwargs into run_config
7269
if kwargs:
7370
run_config = merge_run_config_with_kwargs(run_config, kwargs)
74-
71+
7572
# Set up logging for this specific run if log_level is provided
7673
if run_config.log_level is not None:
7774
setup_logging(level=run_config.log_level)
78-
75+
7976
# Get the pipeline object from registry
8077
pipeline = self._registry.get_pipeline(
8178
name=name,
8279
project_context=self._project_context,
8380
)
84-
81+
8582
# Execute the pipeline
8683
return pipeline.run(run_config=run_config)
87-
84+
8885
async def run_async(
89-
self,
90-
name: str,
91-
run_config: Optional[RunConfig] = None,
92-
**kwargs
86+
self, name: str, run_config: Optional[RunConfig] = None, **kwargs
9387
) -> dict[str, Any]:
9488
"""Execute a pipeline asynchronously and return its results.
95-
89+
9690
Args:
9791
name: Name of the pipeline to run
9892
run_config: Run configuration object
9993
**kwargs: Additional parameters to override the run_config
100-
94+
10195
Returns:
10296
dict[str, Any]: Results of pipeline execution
10397
"""
10498
# Load pipeline configuration
10599
pipeline_config = self._config_manager.load_pipeline_config(name=name)
106-
100+
107101
# Initialize run_config with pipeline defaults if not provided
108102
run_config = run_config or pipeline_config.run
109-
103+
110104
# Merge kwargs into run_config
111105
if kwargs:
112106
run_config = merge_run_config_with_kwargs(run_config, kwargs)
113-
107+
114108
# Set up logging for this specific run if log_level is provided
115109
if run_config.log_level is not None:
116110
setup_logging(level=run_config.log_level)
117-
111+
118112
# Get the pipeline object from registry
119113
pipeline = self._registry.get_pipeline(
120114
name=name,
121115
project_context=self._project_context,
122116
)
123-
117+
124118
# Execute the pipeline asynchronously
125-
return await pipeline.run_async(run_config=run_config)
119+
return await pipeline.run_async(run_config=run_config)

src/flowerpower/pipeline/pipeline.py

Lines changed: 16 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -125,7 +125,9 @@ def run(self, run_config: RunConfig | None = None, **kwargs) -> dict[str, Any]:
125125
retry_cfg.max_retries,
126126
retry_cfg.retry_delay,
127127
retry_cfg.jitter_factor,
128-
tuple(retry_cfg.retry_exceptions) if isinstance(retry_cfg.retry_exceptions, (list, tuple)) else retry_cfg.retry_exceptions,
128+
tuple(retry_cfg.retry_exceptions)
129+
if isinstance(retry_cfg.retry_exceptions, (list, tuple))
130+
else retry_cfg.retry_exceptions,
129131
)
130132
max_retries = retry_config["max_retries"]
131133
retry_delay = retry_config["retry_delay"]
@@ -151,9 +153,19 @@ def _setup_retry_config(
151153
) -> dict:
152154
"""Set up retry configuration with defaults and validation."""
153155
cfg = self.config.run.retry
154-
max_retries = max_retries if max_retries is not None else (cfg.max_retries if cfg else 0)
155-
retry_delay = retry_delay if retry_delay is not None else (cfg.retry_delay if cfg else 1.0)
156-
jitter_factor = jitter_factor if jitter_factor is not None else (cfg.jitter_factor if cfg else 0.1)
156+
max_retries = (
157+
max_retries if max_retries is not None else (cfg.max_retries if cfg else 0)
158+
)
159+
retry_delay = (
160+
retry_delay
161+
if retry_delay is not None
162+
else (cfg.retry_delay if cfg else 1.0)
163+
)
164+
jitter_factor = (
165+
jitter_factor
166+
if jitter_factor is not None
167+
else (cfg.jitter_factor if cfg else 0.1)
168+
)
157169

158170
# Convert string exceptions to actual exception classes
159171
if retry_exceptions and isinstance(retry_exceptions, (list, tuple)):

src/flowerpower/utils/executor.py

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -73,16 +73,23 @@ def _normalize_config(
7373
def _create_executor_by_type(self, executor_cfg: Any) -> Any:
7474
"""Create executor based on type."""
7575
executor_type = executor_cfg.type or "synchronous"
76+
if executor_type == "local":
77+
executor_type = "synchronous"
7678

7779
if executor_type in ("synchronous", None):
80+
logger.debug("Using synchronous/local executor.")
7881
return self._create_synchronous_executor()
7982
elif executor_type == "threadpool":
83+
logger.debug("Using thread pool executor.")
8084
return self._create_threadpool_executor(executor_cfg)
8185
elif executor_type == "processpool":
86+
logger.debug("Using process pool executor.")
8287
return self._create_processpool_executor(executor_cfg)
8388
elif executor_type == "ray":
89+
logger.debug("Using Ray executor.")
8490
return self._create_ray_executor(executor_cfg)
8591
elif executor_type == "dask":
92+
logger.debug("Using Dask executor.")
8693
return self._create_dask_executor(executor_cfg)
8794
else:
8895
logger.warning(

0 commit comments

Comments
 (0)