Skip to content

Conversation

stefannica
Copy link
Contributor

@stefannica stefannica commented Oct 15, 2025

Framework-Agnostic Deployment App Factory

Overview

This PR implements a framework-agnostic deployment ASGI app factory system that allows customizing all aspects of the ASGI web application that powers up the pipeline deployments:

  • app factory abstraction compatible with any ASGI framework (e.g. FastAPI, Django, Flask, Falcon, Quart, BlackSheep, etc.). Currently implemented is only the FastAPI version.
  • unified endpoint/middleware specifications make it possible to add endpoints and middleware via configuration independently of ASGI framework
  • app extensions make it possible to contribute with framework-specific code to the process of building the ASGI application (e.g. add middleware, endpoints, routes, advanced security mechanisms etc.)

If necessary, the core components of the deployment server - the app factory (aka app runner) and the deployment service classes can be extended and custom implementations can be used instead of the built-in ones via configuration options.

Key Components Implemented

zenml.utils.source_utils.SourceOrObject

A hybrid type that can hold either a source string OR a loaded object:

  • Accepts strings, Source objects, or actual importable objects (types, functions, global variables).
  • Lazy loading via load() method
  • Automatic serialization to source strings

zenml.config.deployment_config.DeploymentSettings

Similar to DockerSettings, can be used to customize the configuration and behavior of the ASGI application that is run by the pipeline deployment server:

  • Contains a rich set of settings, from descriptive to URL paths, to custom endpoints, middleware and extensions
  • Relies on SourceOrObject to encode the location of classes and functions used as values
  • Can only be used at pipeline level

zenml.deployers.server.app.BaseDeploymentAppRunner

This is the abstract factory used to build and run the pipeline deployment ASGI application according to the specifications in the DeploymentSettings. This ASGI application is just a wrapper around the core service that implements the pipeline deployment operations (see next point). The responsibilities of this component are:

  • Construct an ASGI application using one of the supported frameworks (e.g. FastAPI, Django, Flask, Falcon, Quart, BlackSheep, etc.) and according to the configuration values in the DeploymentSettings.
  • Provide a unified, framework agnostic API that can be used to extend the ASGI application with custom endpoints and middleware.
  • Implement the REST API specific logic around the core deployment service.

The only built-in implementation provided for this class is the one for FastAPI: zenml.deployers.server.fastapi.app.FastAPIDeploymentAppRunner.

Users can implement this abstract class to support any other ASGI capable framework.

zenml.deployers.server.service.BasePipelineDeploymentService

This is the base class for the service that runs the core pipeline deployment logic. The responsibilities of this component are:

  • Initialize and cleanup the global state shared by all pipeline deployment invocations (e.g. run the init hooks, configure the orchestrator).
  • Provide information about the input and output deployment schemas.
  • Run one or more actual pipelines according to an invocation request (i.e. input parameters) and return its output.
  • Implement some rudimentary health check, status and metrics.
  • This code needs to be ASGI / HTTP agnostic.

The only built-in implementation provided for this class is the one that uses the local orchestrator to run pipelines: zenml.deployers.server.service.PipelineDeploymentService.

Users can implement this abstract class to provide their own custom implementations of running one or more pipelines.

Examples

1. Basic Configuration Examples

Configure URL Paths

from zenml import pipeline
from zenml.config.deployment_settings import DeploymentSettings

deployment_settings = DeploymentSettings(
    # Customize endpoint paths
    invoke_url_path="/api/v1/predict",
    health_url_path="/healthz",
    metrics_url_path="/api/metrics",
    docs_url_path="/api-docs",
    
    # App metadata
    app_title="Customer Churn Prediction Service",
    app_description="ML service for predicting customer churn",
    app_version="1.2.0",
)

@pipeline(settings={"deployment": deployment_settings})
def my_pipeline():
    ...

Configure CORS

from zenml.config.deployment_settings import (
    DeploymentSettings,
    CORSConfig,
)

deployment_settings = DeploymentSettings(
    cors=CORSConfig(
        allow_origins=[
            "https://myapp.example.com",
            "https://admin.example.com",
        ],
        allow_methods=["GET", "POST", "OPTIONS"],
        allow_headers=["Content-Type", "Authorization"],
        allow_credentials=True,
    ),
)

Configure Security Headers

from zenml.config.deployment_settings import (
    DeploymentSettings,
    SecureHeadersConfig,
)

deployment_settings = DeploymentSettings(
    secure_headers=SecureHeadersConfig(
        hsts="max-age=31536000; includeSubDomains",
        xfo="DENY",
        csp=(
            "default-src 'self'; "
            "script-src 'self' 'unsafe-inline'; "
            "style-src 'self' 'unsafe-inline'; "
            "img-src 'self' data: https:;"
        ),
        referrer="strict-origin-when-cross-origin",
        cache="no-store",
    ),
)

2. Custom Endpoints

Framework-Agnostic: Simple Endpoint Function

from pydantic import BaseModel
from zenml.config.deployment_settings import (
    DeploymentSettings,
    EndpointSpec,
    EndpointMethod,
)

