Skip to content

Commit 6e74051

Browse files
committed
refactor(pipeline): extract PipelineRunner, RetryManager & ExecutionContext; simplify Pipeline
- Introduce PipelineRunner to encapsulate pipeline execution flow (sync & async). - Add RetryManager to centralize retry/backoff logic for runs (sync + async). - Add ExecutionContextBuilder to create executors, shutdown hooks and adapters. - Add telemetry helper (initialize_telemetry) and ensure logging initialization helper. - Simplify Pipeline: delegate run/run_async to PipelineRunner, expose adapter_manager & executor_factory, lazily create Runner. - Extend RunConfig handling: add executor_override_raw and utilities to resolve/merge executor overrides. - Harden RunConfig/RetryConfig serialization and exception string handling. - Add examples for hello-world (pipelines, parallel pipeline, project config, python pipeline modules). - Add tests for runner and updated pipeline tests to assert runner delegation and async parity. - Remove docs cleanup plan/tasks (moved/removed) and update .gitignore to ignore docs/refactoring and cleanup plan. - Bump versions: pyproject -> 0.32.6; uv.lock -> 0.32.5
1 parent 19cac12 commit 6e74051

File tree

22 files changed

+1065
-810
lines changed

22 files changed

+1065
-810
lines changed

.gitignore

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -82,3 +82,6 @@ flowerpower_llm_context7.md
8282
flowerpower_llm_doc_1.md
8383
flowerpower_llm_doc_2.md
8484
flowerpower_llm_doc_3.md
85+
docs/refactoring/
86+
docs/cleanup-refactor-plan.md
87+
.gitignore

docs/cleanup-refactor-plan.md

Lines changed: 0 additions & 64 deletions
This file was deleted.

docs/cleanup-tasks.md

