Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
55 changes: 55 additions & 0 deletions aiperf/metrics/types/inter_chunk_latency_metric.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
# SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0
from aiperf.common.enums import MetricFlags, MetricTimeUnit
from aiperf.common.exceptions import NoMetricValue
from aiperf.common.models import ParsedResponseRecord
from aiperf.metrics import BaseRecordMetric
from aiperf.metrics.metric_dicts import MetricRecordDict


class InterChunkLatencyMetric(BaseRecordMetric[float]):
"""
Post-processor for calculating Inter Chunk Latency (ICL) metrics from records. This is the average
of the differences between consecutive responses. This is only applicable to streaming responses.

Formula:
Inter Chunk Latency = Average time between consecutive responses
"""

tag = "inter_chunk_latency"
header = "Inter Chunk Latency"
short_header = "ICL"
unit = MetricTimeUnit.NANOSECONDS
display_unit = MetricTimeUnit.MILLISECONDS
flags = MetricFlags.STREAMING_TOKENS_ONLY | MetricFlags.HIDDEN
required_metrics = None

def _parse_record(
self,
record: ParsedResponseRecord,
record_metrics: MetricRecordDict,
) -> float:
"""
This method extracts the timestamps from the responses in the given
RequestRecord object, computes the average of the differences between consecutive responses (ICL),
and returns the result.

Raises:
NoMetricValue: If the record does not have at least two responses
ValueError: If any of the inter chunk latencies are not positive.
"""
responses = record.responses

if len(responses) < 2:
raise NoMetricValue(
"Record must have at least two responses to calculate Inter Chunk Latency."
)

inter_chunk_latencies = []
for i in range(1, len(responses)):
chunk_latency_ns = responses[i].perf_ns - responses[i - 1].perf_ns
if chunk_latency_ns <= 0:
raise ValueError("Inter chunk latencies must be positive.")
inter_chunk_latencies.append(chunk_latency_ns)

return sum(inter_chunk_latencies) / len(inter_chunk_latencies)
51 changes: 51 additions & 0 deletions aiperf/metrics/types/tokens_per_chunk.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
# SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0

from aiperf.common.enums import MetricFlags
from aiperf.common.enums.metric_enums import GenericMetricUnit
from aiperf.common.exceptions import NoMetricValue
from aiperf.common.models import ParsedResponseRecord
from aiperf.metrics import BaseRecordMetric
from aiperf.metrics.metric_dicts import MetricRecordDict
from aiperf.metrics.types.output_sequence_length_metric import (
OutputSequenceLengthMetric,
)


class TokensPerChunkMetric(BaseRecordMetric[float]):
"""
Post Processor for calculating Tokens Per Chunk (TPC) metric.
This is the average number of tokens per chunk for a streaming response.

Formula:
Tokens Per Chunk = Output Sequence Length / Count(Responses)
"""

tag = "tokens_per_chunk"
header = "Tokens Per Chunk"
short_header = "TPC"
unit = GenericMetricUnit.TOKENS
flags = MetricFlags.STREAMING_TOKENS_ONLY | MetricFlags.HIDDEN
required_metrics = {
OutputSequenceLengthMetric.tag,
}

def _parse_record(
self,
record: ParsedResponseRecord,
record_metrics: MetricRecordDict,
) -> float:
"""
Calculates the Tokens Per Chunk (TPC) metric.
"""
osl = record_metrics.get_or_raise(OutputSequenceLengthMetric)
if osl < 1: # type: ignore
raise NoMetricValue(
f"Output sequence length must be at least 1 for Tokens Per Chunk Metric, got {osl}"
)
if len(record.responses) < 2:
raise NoMetricValue(
f"Record must have at least 2 responses for Tokens Per Chunk Metric, got {len(record.responses)}"
)

return osl / len(record.responses) # type: ignore
80 changes: 80 additions & 0 deletions tests/metrics/test_inter_chunk_latency_metric.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
# SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0

import pytest
from pytest import approx

from aiperf.common.exceptions import NoMetricValue
from aiperf.metrics.metric_dicts import MetricRecordDict
from aiperf.metrics.types.inter_chunk_latency_metric import InterChunkLatencyMetric
from tests.metrics.conftest import create_record, run_simple_metrics_pipeline


class TestInterChunkLatencyMetric:
@pytest.mark.parametrize(
"responses,expected_icl,description",
[
([110, 130], 20.0, "basic two responses"),
([110, 120, 140], 15.0, "multiple chunks (avg of 10,20)"),
([1010, 1025, 1040, 1060, 1075], 16.25, "streaming scenario"),
],
)
def test_inter_chunk_latency_calculations(
self, responses: list[int], expected_icl: float, description: str
):
"""Test ICL calculations with various response patterns"""
record = create_record(start_ns=100, responses=responses)

metric_results = run_simple_metrics_pipeline(
[record], InterChunkLatencyMetric.tag
)
assert metric_results[InterChunkLatencyMetric.tag] == approx([expected_icl])

def test_inter_chunk_latency_multiple_records(self):
"""Test processing multiple records with different ICL values"""
records = [
create_record(start_ns=100, responses=[110, 130]), # ICL = 20ns
create_record(start_ns=200, responses=[210, 220, 240]), # ICL = 15ns
create_record(start_ns=300, responses=[310, 320]), # ICL = 10ns
] # fmt: skip

