Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
62 changes: 62 additions & 0 deletions docs/beta_todos.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
# Beta Hardening TODOs

This is a living checklist for post‑beta hardening. All beta blockers are already implemented; the items below are for production readiness and scale.

## Serving Runtime & Publishing

- Multi‑worker scaling per process
- Option A (threads): Add a small ThreadPoolExecutor consuming the existing bounded queue; preserve backpressure and flush semantics.
- Option B (async): Introduce an asyncio loop + asyncio.Queue + async workers, once client/publish calls have async variants and we opt into async serving.
- Keep bounded queue, inline fallback on Full, and orderly shutdown (join/cancel with timeout).

- Backpressure & batching
- Tune queue maxsize defaults; expose env knob `ZENML_RT_QUEUE_MAXSIZE`.
- Optional: micro‑batch compatible events for fewer round‑trips.

- Circuit breaker refinements
- Distinguish network vs. logical errors for better decisions.
- Add optional cool‑down logs with guidance.

## Artifact Write Semantics

- Server‑side atomicity / compensation
- Align with server to provide atomic batch create or server‑side compensation.
- Client: switch from best‑effort retries to idempotent, category‑aware retries once server semantics are defined.
- Document consistency guarantees and failure behavior.

## Request Parameter Schema & Safety

- Parameter schema from entrypoint annotations
- Generate/derive expected types from pipeline entrypoint annotations (or compiled schema) rather than inferring from defaults.
- Add total payload size cap; add per‑type caps (e.g., list length, dict depth).
- Optional: strict mode that rejects unknown params rather than dropping.

## Monitoring, Metrics, Health

- Metrics enrichment
- Export runtime metrics to Prometheus (queue depth, cache hit rate, error rate, op latency histograms).
- Add per‑worker metrics if multi‑worker is enabled.

- Health/liveness
- Expose background worker liveness/health via the service.
- Add simple self‑check endpoints and document alerts.

## Memory & Resource Management

- Process memory monitoring / limits
- Add process memory watchdog and log warnings; document recommended container limits.
- Add a user‑facing docs note about caching large artifacts and tuning `max_entries` accordingly.

## Operational Docs & UX

- Serving docs
- Add a prominent warning about memory usage for large cached artifacts and sizing `ZENML_RT_CACHE_MAX_ENTRIES`.
- Add examples for scaling processes/replicas and interpreting metrics.

## Notes (Implemented in Beta)

- Request param allowlist / type coercion / size caps
- Memory‑only isolation (instance‑scoped) and cleanup
- Bounded queue with inline fallback; race‑free cache sweep
- Graceful shutdown with timeout and final metrics
- Defensive artifact write behavior with minimal retries and response validation
222 changes: 44 additions & 178 deletions docs/book/how-to/serving/serving.md
Original file line number Diff line number Diff line change
@@ -1,42 +1,25 @@
---
title: Serving Pipelines
description: Millisecond-class pipeline execution over HTTP with intelligent run-only optimization and streaming.
description: Run pipelines as fast HTTP services with async serving by default and optional memory-only execution.
---

# Serving Pipelines

ZenML Serving runs pipelines as ultra-fast FastAPI services, achieving millisecond-class latency through intelligent run-only execution. Perfect for real-time inference, AI agents, and interactive workflows.
ZenML Serving exposes a pipeline as a FastAPI service. In serving, execution uses a Realtime runtime with async server updates by default for low latency. You can optionally run memory-only for maximum speed.

## Why Serving vs. Orchestrators

- **Performance**: Millisecond-class latency with run-only execution (no DB/FS writes in fast mode)
- **Simplicity**: Call your pipeline via HTTP; get results or stream progress
- **Intelligence**: Automatically switches between tracking and run-only modes based on capture settings
- **Flexibility**: Optional run/step tracking with fine-grained capture policies
- Performance: Async serving with in-process caching for low latency.
- Simplicity: Invoke your pipeline over HTTP; get results or stream progress.
- Control: Single, typed `Capture` config to tune observability or enable memory-only.

Use orchestrators for scheduled, long-running, reproducible workflows; use Serving for real-time request/response.

## How It Works

**Run-Only Architecture** (for millisecond latency):
- **ServingOverrides**: Per-request parameter injection using ContextVar isolation
- **ServingBuffer**: In-memory step output handoff with no persistence
- **Effective Config**: Runtime configuration merging without model mutations
- **Skip I/O**: Bypasses all database writes and filesystem operations
- **Input Injection**: Upstream step outputs automatically injected as parameters

