-
Notifications
You must be signed in to change notification settings - Fork 2.2k
Expand file tree
/
Copy pathtest_metrics.py
More file actions
171 lines (144 loc) · 6.57 KB
/
test_metrics.py
File metadata and controls
171 lines (144 loc) · 6.57 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
from __future__ import annotations
import builtins
from unittest.mock import MagicMock, patch
from uuid import uuid4
import pytest
from prefect.settings.models.telemetry import TelemetrySettings
from prefect.telemetry.metrics import RunMetrics, _resolve_metrics_endpoint
class TestTelemetrySettings:
def test_defaults(self):
settings = TelemetrySettings()
assert settings.enable_resource_metrics is True
assert settings.resource_metrics_interval_seconds == 10
def test_env_var_override(self, monkeypatch):
monkeypatch.setenv("PREFECT_TELEMETRY_ENABLE_RESOURCE_METRICS", "false")
monkeypatch.setenv("PREFECT_TELEMETRY_RESOURCE_METRICS_INTERVAL_SECONDS", "30")
settings = TelemetrySettings()
assert settings.enable_resource_metrics is False
assert settings.resource_metrics_interval_seconds == 30
class TestResolveMetricsEndpoint:
def test_explicit_env_var_takes_priority(self, monkeypatch):
monkeypatch.setenv(
"OTEL_EXPORTER_OTLP_METRICS_ENDPOINT", "http://custom:4318/v1/metrics"
)
assert _resolve_metrics_endpoint() == "http://custom:4318/v1/metrics"
def test_derives_from_cloud_api_url(self, monkeypatch):
monkeypatch.delenv("OTEL_EXPORTER_OTLP_METRICS_ENDPOINT", raising=False)
mock_settings = MagicMock()
mock_settings.api.url = (
"https://api.prefect.cloud/api/accounts/abc/workspaces/def"
)
mock_settings.connected_to_cloud = True
with patch("prefect.settings.get_current_settings", return_value=mock_settings):
endpoint = _resolve_metrics_endpoint()
assert (
endpoint
== "https://api.prefect.cloud/api/accounts/abc/workspaces/def/telemetry/v1/metrics"
)
def test_returns_none_when_not_connected_to_cloud(self, monkeypatch):
monkeypatch.delenv("OTEL_EXPORTER_OTLP_METRICS_ENDPOINT", raising=False)
mock_settings = MagicMock()
mock_settings.api.url = "http://localhost:4200/api"
mock_settings.connected_to_cloud = False
with patch("prefect.settings.get_current_settings", return_value=mock_settings):
assert _resolve_metrics_endpoint() is None
def test_returns_none_when_no_api_url(self, monkeypatch):
monkeypatch.delenv("OTEL_EXPORTER_OTLP_METRICS_ENDPOINT", raising=False)
mock_settings = MagicMock()
mock_settings.api.url = None
mock_settings.connected_to_cloud = False
with patch("prefect.settings.get_current_settings", return_value=mock_settings):
assert _resolve_metrics_endpoint() is None
class TestRunMetrics:
@pytest.fixture
def flow_run(self) -> MagicMock:
run = MagicMock()
run.id = uuid4()
run.deployment_id = uuid4()
run.work_pool_name = "my-pool"
return run
@pytest.fixture
def flow(self) -> MagicMock:
f = MagicMock()
f.name = "my-flow"
return f
def test_noop_when_disabled(self, flow_run: MagicMock, flow: MagicMock):
mock_settings = MagicMock()
mock_settings.telemetry.enable_resource_metrics = False
with patch("prefect.settings.get_current_settings", return_value=mock_settings):
with RunMetrics(flow_run, flow):
pass
def test_noop_when_no_endpoint(self, flow_run: MagicMock, flow: MagicMock):
mock_settings = MagicMock()
mock_settings.telemetry.enable_resource_metrics = True
with (
patch("prefect.settings.get_current_settings", return_value=mock_settings),
patch(
"prefect.telemetry.metrics._resolve_metrics_endpoint",
return_value=None,
),
):
with RunMetrics(flow_run, flow):
pass
def test_noop_when_import_fails(self, flow_run: MagicMock, flow: MagicMock):
mock_settings = MagicMock()
mock_settings.telemetry.enable_resource_metrics = True
original_import = builtins.__import__
def mock_import(name: str, *args: object, **kwargs: object) -> object:
if "system_metrics" in name or "otlp" in name:
raise ImportError("not installed")
return original_import(name, *args, **kwargs)
with (
patch("prefect.settings.get_current_settings", return_value=mock_settings),
patch(
"prefect.telemetry.metrics._resolve_metrics_endpoint",
return_value="http://localhost:4318/v1/metrics",
),
patch.object(builtins, "__import__", side_effect=mock_import),
):
with RunMetrics(flow_run, flow):
pass
def test_instruments_and_shuts_down(self, flow_run: MagicMock, flow: MagicMock):
mock_settings = MagicMock()
mock_settings.telemetry.enable_resource_metrics = True
mock_settings.telemetry.resource_metrics_interval_seconds = 10
mock_instrumentor = MagicMock()
mock_meter_provider = MagicMock()
mock_resource = MagicMock()
with (
patch("prefect.settings.get_current_settings", return_value=mock_settings),
patch(
"prefect.telemetry.metrics._resolve_metrics_endpoint",
return_value="http://localhost:4318/v1/metrics",
),
patch(
"opentelemetry.instrumentation.system_metrics.SystemMetricsInstrumentor",
return_value=mock_instrumentor,
),
patch(
"opentelemetry.sdk.metrics.MeterProvider",
return_value=mock_meter_provider,
),
patch(
"opentelemetry.exporter.otlp.proto.http.metric_exporter.OTLPMetricExporter"
),
patch("opentelemetry.sdk.metrics.export.PeriodicExportingMetricReader"),
patch(
"opentelemetry.sdk.resources.Resource.create",
return_value=mock_resource,
),
):
with RunMetrics(flow_run, flow):
mock_instrumentor.instrument.assert_called_once_with(
meter_provider=mock_meter_provider
)
mock_instrumentor.uninstrument.assert_called_once()
mock_meter_provider.shutdown.assert_called_once()
class TestEngineIntegration:
def test_engine_imports_run_metrics(self):
"""Verify engine.py references RunMetrics."""
import inspect
import prefect.engine
source = inspect.getsource(prefect.engine)
assert "RunMetrics" in source
assert "from prefect.telemetry.metrics import RunMetrics" in source