metric_results = run_simple_metrics_pipeline(
records, InterChunkLatencyMetric.tag
)
assert metric_results[InterChunkLatencyMetric.tag] == approx([20.0, 15.0, 10.0])

@pytest.mark.parametrize(
"responses,error_type,error_match,description",
[
(
[110],
NoMetricValue,
"Record must have at least two responses",
"single response",
),
([], NoMetricValue, "Invalid Record", "no responses"),
(
[130, 110],
ValueError,
"Inter chunk latencies must be positive",
"descending timestamps",
),
(
[110, 110],
ValueError,
"Inter chunk latencies must be positive",
"equal timestamps",
),
],
)
def test_inter_chunk_latency_errors(
self, responses: list[int], error_type: type, error_match: str, description: str
):
"""Test error conditions for ICL metric"""
record = create_record(start_ns=100, responses=responses)
if not responses:
record.responses = []

metric = InterChunkLatencyMetric()
with pytest.raises(error_type, match=error_match):
metric.parse_record(record, MetricRecordDict())
115 changes: 115 additions & 0 deletions tests/metrics/test_tokens_per_chunk.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,115 @@
# SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0

import pytest
from pytest import approx

from aiperf.common.exceptions import NoMetricValue
from aiperf.metrics.metric_dicts import MetricRecordDict
from aiperf.metrics.types.output_sequence_length_metric import (
OutputSequenceLengthMetric,
)
from aiperf.metrics.types.tokens_per_chunk import TokensPerChunkMetric
from tests.metrics.conftest import create_record, run_simple_metrics_pipeline


class TestTokensPerChunkMetric:
@pytest.mark.parametrize(
"responses,tokens_per_response,expected_tpc,description",
[
([100, 110], 3, 3.0, "basic calculation"),
([100, 110, 120, 130], 5, 5.0, "multiple chunks"),
([1000, 1010, 1020, 1030, 1040], 5, 5.0, "streaming scenario"),
], # fmt: skip
)
def test_tokens_per_chunk_calculations(
self,
responses: list[int],
tokens_per_response: int,
expected_tpc: float,
description: str,
):
"""Test TPC calculations with various response patterns"""
record = create_record(
responses=responses, output_tokens_per_response=tokens_per_response
)

metric_results = run_simple_metrics_pipeline(
[record], OutputSequenceLengthMetric.tag, TokensPerChunkMetric.tag
)
assert metric_results[TokensPerChunkMetric.tag] == approx([expected_tpc])

def test_tokens_per_chunk_fractional_result(self):
"""Test TPC with fractional result"""
record = create_record(
responses=[100, 110, 120], output_tokens_per_response=int(10 / 3)
)
record.output_token_count = 10 # Override for exact calculation

metric_results = run_simple_metrics_pipeline(
[record], OutputSequenceLengthMetric.tag, TokensPerChunkMetric.tag
)
assert metric_results[TokensPerChunkMetric.tag] == approx([10.0 / 3.0])

def test_tokens_per_chunk_multiple_records(self):
"""Test processing multiple records with different TPC values"""
records = [
create_record(responses=[100, 110], output_tokens_per_response=4), # 4.0
create_record(responses=[200, 210, 220], output_tokens_per_response=3), # 3.0
create_record(responses=[300, 310, 320, 330], output_tokens_per_response=2), # 2.0
] # fmt: skip

metric_results = run_simple_metrics_pipeline(
records, OutputSequenceLengthMetric.tag, TokensPerChunkMetric.tag
)
assert metric_results[TokensPerChunkMetric.tag] == approx([4.0, 3.0, 2.0])

def test_tokens_per_chunk_insufficient_responses(self):
"""Test error when record has only 1 response (needs valid record for custom validation)"""
record = create_record(
start_ns=100, responses=[110], output_tokens_per_response=5
)

metric = TokensPerChunkMetric()
metric_dict = MetricRecordDict()
metric_dict[OutputSequenceLengthMetric.tag] = 5

with pytest.raises(
NoMetricValue, match="Record must have at least 2 responses"
):
metric.parse_record(record, metric_dict)

def test_tokens_per_chunk_no_responses_invalid(self):
"""Test error when record has no responses (Invalid Record)"""
record = create_record(responses=[], output_tokens_per_response=5)
record.responses = []

metric = TokensPerChunkMetric()
metric_dict = MetricRecordDict()
metric_dict[OutputSequenceLengthMetric.tag] = 5

with pytest.raises(NoMetricValue, match="Invalid Record"):
metric.parse_record(record, metric_dict)

def test_tokens_per_chunk_missing_dependency(self):
"""Test error when required OutputSequenceLengthMetric is missing"""
record = create_record(
start_ns=100, responses=[110, 120], output_tokens_per_response=5
)

metric = TokensPerChunkMetric()
empty_metrics = MetricRecordDict()

with pytest.raises(NoMetricValue, match="Missing required metric"):
metric.parse_record(record, empty_metrics)

def test_tokens_per_chunk_zero_tokens_pipeline(self):
"""Test zero tokens error through pipeline (get_or_raise treats 0 as falsy)"""
record = create_record(responses=[100, 110], output_tokens_per_response=0)

with pytest.raises(
NoMetricValue, match="Metric output_sequence_length is not available"
):
run_simple_metrics_pipeline(
[record], OutputSequenceLengthMetric.tag, TokensPerChunkMetric.tag
)
Loading