diff --git a/aiperf/metrics/types/inter_chunk_latency_metric.py b/aiperf/metrics/types/inter_chunk_latency_metric.py new file mode 100644 index 000000000..79572bb31 --- /dev/null +++ b/aiperf/metrics/types/inter_chunk_latency_metric.py @@ -0,0 +1,55 @@ +# SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved. +# SPDX-License-Identifier: Apache-2.0 +from aiperf.common.enums import MetricFlags, MetricTimeUnit +from aiperf.common.exceptions import NoMetricValue +from aiperf.common.models import ParsedResponseRecord +from aiperf.metrics import BaseRecordMetric +from aiperf.metrics.metric_dicts import MetricRecordDict + + +class InterChunkLatencyMetric(BaseRecordMetric[float]): + """ + Post-processor for calculating Inter Chunk Latency (ICL) metrics from records. This is the average + of the differences between consecutive responses. This is only applicable to streaming responses. + + Formula: + Inter Chunk Latency = Average time between consecutive responses + """ + + tag = "inter_chunk_latency" + header = "Inter Chunk Latency" + short_header = "ICL" + unit = MetricTimeUnit.NANOSECONDS + display_unit = MetricTimeUnit.MILLISECONDS + flags = MetricFlags.STREAMING_TOKENS_ONLY | MetricFlags.HIDDEN + required_metrics = None + + def _parse_record( + self, + record: ParsedResponseRecord, + record_metrics: MetricRecordDict, + ) -> float: + """ + This method extracts the timestamps from the responses in the given + RequestRecord object, computes the average of the differences between consecutive responses (ICL), + and returns the result. + + Raises: + NoMetricValue: If the record does not have at least two responses + ValueError: If any of the inter chunk latencies are not positive. + """ + responses = record.responses + + if len(responses) < 2: + raise NoMetricValue( + "Record must have at least two responses to calculate Inter Chunk Latency." + ) + + inter_chunk_latencies = [] + for i in range(1, len(responses)): + chunk_latency_ns = responses[i].perf_ns - responses[i - 1].perf_ns + if chunk_latency_ns <= 0: + raise ValueError("Inter chunk latencies must be positive.") + inter_chunk_latencies.append(chunk_latency_ns) + + return sum(inter_chunk_latencies) / len(inter_chunk_latencies) diff --git a/aiperf/metrics/types/tokens_per_chunk.py b/aiperf/metrics/types/tokens_per_chunk.py new file mode 100644 index 000000000..15f3a7d42 --- /dev/null +++ b/aiperf/metrics/types/tokens_per_chunk.py @@ -0,0 +1,51 @@ +# SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved. +# SPDX-License-Identifier: Apache-2.0 + +from aiperf.common.enums import MetricFlags +from aiperf.common.enums.metric_enums import GenericMetricUnit +from aiperf.common.exceptions import NoMetricValue +from aiperf.common.models import ParsedResponseRecord +from aiperf.metrics import BaseRecordMetric +from aiperf.metrics.metric_dicts import MetricRecordDict +from aiperf.metrics.types.output_sequence_length_metric import ( + OutputSequenceLengthMetric, +) + + +class TokensPerChunkMetric(BaseRecordMetric[float]): + """ + Post Processor for calculating Tokens Per Chunk (TPC) metric. + This is the average number of tokens per chunk for a streaming response. + + Formula: + Tokens Per Chunk = Output Sequence Length / Count(Responses) + """ + + tag = "tokens_per_chunk" + header = "Tokens Per Chunk" + short_header = "TPC" + unit = GenericMetricUnit.TOKENS + flags = MetricFlags.STREAMING_TOKENS_ONLY | MetricFlags.HIDDEN + required_metrics = { + OutputSequenceLengthMetric.tag, + } + + def _parse_record( + self, + record: ParsedResponseRecord, + record_metrics: MetricRecordDict, + ) -> float: + """ + Calculates the Tokens Per Chunk (TPC) metric. + """ + osl = record_metrics.get_or_raise(OutputSequenceLengthMetric) + if osl < 1: # type: ignore + raise NoMetricValue( + f"Output sequence length must be at least 1 for Tokens Per Chunk Metric, got {osl}" + ) + if len(record.responses) < 2: + raise NoMetricValue( + f"Record must have at least 2 responses for Tokens Per Chunk Metric, got {len(record.responses)}" + ) + + return osl / len(record.responses) # type: ignore diff --git a/tests/metrics/test_inter_chunk_latency_metric.py b/tests/metrics/test_inter_chunk_latency_metric.py new file mode 100644 index 000000000..6a7b600b2 --- /dev/null +++ b/tests/metrics/test_inter_chunk_latency_metric.py @@ -0,0 +1,80 @@ +# SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved. +# SPDX-License-Identifier: Apache-2.0 + +import pytest +from pytest import approx + +from aiperf.common.exceptions import NoMetricValue +from aiperf.metrics.metric_dicts import MetricRecordDict +from aiperf.metrics.types.inter_chunk_latency_metric import InterChunkLatencyMetric +from tests.metrics.conftest import create_record, run_simple_metrics_pipeline + + +class TestInterChunkLatencyMetric: + @pytest.mark.parametrize( + "responses,expected_icl,description", + [ + ([110, 130], 20.0, "basic two responses"), + ([110, 120, 140], 15.0, "multiple chunks (avg of 10,20)"), + ([1010, 1025, 1040, 1060, 1075], 16.25, "streaming scenario"), + ], + ) + def test_inter_chunk_latency_calculations( + self, responses: list[int], expected_icl: float, description: str + ): + """Test ICL calculations with various response patterns""" + record = create_record(start_ns=100, responses=responses) + + metric_results = run_simple_metrics_pipeline( + [record], InterChunkLatencyMetric.tag + ) + assert metric_results[InterChunkLatencyMetric.tag] == approx([expected_icl]) + + def test_inter_chunk_latency_multiple_records(self): + """Test processing multiple records with different ICL values""" + records = [ + create_record(start_ns=100, responses=[110, 130]), # ICL = 20ns + create_record(start_ns=200, responses=[210, 220, 240]), # ICL = 15ns + create_record(start_ns=300, responses=[310, 320]), # ICL = 10ns + ] # fmt: skip + + metric_results = run_simple_metrics_pipeline( + records, InterChunkLatencyMetric.tag + ) + assert metric_results[InterChunkLatencyMetric.tag] == approx([20.0, 15.0, 10.0]) + + @pytest.mark.parametrize( + "responses,error_type,error_match,description", + [ + ( + [110], + NoMetricValue, + "Record must have at least two responses", + "single response", + ), + ([], NoMetricValue, "Invalid Record", "no responses"), + ( + [130, 110], + ValueError, + "Inter chunk latencies must be positive", + "descending timestamps", + ), + ( + [110, 110], + ValueError, + "Inter chunk latencies must be positive", + "equal timestamps", + ), + ], + ) + def test_inter_chunk_latency_errors( + self, responses: list[int], error_type: type, error_match: str, description: str + ): + """Test error conditions for ICL metric""" + record = create_record(start_ns=100, responses=responses) + if not responses: + record.responses = [] + + metric = InterChunkLatencyMetric() + with pytest.raises(error_type, match=error_match): + metric.parse_record(record, MetricRecordDict()) diff --git a/tests/metrics/test_tokens_per_chunk.py b/tests/metrics/test_tokens_per_chunk.py new file mode 100644 index 000000000..c37614ab9 --- /dev/null +++ b/tests/metrics/test_tokens_per_chunk.py @@ -0,0 +1,115 @@ +# SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved. +# SPDX-License-Identifier: Apache-2.0 + +import pytest +from pytest import approx + +from aiperf.common.exceptions import NoMetricValue +from aiperf.metrics.metric_dicts import MetricRecordDict +from aiperf.metrics.types.output_sequence_length_metric import ( + OutputSequenceLengthMetric, +) +from aiperf.metrics.types.tokens_per_chunk import TokensPerChunkMetric +from tests.metrics.conftest import create_record, run_simple_metrics_pipeline + + +class TestTokensPerChunkMetric: + @pytest.mark.parametrize( + "responses,tokens_per_response,expected_tpc,description", + [ + ([100, 110], 3, 3.0, "basic calculation"), + ([100, 110, 120, 130], 5, 5.0, "multiple chunks"), + ([1000, 1010, 1020, 1030, 1040], 5, 5.0, "streaming scenario"), + ], # fmt: skip + ) + def test_tokens_per_chunk_calculations( + self, + responses: list[int], + tokens_per_response: int, + expected_tpc: float, + description: str, + ): + """Test TPC calculations with various response patterns""" + record = create_record( + responses=responses, output_tokens_per_response=tokens_per_response + ) + + metric_results = run_simple_metrics_pipeline( + [record], OutputSequenceLengthMetric.tag, TokensPerChunkMetric.tag + ) + assert metric_results[TokensPerChunkMetric.tag] == approx([expected_tpc]) + + def test_tokens_per_chunk_fractional_result(self): + """Test TPC with fractional result""" + record = create_record( + responses=[100, 110, 120], output_tokens_per_response=int(10 / 3) + ) + record.output_token_count = 10 # Override for exact calculation + + metric_results = run_simple_metrics_pipeline( + [record], OutputSequenceLengthMetric.tag, TokensPerChunkMetric.tag + ) + assert metric_results[TokensPerChunkMetric.tag] == approx([10.0 / 3.0]) + + def test_tokens_per_chunk_multiple_records(self): + """Test processing multiple records with different TPC values""" + records = [ + create_record(responses=[100, 110], output_tokens_per_response=4), # 4.0 + create_record(responses=[200, 210, 220], output_tokens_per_response=3), # 3.0 + create_record(responses=[300, 310, 320, 330], output_tokens_per_response=2), # 2.0 + ] # fmt: skip + + metric_results = run_simple_metrics_pipeline( + records, OutputSequenceLengthMetric.tag, TokensPerChunkMetric.tag + ) + assert metric_results[TokensPerChunkMetric.tag] == approx([4.0, 3.0, 2.0]) + + def test_tokens_per_chunk_insufficient_responses(self): + """Test error when record has only 1 response (needs valid record for custom validation)""" + record = create_record( + start_ns=100, responses=[110], output_tokens_per_response=5 + ) + + metric = TokensPerChunkMetric() + metric_dict = MetricRecordDict() + metric_dict[OutputSequenceLengthMetric.tag] = 5 + + with pytest.raises( + NoMetricValue, match="Record must have at least 2 responses" + ): + metric.parse_record(record, metric_dict) + + def test_tokens_per_chunk_no_responses_invalid(self): + """Test error when record has no responses (Invalid Record)""" + record = create_record(responses=[], output_tokens_per_response=5) + record.responses = [] + + metric = TokensPerChunkMetric() + metric_dict = MetricRecordDict() + metric_dict[OutputSequenceLengthMetric.tag] = 5 + + with pytest.raises(NoMetricValue, match="Invalid Record"): + metric.parse_record(record, metric_dict) + + def test_tokens_per_chunk_missing_dependency(self): + """Test error when required OutputSequenceLengthMetric is missing""" + record = create_record( + start_ns=100, responses=[110, 120], output_tokens_per_response=5 + ) + + metric = TokensPerChunkMetric() + empty_metrics = MetricRecordDict() + + with pytest.raises(NoMetricValue, match="Missing required metric"): + metric.parse_record(record, empty_metrics) + + def test_tokens_per_chunk_zero_tokens_pipeline(self): + """Test zero tokens error through pipeline (get_or_raise treats 0 as falsy)""" + record = create_record(responses=[100, 110], output_tokens_per_response=0) + + with pytest.raises( + NoMetricValue, match="Metric output_sequence_length is not available" + ): + run_simple_metrics_pipeline( + [record], OutputSequenceLengthMetric.tag, TokensPerChunkMetric.tag + )