async def health_detailed() -> Dict[str, Any]:
    """Detailed health check with system metrics."""
    import psutil
    
    return {
        "status": "healthy",
        "cpu_percent": psutil.cpu_percent(),
        "memory_percent": psutil.virtual_memory().percent,
        "disk_percent": psutil.disk_usage('/').percent,
    }

deployment_settings = DeploymentSettings(
    custom_endpoints=[
        EndpointSpec(
            path="/health/detailed",
            method=EndpointMethod.GET,
            handler=health_detailed,
            auth_required=False,
        ),
    ],
)

Framework-Agnostic: Endpoint Builder Function

from typing import Callable
from zenml.deployers.server import BaseDeploymentAppRunner
from pydantic import BaseModel

class PredictionRequest(BaseModel):
    features: list[float]

class PredictionResponse(BaseModel):
    prediction: float
    confidence: float
    model_version: str

def create_custom_predict_endpoint(
    app_runner: BaseDeploymentAppRunner,
    model_path: str,
) -> Callable:
    """Builder function that creates a custom prediction endpoint.
    
    This pattern allows you to access the app_runner and inject
    configuration at build time.
    """
    # Load model during app build time (once)
    import joblib
    model = joblib.load(model_path)
    
    async def predict_endpoint(
        request: PredictionRequest,
    ) -> PredictionResponse:
        """The actual endpoint implementation."""
        prediction = model.predict([request.features])[0]
        confidence = model.predict_proba([request.features]).max()
        
        return PredictionResponse(
            prediction=float(prediction),
            confidence=float(confidence),
            model_version=app_runner.deployment.name,
        )
    
    return predict_endpoint

deployment_settings = DeploymentSettings(
    custom_endpoints=[
        EndpointSpec(
            path="/predict/custom",
            method=EndpointMethod.POST,
            handler=create_custom_predict_endpoint,
            init_kwargs={"model_path": "/models/custom_model.pkl"},
            auth_required=True,
        ),
    ],
)

FastAPI-Specific: Native FastAPI Router

from fastapi import APIRouter, HTTPException, Depends
from zenml.config.deployment_settings import (
    DeploymentSettings,
    EndpointSpec,
    EndpointMethod,
)

# Create a native FastAPI router with all FastAPI features
admin_router = APIRouter(prefix="/admin", tags=["admin"])

@admin_router.get("/stats")
async def get_stats():
    """Get deployment statistics."""
    return {"total_requests": 1000, "uptime_hours": 72}

@admin_router.post("/reload")
async def reload_model():
    """Reload the model."""
    # Implementation here
    return {"status": "reloaded"}

@admin_router.delete("/cache")
async def clear_cache():
    """Clear the cache."""
    # Implementation here
    return {"status": "cleared"}

# Register as native endpoint
deployment_settings = DeploymentSettings(
    custom_endpoints=[
        EndpointSpec(
            path="",  # Router has its own prefix
            method=EndpointMethod.GET,
            handler=admin_router,
            native=True,  # Treat as native FastAPI object
            auth_required=True,
        ),
    ],
)

3. Custom Middleware

Framework-Agnostic: Simple Middleware Function

from zenml.config.deployment_settings import (
    DeploymentSettings,
    MiddlewareSpec,
)
import time

class RequestTimingMiddleware:
    """ASGI middleware to measure request processing time.
    
    Uses the standard ASGI interface (scope, receive, send) which works
    across all ASGI frameworks: FastAPI, Django, Starlette, Quart, etc.
    """
    
    def __init__(self, app):
        self.app = app
    
    async def __call__(self, scope, receive, send):
        """Process ASGI request with timing measurement.
        
        Args:
            scope: ASGI connection scope (contains request info).
            receive: Async callable to receive ASGI events.
            send: Async callable to send ASGI events.
        """
        if scope["type"] != "http":
            return await self.app(scope, receive, send)
        
        start_time = time.time()
        
        async def send_wrapper(message):
            """Intercept response to add timing header."""
            if message["type"] == "http.response.start":
                process_time = (time.time() - start_time) * 1000
                headers = list(message.get("headers", []))
                headers.append((
                    b"x-process-time-ms",
                    str(process_time).encode(),
                ))
                message = {**message, "headers": headers}
            
            await send(message)
        
        await self.app(scope, receive, send_wrapper)

deployment_settings = DeploymentSettings(
    custom_middlewares=[
        MiddlewareSpec(
            middleware=RequestTimingMiddleware,
            order=10,
        ),
    ],
)

FastAPI-Specific: Native FastAPI Middleware

from fastapi.middleware.gzip import GZipMiddleware
from zenml.config.deployment_settings import (
    DeploymentSettings,
    MiddlewareSpec,
)

deployment_settings = DeploymentSettings(
    custom_middlewares=[
        # Use native FastAPI GZip middleware
        MiddlewareSpec(
            middleware=GZipMiddleware,
            native=True,
            init_kwargs={"minimum_size": 1000},
            order=100,  # Run late in the chain
        ),
    ],
)

4. App Extensions

Simple Extension Function

