Skip to content
57 changes: 56 additions & 1 deletion examples/weather_agent/pipelines/weather_agent.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
"""Weather Agent Pipeline."""

import os
import time

from pipelines.hooks import (
InitConfig,
Expand All @@ -10,9 +11,10 @@
from steps import analyze_weather_with_llm, get_weather

from zenml import pipeline
from zenml.config import DockerSettings
from zenml.config import DeploymentSettings, DockerSettings

# Import enums for type-safe capture mode configuration
from zenml.config.deployment_settings import MiddlewareSpec
from zenml.config.docker_settings import PythonPackageInstaller
from zenml.config.resource_settings import ResourceSettings

Expand All @@ -22,6 +24,58 @@
python_package_installer=PythonPackageInstaller.UV,
)


class RequestTimingMiddleware:
"""ASGI middleware to measure request processing time.

Uses the standard ASGI interface (scope, receive, send) which works
across all ASGI frameworks: FastAPI, Django, Starlette, Quart, etc.
"""

def __init__(self, app):
self.app = app

async def __call__(self, scope, receive, send):
"""Process ASGI request with timing measurement.

Args:
scope: ASGI connection scope (contains request info).
receive: Async callable to receive ASGI events.
send: Async callable to send ASGI events.
"""
if scope["type"] != "http":
return await self.app(scope, receive, send)

start_time = time.time()

async def send_wrapper(message):
"""Intercept response to add timing header."""
if message["type"] == "http.response.start":
process_time = (time.time() - start_time) * 1000
headers = list(message.get("headers", []))
headers.append(
(
b"x-process-time-ms",
str(process_time).encode(),
)
)
message = {**message, "headers": headers}

await send(message)

await self.app(scope, receive, send_wrapper)


deployment_settings = DeploymentSettings(
custom_middlewares=[
MiddlewareSpec(
middleware=RequestTimingMiddleware,
order=10,
native=True,
),
],
)

environment = {}
if os.getenv("OPENAI_API_KEY"):
environment["OPENAI_API_KEY"] = os.getenv("OPENAI_API_KEY")
Expand All @@ -34,6 +88,7 @@
on_cleanup=cleanup_hook,
settings={
"docker": docker_settings,
"deployment": deployment_settings,
"deployer.gcp": {
"allow_unauthenticated": True,
# "location": "us-central1",
Expand Down
2 changes: 2 additions & 0 deletions src/zenml/config/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
# or implied. See the License for the specific language governing
# permissions and limitations under the License.
"""Config classes."""
from zenml.config.deployment_settings import DeploymentSettings
from zenml.config.docker_settings import (
DockerSettings,
PythonPackageInstaller,
Expand All @@ -24,6 +25,7 @@
from zenml.config.cache_policy import CachePolicy

__all__ = [
"DeploymentSettings",
"DockerSettings",
"PythonPackageInstaller",
"PythonEnvironmentExportMethod",
Expand Down
1 change: 1 addition & 0 deletions src/zenml/config/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,3 +15,4 @@

DOCKER_SETTINGS_KEY = "docker"
RESOURCE_SETTINGS_KEY = "resources"
DEPLOYMENT_SETTINGS_KEY = "deployment"
Loading
Loading