Skip to content

Commit 1d7128c

Browse files
committed
feat(pipeline,executor,examples): hello_world_parallel hinzufügen und Ausführungs-/Executor-Logik verbessern
- Füge Beispiel-Pipeline hello_world_parallel (YAML + Python) hinzu - Verbessere Pipeline-Ausführung: - setup_logging früher initialisieren - aktiviere dynamic execution und konfiguriere lokalen/remote Executor korrekt - robustere Behandlung von Retry-Exceptions und Fehler-Logging - Passe ExecutorFactory an: - nutze hamilton.execution.executors MultiThreading/MultiProcessing Executor-API - vereinfache Konfigurationsnormalisierung und Rückgabe - Bump Versionen in pyproject.toml und uv.lock
1 parent 51f0c9f commit 1d7128c

File tree

6 files changed

+145
-55
lines changed

6 files changed

+145
-55
lines changed
Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,45 @@
1+
adapter:
2+
hamilton_tracker:
3+
capture_data_statistics: true
4+
dag_name: null
5+
max_dict_length_capture: 10
6+
max_list_length_capture: 50
7+
project_id: null
8+
tags: !munch.Munch {}
9+
mlflow:
10+
experiment_description: null
11+
experiment_name: null
12+
experiment_tags: !munch.Munch {}
13+
run_description: null
14+
run_id: null
15+
run_name: null
16+
run_tags: !munch.Munch {}
17+
params: !munch.Munch {}
18+
run:
19+
adapter: null
20+
cache: false
21+
config: !munch.Munch {}
22+
executor:
23+
max_workers: 60
24+
num_cpus: 12
25+
type: threadpool
26+
final_vars: []
27+
inputs: {}
28+
jitter_factor: 0.1
29+
log_level: INFO
30+
max_retries: 3
31+
on_failure: null
32+
on_success: null
33+
pipeline_adapter_cfg: null
34+
project_adapter_cfg: null
35+
reload: false
36+
retry_delay: 1
37+
retry_exceptions:
38+
- <class 'Exception'>
39+
with_adapter:
40+
future: false
41+
hamilton_tracker: false
42+
mlflow: false
43+
opentelemetry: false
44+
progressbar: false
45+
ray: false
Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,30 @@
1+
# FlowerPower pipeline hello_world_parallel.py
2+
# Created on 2025-10-14 02:39:22
3+
4+
####################################################################################################
5+
# Import necessary libraries
6+
# NOTE: Remove or comment out imports that are not used in the pipeline
7+
8+
from hamilton.function_modifiers import parameterize, dataloader, datasaver
9+
from hamilton.htypes import Parallelizable, Collect
10+
11+
from pathlib import Path
12+
13+
from flowerpower.cfg import Config
14+
15+
####################################################################################################
16+
# Load pipeline parameters. Do not modify this section.
17+
18+
PARAMS = Config.load(
19+
Path(__file__).parents[1], pipeline_name="hello_world_parallel"
20+
).pipeline.h_params
21+
22+
23+
####################################################################################################
24+
# Helper functions.
25+
# This functions have to start with an underscore (_).
26+
27+
28+
####################################################################################################
29+
# Pipeline functions
30+

pyproject.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ description = "A simple workflow framework for building and managing data proces
44
authors = [{ name = "Volker L.", email = "[email protected]" }]
55
readme = "README.md"
66
requires-python = ">= 3.11"
7-
version = "0.31.3"
7+
version = "0.31.4"
88
keywords = ["hamilton", "workflow", "pipeline", "scheduler", "dask", "ray"]
99