**Full Tracking Mode** (when capture enabled):
- Traditional ZenML tracking with runs, steps, artifacts, and metadata
- Orchestrator-based execution with full observability

The service automatically chooses the optimal execution mode based on your capture settings.
Use orchestrators for scheduled, reproducible workflows. Use Serving for request/response inference.

## Quickstart

Prerequisites

- A deployed pipeline; note its deployment UUID as `ZENML_PIPELINE_DEPLOYMENT_ID`.
- Python env with dev deps (as per CONTRIBUTING).

Start the service

Expand All @@ -47,73 +30,61 @@ export ZENML_SERVICE_PORT=8001
python -m zenml.deployers.serving.app
```

Synchronous invocation
Invoke (sync)

```bash
curl -s -X POST "http://localhost:8001/invoke" \
-H "Content-Type: application/json" \
-d '{"parameters": {"your_param": "value"}}'
```

## Performance Modes
## Capture (typed-only)

ZenML Serving automatically chooses the optimal execution mode:

### Run-Only Mode (Millisecond Latency)

Activated when `capture="none"` or no capture settings specified:
Configure capture at the pipeline decorator using a single, typed `Capture`:

```python
@pipeline(settings={"capture": "none"})
def fast_pipeline(x: int) -> int:
return x * 2
```
from zenml import pipeline
from zenml.capture.config import Capture

**Optimizations**:
- ✅ Zero database writes
- ✅ Zero filesystem operations
- ✅ In-memory step output handoff
- ✅ Per-request parameter injection
- ✅ Effective configuration merging
- ✅ Multi-worker safe (ContextVar isolation)
@pipeline(capture=Capture()) # serving async by default
def serve_pipeline(...):
...

**Use for**: Real-time inference, AI agents, interactive demos

### Full Tracking Mode
@pipeline(capture=Capture(memory_only=True)) # serving only
def max_speed_pipeline(...):
...
```

Activated when capture settings specify tracking:
Options (observability only; do not affect dataflow):
- `code`: include code/source/docstrings in metadata (default True)
- `logs`: persist step logs (default True)
- `metadata`: publish run/step metadata (default True)
- `visualizations`: persist visualizations (default True)
- `metrics`: emit runtime metrics (default True)

```python
@pipeline(settings={"capture": "full"})
def tracked_pipeline(x: int) -> int:
return x * 2
```
Notes
- Serving is async by default; there is no `flush_on_step_end` knob.
- `memory_only=True` is ignored outside serving with a warning.

**Features**:
- Complete run/step tracking
- Artifact persistence
- Metadata collection
- Dashboard integration
## Request Parameters

**Use for**: Experimentation, debugging, audit trails
Request JSON under `parameters` is merged into the effective step config in serving. Logged keys indicate which parameters were applied.

## Execution Modes

- **Sync**: `POST /invoke` waits for completion; returns results or error.
- **Async**: `POST /invoke?mode=async` returns a `job_id`; poll `GET /jobs/{job_id}`.
- **Streaming**: `GET /stream/{job_id}` (SSE) or `WebSocket /stream` to receive progress and completion events in real time.
- Sync: `POST /invoke` waits for completion; returns results or error.
- Async: `POST /invoke?mode=async` returns a `job_id`; poll `GET /jobs/{job_id}`.
- Streaming: `GET /stream/{job_id}` (SSE) or `WebSocket /stream` to stream progress.

Async example

```bash
# Submit
JOB_ID=$(curl -s -X POST "http://localhost:8001/invoke?mode=async" -H "Content-Type: application/json" -d '{"parameters":{}}' | jq -r .job_id)

# Poll
JOB_ID=$(curl -s -X POST "http://localhost:8001/invoke?mode=async" \
-H "Content-Type: application/json" -d '{"parameters":{}}' | jq -r .job_id)
curl -s "http://localhost:8001/jobs/$JOB_ID"
```

SSE example
SSE

