Skip to content
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,7 @@ otel = [
"opentelemetry-exporter-otlp>=1.27.0,<2.0.0",
"opentelemetry-instrumentation>=0.48b0,<1.0.0",
"opentelemetry-instrumentation-logging>=0.48b0,<1.0.0",
"opentelemetry-instrumentation-system-metrics>=0.48b0,<1.0.0",
"opentelemetry-test-utils>=0.48b0,<1.0.0",
]

Expand Down Expand Up @@ -178,6 +179,7 @@ dev = [
"opentelemetry-exporter-otlp>=1.27.0,<2.0.0",
"opentelemetry-instrumentation>=0.48b0,<1.0.0",
"opentelemetry-instrumentation-logging>=0.48b0,<1.0.0",
"opentelemetry-instrumentation-system-metrics>=0.48b0,<1.0.0",
"opentelemetry-test-utils>=0.48b0,<1.0.0",
]

Expand Down
12 changes: 8 additions & 4 deletions src/prefect/engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,7 @@ def my_flow():
load_flow_run,
run_flow,
)
from prefect.telemetry.metrics import RunMetrics

flow_run: "FlowRun" = load_flow_run(flow_run_id=flow_run_id)
run_logger: "LoggingAdapter" = flow_run_logger(flow_run=flow_run)
Expand All @@ -118,10 +119,13 @@ def my_flow():
raise

# run the flow
if flow.isasync:
run_coro_as_sync(run_flow(flow, flow_run=flow_run, error_logger=run_logger))
else:
run_flow(flow, flow_run=flow_run, error_logger=run_logger)
with RunMetrics(flow_run, flow):
if flow.isasync:
run_coro_as_sync(
run_flow(flow, flow_run=flow_run, error_logger=run_logger)
)
else:
run_flow(flow, flow_run=flow_run, error_logger=run_logger)
Comment on lines +122 to +128
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

😮‍💨



__getattr__: Callable[[str], Any] = getattr_migration(__name__)
6 changes: 6 additions & 0 deletions src/prefect/settings/models/root.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
from .results import ResultsSettings
from .runner import RunnerSettings
from .server import ServerSettings
from .telemetry import TelemetrySettings

if TYPE_CHECKING:
from prefect.settings.legacy import Setting
Expand Down Expand Up @@ -137,6 +138,11 @@ class Settings(PrefectBaseSettings):
description="Settings for controlling task behavior",
)

telemetry: TelemetrySettings = Field(
default_factory=TelemetrySettings,
description="Settings for configuring telemetry collection",
)