1010
dependencies = [

src/flowerpower/pipeline/pipeline.py

Lines changed: 56 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -9,10 +9,11 @@
99
import random
1010
import time
1111
from typing import TYPE_CHECKING, Any, Callable
12-
from requests.exceptions import HTTPError, ConnectionError, Timeout # Example exception
12+
from requests.exceptions import HTTPError, ConnectionError, Timeout # Example exception
1313

1414
import humanize
1515
import msgspec
16+
from loguru import logger
1617
from hamilton import driver
1718
from hamilton.execution import executors
1819
from hamilton.registry import disable_autoload
@@ -23,6 +24,8 @@
2324
from .. import settings
2425
from ..utils.adapter import create_adapter_manager
2526
from ..utils.executor import create_executor_factory
27+
from ..utils.logging import setup_logging
28+
2629

2730
if importlib.util.find_spec("opentelemetry"):
2831
from hamilton.plugins import h_opentelemetry
@@ -37,11 +40,6 @@
3740
else:
3841
h_mlflow = None
3942

40-
from hamilton.plugins import h_rich
41-
from hamilton.plugins.h_threadpool import FutureAdapter
42-
from hamilton_sdk.adapters import HamiltonTracker
43-
from hamilton_sdk.tracking import constants
44-
from loguru import logger
4543

4644
if importlib.util.find_spec("distributed"):
4745
from dask import distributed
@@ -58,7 +56,7 @@
5856
ray = None
5957
h_ray = None
6058

61-
from ..cfg import PipelineConfig, ProjectConfig
59+
from ..cfg import PipelineConfig
6260
from ..cfg.pipeline.adapter import AdapterConfig as PipelineAdapterConfig
6361
from ..cfg.pipeline.run import ExecutorConfig, RunConfig
6462
from ..cfg.project.adapter import AdapterConfig as ProjectAdapterConfig
@@ -67,6 +65,8 @@
6765
if TYPE_CHECKING:
6866
from ..flowerpower import FlowerPowerProject
6967

68+
setup_logging(level=settings.LOG_LEVEL)
69+
7070

7171
class Pipeline(msgspec.Struct):
7272
"""Active pipeline object that encapsulates its own execution logic.
@@ -100,12 +100,7 @@ def __post_init__(self):
100100
self._adapter_manager = create_adapter_manager()
101101
self._executor_factory = create_executor_factory()
102102

103-
104-
def run(
105-
self,
106-
run_config: RunConfig | None = None,
107-
**kwargs
108-
) -> dict[str, Any]:
103+
def run(self, run_config: RunConfig | None = None, **kwargs) -> dict[str, Any]:
109104
"""Execute the pipeline with the given parameters.
110105
111106
Args:
@@ -120,7 +115,7 @@ def run(
120115

121116
# Initialize run_config with pipeline defaults if not provided
122117
run_config = run_config or self.config.run
123-
118+
124119
# Merge kwargs into the run_config
125120
if kwargs:
126121
run_config = merge_run_config_with_kwargs(run_config, kwargs)
@@ -131,7 +126,10 @@ def run(
131126

132127
# Set up retry configuration
133128
retry_config = self._setup_retry_config(
134-
run_config.max_retries, run_config.retry_delay, run_config.jitter_factor, run_config.retry_exceptions
129+
run_config.max_retries,
130+
run_config.retry_delay,
131+
run_config.jitter_factor,
132+
run_config.retry_exceptions,
135133
)
136134
max_retries = retry_config["max_retries"]
137135
retry_delay = retry_config["retry_delay"]
@@ -165,22 +163,22 @@ def _setup_retry_config(
165163
converted_exceptions = []
166164
# Safe mapping of exception names to classes
167165
exception_mapping = {
168-
'Exception': Exception,
169-
'ValueError': ValueError,
170-
'TypeError': TypeError,
171-
'RuntimeError': RuntimeError,
172-
'FileNotFoundError': FileNotFoundError,
173-
'PermissionError': PermissionError,
174-
'ConnectionError': ConnectionError,
175-
'TimeoutError': TimeoutError,
176-
'KeyError': KeyError,
177-
'AttributeError': AttributeError,
178-
'ImportError': ImportError,
179-
'OSError': OSError,
180-
'IOError': IOError,
181-
'HTTPError': HTTPError,
182-
'ConnectionError': ConnectionError,
183-
'Timeout': Timeout,
166+
"Exception": Exception,
167+
"ValueError": ValueError,
168+
"TypeError": TypeError,
169+
"RuntimeError": RuntimeError,
170+
"FileNotFoundError": FileNotFoundError,
171+
"PermissionError": PermissionError,
172+
"ConnectionError": ConnectionError,
173+
"TimeoutError": TimeoutError,
174+
"KeyError": KeyError,
175+
"AttributeError": AttributeError,
176+
"ImportError": ImportError,
177+
"OSError": OSError,
178+
"IOError": IOError,
179+
"HTTPError": HTTPError,
180+
"ConnectionError": ConnectionError,
181+
"Timeout": Timeout,
184182
}
185183
for exc in retry_exceptions:
186184
if isinstance(exc, str):
@@ -296,17 +294,33 @@ def _execute_pipeline(
296294
) -> dict[str, Any]:
297295
"""Execute the pipeline with Hamilton."""
298296
# Set up execution context
299-
executor, shutdown_func, adapters = self._setup_execution_context(run_config=run_config)
300-
297+
executor, shutdown_func, adapters = self._setup_execution_context(
298+
run_config=run_config
299+
)
300+
if (
301+
run_config.executor.type != "synchronous"
302+
or run_config.executor.type == "local"
303+
):
304+
allow_experimental_mode = True
305+
synchronous_executor = False
306+
else:
307+
allow_experimental_mode = False
301308
try:
302309
# Create Hamilton driver
303310
dr = (
304311
driver.Builder()
305-
.with_config(run_config.config)
306312
.with_modules(self.module)
313+
.with_config(run_config.config)
307314
.with_adapters(*adapters)
308-
.build()
315+
.enable_dynamic_execution(
316+
allow_experimental_mode=allow_experimental_mode
317+
)
318+
.with_local_executor(executors.SynchronousLocalTaskExecutor())
309319
)
320+
if not synchronous_executor:
321+
dr = dr.with_remote_executor(executor)
322+
323+
dr = dr.build()
310324

311325
# Execute the pipeline
312326
result = dr.execute(
@@ -352,11 +366,11 @@ def _get_executor(
352366
cleanup_fn = None
353367
if executor_cfg.type == "ray" and h_ray:
354368
# Handle temporary case where project_context is PipelineManager
355-
project_cfg = getattr(
356-
self.project_context, "project_cfg", None
357-
) or getattr(self.project_context, "_project_cfg", None)
369+
project_cfg = getattr(self.project_context, "project_cfg", None) or getattr(
370+
self.project_context, "_project_cfg", None
371+
)
358372

359-
if project_cfg and hasattr(project_cfg.adapter, 'ray'):
373+
if project_cfg and hasattr(project_cfg.adapter, "ray"):
360374
cleanup_fn = (
361375
ray.shutdown
362376
if project_cfg.adapter.ray.shutdown_ray_on_completion
@@ -427,5 +441,7 @@ def _reload_module(self):
427441
logger.error(f"Failed to reload module for pipeline '{self.name}': {e}")
428442
raise
429443
except Exception as e:
430-
logger.error(f"Unexpected error reloading module for pipeline '{self.name}': {e}")
444+
logger.error(
445+
f"Unexpected error reloading module for pipeline '{self.name}': {e}"
446+
)
431447
raise

src/flowerpower/utils/executor.py

Lines changed: 12 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -25,8 +25,7 @@ def __init__(self):
2525
self._executor_cache: Dict[str, Any] = {}
2626

2727
def create_executor(
28-
self,
29-
executor_cfg: Union[str, Dict[str, Any], Any, None]
28+
self, executor_cfg: Union[str, Dict[str, Any], Any, None]
3029
) -> Any:
3130
"""
3231
Create an executor instance based on configuration.
@@ -52,8 +51,7 @@ def create_executor(
5251
return executor
5352

5453
def _normalize_config(
55-
self,
56-
executor_cfg: Union[str, Dict[str, Any], Any, None]
54+
self, executor_cfg: Union[str, Dict[str, Any], Any, None]
5755
) -> Any:
5856
"""Normalize executor configuration to ExecutorConfig instance."""
5957
from ..cfg.pipeline.run import ExecutorConfig
@@ -95,17 +93,18 @@ def _create_executor_by_type(self, executor_cfg: Any) -> Any:
9593
def _create_synchronous_executor(self) -> Any:
9694
"""Create synchronous/local executor."""
9795
from hamilton.execution.executors import SynchronousLocalTaskExecutor
96+
9897
return SynchronousLocalTaskExecutor()
9998

10099
def _create_threadpool_executor(self, executor_cfg: Any) -> Any:
101100
"""Create thread pool executor."""
102101
try:
103-
from hamilton.plugins.h_threadpool import ThreadPoolExecutor
102+
from hamilton.execution.executors import MultiThreadingExecutor
104103

105104
# Extract max workers from config
106105
if executor_cfg.max_workers is not None:
107-
return ThreadPoolExecutor(max_workers=executor_cfg.max_workers)
108-
return ThreadPoolExecutor()
106+
return MultiThreadingExecutor(max_tasks=executor_cfg.max_workers)
107+
return MultiThreadingExecutor()
109108
except ImportError:
110109
logger.warning(
111110
"ThreadPool executor dependencies not installed. Using local executor."
@@ -115,12 +114,12 @@ def _create_threadpool_executor(self, executor_cfg: Any) -> Any:
115114
def _create_processpool_executor(self, executor_cfg: Any) -> Any:
116115
"""Create process pool executor."""
117116
try:
118-
from hamilton.execution.executors import ProcessPoolExecutor
117+
from hamilton.execution.executors import MultiProcessingExecutor
119118

120119
# Extract max workers from config
121120
if executor_cfg.max_workers is not None:
122-
return ProcessPoolExecutor(max_workers=executor_cfg.max_workers)
123-
return ProcessPoolExecutor()
121+
return MultiProcessingExecutor(max_tasks=executor_cfg.max_workers)
122+
return MultiProcessingExecutor()
124123
except ImportError:
125124
logger.warning(
126125
"ProcessPool executor dependencies not installed. Using local executor."
@@ -135,7 +134,7 @@ def _create_ray_executor(self, executor_cfg: Any) -> Any:
135134
# Extract configuration
136135
config = {}
137136
if executor_cfg.num_cpus is not None:
138-
config['num_cpus'] = executor_cfg.num_cpus
137+
config["num_cpus"] = executor_cfg.num_cpus
139138
if config:
140139
return RayTaskExecutor(**config)
141140
return RayTaskExecutor()
@@ -153,7 +152,7 @@ def _create_dask_executor(self, executor_cfg: Any) -> Any:
153152
# Extract configuration
154153
config = {}
155154
if executor_cfg.num_cpus is not None:
156-
config['num_cpus'] = executor_cfg.num_cpus
155+
config["num_cpus"] = executor_cfg.num_cpus
157156
if config:
158157
return DaskExecutor(**config)
159158
return DaskExecutor()
@@ -175,4 +174,4 @@ def create_executor_factory() -> ExecutorFactory:
175174
Returns:
176175
ExecutorFactory: Configured factory instance
177176
"""
178-
return ExecutorFactory()
177+
return ExecutorFactory()

uv.lock

Lines changed: 1 addition & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

0 commit comments

Comments
 (0)