Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
77 changes: 10 additions & 67 deletions docs/book/how-to/steps-pipelines/advanced_features.md
Original file line number Diff line number Diff line change
Expand Up @@ -281,78 +281,21 @@ Pipeline composition allows you to build complex workflows from simpler, well-te

### Fan-out and Fan-in

The fan-out/fan-in pattern is a common pipeline architecture where a single step splits into multiple parallel operations (fan-out) and then consolidates the results back into a single step (fan-in). This pattern is particularly useful for parallel processing, distributed workloads, or when you need to process data through different transformations and then aggregate the results. For example, you might want to process different chunks of data in parallel and then aggregate the results:
The fan-out/fan-in pattern is a common pipeline architecture where a single step splits into multiple parallel operations (fan-out) and then consolidates the results back into a single step (fan-in). This pattern is useful for parallel processing, distributed workloads, or when you need to process data through different transformations and then aggregate the results.

```python
from zenml import step, get_step_context, pipeline
from zenml.client import Client


@step
def load_step() -> str:
return "Hello from ZenML!"


@step
def process_step(input_data: str) -> str:
return input_data


@step
def combine_step(step_prefix: str, output_name: str) -> None:
run_name = get_step_context().pipeline_run.name
run = Client().get_pipeline_run(run_name)

# Fetch all results from parallel processing steps
processed_results = {}
for step_name, step_info in run.steps.items():
if step_name.startswith(step_prefix):
output = step_info.outputs[output_name][0]
processed_results[step_info.name] = output.load()

# Combine all results
print(",".join([f"{k}: {v}" for k, v in processed_results.items()]))


@pipeline(enable_cache=False)
def fan_out_fan_in_pipeline(parallel_count: int) -> None:
# Initial step (source)
input_data = load_step()

# Fan out: Process data in parallel branches
after = []
for i in range(parallel_count):
artifact = process_step(input_data, id=f"process_{i}")
after.append(artifact)

# Fan in: Combine results from all parallel branches
combine_step(step_prefix="process_", output_name="output", after=after)


fan_out_fan_in_pipeline(parallel_count=8)
```

The fan-out pattern allows for parallel processing and better resource utilization, while the fan-in pattern enables aggregation and consolidation of results. This is particularly useful for:

- Parallel data processing
- Distributed model training
- Ensemble methods
- Batch processing
- Data validation across multiple sources
- Hyperparameter tuning

Note that when implementing the fan-in step, you'll need to use the ZenML Client to query the results from previous parallel steps, as shown in the example above, and you can't pass in the result directly.
{% hint style="info" %}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Dynamic pipelines is still experimental, I wouldn't recommend them to anyone yet. But even once they're ready, don't you think it makes sense to keep the code sample on how to do it with static pipelines as well, just for the users that want to/must use static pipelines for some reason or another?

We can make the fan-in much nicer now even, as in static pipelines we also allow list of input artifacts, which gets rid of this whole code in the combine step. The only real limitation of static pipelines is the fan-out amount is fixed, but in some cases this might not even be a problem for users.

**For within-pipeline fan-out/fan-in patterns**, we recommend using [dynamic pipelines](../dynamic_pipelines.md) with the built-in `map/reduce` pattern. This provides a cleaner API, better performance, and eliminates the need to manually query results using the Client API. See the [Map/Reduce over collections](../dynamic_pipelines.md#mapreduce-over-collections) section for details.
{% endhint %}

{% hint style="warning" %}
The fan-in, fan-out method has the following limitations:
### Cross-Pipeline Fan-out/Fan-in with Snapshots

1. Steps run sequentially rather than in parallel if the underlying orchestrator does not support parallel step runs (e.g. with the local orchestrator)
2. The number of steps need to be known ahead-of-time, and ZenML does not yet support the ability to dynamically create steps on the fly.
{% endhint %}
For scenarios where you need to trigger multiple **separate pipeline runs** dynamically (e.g., based on database queries or external events), you can use [snapshots](https://docs.zenml.io/user-guides/tutorial/trigger-pipelines-from-external-systems) to create a cross-pipeline fan-out/fan-in pattern. This approach allows you to trigger multiple pipeline runs dynamically and then aggregate their results.

### Dynamic Fan-out/Fan-in with Snapshots
The snapshot-based approach is useful when you need to:
- Trigger completely separate pipeline runs (not just steps)
- Coordinate workflows across different pipeline definitions
- Handle scenarios where each parallel operation needs its own isolated pipeline execution context

For scenarios where you need to determine the number of parallel operations at runtime (e.g., based on database queries or dynamic data), you can use [snapshots](https://docs.zenml.io/user-guides/tutorial/trigger-pipelines-from-external-systems) to create a more flexible fan-out/fan-in pattern. This approach allows you to trigger multiple pipeline runs dynamically and then aggregate their results.

```python
from typing import List, Optional
Expand Down
Loading