Skip to content

Commit a1c243c

Browse files
committed
feat(pipeline): allow composing multiple modules via RunConfig.additional_modules
- Add RunConfig.additional_modules with normalization and to_dict serialization - PipelineRunner: resolve/import module names or objects, pass modules to driver.Builder.with_modules, support reload for all modules, and handle async path - PipelineVisualizer: accept additional_modules to render composite DAGs - Utils/config: merge/dedupe additional_modules, add RunConfigBuilder.with_additional_modules, and plumb merge into merge_run_config_with_kwargs - Docs/examples/specs/tasks/openspec: document feature and add README usage & example pipeline setup - Add examples/hello-world/pipelines/setup.py and update hello_world example to reference composition - Tests: add coverage for import, async path, reload, merge/dedupe behavior - Changelog and pyproject version bump to 0.34.0; update CHANGELOG entry and uv.lock updates
1 parent f18fc40 commit a1c243c

File tree

18 files changed

+741
-193
lines changed

18 files changed

+741
-193
lines changed

CHANGELOG.md

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,10 @@
11
# Changelog
22

3+
## [0.34.0] - 2025-11-03
4+
5+
### Added
6+
- Allow `PipelineManager.run`/`RunConfig` to load `additional_modules`, enabling multi-module pipeline execution and visualization.
7+
38
## [0.31.0] - 2025-09-26
49

510
### Changes
@@ -992,4 +997,3 @@
992997

993998

994999

995-

examples/hello-world/pipelines/hello_world.py

Lines changed: 25 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -17,40 +17,38 @@
1717
).pipeline.h_params
1818

1919

20-
@config.when(range=10_000)
21-
def spend__10000() -> pd.Series:
22-
"""Returns a series of spend data."""
23-
# time.sleep(2)
24-
return pd.Series(range(10_000)) * 10
20+
# @config.when(range=10_000)
21+
# def spend__10000() -> pd.Series:
22+
# """Returns a series of spend data."""
23+
# # time.sleep(2)
24+
# return pd.Series(range(10_000)) * 10
2525

2626

27-
@config.when(range=10_000)
28-
def signups__10000() -> pd.Series:
29-
"""Returns a series of signups data."""
30-
time.sleep(1)
31-
print(10_000)
32-
return pd.Series(range(10_000))
27+
# @config.when(range=10_000)
28+
# def signups__10000() -> pd.Series:
29+
# """Returns a series of signups data."""
30+
# time.sleep(1)
31+
# print(10_000)
32+
# return pd.Series(range(10_000))
3333

3434

35-
@config.when(range=1_000)
36-
def spend__1000() -> pd.Series:
37-
"""Returns a series of spend data."""
38-
# time.sleep(2)
39-
print(1_000)
40-
return pd.Series(range(10_000)) * 10
35+
# @config.when(range=1_000)
36+
# def spend__1000() -> pd.Series:
37+
# """Returns a series of spend data."""
38+
# # time.sleep(2)
39+
# print(1_000)
40+
# return pd.Series(range(10_000)) * 10
4141

4242

43-
@config.when(range=1_000)
44-
def signups__1000() -> pd.Series:
45-
"""Returns a series of signups data."""
46-
time.sleep(1)
47-
print(1_000)
48-
return pd.Series(range(10_000))
43+
# @config.when(range=1_000)
44+
# def signups__1000() -> pd.Series:
45+
# """Returns a series of signups data."""
46+
# time.sleep(1)
47+
# print(1_000)
48+
# return pd.Series(range(10_000))
4949

5050

51-
@parameterize(
52-
**PARAMS.avg_x_wk_spend
53-
) # (avg_x_wk_spend={"rolling": value(3)}) #
51+
@parameterize(**PARAMS.avg_x_wk_spend) # (avg_x_wk_spend={"rolling": value(3)}) #
5452
def avg_x_wk_spend(spend: pd.Series, rolling: int) -> pd.Series:
5553
"""Rolling x week average spend."""
5654
# time.sleep(2)
@@ -68,9 +66,7 @@ def spend_mean(spend: pd.Series) -> float:
6866
return spend.mean()
6967

7068