```bash
curl -N -H "Accept: text/event-stream" "http://localhost:8001/stream/$JOB_ID"
Expand All @@ -123,124 +94,19 @@ curl -N -H "Accept: text/event-stream" "http://localhost:8001/stream/$JOB_ID"

- `/health`: Service health and uptime.
- `/info`: Pipeline name, steps, parameter schema, deployment info.
- `/metrics`: Execution statistics (counts, averages).
- `/metrics`: Execution statistics (queue depth, cache hit rate, latencies when metrics enabled).
- `/status`: Service configuration snapshot.
- `/invoke`: Execute (sync/async) with optional parameter overrides.
- `/invoke`: Execute (sync/async) with optional parameters.
- `/jobs`, `/jobs/{id}`, `/jobs/{id}/cancel`: Manage async jobs.
- `/stream/{id}`: Server‑Sent Events stream for a job; `WebSocket /stream` for bidirectional.

## Configuration

Key environment variables

- `ZENML_PIPELINE_DEPLOYMENT_ID`: Deployment UUID (required).
- `ZENML_SERVING_CAPTURE_DEFAULT`: Default capture mode (`none` for run-only, `full` for tracking).
- `ZENML_SERVICE_HOST` (default: `0.0.0.0`), `ZENML_SERVICE_PORT` (default: `8001`).
- `ZENML_LOG_LEVEL`: Logging verbosity.

## Capture Policies

Control what gets tracked per invocation:

- **`none`**: Run-only mode, millisecond latency, no persistence
- **`metadata`**: Track runs/steps, no payload data
- **`full`**: Complete tracking with artifacts and metadata
- **`sampled`**: Probabilistic tracking for cost control
- **`errors_only`**: Track only failed executions

Configuration locations:
- **Pipeline-level**: `@pipeline(settings={"capture": "none"})`
- **Request-level**: `{"capture_override": {"mode": "full"}}`
- **Environment**: `ZENML_SERVING_CAPTURE_DEFAULT=none`

Precedence: Request > Pipeline > Environment > Default

## Advanced Features

### Input/Output Contracts

Pipelines automatically expose their signature:

```python
@pipeline
def my_pipeline(city: str, temperature: float) -> str:
return process_weather(city, temperature)

# Automatic parameter schema:
# {"city": {"type": "str", "required": true},
# "temperature": {"type": "float", "required": true}}
```

### Multi-Step Pipelines

Step outputs automatically injected as inputs:

```python
@step
def fetch_data(city: str) -> dict:
return {"weather": "sunny", "temp": 25}

@step
def analyze_data(weather_data: dict) -> str:
return f"Analysis: {weather_data}"

@pipeline
def weather_pipeline(city: str) -> str:
data = fetch_data(city)
return analyze_data(data) # weather_data auto-injected
```

### Response Building

Only declared pipeline outputs returned:

```python
@pipeline
def multi_output_pipeline(x: int) -> tuple[int, str]:
return x * 2, f"Result: {x}"

# Response: {"outputs": {"output_0": 4, "output_1": "Result: 2"}}
```

## Testing & Local Dev

Exercise endpoints locally:

```bash
# Health check
curl http://localhost:8001/health

# Pipeline info
curl http://localhost:8001/info

# Execute with parameters
curl -X POST http://localhost:8001/invoke \
-H "Content-Type: application/json" \
-d '{"parameters": {"city": "Paris"}}'

# Override capture mode
curl -X POST http://localhost:8001/invoke \
-H "Content-Type: application/json" \
-d '{"parameters": {"city": "Tokyo"}, "capture_override": {"mode": "full"}}'
```
- `/stream/{id}`: Server‑Sent Events stream; `WebSocket /stream` for bidirectional.

## Troubleshooting

- **Missing deployment ID**: set `ZENML_PIPELINE_DEPLOYMENT_ID`.
- **Slow performance**: ensure `capture="none"` for run-only mode.
- **Import errors**: run-only mode bypasses some ZenML integrations that aren't needed for serving.
- **Memory leaks**: serving contexts are automatically cleared per request.
- **Multi-worker issues**: ContextVar isolation ensures thread safety.

## Architecture Comparison
- Missing deployment ID: set `ZENML_PIPELINE_DEPLOYMENT_ID`.
- Slow responses: ensure you are in serving (async by default) or consider `Capture(memory_only=True)` for prototypes.
- Multi-worker/safety: Serving isolates request state; taps are cleared per request.

| Feature | Run-Only Mode | Full Tracking |
|---------|---------------|---------------|
| **Latency** | Milliseconds | Seconds |
| **DB Writes** | None | Full tracking |
| **FS Writes** | None | Artifacts |
| **Memory** | Minimal | Standard |
| **Debugging** | Limited | Complete |
| **Production** | ✅ Optimal | For experimentation |
## See Also

Choose run-only for production serving, full tracking for development and debugging.
- Capture & Runtimes (advanced): serving defaults, toggles, memory-only behavior.
- Realtime Tuning: cache TTL/size, error reporting, and circuit breaker knobs.
Loading