Skip to content

Latest commit

 

History

History
310 lines (214 loc) · 11.7 KB

File metadata and controls

310 lines (214 loc) · 11.7 KB
title sidebarTitle description
How to submit flows directly to dynamic infrastructure
Submit Flows Directly to Dynamic Infrastructure
Submit flows directly to different infrastructure types without a deployment
**Beta Feature**

This feature is currently in beta. While we encourage you to try it out and provide feedback, please be aware that the API may change in future releases, potentially including breaking changes.

Prefect allows you to submit workflows directly to different infrastructure types without requiring a deployment. This enables you to dynamically choose where your workflows run based on their requirements, such as:

  • Training machine learning models that require GPUs
  • Processing large datasets that need significant memory
  • Running lightweight tasks that can use minimal resources

Benefits

Submitting workflows directly to dynamic infrastructure provides several advantages:

  • Dynamic resource allocation: Choose infrastructure based on workflow requirements at runtime
  • Cost efficiency: Use expensive infrastructure only when needed
  • Consistency: Ensure workflows always run on the appropriate infrastructure type
  • Simplified workflow management: No need to create and maintain deployments for different infrastructure types

Supported infrastructure

Direct submission of workflows is currently supported for the following infrastructures:

Infrastructure Required Package Decorator
Docker prefect-docker @docker
Kubernetes prefect-kubernetes @kubernetes
AWS ECS prefect-aws @ecs
Google Cloud Run prefect-gcp @cloud_run
Google Vertex AI prefect-gcp @vertex_ai
Azure Container Instances prefect-azure @azure_container_instance

Each package can be installed using pip, for example:

pip install prefect-docker

Prerequisites

Before submitting workflows to specific infrastructure, you'll need:

  1. A work pool for each infrastructure type you want to use
  2. Object storage to associate with your work pool(s)

Setting up work pools and storage

Creating a work pool

Create work pools for each infrastructure type using the Prefect CLI:

prefect work-pool create NAME --type WORK_POOL_TYPE

For detailed information on creating and configuring work pools, refer to the work pools documentation.

Configuring work pool storage

To enable Prefect to run workflows in remote infrastructure, work pools need an associated storage location to store serialized versions of submitted workflows and results from workflow runs.

Configure storage for your work pools using one of the supported storage types:

```bash S3 prefect work-pool storage configure s3 WORK_POOL_NAME \ --bucket BUCKET_NAME \ --aws-credentials-block-name BLOCK_NAME ```
prefect work-pool storage configure gcs WORK_POOL_NAME \
    --bucket BUCKET_NAME \
    --gcp-credentials-block-name BLOCK_NAME
prefect work-pool storage configure azure-blob-storage WORK_POOL_NAME \
    --container CONTAINER_NAME \
    --azure-blob-storage-credentials-block-name BLOCK_NAME

To allow Prefect to upload and download serialized workflows, you can create a block containing credentials with permission to access your configured storage location.

If a credentials block is not provided, Prefect will use the default credentials (e.g., a local profile or an IAM role) as determined by the corresponding cloud provider.

You can inspect your storage configuration using:

prefect work-pool storage inspect WORK_POOL_NAME
**Local storage for `@docker`**

When using the @docker decorator with a local Docker engine, you can use volume mounts to share data between your Docker container and host machine.

Here's an example:

from prefect import flow
from prefect.filesystems import LocalFileSystem
from prefect_docker.experimental import docker


result_storage = LocalFileSystem(basepath="/tmp/results")
result_storage.save("result-storage", overwrite=True)


@docker(
    work_pool="above-ground",
    volumes=["/tmp/results:/tmp/results"],
)
@flow(result_storage=result_storage)
def run_in_docker(name: str):
    return(f"Hello, {name}!")


print(run_in_docker("world")) # prints "Hello, world!"

To use local storage, ensure that:

  1. The volume mount path is identical on both the host and container side
  2. The LocalFileSystem block's basepath matches the path specified in the volume mount

Submitting workflows to specific infrastructure

To submit a flow to specific infrastructure, use the appropriate decorator for that infrastructure type.

Here's a basic example using @kubernetes:

from prefect import flow
from prefect_kubernetes.experimental.decorators import kubernetes


@kubernetes(work_pool="my-k8s-pool")
@flow
def my_remote_flow(name: str):
    print(f"Hello {name}!")

@flow
def my_flow():
    my_remote_flow("Marvin")

# my_flow runs locally; my_remote_flow runs in a Kubernetes job
my_flow()

When you run this code on your machine, my_flow executes locally, while my_remote_flow is serialized with cloudpickle, shipped to object storage, and executed inside a Kubernetes job managed by a worker polling my-k8s-pool.

**Parameters must be serializable**

Parameters passed to infrastructure-bound flows are serialized with cloudpickle to allow them to be transported to the destination infrastructure.

Most Python objects can be serialized with cloudpickle, but objects like database connections cannot be serialized. For parameters that cannot be serialized, you'll need to create the object inside your infrastructure-bound workflow.