from zenml.deployers.server import BaseDeploymentAppRunner

def add_monitoring_extension(
    app_runner: BaseDeploymentAppRunner,
    prometheus_path: str = "/prometheus",
):
    """Simple extension that adds Prometheus metrics endpoint."""
    from prometheus_client import (
        make_asgi_app,
        Counter,
        Histogram,
    )
    
    # Create Prometheus metrics
    request_counter = Counter(
        'deployment_requests_total',
        'Total requests',
    )
    
    # Mount Prometheus metrics app
    metrics_app = make_asgi_app()
    app_runner.asgi_app.mount(prometheus_path, metrics_app)
    
    print(f"✅ Prometheus metrics available at {prometheus_path}")

deployment_settings = DeploymentSettings(
    app_extensions=[
        AppExtensionSpec(
            extension=add_monitoring_extension,
            extension_kwargs={"prometheus_path": "/metrics/prometheus"},
        ),
    ],
)

Pre-requisites

Please ensure you have done the following:

  • I have read the CONTRIBUTING.md document.
  • I have added tests to cover my changes.
  • I have based my new branch on develop and the open PR is targeting develop. If your branch wasn't based on develop read Contribution guide on rebasing branch to develop.
  • IMPORTANT: I made sure that my changes are reflected properly in the following resources:
    • ZenML Docs
    • Dashboard: Needs to be communicated to the frontend team.
    • Templates: Might need adjustments (that are not reflected in the template tests) in case of non-breaking changes and deprecations.
    • Projects: Depending on the version dependencies, different projects might get affected.

Types of changes

  • Bug fix (non-breaking change which fixes an issue)
  • New feature (non-breaking change which adds functionality)
  • Breaking change (fix or feature that would cause existing functionality to change)
  • Other (add details above)

@github-actions github-actions bot added internal To filter out internal PRs and issues enhancement New feature or request labels Oct 15, 2025
@stefannica stefannica force-pushed the feature/customizable-deployment-servers branch from dd63272 to 55719bb Compare October 15, 2025 13:26
@stefannica stefannica marked this pull request as ready for review October 15, 2025 21:23
@stefannica stefannica force-pushed the feature/customizable-deployment-servers branch from 45049ac to 65c4de8 Compare October 15, 2025 21:46
self._source = resolve(self._object)
return self._source

def to_source_string(self) -> str:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Any reason you're serializing this to a string? The Source object actually contains more information and is used in all other places where we serialize sources

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think I understand this question. I need to serialize this to a JSON-able data type (dict, string, list, etc.). This is only used in serialization, so I can't return a Source object here instead. It needs to be something that json.dump(...) can work with.

Copy link

socket-security bot commented Oct 17, 2025

Review the following changes in direct dependencies. Learn more about Socket for GitHub.

Diff Package Supply Chain
Security
Vulnerability Quality Maintenance License
Updatedsecure@​0.3.0 ⏵ 1.0.1100 +1100100100100

View full report

Copy link
Contributor

github-actions bot commented Oct 17, 2025

ZenML CLI Performance Comparison (Threshold: 1.0s, Timeout: 60s, Slow: 5s)

❌ Failed Commands on Current Branch (feature/customizable-deployment-servers)

  • zenml stack list: Command failed on run 1 (exit code: 1)
  • zenml pipeline list: Command failed on run 1 (exit code: 1)
  • zenml model list: Command failed on run 1 (exit code: 1)

🚨 New Failures Introduced

The following commands fail on your branch but worked on the target branch:

  • zenml stack list
  • zenml pipeline list
  • zenml model list

Performance Comparison

Command develop Time (s) feature/customizable-deployment-servers Time (s) Difference Status
zenml --help 1.326077 ± 0.012042 1.354361 ± 0.014177 +0.028s ✓ No significant change
zenml model list Not tested Failed N/A ❌ Broken in current branch
zenml pipeline list Not tested Failed N/A ❌ Broken in current branch
zenml stack --help 1.309553 ± 0.015330 1.343594 ± 0.006245 +0.034s ✓ No significant change
zenml stack list Not tested Failed N/A ❌ Broken in current branch

Summary

  • Total commands analyzed: 5
  • Commands compared for timing: 2
  • Commands improved: 0 (0.0% of compared)
  • Commands degraded: 0 (0.0% of compared)
  • Commands unchanged: 2 (100.0% of compared)
  • Failed commands: 3 (NEW FAILURES INTRODUCED)
  • Timed out commands: 0
  • Slow commands: 0

Environment Info

  • Target branch: Linux 6.14.0-1012-azure
  • Current branch: Linux 6.14.0-1012-azure
  • Test timestamp: 2025-10-17T21:27:23Z
  • Timeout: 60 seconds
  • Slow threshold: 5 seconds

Copy link
Contributor

github-actions bot commented Oct 17, 2025

Documentation Link Check Results

Absolute links check failed
There are broken absolute links in the documentation. See workflow logs for details
Relative links check passed
Last checked: 2025-10-17 21:26:58 UTC

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

enhancement New feature or request internal To filter out internal PRs and issues

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants