diff --git a/src/aiperf/common/models/telemetry_models.py b/src/aiperf/common/models/telemetry_models.py index cdeac36be..233703582 100644 --- a/src/aiperf/common/models/telemetry_models.py +++ b/src/aiperf/common/models/telemetry_models.py @@ -1,13 +1,15 @@ -# SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved. +# SPDX-FileCopyrightText: Copyright (c) 2025-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved. # SPDX-License-Identifier: Apache-2.0 import numpy as np +from numpy.typing import NDArray from pydantic import ConfigDict, Field from aiperf.common.exceptions import NoMetricValue from aiperf.common.models.base_models import AIPerfBaseModel from aiperf.common.models.export_models import TelemetryExportData from aiperf.common.models.record_models import MetricResult +from aiperf.common.models.server_metrics_models import TimeRangeFilter class TelemetryMetrics(AIPerfBaseModel): @@ -236,6 +238,9 @@ def to_metric_result( arr, [1, 5, 10, 25, 50, 75, 90, 95, 99] ) + # Use sample std (ddof=1) for unbiased estimate; 0 for single sample + std_dev = float(np.std(arr, ddof=1)) if len(arr) > 1 else 0.0 + return MetricResult( tag=tag, header=header, @@ -243,7 +248,7 @@ def to_metric_result( min=float(np.min(arr)), max=float(np.max(arr)), avg=float(np.mean(arr)), - std=float(np.std(arr)), + std=std_dev, count=len(arr), current=float(arr[-1]), p1=p1, @@ -257,6 +262,148 @@ def to_metric_result( p99=p99, ) + def get_time_mask(self, time_filter: TimeRangeFilter | None) -> NDArray[np.bool_]: + """Get boolean mask for points within time range. + + Uses np.searchsorted for O(log n) binary search on sorted timestamps, + then slice assignment for mask creation (10-100x faster than element-wise + boolean comparisons for large arrays). + + Args: + time_filter: Time range filter specifying start_ns and/or end_ns bounds. + None returns all-True mask. + + Returns: + Boolean mask array where True indicates timestamp within range + """ + if time_filter is None: + return np.ones(self._size, dtype=bool) + + timestamps = self.timestamps + first_idx = 0 + last_idx = self._size + + # O(log n) binary search for range boundaries + if time_filter.start_ns is not None: + first_idx = int( + np.searchsorted(timestamps, time_filter.start_ns, side="left") + ) + if time_filter.end_ns is not None: + last_idx = int( + np.searchsorted(timestamps, time_filter.end_ns, side="right") + ) + + # Single allocation + slice assignment + mask = np.zeros(self._size, dtype=bool) + mask[first_idx:last_idx] = True + return mask + + def get_reference_idx(self, time_filter: TimeRangeFilter | None) -> int | None: + """Get index of last point BEFORE time filter start (for delta calculation). + + Uses np.searchsorted for O(log n) lookup. Returns None if no baseline exists + (i.e., time_filter is None, start_ns is None, or no data before start_ns). + + Args: + time_filter: Time range filter. Reference point is found before start_ns. + + Returns: + Index of last timestamp before start_ns, or None if no baseline exists + """ + if time_filter is None or time_filter.start_ns is None: + return None + insert_pos = int( + np.searchsorted(self.timestamps, time_filter.start_ns, side="left") + ) + return insert_pos - 1 if insert_pos > 0 else None + + def to_metric_result_filtered( + self, + metric_name: str, + tag: str, + header: str, + unit: str, + time_filter: TimeRangeFilter | None = None, + is_counter: bool = False, + ) -> MetricResult: + """Compute stats with time filtering and optional delta for counters. + + For gauges: Uses vectorized NumPy on filtered array (np.mean, np.std, np.percentile) + For counters: Computes delta from reference point before profiling start + + Args: + metric_name: Name of the metric to analyze + tag: Unique identifier for this metric + header: Human-readable name for display + unit: Unit of measurement + time_filter: Optional time range filter to exclude warmup/cooldown periods + is_counter: If True, compute delta from baseline instead of statistics + + Returns: + MetricResult with min/max/avg/percentiles for gauges, or delta for counters + + Raises: + NoMetricValue: If no data for this metric or no data in filtered range + """ + arr = self.get_metric_array(metric_name) + if arr is None or len(arr) == 0: + raise NoMetricValue( + f"No telemetry data available for metric '{metric_name}'" + ) + + # Common: apply time filter + time_mask = self.get_time_mask(time_filter) + filtered = arr[time_mask] + if len(filtered) == 0: + raise NoMetricValue(f"No data in time range for metric '{metric_name}'") + + if is_counter: + # Counter: compute delta from baseline + reference_idx = self.get_reference_idx(time_filter) + reference_value = ( + arr[reference_idx] if reference_idx is not None else filtered[0] + ) + raw_delta = float(filtered[-1] - reference_value) + + # Handle counter resets (e.g., DCGM restart) by clamping to 0 + delta = max(raw_delta, 0.0) + + # Counters report a single delta value, not a distribution + return MetricResult( + tag=tag, + header=header, + unit=unit, + avg=delta, + ) + + # Gauge: vectorized stats on filtered data + p1, p5, p10, p25, p50, p75, p90, p95, p99 = np.percentile( + filtered, [1, 5, 10, 25, 50, 75, 90, 95, 99] + ) + + # Use sample std (ddof=1) for unbiased estimate; 0 for single sample + std_dev = float(np.std(filtered, ddof=1)) if len(filtered) > 1 else 0.0 + + return MetricResult( + tag=tag, + header=header, + unit=unit, + min=float(np.min(filtered)), + max=float(np.max(filtered)), + avg=float(np.mean(filtered)), + std=std_dev, + count=len(filtered), + p1=p1, + p5=p5, + p10=p10, + p25=p25, + p50=p50, + p75=p75, + p90=p90, + p95=p95, + p99=p99, + ) + def __len__(self) -> int: """Return the number of snapshots in the time series.""" return self._size @@ -292,19 +439,31 @@ def add_record(self, record: TelemetryRecord) -> None: self.time_series.append_snapshot(valid_metrics, record.timestamp_ns) def get_metric_result( - self, metric_name: str, tag: str, header: str, unit: str + self, + metric_name: str, + tag: str, + header: str, + unit: str, + time_filter: TimeRangeFilter | None = None, + is_counter: bool = False, ) -> MetricResult: - """Get MetricResult for a specific metric. + """Get MetricResult for a specific metric with optional time filtering. Args: metric_name: Name of the metric to analyze tag: Unique identifier for this metric header: Human-readable name for display unit: Unit of measurement + time_filter: Optional time range filter to exclude warmup/cooldown periods + is_counter: If True, compute delta from baseline instead of statistics Returns: MetricResult with statistical summary for the specified metric """ + if time_filter is not None or is_counter: + return self.time_series.to_metric_result_filtered( + metric_name, tag, header, unit, time_filter, is_counter + ) return self.time_series.to_metric_result(metric_name, tag, header, unit) diff --git a/src/aiperf/gpu_telemetry/__init__.py b/src/aiperf/gpu_telemetry/__init__.py index 80ebbfd9b..8c6863b20 100644 --- a/src/aiperf/gpu_telemetry/__init__.py +++ b/src/aiperf/gpu_telemetry/__init__.py @@ -18,6 +18,7 @@ ) from aiperf.gpu_telemetry.constants import ( DCGM_TO_FIELD_MAPPING, + GPU_TELEMETRY_COUNTER_METRICS, GPU_TELEMETRY_METRICS_CONFIG, SCALING_FACTORS, get_gpu_telemetry_metrics_config, @@ -41,6 +42,7 @@ "GPUTelemetryDataCollector", "GPUTelemetryJSONLWriter", "GPUTelemetryManager", + "GPU_TELEMETRY_COUNTER_METRICS", "GPU_TELEMETRY_METRICS_CONFIG", "MetricsConfigLoader", "SCALING_FACTORS", diff --git a/src/aiperf/gpu_telemetry/accumulator.py b/src/aiperf/gpu_telemetry/accumulator.py index 726238d8a..c63de653f 100644 --- a/src/aiperf/gpu_telemetry/accumulator.py +++ b/src/aiperf/gpu_telemetry/accumulator.py @@ -1,4 +1,4 @@ -# SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved. +# SPDX-FileCopyrightText: Copyright (c) 2025-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved. # SPDX-License-Identifier: Apache-2.0 import asyncio @@ -22,10 +22,14 @@ TelemetryExportData, TelemetrySummary, ) +from aiperf.common.models.server_metrics_models import TimeRangeFilter from aiperf.common.models.telemetry_models import TelemetryHierarchy, TelemetryRecord from aiperf.common.protocols import GPUTelemetryAccumulatorProtocol, PubClientProtocol from aiperf.exporters.display_units_utils import normalize_endpoint_display -from aiperf.gpu_telemetry.constants import get_gpu_telemetry_metrics_config +from aiperf.gpu_telemetry.constants import ( + GPU_TELEMETRY_COUNTER_METRICS, + get_gpu_telemetry_metrics_config, +) from aiperf.post_processors.base_metrics_processor import BaseMetricsProcessor @@ -176,10 +180,8 @@ async def summarize(self) -> list[MetricResult]: header = f"{metric_display} | {endpoint_display} | GPU {gpu_index} | {model_name}" - unit = unit_enum.value - result = telemetry_data.get_metric_result( - metric_name, tag, header, unit + metric_name, tag, header, unit_enum ) results.append(result) except NoMetricValue: @@ -197,8 +199,8 @@ async def summarize(self) -> list[MetricResult]: def export_results( self, - start_ns: int, - end_ns: int, + start_ns: int | None = None, + end_ns: int | None = None, error_summary: list[ErrorDetailsCount] | None = None, ) -> "TelemetryExportData | None": """Export accumulated telemetry data as a TelemetryExportData object. @@ -206,21 +208,42 @@ def export_results( Transforms the internal numpy-backed telemetry hierarchy into a serializable format with pre-computed metric statistics for each GPU. + Time filtering is applied to exclude warmup periods from statistics: + - Gauge metrics (power, utilization, etc.): Stats computed on filtered data only + - Counter metrics (energy, errors): Delta computed from baseline before start_ns + Args: - start_ns: Start time of collection in nanoseconds - end_ns: End time of collection in nanoseconds + start_ns: Start time of profiling phase in nanoseconds (excludes warmup). + If None, includes all data from beginning. + end_ns: End time of profiling phase in nanoseconds. If None, includes all + data after start_ns (including final scrape after profiling completes). error_summary: Optional list of error counts Returns: TelemetryExportData object with pre-computed metrics for each GPU """ + # Create time filter for warmup exclusion + # Note: end_ns is typically None to include the final telemetry scrape + # that occurs after PROFILE_COMPLETE but before export + time_filter = TimeRangeFilter(start_ns=start_ns, end_ns=end_ns) # Build summary + # When start_ns/end_ns is None, use current time as the timestamp + start_time = ( + datetime.fromtimestamp(start_ns / NANOS_PER_SECOND) + if start_ns is not None + else datetime.now() + ) + end_time = ( + datetime.fromtimestamp(end_ns / NANOS_PER_SECOND) + if end_ns is not None + else datetime.now() + ) summary = TelemetrySummary( endpoints_configured=list(self._hierarchy.dcgm_endpoints.keys()), endpoints_successful=list(self._hierarchy.dcgm_endpoints.keys()), - start_time=datetime.fromtimestamp(start_ns / NANOS_PER_SECOND), - end_time=datetime.fromtimestamp(end_ns / NANOS_PER_SECOND), + start_time=start_time, + end_time=end_time, ) # Build endpoints dict with pre-computed metrics @@ -238,16 +261,27 @@ def export_results( metrics_dict = {} for ( - _metric_display, + metric_display, metric_key, - unit, + unit_enum, ) in get_gpu_telemetry_metrics_config(): try: + is_counter = metric_key in GPU_TELEMETRY_COUNTER_METRICS metric_result = gpu_data.get_metric_result( - metric_key, metric_key, metric_key, unit + metric_key, + metric_key, + metric_display, + unit_enum, + time_filter=time_filter, + is_counter=is_counter, ) metrics_dict[metric_key] = metric_result.to_json_result() - except Exception: + except NoMetricValue: + continue + except Exception as e: + self.warning( + f"Failed to compute metric '{metric_key}' for GPU {gpu_uuid[:12]}: {e}" + ) continue gpu_summary = GpuSummary( diff --git a/src/aiperf/gpu_telemetry/constants.py b/src/aiperf/gpu_telemetry/constants.py index 49a486feb..94da99e57 100644 --- a/src/aiperf/gpu_telemetry/constants.py +++ b/src/aiperf/gpu_telemetry/constants.py @@ -1,4 +1,4 @@ -# SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved. +# SPDX-FileCopyrightText: Copyright (c) 2025-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved. # SPDX-License-Identifier: Apache-2.0 """Constants specific to GPU telemetry collection.""" @@ -45,6 +45,15 @@ ("Power Violation", "power_violation", MetricTimeUnit.MICROSECONDS), ] +# Metrics that are cumulative counters (need delta calculation). +# These metrics accumulate over time (e.g., total energy consumed since boot), +# so we compute the delta between baseline and final values rather than statistics. +GPU_TELEMETRY_COUNTER_METRICS: set[str] = { + "energy_consumption", + "xid_errors", + "power_violation", +} + def get_gpu_telemetry_metrics_config() -> list[tuple[str, str, MetricUnitT]]: """Get the current GPU telemetry metrics configuration.""" diff --git a/src/aiperf/gpu_telemetry/manager.py b/src/aiperf/gpu_telemetry/manager.py index c19b61e9a..2ddb15e1a 100644 --- a/src/aiperf/gpu_telemetry/manager.py +++ b/src/aiperf/gpu_telemetry/manager.py @@ -16,6 +16,7 @@ from aiperf.common.hooks import on_command, on_init, on_stop from aiperf.common.messages import ( ProfileCancelCommand, + ProfileCompleteCommand, ProfileConfigureCommand, TelemetryRecordsMessage, TelemetryStatusMessage, @@ -175,6 +176,8 @@ async def _profile_configure_command( self._collectors.clear() self._collector_id_to_url.clear() + + # Phase 1: Test reachability for all endpoints for dcgm_url in self._dcgm_endpoints: self.debug(f"GPU Telemetry: Testing reachability of {dcgm_url}") collector_id = f"collector_{dcgm_url.replace(':', '_').replace('/', '_')}" @@ -218,6 +221,18 @@ async def _profile_configure_command( ) return + # Phase 2: Capture baseline metrics before profiling starts + self.info("GPU Telemetry: Capturing baseline metrics...") + for dcgm_url, collector in self._collectors.items(): + try: + await collector.initialize() + await collector.collect_and_process_metrics() + self.debug(f"GPU Telemetry: Captured baseline from {dcgm_url}") + except Exception as e: + self.warning( + f"GPU Telemetry: Failed to capture baseline from {dcgm_url}: {e}" + ) + await self._send_telemetry_status( enabled=True, reason=None, @@ -274,6 +289,36 @@ async def _handle_profile_cancel_command( """ await self._stop_all_collectors() + @on_command(CommandType.PROFILE_COMPLETE) + async def _handle_profile_complete_command( + self, message: ProfileCompleteCommand + ) -> None: + """Trigger final scrape when profiling completes. + + Ensures GPU telemetry captures final state for accurate counter deltas. + This final scrape provides the end-point values needed for metrics like + energy_consumption which are computed as (final - baseline). + + Args: + message: Profile complete command from SystemController + """ + if not self._collectors: + self.debug("GPU Telemetry: Already stopped, skipping final scrape") + return + + self.info("GPU Telemetry: Profiling complete, capturing final metrics...") + + for dcgm_url, collector in list(self._collectors.items()): + try: + await collector.collect_and_process_metrics() + self.debug(f"GPU Telemetry: Captured final state from {dcgm_url}") + except Exception as e: + self.warning( + f"GPU Telemetry: Failed to capture final state from {dcgm_url}: {e}" + ) + + await self._stop_all_collectors() + @on_stop async def _telemetry_manager_stop(self) -> None: """Stop all telemetry collectors during service shutdown. diff --git a/src/aiperf/records/records_manager.py b/src/aiperf/records/records_manager.py index 384161e8f..f18e201fa 100644 --- a/src/aiperf/records/records_manager.py +++ b/src/aiperf/records/records_manager.py @@ -676,12 +676,14 @@ def _process_telemetry_results(self) -> ProcessTelemetryResult: ) # Get timing from profiling phase stats + # Note: end_ns is not passed to include the final telemetry scrape that + # occurs after PROFILE_COMPLETE but before export_results is called. + # If start_ns is None (no profiling phase), include all data. phase_stats = self._records_tracker.create_stats_for_phase( CreditPhase.PROFILING ) telemetry_export_data = self._gpu_telemetry_accumulator.export_results( - start_ns=phase_stats.start_ns or time.time_ns(), - end_ns=phase_stats.requests_end_ns or time.time_ns(), + start_ns=phase_stats.start_ns, error_summary=error_summary, ) diff --git a/tests/component_integration/gpu_telemetry/test_gpu_telemetry.py b/tests/component_integration/gpu_telemetry/test_gpu_telemetry.py index 61470cfe6..2ed142c22 100644 --- a/tests/component_integration/gpu_telemetry/test_gpu_telemetry.py +++ b/tests/component_integration/gpu_telemetry/test_gpu_telemetry.py @@ -31,7 +31,7 @@ def test_dcgm_endpoints(self, cli, mock_dcgm_endpoints): --model {Defaults.model} \ --url http://localhost:8000 \ --gpu-telemetry http://localhost:9402/metrics \ - --request-count 10 \ + --benchmark-duration 2 \ --concurrency 2 \ --workers-max {Defaults.workers_max} \ --ui {Defaults.ui} @@ -39,7 +39,6 @@ def test_dcgm_endpoints(self, cli, mock_dcgm_endpoints): ) assert result.exit_code == 0 - assert result.request_count == 10 assert result.has_gpu_telemetry # Verify we collected telemetry from endpoints: @@ -59,18 +58,22 @@ def test_dcgm_endpoints(self, cli, mock_dcgm_endpoints): ) # Verify each GPU has valid metrics with all required fields + # Counter metrics only have avg (delta), not min/max + counter_metrics = {"energy_consumption", "xid_errors", "power_violation"} for gpu_id, gpu_data in endpoint_data.gpus.items(): assert gpu_data.metrics, f"GPU {gpu_id}: no metrics collected" for metric_name, metric_value in gpu_data.metrics.items(): assert metric_value.avg is not None, ( f"GPU {gpu_id} metric {metric_name}: missing avg" ) - assert metric_value.min is not None, ( - f"GPU {gpu_id} metric {metric_name}: missing min" - ) - assert metric_value.max is not None, ( - f"GPU {gpu_id} metric {metric_name}: missing max" - ) assert metric_value.unit is not None, ( f"GPU {gpu_id} metric {metric_name}: missing unit" ) + # Gauge metrics should have min/max; counter metrics only have avg + if metric_name not in counter_metrics: + assert metric_value.min is not None, ( + f"GPU {gpu_id} metric {metric_name}: missing min" + ) + assert metric_value.max is not None, ( + f"GPU {gpu_id} metric {metric_name}: missing max" + ) diff --git a/tests/harness/fake_dcgm.py b/tests/harness/fake_dcgm.py index bd17cf826..15fd54f30 100644 --- a/tests/harness/fake_dcgm.py +++ b/tests/harness/fake_dcgm.py @@ -71,20 +71,30 @@ def __init__(self, endpoints: list[DCGMEndpoint] | None = None): @staticmethod def _default_endpoints() -> list[DCGMEndpoint]: - """Get default DCGM endpoints for testing.""" + """Get default DCGM endpoints for testing. + + Includes both application defaults (9400, 9401) plus additional test endpoint (9402). + """ return [ DCGMEndpoint( - url="http://localhost:9401/metrics", + url="http://localhost:9400/metrics", gpu_name="h100", num_gpus=2, seed=42, initial_load=0.3, ), + DCGMEndpoint( + url="http://localhost:9401/metrics", + gpu_name="h100", + num_gpus=2, + seed=43, + initial_load=0.3, + ), DCGMEndpoint( url="http://localhost:9402/metrics", gpu_name="h200", num_gpus=2, - seed=43, + seed=44, initial_load=0.3, ), ] diff --git a/tests/integration/test_custom_gpu_metrics.py b/tests/integration/test_custom_gpu_metrics.py index b8dc2aaf8..5aa427b67 100644 --- a/tests/integration/test_custom_gpu_metrics.py +++ b/tests/integration/test_custom_gpu_metrics.py @@ -99,13 +99,13 @@ async def test_custom_metrics_csv_loading_basic( --tokenizer gpt2 \ --endpoint-type chat \ --gpu-telemetry {custom_gpu_metrics_csv} {" ".join(aiperf_mock_server.dcgm_urls)} \ - --request-count 50 \ + --benchmark-duration 2 \ --concurrency 2 \ --workers-max 2 """ ) - assert result.request_count == 50 + assert result.request_count > 0 assert result.has_gpu_telemetry assert result.json.telemetry_data.endpoints is not None assert len(result.json.telemetry_data.endpoints) > 0 @@ -184,7 +184,7 @@ async def test_custom_metrics_deduplication( --tokenizer gpt2 \ --endpoint-type chat \ --gpu-telemetry {custom_gpu_metrics_csv_with_defaults} {" ".join(aiperf_mock_server.dcgm_urls)} \ - --request-count 50 \ + --benchmark-duration 2 \ --concurrency 2 \ --workers-max 2 """ @@ -231,13 +231,13 @@ async def test_invalid_csv_fallback_to_defaults( --tokenizer gpt2 \ --endpoint-type chat \ --gpu-telemetry {custom_gpu_metrics_csv_invalid} {" ".join(aiperf_mock_server.dcgm_urls)} \ - --request-count 50 \ + --benchmark-duration 2 \ --concurrency 2 \ --workers-max 2 """ ) - assert result.request_count == 50 + assert result.request_count > 0 assert result.has_gpu_telemetry for dcgm_url in result.json.telemetry_data.endpoints: diff --git a/tests/integration/test_gpu_telemetry.py b/tests/integration/test_gpu_telemetry.py index f0fb37184..d8d0d5eb8 100644 --- a/tests/integration/test_gpu_telemetry.py +++ b/tests/integration/test_gpu_telemetry.py @@ -54,12 +54,20 @@ async def test_gpu_telemetry( assert gpu_data.metrics is not None assert len(gpu_data.metrics) > 0 - for metric_value in gpu_data.metrics.values(): + # Counter metrics only have avg (delta), not min/max + counter_metrics = { + "energy_consumption", + "xid_errors", + "power_violation", + } + for metric_name, metric_value in gpu_data.metrics.items(): assert metric_value is not None assert metric_value.avg is not None - assert metric_value.min is not None - assert metric_value.max is not None assert metric_value.unit is not None + # Gauge metrics should have min/max; counter metrics only have avg + if metric_name not in counter_metrics: + assert metric_value.min is not None + assert metric_value.max is not None async def test_gpu_telemetry_export( self, cli: AIPerfCLI, aiperf_mock_server: AIPerfMockServer diff --git a/tests/unit/gpu_telemetry/test_telemetry_manager.py b/tests/unit/gpu_telemetry/test_telemetry_manager.py index df22fb0a0..0a677a696 100644 --- a/tests/unit/gpu_telemetry/test_telemetry_manager.py +++ b/tests/unit/gpu_telemetry/test_telemetry_manager.py @@ -1,7 +1,7 @@ -# SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved. +# SPDX-FileCopyrightText: Copyright (c) 2025-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved. # SPDX-License-Identifier: Apache-2.0 -from unittest.mock import AsyncMock, MagicMock, patch +from unittest.mock import AsyncMock, MagicMock, Mock, patch import pytest @@ -667,10 +667,22 @@ async def test_configure_sends_enabled_status_when_endpoints_reachable(self): """Test that configure phase sends enabled status with reachable endpoints.""" manager = self._create_test_manager() manager.publish = AsyncMock() - - # Mock GPUTelemetryDataCollector to return reachable - with patch.object( - GPUTelemetryDataCollector, "is_url_reachable", return_value=True + manager.info = Mock() # Mock logging method + manager.debug = Mock() + + # Mock GPUTelemetryDataCollector methods for reachability and baseline capture + with ( + patch.object( + GPUTelemetryDataCollector, "is_url_reachable", return_value=True + ), + patch.object( + GPUTelemetryDataCollector, "initialize", new_callable=AsyncMock + ), + patch.object( + GPUTelemetryDataCollector, + "collect_and_process_metrics", + new_callable=AsyncMock, + ), ): configure_msg = ProfileConfigureCommand( command_id="test", service_id="system_controller", config={} diff --git a/tests/unit/gpu_telemetry/test_telemetry_models.py b/tests/unit/gpu_telemetry/test_telemetry_models.py index 8cf5384ae..975690b60 100644 --- a/tests/unit/gpu_telemetry/test_telemetry_models.py +++ b/tests/unit/gpu_telemetry/test_telemetry_models.py @@ -1,10 +1,12 @@ -# SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved. +# SPDX-FileCopyrightText: Copyright (c) 2025-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved. # SPDX-License-Identifier: Apache-2.0 +import numpy as np import pytest from pydantic import ValidationError from aiperf.common.exceptions import NoMetricValue +from aiperf.common.models.server_metrics_models import TimeRangeFilter from aiperf.common.models.telemetry_models import ( GpuMetadata, GpuMetricTimeSeries, @@ -14,6 +16,59 @@ TelemetryRecord, ) +# ============================================================================= +# Helpers +# ============================================================================= + + +def _make_record(timestamp_ns: int, **metrics: float) -> TelemetryRecord: + """Create TelemetryRecord with minimal boilerplate for testing.""" + return TelemetryRecord( + timestamp_ns=timestamp_ns, + dcgm_url="http://localhost:9401/metrics", + gpu_index=0, + gpu_model_name="Test GPU", + gpu_uuid="GPU-test-uuid", + telemetry_data=TelemetryMetrics(**metrics), + ) + + +# ============================================================================= +# Fixtures +# ============================================================================= + + +@pytest.fixture +def gpu_telemetry_data() -> GpuTelemetryData: + """GpuTelemetryData with standard test metadata.""" + return GpuTelemetryData( + metadata=GpuMetadata( + gpu_index=0, gpu_uuid="GPU-test-uuid", gpu_model_name="Test GPU" + ) + ) + + +@pytest.fixture +def time_series_4pt() -> GpuMetricTimeSeries: + """Time series with 4 data points at 1s intervals for filtering tests.""" + ts = GpuMetricTimeSeries() + ts.append_snapshot({"power": 100.0}, 1_000_000_000) + ts.append_snapshot({"power": 110.0}, 2_000_000_000) + ts.append_snapshot({"power": 120.0}, 3_000_000_000) + ts.append_snapshot({"power": 130.0}, 4_000_000_000) + return ts + + +@pytest.fixture +def counter_time_series() -> GpuMetricTimeSeries: + """Time series with counter data: baseline at 1s, profiling from 2s-4s.""" + ts = GpuMetricTimeSeries() + ts.append_snapshot({"energy": 1000.0}, 1_000_000_000) # baseline + ts.append_snapshot({"energy": 1200.0}, 2_000_000_000) # profiling start + ts.append_snapshot({"energy": 1500.0}, 3_000_000_000) # profiling + ts.append_snapshot({"energy": 1800.0}, 4_000_000_000) # profiling end + return ts + class TestTelemetryRecord: """Test TelemetryRecord model validation and data structure integrity. @@ -281,27 +336,6 @@ def test_get_metric_array(self): assert list(util) == [80.0, 85.0] assert unknown is None - def test_schema_determined_on_first_snapshot(self): - """Test that metric schema is determined on the first snapshot. - - DCGM metrics are static per run. The first snapshot determines which - metrics are tracked, and all subsequent snapshots should provide the same set. - """ - time_series = GpuMetricTimeSeries() - - # First snapshot determines schema - time_series.append_snapshot({"power": 100.0, "util": 80.0}, 1_000_000_000) - - # Subsequent snapshots provide same metrics - time_series.append_snapshot({"power": 110.0, "util": 85.0}, 2_000_000_000) - time_series.append_snapshot({"power": 120.0, "util": 90.0}, 3_000_000_000) - - power = time_series.get_metric_array("power") - util = time_series.get_metric_array("util") - - assert list(power) == [100.0, 110.0, 120.0] - assert list(util) == [80.0, 85.0, 90.0] - def test_stats_computation(self): """Test stats computation on consistent metric data.""" time_series = GpuMetricTimeSeries() @@ -317,6 +351,28 @@ def test_stats_computation(self): assert result.max == 200.0 assert result.current == 200.0 # Last value + @pytest.mark.parametrize( + ("values", "expected_std"), + [ + pytest.param([100.0], 0.0, id="single_sample"), + pytest.param( + [100.0, 150.0, 200.0], + np.std([100.0, 150.0, 200.0], ddof=1), + id="multiple_samples", + ), + ], + ) + def test_stats_uses_sample_std(self, values: list[float], expected_std: float): + """Test std uses sample std (ddof=1) with edge case handling.""" + time_series = GpuMetricTimeSeries() + for i, val in enumerate(values): + time_series.append_snapshot({"power": val}, (i + 1) * 1_000_000_000) + + result = time_series.to_metric_result("power", "tag", "header", "W") + + assert result.std == expected_std + assert result.count == len(values) + def test_grow_preserves_data(self): """Test array growth preserves existing data.""" time_series = GpuMetricTimeSeries() @@ -411,92 +467,181 @@ def test_unknown_metric_raises_no_metric_value(self): with pytest.raises(NoMetricValue): time_series.to_metric_result("unknown", "tag", "header", "unit") + # Time filtering tests + + @pytest.mark.parametrize( + ("time_filter", "expected_mask"), + [ + pytest.param(None, [True, True, True, True], id="no_filter"), + pytest.param(TimeRangeFilter(start_ns=2_000_000_000), [False, True, True, True], id="start_only"), + pytest.param(TimeRangeFilter(end_ns=2_500_000_000), [True, True, False, False], id="end_only"), + pytest.param(TimeRangeFilter(start_ns=1_500_000_000, end_ns=3_500_000_000), [False, True, True, False], id="range"), + ], + ) # fmt: skip + def test_get_time_mask( + self, + time_series_4pt: GpuMetricTimeSeries, + time_filter: TimeRangeFilter | None, + expected_mask: list[bool], + ): + """Test get_time_mask with various filter configurations.""" + mask = time_series_4pt.get_time_mask(time_filter) + assert list(mask) == expected_mask + + @pytest.mark.parametrize( + ("time_filter", "expected_idx"), + [ + pytest.param(None, None, id="no_filter"), + pytest.param(TimeRangeFilter(end_ns=3_000_000_000), None, id="no_start"), + pytest.param(TimeRangeFilter(start_ns=2_000_000_000, end_ns=5_000_000_000), 0, id="baseline_at_0"), + pytest.param(TimeRangeFilter(start_ns=3_000_000_000, end_ns=5_000_000_000), 1, id="baseline_at_1"), + ], + ) # fmt: skip + def test_get_reference_idx( + self, + time_series_4pt: GpuMetricTimeSeries, + time_filter: TimeRangeFilter | None, + expected_idx: int | None, + ): + """Test get_reference_idx with various filter configurations.""" + assert time_series_4pt.get_reference_idx(time_filter) == expected_idx + + def test_get_reference_idx_no_baseline_before_start(self): + """Test get_reference_idx returns None when no data before start_ns.""" + time_series = GpuMetricTimeSeries() + time_series.append_snapshot({"power": 100.0}, 2_000_000_000) + time_series.append_snapshot({"power": 110.0}, 3_000_000_000) -class TestGpuTelemetryData: - """Test GpuTelemetryData model with grouped approach.""" + time_filter = TimeRangeFilter(start_ns=1_000_000_000, end_ns=4_000_000_000) + assert time_series.get_reference_idx(time_filter) is None - def test_add_record_grouped(self): - """Test adding TelemetryRecord creates grouped snapshots.""" - metadata = GpuMetadata( - gpu_index=0, gpu_uuid="GPU-test-uuid", gpu_model_name="Test GPU" + def test_to_metric_result_filtered_gauge(self): + """Test gauge stats computed only on filtered data.""" + time_series = GpuMetricTimeSeries() + time_series.append_snapshot({"power": 50.0}, 1_000_000_000) # warmup - excluded + time_series.append_snapshot({"power": 100.0}, 2_000_000_000) # profiling + time_series.append_snapshot({"power": 120.0}, 3_000_000_000) # profiling + time_series.append_snapshot({"power": 80.0}, 4_000_000_000) # profiling + + time_filter = TimeRangeFilter(start_ns=2_000_000_000, end_ns=5_000_000_000) + result = time_series.to_metric_result_filtered( + "power", "tag", "header", "W", time_filter, is_counter=False ) - telemetry_data = GpuTelemetryData(metadata=metadata) + # Stats should only include values 100, 120, 80 (not 50) + assert result.count == 3 + assert result.min == 80.0 + assert result.max == 120.0 + assert result.avg == 100.0 # (100 + 120 + 80) / 3 - record = TelemetryRecord( - timestamp_ns=1000000000, - dcgm_url="http://localhost:9401/metrics", - gpu_index=0, - gpu_model_name="Test GPU", - gpu_uuid="GPU-test-uuid", - telemetry_data=TelemetryMetrics( - gpu_power_usage=100.0, - gpu_utilization=80.0, - gpu_memory_used=15.0, - ), + def test_to_metric_result_filtered_counter_with_baseline( + self, counter_time_series: GpuMetricTimeSeries + ): + """Test counter delta computed from baseline before start_ns.""" + time_filter = TimeRangeFilter(start_ns=2_000_000_000, end_ns=5_000_000_000) + result = counter_time_series.to_metric_result_filtered( + "energy", "tag", "header", "MJ", time_filter, is_counter=True + ) + + # Delta: final (1800) - baseline (1000) = 800 + # Counters only set avg, other fields are None + assert result.avg == 800.0 + assert result.min is None + assert result.max is None + assert result.current is None + + def test_to_metric_result_filtered_counter_no_baseline(self): + """Test counter delta uses first filtered value when no baseline exists.""" + time_series = GpuMetricTimeSeries() + time_series.append_snapshot({"energy": 1000.0}, 2_000_000_000) + time_series.append_snapshot({"energy": 1300.0}, 3_000_000_000) + time_series.append_snapshot({"energy": 1600.0}, 4_000_000_000) + + # Filter starts before any data + time_filter = TimeRangeFilter(start_ns=1_000_000_000, end_ns=5_000_000_000) + result = time_series.to_metric_result_filtered( + "energy", "tag", "header", "MJ", time_filter, is_counter=True ) - telemetry_data.add_record(record) + # No baseline: delta = final (1600) - first filtered (1000) = 600 + assert result.avg == 600.0 + + def test_to_metric_result_filtered_counter_reset_clamped_to_zero(self): + """Test counter reset (e.g., DCGM restart) clamps delta to 0.""" + time_series = GpuMetricTimeSeries() + time_series.append_snapshot({"energy": 5000.0}, 1_000_000_000) # baseline + time_series.append_snapshot({"energy": 5500.0}, 2_000_000_000) + time_series.append_snapshot({"energy": 100.0}, 3_000_000_000) # after reset + time_series.append_snapshot({"energy": 300.0}, 4_000_000_000) + + time_filter = TimeRangeFilter(start_ns=2_000_000_000, end_ns=5_000_000_000) + result = time_series.to_metric_result_filtered( + "energy", "tag", "header", "MJ", time_filter, is_counter=True + ) + + # Raw delta: 300 - 5000 = -4700, clamped to 0 + assert result.avg == 0.0 + + def test_to_metric_result_filtered_empty_range(self): + """Test NoMetricValue raised when filtered range has no data.""" + time_series = GpuMetricTimeSeries() + time_series.append_snapshot({"power": 100.0}, 1_000_000_000) + time_series.append_snapshot({"power": 110.0}, 2_000_000_000) + + # Filter for range with no data + time_filter = TimeRangeFilter(start_ns=5_000_000_000, end_ns=6_000_000_000) + + with pytest.raises(NoMetricValue) as exc_info: + time_series.to_metric_result_filtered( + "power", "tag", "header", "W", time_filter, is_counter=False + ) + assert "No data in time range" in str(exc_info.value) - ts = telemetry_data.time_series + +class TestGpuTelemetryData: + """Test GpuTelemetryData model with grouped approach.""" + + def test_add_record_grouped(self, gpu_telemetry_data: GpuTelemetryData): + """Test adding TelemetryRecord creates grouped snapshots.""" + record = _make_record( + 1_000_000_000, + gpu_power_usage=100.0, + gpu_utilization=80.0, + gpu_memory_used=15.0, + ) + gpu_telemetry_data.add_record(record) + + ts = gpu_telemetry_data.time_series assert len(ts) == 1 - assert ts.timestamps[0] == 1000000000 + assert ts.timestamps[0] == 1_000_000_000 assert ts.get_metric_array("gpu_power_usage")[0] == 100.0 assert ts.get_metric_array("gpu_utilization")[0] == 80.0 assert ts.get_metric_array("gpu_memory_used")[0] == 15.0 - def test_add_record_filters_none_values(self): + def test_add_record_filters_none_values(self, gpu_telemetry_data: GpuTelemetryData): """Test that None metric values are filtered out.""" - metadata = GpuMetadata( - gpu_index=0, gpu_uuid="GPU-test-uuid", gpu_model_name="Test GPU" - ) - - telemetry_data = GpuTelemetryData(metadata=metadata) - - record = TelemetryRecord( - timestamp_ns=1000000000, - dcgm_url="http://localhost:9401/metrics", - gpu_index=0, - gpu_model_name="Test GPU", - gpu_uuid="GPU-test-uuid", - telemetry_data=TelemetryMetrics( - gpu_power_usage=100.0, - gpu_utilization=None, # Should be filtered out - gpu_memory_used=15.0, - ), + record = _make_record( + 1_000_000_000, + gpu_power_usage=100.0, + gpu_memory_used=15.0, + # gpu_utilization intentionally omitted (will be None) ) + gpu_telemetry_data.add_record(record) - telemetry_data.add_record(record) - - ts = telemetry_data.time_series + ts = gpu_telemetry_data.time_series assert len(ts) == 1 assert ts.get_metric_array("gpu_power_usage") is not None assert ts.get_metric_array("gpu_memory_used") is not None assert ts.get_metric_array("gpu_utilization") is None - def test_get_metric_result(self): + def test_get_metric_result(self, gpu_telemetry_data: GpuTelemetryData): """Test getting MetricResult for a specific metric.""" - metadata = GpuMetadata( - gpu_index=0, gpu_uuid="GPU-test-uuid", gpu_model_name="Test GPU" - ) - - telemetry_data = GpuTelemetryData(metadata=metadata) - - # Add multiple records for i, power in enumerate([100.0, 120.0, 80.0]): - record = TelemetryRecord( - timestamp_ns=1000000000 + i * 1000000, - dcgm_url="http://localhost:9401/metrics", - gpu_index=0, - gpu_model_name="Test GPU", - gpu_uuid="GPU-test-uuid", - telemetry_data=TelemetryMetrics( - gpu_power_usage=power, - ), + gpu_telemetry_data.add_record( + _make_record(1_000_000_000 + i * 1_000_000, gpu_power_usage=power) ) - telemetry_data.add_record(record) - result = telemetry_data.get_metric_result( + result = gpu_telemetry_data.get_metric_result( "gpu_power_usage", "power_tag", "GPU Power", "W" ) @@ -506,3 +651,48 @@ def test_get_metric_result(self): assert result.min == 80.0 assert result.max == 120.0 assert result.avg == 100.0 + + def test_get_metric_result_with_time_filter( + self, gpu_telemetry_data: GpuTelemetryData + ): + """Test getting MetricResult with time filtering.""" + # Add records: warmup + profiling + for ts, power in [(1, 50.0), (2, 100.0), (3, 120.0), (4, 80.0)]: + gpu_telemetry_data.add_record( + _make_record(ts * 1_000_000_000, gpu_power_usage=power) + ) + + # Exclude warmup at 1s + time_filter = TimeRangeFilter(start_ns=2_000_000_000, end_ns=5_000_000_000) + result = gpu_telemetry_data.get_metric_result( + "gpu_power_usage", "power_tag", "GPU Power", "W", time_filter=time_filter + ) + + # Stats should exclude warmup value of 50.0 + assert result.count == 3 + assert result.min == 80.0 + assert result.max == 120.0 + assert result.avg == 100.0 # (100 + 120 + 80) / 3 + + def test_get_metric_result_counter_with_time_filter( + self, gpu_telemetry_data: GpuTelemetryData + ): + """Test getting MetricResult for counter metric with delta calculation.""" + # Add records: baseline + profiling + for ts, energy in [(1, 1000.0), (2, 1200.0), (3, 1500.0), (4, 1800.0)]: + gpu_telemetry_data.add_record( + _make_record(ts * 1_000_000_000, energy_consumption=energy) + ) + + time_filter = TimeRangeFilter(start_ns=2_000_000_000, end_ns=5_000_000_000) + result = gpu_telemetry_data.get_metric_result( + "energy_consumption", + "energy_tag", + "Energy", + "MJ", + time_filter=time_filter, + is_counter=True, + ) + + # Delta: final (1800) - baseline (1000) = 800 + assert result.avg == 800.0