Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 3 additions & 8 deletions docs/book/how-to/containerization/containerization.md
Original file line number Diff line number Diff line change
Expand Up @@ -267,7 +267,7 @@ ZenML offers several ways to specify dependencies for your Docker containers:
By default, ZenML automatically installs all packages required by your active ZenML stack.

{% hint style="warning" %}
In future versions, if none of the `replicate_local_python_environment`, `pyproject_path` or `requirements` attributes on `DockerSettings` are specified, ZenML will try to automatically find a `requirements.txt` and `pyproject.toml` files inside your current source root and install packages from the first one it finds. You can disable this behavior by setting `disable_automatic_requirements_detection=True`. If
In future versions, if none of the `replicate_local_python_environment`, `pyproject_path` or `requirements` attributes on `DockerSettings` are specified, ZenML will try to automatically find a `requirements.txt` and `pyproject.toml` files inside your current [source root](../steps-pipelines/sources.md#source-root) and install packages from the first one it finds. You can disable this behavior by setting `disable_automatic_requirements_detection=True`. If
you already want this automatic detection in current versions of ZenML, set `disable_automatic_requirements_detection=False`.
{% endhint %}

Expand Down Expand Up @@ -429,12 +429,7 @@ Be cautious with handling credentials. Always use secure methods to manage and d

## Source Code Management

ZenML determines the root directory of your source files in the following order:

1. If you've initialized zenml (`zenml init`) in your current working directory or one of its parent directories, the repository root directory will be used.
2. Otherwise, the parent directory of the Python file you're executing will be the source root. For example, running `python /path/to/file.py`, the source root would be `/path/to`.

You can specify how the files inside this root directory are handled:
You can specify how the files inside your [source root directory](../steps-pipelines/sources.md#source-root) are handled for containerized steps:

```python
docker_settings = DockerSettings(
Expand Down Expand Up @@ -577,7 +572,7 @@ zenml integration install github

Once you have registered one or more code repositories, ZenML will check whether the files you use when running a pipeline are tracked inside one of those code repositories. This happens as follows:

* First, the source root is computed
* First, the [source root](../steps-pipelines/sources.md#source-root) is computed
* Next, ZenML checks whether this source root directory is included in a local checkout of one of the registered code repositories

#### Tracking code versions for pipeline runs
Expand Down
6 changes: 1 addition & 5 deletions docs/book/how-to/snapshots/snapshots.md
Original file line number Diff line number Diff line change
Expand Up @@ -83,13 +83,9 @@ snapshot = my_pipeline.create_snapshot(name="<NAME>")

### Using the CLI

You can create a snapshot using the ZenML CLI:
You can create a snapshot using the ZenML CLI, by passing the [source path](../steps-pipelines/sources.md#source-paths) of your pipeline:

```bash
# The <PIPELINE-SOURCE-PATH> will be `run.my_pipeline` if you defined a
# pipeline with name `my_pipeline` in a file called `run.py`. This will be either relative
# to your ZenML repository (that you created by running `zenml init`) or your current working
# directory.
zenml pipeline snapshot create <PIPELINE-SOURCE-PATH> --name=<SNAPSHOT-NAME>
```

Expand Down
69 changes: 69 additions & 0 deletions docs/book/how-to/steps-pipelines/sources.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
---
description: Understanding source roots and source paths
icon: folders
---

# Source Code and Imports

When ZenML interacts with your pipeline code, it needs to understand how to locate and import your code. This page explains how ZenML determines the source root directory and how to construct source paths for referencing your Python objects.

## Source Root

The **source root** is the root directory of all your local code files.

ZenML determines the source root using the following priority:

1. **ZenML Repository**: If you're in a child directory of a [ZenML repository](https://docs.zenml.io/user-guides/best-practices/set-up-your-repository) (initialized with `zenml init`), the repository directory becomes the source root. We recommend always initializing a ZenML repository to make the source root explicit.

2. **Execution Context Fallback**: If no ZenML repository exists in your current working directory or parent directories, ZenML uses the parent directory of the Python file you're executing. For example, running `/a/b/run.py` sets the source root to `/a/b`.

{% hint style="warning" %}
If you're running in a notebook or an interactive Python environment, there will be no file that is currently executed and ZenML won't be able to automatically infer the source root. Therefore, you'll need to explicitly define the source root by initializing a ZenML repository in these cases.
{% endhint %}

## Source Paths

ZenML requires source paths in various configuration contexts. These are Python-style dotted paths that reference objects in your code.

### Common Use Cases

**Step Hook Configuration**:
```yaml
success_hook_source: <SUCCESS-HOOK-SOURCE>
```

**Pipeline Deployment via CLI**:
```bash
zenml pipeline deploy <PIPELINE-SOURCE>
```

### Path Construction

Import paths must be **relative to your source root** and follow Python import syntax.

**Example**: Consider this pipeline in `/a/b/c/run.py`:
```python
from zenml import pipeline

@pipeline
def my_pipeline():
...
```

The source path depends on your source root:
- Source root `/a/b/c` → `run.my_pipeline`
- Source root `/a` → `b.c.run.my_pipeline`

{% hint style="info" %}
Note that the source is not a file path, but instead its elements are separated by dots similar to how you would write import statements in Python.
{% endhint %}

## Containerized Step Execution

When running pipeline steps in containers, ZenML ensures your source root files are available in the container (either by including them in the image or downloading them at runtime).

To execute your step code, ZenML imports the Python module containing the step definition. **All imports of local code files must be relative to the source root** for this to work correctly.

{% hint style="info" %}
If you don't need all files inside your source root for step execution, see the [containerization guide](../containerization/containerization.md#controlling-included-files) for controlling which files are included.
{% endhint %}
1 change: 1 addition & 0 deletions docs/book/toc.md
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
* [Scheduling](how-to/steps-pipelines/scheduling.md)
* [Logging](how-to/steps-pipelines/logging.md)
* [YAML Configuration](how-to/steps-pipelines/yaml_configuration.md)
* [Source Code and Imports](how-to/steps-pipelines/sources.md)
* [Advanced Features](how-to/steps-pipelines/advanced_features.md)
* [Artifacts](how-to/artifacts/artifacts.md)
* [Materializers](how-to/artifacts/materializers.md)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -170,7 +170,7 @@ Collect all your notebooks in one place.

#### .zen

By running `zenml init` at the root of your project, you define the project scope for ZenML. In ZenML terms, this will be called your "source root". This will be used to resolve import paths and store configurations.
By running `zenml init` at the root of your project, you define the [source root](https://docs.zenml.io/concepts/steps_and_pipelines/sources#source-root) for your project.
- When running Jupyter notebooks, it is required that you have a `.zen` directory initialized in one of the parent directories of your notebook.
- When running regular Python scripts, it is still **highly** recommended that you have a `.zen` directory initialized in the root of your project. If that is not the case, ZenML will look for a `.zen` directory in the parent directories, which might cause issues if one is found (The import paths will not be relative to the source root anymore for example). If no `.zen` directory is found, the parent directory of the Python file that you're executing will be used as the implicit source root.

Expand Down
56 changes: 56 additions & 0 deletions docs/book/user-guide/starter-guide/cache-previous-executions.md
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,8 @@ ZenML generates a unique cache key for each step execution based on various fact
- **Step code**: The actual implementation of your step function
- **Step parameters**: Configuration parameters passed to the step
- **Input artifact values or IDs**: The content/data of input artifacts or their IDs
- **Additional file or source dependencies**: The file content or source code of additional dependencies that you can specify in your cache policy.
- **Custom cache function value**: The value returned by a custom cache function that you can specify in your cache policy.

When any of these factors change, the cache key changes, and the step will be re-executed.

Expand Down Expand Up @@ -159,6 +161,60 @@ Setting `include_step_code=False` can lead to unexpected behavior if you modify
artifact doesn't support generating a content hash, the artifact ID will be used as a fallback if enabled.
* `include_artifact_ids` (default: `True`): Whether to include the artifact IDs in the cache key.
* `ignored_inputs`: Allows you to exclude specific step inputs from cache key calculation.
* `file_dependencies`: Allows you to specify a list of files that your step depends on. The content of these files will be read and included in the cache key,
which means changes to any of the files will lead to a new cache key and therefore not cache from previous step executions.

{% hint style="info" %}
Files specified in this list must be relative to your [source root](https://docs.zenml.io/concepts/steps_and_pipelines/sources#source-root)
{% endhint %}

* `source_dependencies`: Allows you to specify a list of Python objects (modules, classes, functions) that your step depends on. The source code of these objects
will be read and included in the cache key, which means changes to any of the objects will lead to a new cache key and therefore not cache from previous step executions.
* `cache_func`: Allows you to specify a function (without arguments) that returns a string. This function will be called as part of the cache key computation, and the
return value will be included in the cache key.

Both source dependencies as well as the cache function can be passed directly directly in code or as a [source](https://docs.zenml.io/concepts/steps_and_pipelines/sources#source-paths) string:
```python
from zenml.config import CachePolicy

def my_helper_function():
...

# pass function directly..
cache_policy = CachePolicy(source_dependencies=[my_helper_function])
# ..or pass the function source. This also works when
# configuring the cache policy with a config file
cache_policy = CachePolicy(source_dependencies=["run.my_helper_function"])
```

#### Cache expiration

By default, any step that executes successfully is a caching candidate for future step runs. Any step with the same [cache key](#understanding-cache-keys) running afterwards
can reuse the output artifacts produced by the caching candidate instead of actually executing the step code. In some cases however you might want to limit the time for how long
a step run is a valid cache candidate for future steps. You can do that by configuring an expiration time for your step runs:

```python
from zenml.config import CachePolicy
from zenml import step

# Expire the cache after 24 hours
custom_cache_policy = CachePolicy(expires_after=60*60*24)

@step(cache_policy=custom_cache_policy)
def my_step():
...
```

{% hint style="info" %}
If you want to expire one of your step runs as a cache candidate manually, you can do so by setting it's cache expiration date (in UTC timezone):
```python
from zenml import Client
from datetime import datetime, timezone

now = datetime.now(timezone.utc)
Client().update_step_run(<STEP-RUN-ID>, cache_expires_at=now)
```
{% endhint %}

## Code Example

Expand Down
28 changes: 28 additions & 0 deletions src/zenml/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -4807,6 +4807,8 @@ def list_run_steps(
updated: Optional[Union[datetime, str]] = None,
name: Optional[str] = None,
cache_key: Optional[str] = None,
cache_expires_at: Optional[Union[datetime, str]] = None,
cache_expired: Optional[bool] = None,
code_hash: Optional[str] = None,
status: Optional[str] = None,
start_time: Optional[Union[datetime, str]] = None,
Expand Down Expand Up @@ -4843,6 +4845,10 @@ def list_run_steps(
model: Filter by model name/ID.
name: The name of the step run to filter by.
cache_key: The cache key of the step run to filter by.
cache_expires_at: The cache expiration time of the step run to
filter by.
cache_expired: Whether the cache expiration time of the step run
has passed.
code_hash: The code hash of the step run to filter by.
status: The name of the run to filter by.
run_metadata: Filter by run metadata.
Expand All @@ -4860,6 +4866,8 @@ def list_run_steps(
logical_operator=logical_operator,
id=id,
cache_key=cache_key,
cache_expires_at=cache_expires_at,
cache_expired=cache_expired,
code_hash=code_hash,
pipeline_run_id=pipeline_run_id,
snapshot_id=snapshot_id,
Expand All @@ -4882,6 +4890,26 @@ def list_run_steps(
hydrate=hydrate,
)

def update_step_run(
self,
step_run_id: UUID,
cache_expires_at: Optional[datetime] = None,
) -> StepRunResponse:
"""Update a step run.

Args:
step_run_id: The ID of the step run to update.
cache_expires_at: The time at which this step run should not be
used for cached results anymore.

Returns:
The updated step run.
"""
update = StepRunUpdate(cache_expires_at=cache_expires_at)
return self.zen_store.update_run_step(
step_run_id=step_run_id, step_run_update=update
)

# ------------------------------- Artifacts -------------------------------

def get_artifact(
Expand Down
53 changes: 52 additions & 1 deletion src/zenml/config/cache_policy.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,10 @@

from typing import Any, List, Optional, Union

from pydantic import BaseModel, BeforeValidator, Field
from pydantic import BaseModel, BeforeValidator, Field, field_validator
from typing_extensions import Annotated

from zenml.config.source import Source, SourceWithValidator
from zenml.logger import get_logger

logger = get_logger(__name__)
Expand Down Expand Up @@ -48,6 +49,56 @@ class CachePolicy(BaseModel):
default=None,
description="List of input names to ignore in the cache key.",
)
file_dependencies: Optional[List[str]] = Field(
default=None,
description="List of file paths. The contents of theses files will be "
"included in the cache key. Only relative paths within the source root "
"are allowed.",
)
source_dependencies: Optional[List[SourceWithValidator]] = Field(
default=None,
description="List of Python objects (modules, classes, functions). "
"The source code of these objects will be included in the cache key.",
)
cache_func: Optional[SourceWithValidator] = Field(
default=None,
description="Function without arguments that returns a string. The "
"returned value will be included in the cache key.",
)
expires_after: Optional[int] = Field(
default=None,
description="The number of seconds after which the cached result by a "
"step with this cache policy will expire. If not set, the result "
"will never expire.",
)

@field_validator("source_dependencies", mode="before")
def _validate_source_dependencies(
cls, v: Optional[List[Any]]
) -> Optional[List[Any]]:
from zenml.utils import source_utils

if v is None:
return None

result = []
for obj in v:
if isinstance(obj, (str, Source, dict)):
result.append(obj)
else:
result.append(source_utils.resolve(obj))
return result

@field_validator("cache_func", mode="before")
def _validate_cache_func(cls, v: Optional[Any]) -> Optional[Any]:
from zenml.utils import source_utils

if v is None or isinstance(v, (str, Source, dict)):
return v
elif callable(v):
return source_utils.resolve(v)
else:
raise ValueError(f"Invalid cache function: {v}")

@classmethod
def default(cls) -> "CachePolicy":
Expand Down
Loading
Loading