Lines changed: 0 additions & 29 deletions
This file was deleted.
Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,44 @@
1+
adapter:
2+
hamilton_tracker:
3+
capture_data_statistics: true
4+
dag_name: null
5+
max_dict_length_capture: 10
6+
max_list_length_capture: 50
7+
project_id: null
8+
tags: {}
9+
mlflow:
10+
experiment_description: null
11+
experiment_name: null
12+
experiment_tags: {}
13+
run_description: null
14+
run_id: null
15+
run_name: null
16+
run_tags: {}
17+
params:
18+
avg_x_wk_spend:
19+
rolling: 3
20+
spend_zero_mean:
21+
offset: 0
22+
run:
23+
cache: false
24+
config:
25+
range: 10_000
26+
executor:
27+
max_workers: 40
28+
num_cpus: 8
29+
type: local
30+
final_vars:
31+
- spend
32+
- signups
33+
- avg_x_wk_spend
34+
- spend_per_signup
35+
- spend_zero_mean_unit_variance
36+
inputs: {}
37+
log_level: null
38+
with_adapter:
39+
future: false
40+
mlflow: false
41+
opentelemetry: false
42+
progressbar: false
43+
ray: false
44+
tracker: false
Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,45 @@
1+
adapter:
2+
hamilton_tracker:
3+
capture_data_statistics: true
4+
dag_name: null
5+
max_dict_length_capture: 10
6+
max_list_length_capture: 50
7+
project_id: null
8+
tags: {}
9+
mlflow:
10+
experiment_description: null
11+
experiment_name: null
12+
experiment_tags: {}
13+
run_description: null
14+
run_id: null
15+
run_name: null
16+
run_tags: {}
17+
params: {}
18+
run:
19+
adapter: null
20+
cache: false
21+
config: {}
22+
executor:
23+
max_workers: 60
24+
num_cpus: 12
25+
type: threadpool
26+
final_vars: []
27+
inputs: {}
28+
jitter_factor: 0.1
29+
log_level: INFO
30+
max_retries: 3
31+
on_failure: null
32+
on_success: null
33+
pipeline_adapter_cfg: null
34+
project_adapter_cfg: null
35+
reload: false
36+
retry_delay: 1
37+
retry_exceptions:
38+
- <class 'Exception'>
39+
with_adapter:
40+
future: false
41+
hamilton_tracker: false
42+
mlflow: false
43+
opentelemetry: false
44+
progressbar: false
45+
ray: false
Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,18 @@
1+
name: hello-world
2+
adapter:
3+
hamilton_tracker:
4+
username: null
5+
api_url: http://localhost:8241
6+
ui_url: http://localhost:8242
7+
api_key: null
8+
verify: false
9+
mlflow:
10+
tracking_uri: null
11+
registry_uri: null
12+
artifact_location: null
13+
ray:
14+
ray_init_config: null
15+
shutdown_ray_on_completion: false
16+
opentelemetry:
17+
host: localhost
18+
port: 6831
Lines changed: 90 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,90 @@
1+
# FlowerPower pipeline hello_world.py
2+
# Created on 2024-10-26 12:44:27
3+
4+
5+
import sys
6+
import time
7+
from pathlib import Path
8+
9+
import pandas as pd
10+
from hamilton.function_modifiers import config, parameterize
11+
from loguru import logger
12+
13+
from flowerpower.cfg import Config
14+
15+
PARAMS = Config.load(
16+
Path(__file__).parents[1], pipeline_name="hello_world"
17+
).pipeline.h_params
18+
19+
20+
@config.when(range=10_000)
21+
def spend__10000() -> pd.Series:
22+
"""Returns a series of spend data."""
23+
# time.sleep(2)
24+
return pd.Series(range(10_000)) * 10
25+
26+
27+
@config.when(range=10_000)
28+
def signups__10000() -> pd.Series:
29+
"""Returns a series of signups data."""
30+
time.sleep(1)
31+
print(10_000)
32+
return pd.Series(range(10_000))
33+
34+
35+
@config.when(range=1_000)
36+
def spend__1000() -> pd.Series:
37+
"""Returns a series of spend data."""
38+
# time.sleep(2)
39+
print(1_000)
40+
return pd.Series(range(10_000)) * 10
41+
42+
43+
@config.when(range=1_000)
44+
def signups__1000() -> pd.Series:
45+
"""Returns a series of signups data."""
46+
time.sleep(1)
47+
print(1_000)
48+
return pd.Series(range(10_000))
49+
50+
51+
@parameterize(
52+
**PARAMS.avg_x_wk_spend
53+
) # (avg_x_wk_spend={"rolling": value(3)}) #
54+
def avg_x_wk_spend(spend: pd.Series, rolling: int) -> pd.Series:
55+
"""Rolling x week average spend."""
56+
# time.sleep(2)
57+
return spend.rolling(rolling).mean()
58+
59+
60+
def spend_per_signup(spend: pd.Series, signups: pd.Series) -> pd.Series:
61+
"""The cost per signup in relation to spend."""
62+
time.sleep(1)
63+
return spend / signups
64+
65+
66+
def spend_mean(spend: pd.Series) -> float:
67+
"""Shows function creating a scalar. In this case it computes the mean of the entire column."""
68+
return spend.mean()
69+
70+
71+
@parameterize(
72+
**PARAMS.spend_zero_mean
73+
) # (spend_zero_mean={"offset": value(0)}) #
74+
def spend_zero_mean(spend: pd.Series, spend_mean: float, offset: int) -> pd.Series:
75+
"""Shows function that takes a scalar. In this case to zero mean spend."""
76+
return spend - spend_mean + offset
77+
78+
79+
def spend_std_dev(spend: pd.Series) -> float:
80+
"""Function that computes the standard deviation of the spend column."""
81+
return spend.std()
82+
83+
84+
def spend_zero_mean_unit_variance(
85+
spend_zero_mean: pd.Series, spend_std_dev: float, verbose: bool = False
86+
) -> pd.Series:
87+
"""Function showing one way to make spend have zero mean and unit variance."""
88+
if verbose:
89+
logger.info(f"spend_zero_mean_unit_variance {spend_zero_mean / spend_std_dev}")
90+
return spend_zero_mean / spend_std_dev
Lines changed: 92 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,92 @@
1+
# FlowerPower pipeline hello_world_parallel.py
2+
# Created on 2025-10-14 02:39:22
3+
4+
####################################################################################################
5+
# Import necessary libraries
6+
# NOTE: Remove or comment out imports that are not used in the pipeline
7+
8+
from hamilton.function_modifiers import parameterize, dataloader, datasaver
9+
from hamilton.htypes import Parallelizable, Collect
10+
11+
from pathlib import Path
12+
13+
from flowerpower.cfg import Config
14+
import requests
15+
16+
####################################################################################################
17+
# Load pipeline parameters. Do not modify this section.
18+
19+
PARAMS = Config.load(
20+
Path(__file__).parents[1], pipeline_name="hello_world_parallel"
21+
).pipeline.h_params
22+
23+
24+
####################################################################################################
25+
# Helper functions.
26+
# This functions have to start with an underscore (_).
27+
28+
29+
def _list_all_urls() -> list[str]:
30+
return [
31+
"https://www.gutenberg.org/files/1342/1342-0.txt", # Pride and Prejudice
32+
"https://www.gutenberg.org/files/11/11-0.txt", # Alice's Adventures in Wonderland
33+
"https://www.gutenberg.org/files/84/84-0.txt", # Frankenstein
34+
"https://www.gutenberg.org/files/98/98-0.txt", # A Tale of Two Cities
35+
"https://www.gutenberg.org/files/2701/2701-0.txt", #
36+
"https://www.gutenberg.org/files/1232/1232-0.txt", # The Prince
37+
"https://www.gutenberg.org/files/74/74-0.txt", # The Adventures of Tom Sawyer
38+
"https://www.gutenberg.org/files/5200/5200-0.txt", # Metamorphosis
39+
"https://www.gutenberg.org/files/16328/16328-0.txt", # Beowulf
40+
"https://www.gutenberg.org/files/55/55-0.txt", # The Wonderful Wizard of Oz
41+
"https://www.gutenberg.org/files/1080/1080-0.txt", # A Modest Proposal
42+
"https://www.gutenberg.org/files/345/345-0.txt", # Dracula by Bram Stoker
43+
"https://www.gutenberg.org/files/174/174-0.txt", # The Picture of Dorian Gray
44+
"https://www.gutenberg.org/files/23/23-0.txt", # The Scarlet Letter
45+
"https://www.gutenberg.org/files/768/768-0.txt", # Wuthering Heights by Emily Brontë
46+
"https://www.gutenberg.org/files/1260/1260-0.txt", # Jane Eyre by Charlotte Brontë
47+
"https://www.gutenberg.org/files/1399/1399-0.txt", # The Iliad by Homer
48+
"https://www.gutenberg.org/files/135/135-0.txt", # The Odyssey by Homer
49+
"https://www.gutenberg.org/files/author/1342.txt", # The Complete Works of William Shakespeare
50+
"https://www.gutenberg.org/files/author/11.txt", # The Complete Works of Lewis Carroll
51+
"https://www.gutenberg.org/files/author/84.txt", # The Complete Works of Mary Shelley
52+
"https://www.gutenberg.org/files/author/98.txt", # The Complete Works of Charles Dickens
53+
"https://www.gutenberg.org/files/author/2701.txt", # The Complete Works of Herman Melville
54+
"https://www.gutenberg.org/files/author/1232.txt", # The Complete Works of Niccolò Machiavelli
55+
"https://www.gutenberg.org/files/author/74.txt", # The Complete Works of Mark Twain
56+
"https://www.gutenberg.org/files/author/5200.txt", # The Complete Works of Franz Kafka
57+
"https://www.gutenberg.org/files/author/16328.txt", # The Complete Works of Anonymous
58+
"https://www.gutenberg.org/files/author/55.txt", # The Complete Works of L. Frank Baum
59+
"https://www.gutenberg.org/files/author/1080.txt", # The Complete Works of Jonathan Swift
60+
"https://www.gutenberg.org/files/author/345.txt", # The Complete Works of Bram Stoker
61+
"https://www.gutenberg.org/files/author/174.txt", # The Complete Works of Oscar Wilde
62+
"https://www.gutenberg.org/files/author/23.txt", # The Complete Works of Nathaniel Hawthorne
63+
"https://www.gutenberg.org/files/author/768.txt", # The Complete Works of Emily Brontë
64+
"https://www.gutenberg.org/files/author/1260.txt", # The Complete Works of Charlotte Brontë
65+
]
66+
67+
68+
def _load(url: str) -> str:
69+
response = requests.get(url)
70+
response.raise_for_status()
71+
return response.text
72+
73+
74+
####################################################################################################
75+
# Pipeline functions
76+
77+
78+
def url() -> Parallelizable[str]:
79+
for url_ in _list_all_urls():
80+
yield url_
81+
82+
83+
def url_loaded(url: str) -> str:
84+
return _load(url)
85+
86+
87+
def counts(url_loaded: str) -> int:
88+
return len(url_loaded.split(" "))
89+
90+
91+
def total_words(counts: Collect[int]) -> int:
92+
return sum(counts)

0 commit comments

Comments
 (0)