71-
@parameterize(
72-
**PARAMS.spend_zero_mean
73-
) # (spend_zero_mean={"offset": value(0)}) #
69+
@parameterize(**PARAMS.spend_zero_mean) # (spend_zero_mean={"offset": value(0)}) #
7470
def spend_zero_mean(spend: pd.Series, spend_mean: float, offset: int) -> pd.Series:
7571
"""Shows function that takes a scalar. In this case to zero mean spend."""
7672
return spend - spend_mean + offset
Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,40 @@
1+
import sys
2+
import time
3+
from pathlib import Path
4+
5+
import pandas as pd
6+
from hamilton.function_modifiers import config, parameterize
7+
from loguru import logger
8+
9+
from flowerpower.cfg import Config
10+
11+
12+
@config.when(range=10_000)
13+
def spend__10000() -> pd.Series:
14+
"""Returns a series of spend data."""
15+
# time.sleep(2)
16+
return pd.Series(range(10_000)) * 10
17+
18+
19+
@config.when(range=10_000)
20+
def signups__10000() -> pd.Series:
21+
"""Returns a series of signups data."""
22+
time.sleep(1)
23+
print(10_000)
24+
return pd.Series(range(10_000))
25+
26+
27+
@config.when(range=1_000)
28+
def spend__1000() -> pd.Series:
29+
"""Returns a series of spend data."""
30+
# time.sleep(2)
31+
print(1_000)
32+
return pd.Series(range(10_000)) * 10
33+
34+
35+
@config.when(range=1_000)
36+
def signups__1000() -> pd.Series:
37+
"""Returns a series of signups data."""
38+
time.sleep(1)
39+
print(1_000)
40+
return pd.Series(range(10_000))
Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,33 @@
1+
## Why
2+
Running a single pipeline module limits composition. Many FlowerPower projects benefit from shared setup/teardown steps (e.g., DB connections, secrets bootstrapping) and auxiliary pipelines. Hamilton natively supports composing multiple modules via `driver.Builder.with_modules(...)`. FlowerPower should expose this ability so users can run `pm.run("pipeline_1", additional_modules=["setup"])` without custom wiring.
3+
4+
## What Changes
5+
- Add runtime support for specifying additional pipeline modules to load alongside the main pipeline.
6+
- Extend `RunConfig` with `additional_modules` (list of strings or modules at runtime) and plumb through to the runner.
7+
- Update runner to import and pass all modules to `driver.Builder.with_modules(*modules)`.
8+
- Ensure `reload=True` reloads all loaded modules.
9+
- Support both sync and async execution paths.
10+
11+
Non‑breaking: Defaults preserve current single‑module behavior.
12+
13+
## Impact
14+
- Affected specs: `pipeline-execution`
15+
- Affected code:
16+
- `src/flowerpower/cfg/pipeline/run.py`
17+
- `src/flowerpower/utils/config.py`
18+
- `src/flowerpower/pipeline/runner.py`
19+
- `src/flowerpower/pipeline/manager.py` (docs / passthrough)
20+
- Tests under `tests/pipeline/` and `tests/cfg/`
21+
22+
## Out of Scope
23+
- Changes to persistence, scheduling, or adapters behavior beyond module loading.
24+
- DAG conflict resolution beyond standard Hamilton rules (last definition wins).
25+
26+
## Risks / Mitigations
27+
- Import errors for missing modules → raise clear ImportError with actionable path/name tips.
28+
- Conflicting node names across modules → document precedence (order) and suggest namespacing.
29+
30+
## Rollout
31+
- Add tests for sync/async, reload, error handling.
32+
- Update README usage and CHANGELOG.
33+
Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,36 @@
1+
## ADDED Requirements
2+
3+
### Requirement: Run With Additional Modules
4+
The system SHALL support executing a pipeline while loading additional Python modules into the Hamilton driver so that DAG nodes can be composed across modules.
5+
6+
#### Scenario: Execute with string module names
7+
- WHEN a user runs `pm.run("pipeline_1", additional_modules=["setup"])`
8+
- THEN the system imports both `setup` and `pipeline_1` modules and executes the composed DAG
9+
- AND functions in all loaded modules are available for dependency resolution
10+
11+
#### Scenario: Execute with module objects
12+
- WHEN a user passes actual module objects in `additional_modules`
13+
- THEN the system uses these module objects directly without re-importing
14+
- AND execution succeeds identically to using string names
15+
16+
#### Scenario: Missing module raises helpful error
17+
- WHEN any `additional_modules` entry cannot be imported
18+
- THEN the system raises an ImportError that indicates the attempted name and suggests `pipelines.<name>` and `'-'→'_'` formatting
19+
20+
#### Scenario: Precedence order determines node override
21+
- WHEN multiple modules define the same node name
22+
- THEN the last module in the `.with_modules(*modules)` order SHALL take precedence (Hamilton standard behavior)
23+
24+
#### Scenario: Reload reloads all loaded modules
25+
- GIVEN `reload=True` is set
26+
- WHEN execution starts
27+
- THEN the system reloads the main pipeline module and each additional module
28+
29+
#### Scenario: Works for async execution
30+
- WHEN running with `run_async(...)` and `additional_modules` provided
31+
- THEN the system composes the same set of modules and executes asynchronously
32+
33+
#### Scenario: Cross-module final_vars
34+
- WHEN `final_vars` references nodes defined in any loaded module
35+
- THEN the system resolves and returns those nodes successfully
36+
Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,19 @@
1+
## 1. Implementation
2+
- [x] 1.1 Add `additional_modules` to `RunConfig` (list[str] | None)
3+
- [x] 1.2 Extend kwargs merge to support `additional_modules` (merge + dedupe)
4+
- [x] 1.3 Runner: resolve additional modules (string or module) and call `.with_modules(*modules)`
5+
- [x] 1.4 Reload logic: reload all additional modules when `reload=True`
6+
- [x] 1.5 PipelineManager.run docs: document `additional_modules`
7+
- [x] 1.6 (Optional) Visualizer: accept `additional_modules` to render composite DAG
8+
9+
## 2. Tests
10+
- [x] 2.1 Runner passes multiple modules to Builder (sync)
11+
- [x] 2.2 Runner passes multiple modules to Builder (async)
12+
- [x] 2.3 Import resolution: string success and helpful ImportError on failure
13+
- [x] 2.4 Reload reloads all modules
14+
- [x] 2.5 Config merge: kwargs + config `additional_modules` merge and dedupe
15+
16+
## 3. Docs & Release
17+
- [x] 3.1 README usage example (`pm.run("pipeline_1", additional_modules=["setup"])`)
18+
- [x] 3.2 CHANGELOG entry under Added
19+
- [ ] 3.3 Version bump as needed