testing: TestingSettings = Field(
default_factory=TestingSettings,
description="Settings used during testing",
Expand Down
24 changes: 24 additions & 0 deletions src/prefect/settings/models/telemetry.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
from typing import ClassVar

from pydantic import Field
from pydantic_settings import SettingsConfigDict

from prefect.settings.base import PrefectBaseSettings, build_settings_config


class TelemetrySettings(PrefectBaseSettings):
"""
Settings for configuring Prefect telemetry
"""

model_config: ClassVar[SettingsConfigDict] = build_settings_config(("telemetry",))

enable_resource_metrics: bool = Field(
default=True,
description="Whether to enable OS-level resource metric collection in flow run subprocesses.",
)

resource_metrics_interval_seconds: int = Field(
default=10,
description="Interval in seconds between resource metric collections.",
)
125 changes: 125 additions & 0 deletions src/prefect/telemetry/metrics.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,125 @@
from __future__ import annotations

import logging
import os
from contextlib import contextmanager
from typing import TYPE_CHECKING, Generator, Optional

from prefect.logging.loggers import get_logger

if TYPE_CHECKING:
from prefect.client.schemas.objects import FlowRun
from prefect.flows import Flow

logger: logging.Logger = get_logger("prefect.telemetry.metrics")


def _resolve_metrics_endpoint() -> Optional[str]:
"""Resolve the OTLP metrics endpoint.

Priority:
1. OTEL_EXPORTER_OTLP_METRICS_ENDPOINT env var (user override)
2. Auto-derived from Cloud API URL: {api_url}/telemetry/v1/metrics
3. None if neither available
"""
explicit = os.environ.get("OTEL_EXPORTER_OTLP_METRICS_ENDPOINT")
if explicit:
return explicit

import prefect.settings

settings = prefect.settings.get_current_settings()
api_url = settings.api.url
if api_url and settings.connected_to_cloud:
return f"{api_url}/telemetry/v1/metrics"

return None


@contextmanager
def RunMetrics(
flow_run: FlowRun,
flow: Flow,
) -> Generator[None, None, None]:
"""Context manager that collects OS-level resource metrics during flow run execution.

Starts an OpenTelemetry MeterProvider with SystemMetricsInstrumentor, filtered to
process CPU and memory metrics. Exports via OTLP HTTP.

Becomes a no-op if:
- Resource metrics are disabled in settings
- No OTLP endpoint is available
- The opentelemetry-instrumentation-system-metrics package is not installed
"""
import prefect.settings

settings = prefect.settings.get_current_settings()

if not settings.telemetry.enable_resource_metrics:
yield
return

endpoint = _resolve_metrics_endpoint()
if not endpoint:
yield
return

try:
from opentelemetry.exporter.otlp.proto.http.metric_exporter import (
OTLPMetricExporter,
)
from opentelemetry.instrumentation.system_metrics import (
SystemMetricsInstrumentor,
)
from opentelemetry.sdk.metrics import MeterProvider
from opentelemetry.sdk.metrics.export import PeriodicExportingMetricReader
from opentelemetry.sdk.resources import Resource
except ImportError:
logger.debug(
"opentelemetry instrumentation packages not available, "
"skipping resource metric collection"
)
yield
return

resource_attributes: dict[str, str] = {
"prefect.flow-run.id": str(flow_run.id),
"prefect.flow.name": flow.name,
}
if flow_run.deployment_id:
resource_attributes["prefect.deployment.id"] = str(flow_run.deployment_id)
if flow_run.work_pool_name:
resource_attributes["prefect.work-pool.name"] = flow_run.work_pool_name

resource = Resource.create(resource_attributes)

exporter = OTLPMetricExporter(
endpoint=endpoint,
)
reader = PeriodicExportingMetricReader(
exporter,
export_interval_millis=settings.telemetry.resource_metrics_interval_seconds
* 1000,
)
meter_provider = MeterProvider(resource=resource, metric_readers=[reader])

instrumentor = SystemMetricsInstrumentor(
config={
"process.cpu.utilization": None,
"process.memory.usage": None,
"process.memory.virtual": None,
},
)
instrumentor.instrument(meter_provider=meter_provider)

try:
yield
finally:
try:
instrumentor.uninstrument()
except Exception:
logger.debug("Error uninstrumenting system metrics", exc_info=True)
try:
meter_provider.shutdown()
except Exception:
logger.debug("Error shutting down meter provider", exc_info=True)
171 changes: 171 additions & 0 deletions tests/telemetry/test_metrics.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,171 @@
from __future__ import annotations

import builtins
from unittest.mock import MagicMock, patch
from uuid import uuid4

import pytest

from prefect.settings.models.telemetry import TelemetrySettings
from prefect.telemetry.metrics import RunMetrics, _resolve_metrics_endpoint


class TestTelemetrySettings:
def test_defaults(self):
settings = TelemetrySettings()
assert settings.enable_resource_metrics is True
assert settings.resource_metrics_interval_seconds == 10

def test_env_var_override(self, monkeypatch):
monkeypatch.setenv("PREFECT_TELEMETRY_ENABLE_RESOURCE_METRICS", "false")
monkeypatch.setenv("PREFECT_TELEMETRY_RESOURCE_METRICS_INTERVAL_SECONDS", "30")
settings = TelemetrySettings()
assert settings.enable_resource_metrics is False
assert settings.resource_metrics_interval_seconds == 30


class TestResolveMetricsEndpoint:
def test_explicit_env_var_takes_priority(self, monkeypatch):
monkeypatch.setenv(
"OTEL_EXPORTER_OTLP_METRICS_ENDPOINT", "http://custom:4318/v1/metrics"
)
assert _resolve_metrics_endpoint() == "http://custom:4318/v1/metrics"

def test_derives_from_cloud_api_url(self, monkeypatch):
monkeypatch.delenv("OTEL_EXPORTER_OTLP_METRICS_ENDPOINT", raising=False)
mock_settings = MagicMock()
mock_settings.api.url = (
"https://api.prefect.cloud/api/accounts/abc/workspaces/def"
)
mock_settings.connected_to_cloud = True
with patch("prefect.settings.get_current_settings", return_value=mock_settings):
endpoint = _resolve_metrics_endpoint()
assert (
endpoint
== "https://api.prefect.cloud/api/accounts/abc/workspaces/def/telemetry/v1/metrics"
)

def test_returns_none_when_not_connected_to_cloud(self, monkeypatch):
monkeypatch.delenv("OTEL_EXPORTER_OTLP_METRICS_ENDPOINT", raising=False)
mock_settings = MagicMock()
mock_settings.api.url = "http://localhost:4200/api"
mock_settings.connected_to_cloud = False
with patch("prefect.settings.get_current_settings", return_value=mock_settings):
assert _resolve_metrics_endpoint() is None

def test_returns_none_when_no_api_url(self, monkeypatch):
monkeypatch.delenv("OTEL_EXPORTER_OTLP_METRICS_ENDPOINT", raising=False)
mock_settings = MagicMock()
mock_settings.api.url = None
mock_settings.connected_to_cloud = False
with patch("prefect.settings.get_current_settings", return_value=mock_settings):
assert _resolve_metrics_endpoint() is None


class TestRunMetrics:
@pytest.fixture
def flow_run(self) -> MagicMock:
run = MagicMock()
run.id = uuid4()
run.deployment_id = uuid4()
run.work_pool_name = "my-pool"
return run

@pytest.fixture
def flow(self) -> MagicMock:
f = MagicMock()
f.name = "my-flow"
return f

def test_noop_when_disabled(self, flow_run: MagicMock, flow: MagicMock):
mock_settings = MagicMock()
mock_settings.telemetry.enable_resource_metrics = False
with patch("prefect.settings.get_current_settings", return_value=mock_settings):
with RunMetrics(flow_run, flow):
pass

def test_noop_when_no_endpoint(self, flow_run: MagicMock, flow: MagicMock):
mock_settings = MagicMock()
mock_settings.telemetry.enable_resource_metrics = True
with (
patch("prefect.settings.get_current_settings", return_value=mock_settings),
patch(
"prefect.telemetry.metrics._resolve_metrics_endpoint",
return_value=None,
),
):
with RunMetrics(flow_run, flow):
pass

def test_noop_when_import_fails(self, flow_run: MagicMock, flow: MagicMock):
mock_settings = MagicMock()
mock_settings.telemetry.enable_resource_metrics = True
original_import = builtins.__import__

def mock_import(name: str, *args: object, **kwargs: object) -> object:
if "system_metrics" in name or "otlp" in name:
raise ImportError("not installed")
return original_import(name, *args, **kwargs)

with (
patch("prefect.settings.get_current_settings", return_value=mock_settings),
patch(
"prefect.telemetry.metrics._resolve_metrics_endpoint",
return_value="http://localhost:4318/v1/metrics",
),
patch.object(builtins, "__import__", side_effect=mock_import),
):
with RunMetrics(flow_run, flow):
pass

def test_instruments_and_shuts_down(self, flow_run: MagicMock, flow: MagicMock):
mock_settings = MagicMock()
mock_settings.telemetry.enable_resource_metrics = True
mock_settings.telemetry.resource_metrics_interval_seconds = 10

mock_instrumentor = MagicMock()
mock_meter_provider = MagicMock()
mock_resource = MagicMock()

with (
patch("prefect.settings.get_current_settings", return_value=mock_settings),
patch(
"prefect.telemetry.metrics._resolve_metrics_endpoint",
return_value="http://localhost:4318/v1/metrics",
),
patch(
"opentelemetry.instrumentation.system_metrics.SystemMetricsInstrumentor",
return_value=mock_instrumentor,
),
patch(
"opentelemetry.sdk.metrics.MeterProvider",
return_value=mock_meter_provider,
),
patch(
"opentelemetry.exporter.otlp.proto.http.metric_exporter.OTLPMetricExporter"
),
patch("opentelemetry.sdk.metrics.export.PeriodicExportingMetricReader"),
patch(
"opentelemetry.sdk.resources.Resource.create",
return_value=mock_resource,
),
):
with RunMetrics(flow_run, flow):
mock_instrumentor.instrument.assert_called_once_with(
meter_provider=mock_meter_provider
)

mock_instrumentor.uninstrument.assert_called_once()
mock_meter_provider.shutdown.assert_called_once()


class TestEngineIntegration:
def test_engine_imports_run_metrics(self):
"""Verify engine.py references RunMetrics."""
import inspect

import prefect.engine

source = inspect.getsource(prefect.engine)
assert "RunMetrics" in source
assert "from prefect.telemetry.metrics import RunMetrics" in source
2 changes: 2 additions & 0 deletions tests/test_settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -544,6 +544,8 @@
"test_value": timedelta(seconds=10),
"legacy": True,
},
"PREFECT_TELEMETRY_ENABLE_RESOURCE_METRICS": {"test_value": False},
"PREFECT_TELEMETRY_RESOURCE_METRICS_INTERVAL_SECONDS": {"test_value": 30},
"PREFECT_TESTING_TEST_MODE": {"test_value": True},
"PREFECT_TESTING_TEST_SETTING": {"test_value": "bar"},
"PREFECT_TESTING_UNIT_TEST_LOOP_DEBUG": {"test_value": True},
Expand Down
Loading
Loading