-
Notifications
You must be signed in to change notification settings - Fork 2.2k
docs: beef up "submit flows to dynamic infra" with realistic k8s example #20997
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from 1 commit
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -136,15 +136,14 @@ To use local storage, ensure that: | |
|
|
||
| To submit a flow to specific infrastructure, use the appropriate decorator for that infrastructure type. | ||
|
|
||
| Here's an example using `@kubernetes`: | ||
| Here's a basic example using `@kubernetes`: | ||
|
|
||
| ```python | ||
| from prefect import flow | ||
| from prefect_kubernetes.experimental.decorators import kubernetes | ||
|
|
||
|
|
||
| # Submit `my_remote_flow` to run in a Kubernetes job | ||
| @kubernetes(work_pool="olympic") | ||
| @kubernetes(work_pool="my-k8s-pool") | ||
| @flow | ||
| def my_remote_flow(name: str): | ||
| print(f"Hello {name}!") | ||
|
|
@@ -153,11 +152,11 @@ def my_remote_flow(name: str): | |
| def my_flow(): | ||
| my_remote_flow("Marvin") | ||
|
|
||
| # Run the flow | ||
| # my_flow runs locally; my_remote_flow runs in a Kubernetes job | ||
| my_flow() | ||
| ``` | ||
|
|
||
| When you run this code on your machine, `my_flow` will execute locally, while `my_remote_flow` will be submitted to run in a Kubernetes job. | ||
| When you run this code on your machine, `my_flow` executes locally, while `my_remote_flow` is serialized with `cloudpickle`, shipped to object storage, and executed inside a Kubernetes job managed by a worker polling `my-k8s-pool`. | ||
|
|
||
| <Note> | ||
| **Parameters must be serializable** | ||
|
|
@@ -169,25 +168,143 @@ Most Python objects can be serialized with `cloudpickle`, but objects like datab | |
|
|
||
| ## Customizing infrastructure configuration | ||
|
|
||
| You can override the default configuration by providing additional kwargs to the infrastructure decorator: | ||
| Any extra keyword arguments you pass to the infrastructure decorator override the corresponding values in the work pool's [base job template](/v3/advanced/customize-base-job-templates). This is how you control the runtime environment of your infrastructure-bound flows. | ||
|
|
||
| ```python | ||
| from prefect import flow | ||
| from prefect_kubernetes.experimental.decorators import kubernetes | ||
| For Kubernetes work pools, the available job variables include: | ||
|
|
||
| | Variable | Description | | ||
| |---|---| | ||
| | `image` | Container image to use (defaults to the standard Prefect image) | | ||
| | `namespace` | Kubernetes namespace to create jobs in | | ||
| | `service_account_name` | Kubernetes service account for the job | | ||
| | `image_pull_policy` | Image pull policy (`IfNotPresent`, `Always`, `Never`) | | ||
| | `finished_job_ttl` | Seconds to retain completed jobs before cleanup | | ||
| | `stream_output` | Whether to stream job output to your local terminal | | ||
|
|
||
| For example, to run a flow in a custom namespace: | ||
|
|
||
| ```python | ||
| @kubernetes( | ||
| work_pool="my-kubernetes-pool", | ||
| work_pool="my-k8s-pool", | ||
|
||
| namespace="custom-namespace" | ||
| ) | ||
| @flow | ||
| def custom_namespace_flow(): | ||
| pass | ||
| ``` | ||
|
|
||
| Any kwargs passed to the infrastructure decorator will override the corresponding default value in the [base job template](/v3/how-to-guides/deployment_infra/manage-work-pools#base-job-template) for the specified work pool. | ||
| ### Bringing your own dependencies with a custom image | ||
|
|
||
| The most common customization is the `image` job variable. By default, infrastructure-bound flows run in the standard Prefect image, which only includes `prefect` and its dependencies. If your flow needs additional Python packages (for ML training, data processing, etc.), you provide a custom Docker image that has those packages installed. | ||
|
|
||
| For example, say you have a flow that trains a model with PyTorch. First, build and push a Docker image that includes your dependencies: | ||
|
|
||
| ```dockerfile | ||
| FROM prefecthq/prefect:3-latest | ||
| RUN pip install torch torchvision scikit-learn | ||
| ``` | ||
|
|
||
| ```bash | ||
| docker build -t my-registry.example.com/ml-flows:latest . | ||
| docker push my-registry.example.com/ml-flows:latest | ||
| ``` | ||
|
|
||
| Then reference that image in the decorator: | ||
|
|
||
| ```python | ||
| from prefect import flow | ||
| from prefect_kubernetes.experimental.decorators import kubernetes | ||
|
|
||
|
|
||
| @kubernetes( | ||
| work_pool="my-k8s-pool", | ||
| image="my-registry.example.com/ml-flows:latest", | ||
| ) | ||
| @flow | ||
| def train_model(epochs: int, learning_rate: float): | ||
| import torch | ||
| import torch.nn as nn | ||
|
|
||
| model = nn.Linear(10, 1) | ||
| optimizer = torch.optim.SGD(model.parameters(), lr=learning_rate) | ||
|
|
||
| for epoch in range(epochs): | ||
| # ... training logic ... | ||
| pass | ||
|
|
||
| return {"final_loss": 0.01} | ||
|
|
||
|
|
||
| @flow | ||
| def ml_pipeline(): | ||
| result = train_model(epochs=10, learning_rate=0.001) | ||
| print(f"Training complete: {result}") | ||
|
|
||
|
|
||
| ml_pipeline() | ||
| ``` | ||
|
|
||
| Here's what happens when you run `ml_pipeline()` on your laptop: | ||
|
|
||
| 1. `ml_pipeline` executes **locally** on your machine | ||
| 2. When it calls `train_model`, Prefect serializes the function and its arguments, uploads them to the object storage configured on `my-k8s-pool`, and submits a flow run to the work pool | ||
| 3. A worker polling `my-k8s-pool` picks up the run and creates a Kubernetes job using the work pool's **base job template** — a manifest that defines the shape of the `Job` spec (image, env vars, resource requests, etc.) | ||
| 4. Because `image` is passed in the decorator, it overrides the `{{ image }}` placeholder in the job template, so the pod runs **your custom image** with PyTorch installed | ||
| 5. The pod starts, downloads the serialized flow from object storage, executes it, and reports results back | ||
|
||
|
|
||
| <Tip> | ||
| You can also set a default `image` directly on the work pool (via the UI or CLI) so that all flows submitted to that pool use it automatically, without needing to specify `image` in every decorator. | ||
| </Tip> | ||
|
|
||
| ### Dynamic infrastructure sizing | ||
|
|
||
| You can also customize infrastructure at call time by computing job variables dynamically. If your work pool's base job template has been [customized to expose a `memory` variable](/v3/advanced/customize-base-job-templates), you could do something like: | ||
|
|
||
| ```python | ||
| from prefect import flow | ||
| from prefect_kubernetes.experimental.decorators import kubernetes | ||
|
|
||
|
|
||
| def estimate_memory(dataset_size_gb: float) -> int: | ||
| """Estimate memory needed based on dataset size.""" | ||
| return int(dataset_size_gb * 2.5 * 1024) # 2.5x dataset size in Mi | ||
|
|
||
|
|
||
| @flow | ||
| def process_pipeline(dataset_size_gb: float): | ||
| memory_needed = estimate_memory(dataset_size_gb) | ||
|
|
||
| # Dynamically create the infrastructure-bound flow with the right resources | ||
| @kubernetes( | ||
| work_pool="my-k8s-pool", | ||
| image="my-registry.example.com/data-tools:latest", | ||
| memory=memory_needed, | ||
| ) | ||
| @flow | ||
| def process_data(): | ||
| # This runs in a pod with enough memory for the dataset | ||
| import pandas as pd | ||
| # ... processing logic ... | ||
|
|
||
| process_data() | ||
|
|
||
|
|
||
| process_pipeline(dataset_size_gb=50) | ||
| ``` | ||
|
|
||
| ## How it works under the hood | ||
|
|
||
| Understanding the relationship between the decorator, the work pool, and the resulting Kubernetes job helps demystify this feature: | ||
|
|
||
| 1. **Work pool** — A work pool of type `kubernetes` stores a **base job template**: a Kubernetes `Job` manifest with `{{ placeholder }}` variables like `{{ image }}`, `{{ namespace }}`, and `{{ env }}`. You can view and customize this template in the Prefect UI or via the CLI. | ||
|
|
||
| 2. **`@kubernetes` decorator** — When you add `@kubernetes(work_pool="my-k8s-pool", image="...")` to a flow, the extra keyword arguments (`image`, `namespace`, etc.) become **job variable overrides**. These replace the corresponding `{{ placeholder }}` values in the base job template when a run is created. | ||
|
|
||
| 3. **Worker** — A worker process polling the work pool receives the flow run, renders the final job manifest by merging the base template with the job variable overrides, and creates the Kubernetes `Job`. | ||
|
||
|
|
||
| This means the work pool's base job template is the **single source of truth** for what the Kubernetes job looks like. The decorator gives you a convenient way to override specific variables per-flow without touching the template. | ||
|
|
||
| ## Further reading | ||
|
|
||
| - [Work pools](/v3/concepts/work-pools) concept page | ||
| - [Work pools](/v3/deploy/infrastructure-concepts/work-pools) concept page | ||
| - [Customize base job templates](/v3/advanced/customize-base-job-templates) | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
you should have added an additional example, keeping the first one simple and adding this more idiosyncratic one second. furthermore making the example more complex and adding the word "basic" makes no sense