Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 32 additions & 0 deletions src/chronicler/processors/streams_processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,9 +30,31 @@ def _validate_streams_timestamp(value: str, context: str) -> str:
class StreamsProcessor(BaseProcessor):
"""Processor for STREAMS memory bandwidth benchmark results."""

def __init__(self, result_dir):
super().__init__(result_dir)
self._benchmark_version: Optional[str] = None

def get_test_name(self) -> str:
return "streams"

def build_test_info(self):
"""Override to use benchmark version from CSV instead of wrapper version.

Extracts STREAMS benchmark version from CSV metadata comment
(e.g., "# streams_version_# 5.10") and uses it for test.version,
while preserving wrapper_version from base implementation.
"""
from ..schema import TestInfo

base_info = super().build_test_info()

# Use benchmark version if extracted, otherwise fall back to wrapper version
return TestInfo(
name=self.get_test_name(),
version=self._benchmark_version or base_info.version,
wrapper_version=base_info.wrapper_version
)

def _extract_primary_metrics(
self, runs: Dict[str, Any],
overall_stats: Optional[StatisticalSummary]
Expand Down Expand Up @@ -121,6 +143,8 @@ def parse_runs(self, extracted_result: Dict[str, Any]) -> Dict[str, Any]:
}
}
"""
# Reset benchmark version to prevent state leakage across multiple parses
self._benchmark_version = None
# Resolve CSV path: direct file (demo/style like tmp/coremark) or from extracted_path
csv_file: Optional[Path] = None
streams_dir: Optional[Path] = None
Expand Down Expand Up @@ -196,6 +220,14 @@ def _parse_streams_csv(self, csv_file: Path) -> Dict[str, Any]:
run_number += 1
continue

# Extract benchmark version from CSV comment
if line_stripped.startswith('#') and 'streams_version_#' in line_stripped:
if self._benchmark_version is None: # Use first occurrence only
match = re.search(r'streams_version_#\s+(\S+)', line_stripped)
if match:
self._benchmark_version = match.group(1)
continue
Comment thread
coderabbitai[bot] marked this conversation as resolved.

# Extract optimization level from comments
if line_stripped.startswith('#') and "Optimization level:" in line_stripped:
opt_match = re.search(r'O(\d+)', line_stripped)
Expand Down
140 changes: 140 additions & 0 deletions tests/test_streams_version_extraction.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,140 @@
"""
STREAMS processor: benchmark version extraction from CSV metadata comments.
"""

import pytest
from pathlib import Path

from chronicler.processors.streams_processor import StreamsProcessor

pytestmark = pytest.mark.unit
from conftest import run_processor_parse

FILE_KEY = "results_streams_csv"
FILENAME = "results_streams.csv"


def _write_csv(result_dir: Path, content: str) -> Path:
path = result_dir / FILENAME
path.write_text(content.strip())
return path


def test_streams_extracts_version_from_csv_comment(result_dir):
"""Extract benchmark version from '# streams_version_# 5.10' comment."""
csv = """# streams_version_# 5.10
# Optimization level: O2
Array sizes,16384k,32768k,Start_Date,End_Date
Copy,1.0,2.0,2026-02-04T00:19:56Z,2026-02-04T00:20:00Z"""
path = _write_csv(result_dir, csv)

processor = StreamsProcessor(str(result_dir))
processor.parse_runs({"files": {FILE_KEY: str(path)}})
test_info = processor.build_test_info()

assert test_info.version == "5.10", "Should extract benchmark version from CSV comment"
assert test_info.wrapper_version is not None, "Should preserve wrapper version"


def test_streams_version_with_whitespace_variations(result_dir):
"""Handle various whitespace around version number."""
csv = """#streams_version_# 5.10
# Optimization level: O2
Array sizes,16384k,Start_Date,End_Date
Copy,1.0,2026-02-04T00:19:56Z,2026-02-04T00:20:00Z"""
path = _write_csv(result_dir, csv)

processor = StreamsProcessor(str(result_dir))
processor.parse_runs({"files": {FILE_KEY: str(path)}})
test_info = processor.build_test_info()

assert test_info.version == "5.10"


def test_streams_version_missing_fallback_to_wrapper(result_dir):
"""When no benchmark version comment, fall back to wrapper version."""
csv = """# Optimization level: O2
Array sizes,16384k,Start_Date,End_Date
Copy,1.0,2026-02-04T00:19:56Z,2026-02-04T00:20:00Z"""
path = _write_csv(result_dir, csv)

# Create test_info file with wrapper version
test_info_file = result_dir / "test_info"
test_info_file.write_text('{"streams": {"test_name": "streams", "repo_file": "v2.8.tar.gz"}}')

processor = StreamsProcessor(str(result_dir))
processor.parse_runs({"files": {FILE_KEY: str(path)}})
test_info = processor.build_test_info()

assert test_info.version == "v2.8", "Should fall back to wrapper version when no benchmark version"
assert test_info.wrapper_version == "v2.8"


def test_streams_version_only_uses_first_occurrence(result_dir):
"""If multiple version comments, use first one."""
csv = """# streams_version_# 5.10
# Optimization level: O2
# streams_version_# 6.0
Array sizes,16384k,Start_Date,End_Date
Copy,1.0,2026-02-04T00:19:56Z,2026-02-04T00:20:00Z"""
path = _write_csv(result_dir, csv)

processor = StreamsProcessor(str(result_dir))
processor.parse_runs({"files": {FILE_KEY: str(path)}})
test_info = processor.build_test_info()

assert test_info.version == "5.10", "Should use first version comment"


def test_streams_version_different_formats(result_dir):
"""Handle different version number formats (x.y, x.y.z, vX.Y, etc)."""
test_cases = [
("5.10", "5.10"),
("5.10.1", "5.10.1"),
("v5.10", "v5.10"),
("2024.1", "2024.1"),
]

for version_str, expected in test_cases:
csv = f"""# streams_version_# {version_str}
Array sizes,16384k,Start_Date,End_Date
Copy,1.0,2026-02-04T00:19:56Z,2026-02-04T00:20:00Z"""
path = _write_csv(result_dir, csv)

processor = StreamsProcessor(str(result_dir))
processor.parse_runs({"files": {FILE_KEY: str(path)}})
test_info = processor.build_test_info()

assert test_info.version == expected, f"Should handle version format: {version_str}"


def test_streams_version_resets_between_parses(result_dir):
"""Processor reuse: version state should not leak between parse_runs() calls."""
# First parse: CSV with benchmark version
csv1 = """# streams_version_# 5.10
# Optimization level: O2
Array sizes,16384k,Start_Date,End_Date
Copy,1.0,2026-02-04T00:19:56Z,2026-02-04T00:20:00Z"""
path1 = _write_csv(result_dir, csv1)

# Create test_info file with wrapper version for fallback
test_info_file = result_dir / "test_info"
test_info_file.write_text('{"streams": {"test_name": "streams", "repo_file": "v2.8.tar.gz"}}')

processor = StreamsProcessor(str(result_dir))
processor.parse_runs({"files": {FILE_KEY: str(path1)}})
test_info1 = processor.build_test_info()

assert test_info1.version == "5.10", "First parse should extract benchmark version"

# Second parse: CSV WITHOUT benchmark version (reusing same processor instance)
csv2 = """# Optimization level: O3
Array sizes,16384k,Start_Date,End_Date
Copy,2.0,2026-02-05T00:19:56Z,2026-02-05T00:20:00Z"""
path2 = _write_csv(result_dir, csv2)

processor.parse_runs({"files": {FILE_KEY: str(path2)}})
test_info2 = processor.build_test_info()

assert test_info2.version == "v2.8", "Second parse should fall back to wrapper version, not retain stale '5.10'"
assert test_info2.wrapper_version == "v2.8"
Loading