diff --git a/docs/book/how-to/steps-pipelines/advanced_features.md b/docs/book/how-to/steps-pipelines/advanced_features.md index 8590d089343..aa5f8e577ea 100644 --- a/docs/book/how-to/steps-pipelines/advanced_features.md +++ b/docs/book/how-to/steps-pipelines/advanced_features.md @@ -281,78 +281,21 @@ Pipeline composition allows you to build complex workflows from simpler, well-te ### Fan-out and Fan-in -The fan-out/fan-in pattern is a common pipeline architecture where a single step splits into multiple parallel operations (fan-out) and then consolidates the results back into a single step (fan-in). This pattern is particularly useful for parallel processing, distributed workloads, or when you need to process data through different transformations and then aggregate the results. For example, you might want to process different chunks of data in parallel and then aggregate the results: +The fan-out/fan-in pattern is a common pipeline architecture where a single step splits into multiple parallel operations (fan-out) and then consolidates the results back into a single step (fan-in). This pattern is useful for parallel processing, distributed workloads, or when you need to process data through different transformations and then aggregate the results. -```python -from zenml import step, get_step_context, pipeline -from zenml.client import Client - - -@step -def load_step() -> str: - return "Hello from ZenML!" - - -@step -def process_step(input_data: str) -> str: - return input_data - - -@step -def combine_step(step_prefix: str, output_name: str) -> None: - run_name = get_step_context().pipeline_run.name - run = Client().get_pipeline_run(run_name) - - # Fetch all results from parallel processing steps - processed_results = {} - for step_name, step_info in run.steps.items(): - if step_name.startswith(step_prefix): - output = step_info.outputs[output_name][0] - processed_results[step_info.name] = output.load() - - # Combine all results - print(",".join([f"{k}: {v}" for k, v in processed_results.items()])) - - -@pipeline(enable_cache=False) -def fan_out_fan_in_pipeline(parallel_count: int) -> None: - # Initial step (source) - input_data = load_step() - - # Fan out: Process data in parallel branches - after = [] - for i in range(parallel_count): - artifact = process_step(input_data, id=f"process_{i}") - after.append(artifact) - - # Fan in: Combine results from all parallel branches - combine_step(step_prefix="process_", output_name="output", after=after) - - -fan_out_fan_in_pipeline(parallel_count=8) -``` - -The fan-out pattern allows for parallel processing and better resource utilization, while the fan-in pattern enables aggregation and consolidation of results. This is particularly useful for: - -- Parallel data processing -- Distributed model training -- Ensemble methods -- Batch processing -- Data validation across multiple sources -- Hyperparameter tuning - -Note that when implementing the fan-in step, you'll need to use the ZenML Client to query the results from previous parallel steps, as shown in the example above, and you can't pass in the result directly. +{% hint style="info" %} +**For within-pipeline fan-out/fan-in patterns**, we recommend using [dynamic pipelines](../dynamic_pipelines.md) with the built-in `map/reduce` pattern. This provides a cleaner API, better performance, and eliminates the need to manually query results using the Client API. See the [Map/Reduce over collections](../dynamic_pipelines.md#mapreduce-over-collections) section for details. +{% endhint %} -{% hint style="warning" %} -The fan-in, fan-out method has the following limitations: +### Cross-Pipeline Fan-out/Fan-in with Snapshots -1. Steps run sequentially rather than in parallel if the underlying orchestrator does not support parallel step runs (e.g. with the local orchestrator) -2. The number of steps need to be known ahead-of-time, and ZenML does not yet support the ability to dynamically create steps on the fly. -{% endhint %} +For scenarios where you need to trigger multiple **separate pipeline runs** dynamically (e.g., based on database queries or external events), you can use [snapshots](https://docs.zenml.io/user-guides/tutorial/trigger-pipelines-from-external-systems) to create a cross-pipeline fan-out/fan-in pattern. This approach allows you to trigger multiple pipeline runs dynamically and then aggregate their results. -### Dynamic Fan-out/Fan-in with Snapshots +The snapshot-based approach is useful when you need to: +- Trigger completely separate pipeline runs (not just steps) +- Coordinate workflows across different pipeline definitions +- Handle scenarios where each parallel operation needs its own isolated pipeline execution context -For scenarios where you need to determine the number of parallel operations at runtime (e.g., based on database queries or dynamic data), you can use [snapshots](https://docs.zenml.io/user-guides/tutorial/trigger-pipelines-from-external-systems) to create a more flexible fan-out/fan-in pattern. This approach allows you to trigger multiple pipeline runs dynamically and then aggregate their results. ```python from typing import List, Optional