Customizing infrastructure configuration

Any extra keyword arguments you pass to the infrastructure decorator override the corresponding values in the work pool's base job template. This is how you control the runtime environment of your infrastructure-bound flows.

For Kubernetes work pools, the available job variables include:

Variable Description
image Container image to use (defaults to the standard Prefect image)
namespace Kubernetes namespace to create jobs in
service_account_name Kubernetes service account for the job
image_pull_policy Image pull policy (IfNotPresent, Always, Never)
finished_job_ttl Seconds to retain completed jobs before cleanup
stream_output Whether to stream job output to your local terminal

For example, to run a flow in a custom namespace:

@kubernetes(
    work_pool="my-k8s-pool",
    namespace="custom-namespace"
)
@flow
def custom_namespace_flow():
    pass

Bringing your own dependencies with a custom image

The most common customization is the image job variable. By default, infrastructure-bound flows run in the standard Prefect image, which only includes prefect and its dependencies. If your flow needs additional Python packages (for ML training, data processing, etc.), you provide a custom Docker image that has those packages installed.

For example, say you have a flow that trains a model with PyTorch. First, build and push a Docker image that includes your dependencies:

FROM prefecthq/prefect:3-latest
RUN pip install torch torchvision scikit-learn
docker build -t my-registry.example.com/ml-flows:latest .
docker push my-registry.example.com/ml-flows:latest

Then reference that image in the decorator:

from prefect import flow
from prefect_kubernetes.experimental.decorators import kubernetes


@kubernetes(
    work_pool="my-k8s-pool",
    image="my-registry.example.com/ml-flows:latest",
)
@flow
def train_model(epochs: int, learning_rate: float):
    import torch
    import torch.nn as nn

    model = nn.Linear(10, 1)
    optimizer = torch.optim.SGD(model.parameters(), lr=learning_rate)

    for epoch in range(epochs):
        # ... training logic ...
        pass

    return {"final_loss": 0.01}


@flow
def ml_pipeline():
    result = train_model(epochs=10, learning_rate=0.001)
    print(f"Training complete: {result}")


ml_pipeline()

Here's what happens when you run ml_pipeline() on your laptop:

  1. ml_pipeline executes locally on your machine
  2. When it calls train_model, Prefect serializes the function and its arguments, uploads them to the object storage configured on my-k8s-pool, and submits a flow run to the work pool
  3. A worker polling my-k8s-pool picks up the run and creates a Kubernetes job using the work pool's base job template — a manifest that defines the shape of the Job spec (image, env vars, resource requests, etc.)
  4. Because image is passed in the decorator, it overrides the {{ image }} placeholder in the job template, so the pod runs your custom image with PyTorch installed
  5. The pod starts, downloads the serialized flow from object storage, executes it, and reports results back
You can also set a default `image` directly on the work pool (via the UI or CLI) so that all flows submitted to that pool use it automatically, without needing to specify `image` in every decorator.

Dynamic infrastructure sizing

You can also customize infrastructure at call time by computing job variables dynamically. If your work pool's base job template has been customized to expose a memory variable, you could do something like:

from prefect import flow
from prefect_kubernetes.experimental.decorators import kubernetes


def estimate_memory(dataset_size_gb: float) -> int:
    """Estimate memory needed based on dataset size."""
    return int(dataset_size_gb * 2.5 * 1024)  # 2.5x dataset size in Mi


@flow
def process_pipeline(dataset_size_gb: float):
    memory_needed = estimate_memory(dataset_size_gb)

    # Dynamically create the infrastructure-bound flow with the right resources
    @kubernetes(
        work_pool="my-k8s-pool",
        image="my-registry.example.com/data-tools:latest",
        memory=memory_needed,
    )
    @flow
    def process_data():
        # This runs in a pod with enough memory for the dataset
        import pandas as pd
        # ... processing logic ...

    process_data()


process_pipeline(dataset_size_gb=50)

How it works under the hood

Understanding the relationship between the decorator, the work pool, and the resulting Kubernetes job helps demystify this feature:

  1. Work pool — A work pool of type kubernetes stores a base job template: a Kubernetes Job manifest with {{ placeholder }} variables like {{ image }}, {{ namespace }}, and {{ env }}. You can view and customize this template in the Prefect UI or via the CLI.

  2. @kubernetes decorator — When you add @kubernetes(work_pool="my-k8s-pool", image="...") to a flow, the extra keyword arguments (image, namespace, etc.) become job variable overrides. These replace the corresponding {{ placeholder }} values in the base job template when a run is created.

  3. Worker — A worker process polling the work pool receives the flow run, renders the final job manifest by merging the base template with the job variable overrides, and creates the Kubernetes Job.

This means the work pool's base job template is the single source of truth for what the Kubernetes job looks like. The decorator gives you a convenient way to override specific variables per-flow without touching the template.

Further reading