-
Notifications
You must be signed in to change notification settings - Fork 2.2k
Expand file tree
/
Copy pathmetrics.py
More file actions
125 lines (102 loc) · 3.72 KB
/
metrics.py
File metadata and controls
125 lines (102 loc) · 3.72 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
from __future__ import annotations
import logging
import os
from contextlib import contextmanager
from typing import TYPE_CHECKING, Generator, Optional
from prefect.logging.loggers import get_logger
if TYPE_CHECKING:
from prefect.client.schemas.objects import FlowRun
from prefect.flows import Flow
logger: logging.Logger = get_logger("prefect.telemetry.metrics")
def _resolve_metrics_endpoint() -> Optional[str]:
"""Resolve the OTLP metrics endpoint.
Priority:
1. OTEL_EXPORTER_OTLP_METRICS_ENDPOINT env var (user override)
2. Auto-derived from Cloud API URL: {api_url}/telemetry/v1/metrics
3. None if neither available
"""
explicit = os.environ.get("OTEL_EXPORTER_OTLP_METRICS_ENDPOINT")
if explicit:
return explicit
import prefect.settings
settings = prefect.settings.get_current_settings()
api_url = settings.api.url
if api_url and settings.connected_to_cloud:
return f"{api_url}/telemetry/v1/metrics"
return None
@contextmanager
def RunMetrics(
flow_run: FlowRun,
flow: Flow,
) -> Generator[None, None, None]:
"""Context manager that collects OS-level resource metrics during flow run execution.
Starts an OpenTelemetry MeterProvider with SystemMetricsInstrumentor, filtered to
process CPU and memory metrics. Exports via OTLP HTTP.
Becomes a no-op if:
- Resource metrics are disabled in settings
- No OTLP endpoint is available
- The opentelemetry-instrumentation-system-metrics package is not installed
"""
import prefect.settings
settings = prefect.settings.get_current_settings()
if not settings.telemetry.enable_resource_metrics:
yield
return
endpoint = _resolve_metrics_endpoint()
if not endpoint:
yield
return
try:
from opentelemetry.exporter.otlp.proto.http.metric_exporter import (
OTLPMetricExporter,
)
from opentelemetry.instrumentation.system_metrics import (
SystemMetricsInstrumentor,
)
from opentelemetry.sdk.metrics import MeterProvider
from opentelemetry.sdk.metrics.export import PeriodicExportingMetricReader
from opentelemetry.sdk.resources import Resource
except ImportError:
logger.debug(
"opentelemetry instrumentation packages not available, "
"skipping resource metric collection"
)
yield
return
resource_attributes: dict[str, str] = {
"prefect.flow-run.id": str(flow_run.id),
"prefect.flow.name": flow.name,
}
if flow_run.deployment_id:
resource_attributes["prefect.deployment.id"] = str(flow_run.deployment_id)
if flow_run.work_pool_name:
resource_attributes["prefect.work-pool.name"] = flow_run.work_pool_name
resource = Resource.create(resource_attributes)
exporter = OTLPMetricExporter(
endpoint=endpoint,
)
reader = PeriodicExportingMetricReader(
exporter,
export_interval_millis=settings.telemetry.resource_metrics_interval_seconds
* 1000,
)
meter_provider = MeterProvider(resource=resource, metric_readers=[reader])
instrumentor = SystemMetricsInstrumentor(
config={
"process.cpu.utilization": None,
"process.memory.usage": None,
"process.memory.virtual": None,
},
)
instrumentor.instrument(meter_provider=meter_provider)
try:
yield
finally:
try:
instrumentor.uninstrument()
except Exception:
logger.debug("Error uninstrumenting system metrics", exc_info=True)
try:
meter_provider.shutdown()
except Exception:
logger.debug("Error shutting down meter provider", exc_info=True)