openspec/changes/enhance-executor-overrides/proposal.md

Lines changed: 0 additions & 23 deletions
This file was deleted.

openspec/changes/enhance-executor-overrides/tasks.md

Lines changed: 0 additions & 5 deletions
This file was deleted.
Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,39 @@
1+
# pipeline-execution Specification
2+
3+
## Purpose
4+
TBD - created by archiving change update-run-allow-additional-modules. Update Purpose after archive.
5+
## Requirements
6+
### Requirement: Run With Additional Modules
7+
The system SHALL support executing a pipeline while loading additional Python modules into the Hamilton driver so that DAG nodes can be composed across modules.
8+
9+
#### Scenario: Execute with string module names
10+
- WHEN a user runs `pm.run("pipeline_1", additional_modules=["setup"])`
11+
- THEN the system imports both `setup` and `pipeline_1` modules and executes the composed DAG
12+
- AND functions in all loaded modules are available for dependency resolution
13+
14+
#### Scenario: Execute with module objects
15+
- WHEN a user passes actual module objects in `additional_modules`
16+
- THEN the system uses these module objects directly without re-importing
17+
- AND execution succeeds identically to using string names
18+
19+
#### Scenario: Missing module raises helpful error
20+
- WHEN any `additional_modules` entry cannot be imported
21+
- THEN the system raises an ImportError that indicates the attempted name and suggests `pipelines.<name>` and `'-'→'_'` formatting
22+
23+
#### Scenario: Precedence order determines node override
24+
- WHEN multiple modules define the same node name
25+
- THEN the last module in the `.with_modules(*modules)` order SHALL take precedence (Hamilton standard behavior)
26+
27+
#### Scenario: Reload reloads all loaded modules
28+
- GIVEN `reload=True` is set
29+
- WHEN execution starts
30+
- THEN the system reloads the main pipeline module and each additional module
31+
32+
#### Scenario: Works for async execution
33+
- WHEN running with `run_async(...)` and `additional_modules` provided
34+
- THEN the system composes the same set of modules and executes asynchronously
35+
36+
#### Scenario: Cross-module final_vars
37+
- WHEN `final_vars` references nodes defined in any loaded module
38+
- THEN the system resolves and returns those nodes successfully
39+

pyproject.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ description = "A simple workflow framework for building and managing data proces
44
authors = [{ name = "Volker L.", email = "[email protected]" }]
55
readme = "README.md"
66
requires-python = ">= 3.11"
7-
version = "0.33.1"
7+
version = "0.34.0"
88
keywords = ["hamilton", "workflow", "pipeline", "scheduler", "dask", "ray"]
99

1010
dependencies = [

0 commit comments

Comments
 (0)