From 4c364661cbfa22ed546b8a5f07703c17687963c7 Mon Sep 17 00:00:00 2001 From: sravan27 Date: Fri, 5 Jun 2026 21:01:50 +0530 Subject: [PATCH] Fix FIO field limit document explosion --- src/chronicler/processors/fio_processor.py | 134 +++++++++--- src/chronicler/schema.py | 26 ++- tests/test_processor_parsing.py | 242 ++++++++++++++++++++- 3 files changed, 373 insertions(+), 29 deletions(-) diff --git a/src/chronicler/processors/fio_processor.py b/src/chronicler/processors/fio_processor.py index 440d481..9f0434f 100644 --- a/src/chronicler/processors/fio_processor.py +++ b/src/chronicler/processors/fio_processor.py @@ -1,6 +1,7 @@ import json import re import statistics +from dataclasses import replace from typing import Dict, Any, List, Optional, Tuple from pathlib import Path from datetime import datetime, timedelta @@ -8,7 +9,8 @@ from .base_processor import BaseProcessor, ProcessorError from ..schema import ( - Run, TimeSeriesPoint, TimeSeriesSummary, PrimaryMetric, + Run, TimeSeriesPoint, TimeSeriesSummary, PrimaryMetric, Results, + ZathrasDocument, create_run_key, create_sequence_key ) from ..utils.parser_utils import ( @@ -77,6 +79,78 @@ class FioProcessor(BaseProcessor): def get_test_name(self) -> str: return "fio" + def process_multiple(self) -> List[ZathrasDocument]: + """ + Process FIO as one summary document per workload. + + A single FIO archive can contain dozens of workload combinations. Keeping + those as dynamic `runs.run_N` objects in one OpenSearch summary document + multiplies every workload-specific field and can exceed the 5,000-field + limit. Splitting each workload into its own document follows the PyPerf + pattern and keeps detailed time series in `zathras-timeseries`. + """ + logger.info(f"Processing {self.get_test_name()} results (multi-document mode)...") + + try: + base_metadata = self.build_metadata() + test_info = self.build_test_info() + sut = self.build_system_under_test() + test_config = self.build_test_configuration() + runtime_info = self.build_runtime_info() + results_obj = self.build_results() + + if not results_obj.runs: + logger.warning("No FIO workloads found") + return [] + + documents = [] + + for run_key, run in results_obj.runs.items(): + workload_key = self._build_workload_document_key(run_key, run) + metadata = replace(base_metadata) + if metadata.scenario_name: + metadata.scenario_name = f"{metadata.scenario_name}_{workload_key}" + else: + metadata.scenario_name = f"fio_{workload_key}" + + single_run = replace(run, run_number=0) + single_results = Results( + status=run.status, + execution_time_seconds=run.duration_seconds, + total_runs=1, + runs={"run_0": single_run}, + ) + + bandwidth = (run.metrics or {}).get("total_bandwidth_kbps") + if isinstance(bandwidth, (int, float)): + single_results.primary_metric = PrimaryMetric( + name="total_bandwidth_kbps", + value=bandwidth, + unit="KiB/s", + ) + + document = ZathrasDocument( + metadata=metadata, + test=test_info, + system_under_test=sut, + test_configuration=test_config, + results=single_results, + runtime_info=runtime_info, + ) + + content_hash = document.calculate_content_hash() + document.metadata.document_id = f"fio_{workload_key}_{content_hash[:16]}" + documents.append(document) + + logger.info(f"Successfully processed {len(documents)} FIO workload documents") + return documents + + except Exception as e: + logger.error(f"Failed to process FIO results: {e}") + raise + finally: + self.archive_handler.cleanup() + def build_results(self) -> Any: """ Build Results object with overall primary metric. @@ -112,6 +186,20 @@ def build_results(self) -> Any: return results + def _build_workload_document_key(self, run_key: str, run: Run) -> str: + """Build a stable, readable key for a per-workload FIO document.""" + configuration = run.configuration or {} + parts = [ + configuration.get("operation"), + configuration.get("block_size"), + f"iodepth_{configuration.get('iodepth')}", + f"run_{run.run_number}", + ] + raw_key = "_".join(str(part) for part in parts if part not in (None, "unknown")) + if not raw_key: + raw_key = run_key + return re.sub(r"[^a-zA-Z0-9]+", "_", raw_key).strip("_").lower() + def parse_runs(self, extracted_result: Dict[str, Any]) -> Dict[str, Any]: """ Parse FIO runs into object-based structure. @@ -337,12 +425,6 @@ def _build_run_object( # Build aggregated metrics aggregated_metrics = self._aggregate_metrics(jobs, operation) - # Add per-job breakdown - aggregated_metrics['jobs'] = [ - self._build_job_metrics(job, i, operation) - for i, job in enumerate(jobs) - ] - # Add disk utilization if 'disk_util' in fio_data: aggregated_metrics['disk_utilization'] = fio_data['disk_util'] @@ -735,29 +817,27 @@ def _parse_timeseries( total_bw_values.append(total_bw) - # Build per-job breakdown (all jobs have valid data at index i after validation) - jobs_data = [ - { - "job_number": j, - "bandwidth_kbps": all_bw[j][i][1], - "iops": all_iops[j][i][1], - "latency_ns": all_lat[j][i][1], - "clat_ns": all_clat[j][i][1], - "slat_ns": all_slat[j][i][1], - } - for j in range(num_jobs) - ] + point_metrics = { + 'total_bandwidth_kbps': total_bw, + 'total_iops': total_iops, + 'avg_latency_ns': avg_lat, + 'avg_clat_ns': avg_clat, + 'avg_slat_ns': avg_slat, + } + + # Preserve per-job granularity as scalar point_metrics fields in + # zathras-timeseries, without nested arrays in the summary document. + for j in range(num_jobs): + job_prefix = f"job_{j}" + point_metrics[f'{job_prefix}_bandwidth_kbps'] = all_bw[j][i][1] + point_metrics[f'{job_prefix}_iops'] = all_iops[j][i][1] + point_metrics[f'{job_prefix}_latency_ns'] = all_lat[j][i][1] + point_metrics[f'{job_prefix}_clat_ns'] = all_clat[j][i][1] + point_metrics[f'{job_prefix}_slat_ns'] = all_slat[j][i][1] timeseries[create_sequence_key(i)] = TimeSeriesPoint( timestamp=sample_time.strftime("%Y-%m-%dT%H:%M:%SZ"), - metrics={ - 'total_bandwidth_kbps': total_bw, - 'total_iops': total_iops, - 'avg_latency_ns': avg_lat, - 'avg_clat_ns': avg_clat, - 'avg_slat_ns': avg_slat, - 'jobs': jobs_data - } + metrics=point_metrics ) # Calculate summary diff --git a/src/chronicler/schema.py b/src/chronicler/schema.py index b51bb0a..943f61f 100644 --- a/src/chronicler/schema.py +++ b/src/chronicler/schema.py @@ -554,13 +554,24 @@ def validate(self) -> tuple[bool, List[str]]: # Validate timeseries has sequence keys if run.timeseries: - for ts_key in run.timeseries.keys(): + for ts_key, ts_point in run.timeseries.items(): if not ts_key.startswith("sequence_"): errors.append( f"Invalid sequence key in {run_key}.timeseries: {ts_key}. " f"Must start with 'sequence_'" ) + if isinstance(ts_point, TimeSeriesPoint): + metrics = ts_point.metrics + else: + metrics = ts_point.get("metrics", {}) + for metric_name, metric_value in metrics.items(): + if isinstance(metric_value, (dict, list, tuple)): + errors.append( + f"{run_key}.timeseries.{ts_key}.metrics.{metric_name} " + "must be a scalar value, not a nested object or array" + ) + return (len(errors) == 0, errors) @@ -690,5 +701,18 @@ def validate_json_schema(document: Dict[str, Any]) -> tuple[bool, List[str]]: if 'timeseries' in run_data: if not isinstance(run_data['timeseries'], dict): errors.append(f"{run_key}.timeseries must be an object with timestamp keys") + else: + for ts_key, ts_point in run_data['timeseries'].items(): + metrics = ( + ts_point.get('metrics', {}) + if isinstance(ts_point, dict) + else {} + ) + for metric_name, metric_value in metrics.items(): + if isinstance(metric_value, (dict, list, tuple)): + errors.append( + f"{run_key}.timeseries.{ts_key}.metrics.{metric_name} " + "must be a scalar value, not a nested object or array" + ) return (len(errors) == 0, errors) diff --git a/tests/test_processor_parsing.py b/tests/test_processor_parsing.py index d56b490..4fa7754 100644 --- a/tests/test_processor_parsing.py +++ b/tests/test_processor_parsing.py @@ -5,6 +5,7 @@ and configuration handling for processors. """ +import json from pathlib import Path import pytest @@ -12,8 +13,18 @@ pytestmark = pytest.mark.unit from chronicler.processors.coremark_processor import CoreMarkProcessor +from chronicler.processors.fio_processor import FioProcessor from chronicler.processors.base_processor import ProcessorError -from chronicler.schema import Run, TimeSeriesPoint +from chronicler.schema import ( + Metadata, + Results, + Run, + SystemUnderTest, + TestConfiguration, + TestInfo, + TimeSeriesPoint, + ZathrasDocument, +) class TestCoreMarkRunGrouping: @@ -69,6 +80,235 @@ def test_groups_multiple_runs(self, tmp_path): assert "run_2" in runs +class TestFioFieldLimitProtection: + """Tests for FIO OpenSearch field-limit protections.""" + + def test_parse_runs_uses_scalar_timeseries_metrics(self, tmp_path): + workload_dir = tmp_path / "1-read-4KiB" + workload_dir.mkdir() + + fio_json = workload_dir / "fio-results.json" + fio_json.write_text(json.dumps(self._fio_results(num_jobs=2))) + + self._write_fio_logs(workload_dir, job_number=1, bandwidth=(100, 120)) + self._write_fio_logs(workload_dir, job_number=2, bandwidth=(200, 220)) + + processor = FioProcessor(str(tmp_path)) + runs = processor.parse_runs({ + "files": { + "fio_results_json": str(fio_json), + } + }) + + run = runs["run_0"] + assert "jobs" not in run.metrics + + point = run.timeseries["sequence_0"] + assert "jobs" not in point.metrics + assert point.metrics["total_bandwidth_kbps"] == 300 + assert point.metrics["job_0_bandwidth_kbps"] == 100 + assert point.metrics["job_1_bandwidth_kbps"] == 200 + assert all( + not isinstance(value, (dict, list, tuple)) + for value in point.metrics.values() + ) + + def test_process_multiple_splits_workloads_into_separate_documents( + self, tmp_path, monkeypatch + ): + processor = FioProcessor(str(tmp_path)) + runs = { + "run_0": Run( + run_number=0, + status="PASS", + metrics={"total_bandwidth_kbps": 300}, + configuration={ + "operation": "read", + "block_size": "4KiB", + "iodepth": 16, + }, + timeseries={ + "sequence_0": TimeSeriesPoint( + timestamp="2026-05-10T12:00:00Z", + metrics={"total_bandwidth_kbps": 300}, + ) + }, + ), + "run_1": Run( + run_number=1, + status="PASS", + metrics={"total_bandwidth_kbps": 900}, + configuration={ + "operation": "write", + "block_size": "1MiB", + "iodepth": 32, + }, + ), + } + + monkeypatch.setattr( + processor, + "build_metadata", + lambda: Metadata( + document_id="fio_base", + scenario_name="scenario", + processing_timestamp="2026-05-10T12:00:00Z", + ), + ) + monkeypatch.setattr( + processor, + "build_test_info", + lambda: TestInfo(name="fio", version="unknown"), + ) + monkeypatch.setattr(processor, "build_system_under_test", SystemUnderTest) + monkeypatch.setattr(processor, "build_test_configuration", TestConfiguration) + monkeypatch.setattr( + processor, + "build_results", + lambda: Results(status="PASS", total_runs=2, runs=runs), + ) + monkeypatch.setattr(processor, "build_runtime_info", lambda: None) + + documents = processor.process_multiple() + + assert len(documents) == 2 + assert [list(doc.results.runs.keys()) for doc in documents] == [ + ["run_0"], + ["run_0"], + ] + assert documents[0].metadata.document_id.startswith( + "fio_read_4kib_iodepth_16_run_0_" + ) + assert documents[1].metadata.document_id.startswith( + "fio_write_1mib_iodepth_32_run_1_" + ) + assert documents[0].metadata.scenario_name == ( + "scenario_read_4kib_iodepth_16_run_0" + ) + + def test_schema_validation_rejects_nested_timeseries_metrics(self): + document = ZathrasDocument( + metadata=Metadata(document_id="fio_doc"), + test=TestInfo(name="fio", version="unknown"), + system_under_test=SystemUnderTest(), + test_configuration=TestConfiguration(), + results=Results( + status="PASS", + runs={ + "run_0": Run( + run_number=0, + status="PASS", + timeseries={ + "sequence_0": TimeSeriesPoint( + timestamp="2026-05-10T12:00:00Z", + metrics={ + "total_bandwidth_kbps": 300, + "jobs": [{"job_number": 0}], + }, + ) + }, + ) + }, + ), + ) + + is_valid, errors = document.validate() + + assert not is_valid + assert any("must be a scalar value" in error for error in errors) + + @staticmethod + def _fio_results(num_jobs): + jobs = [] + for job_number in range(num_jobs): + jobs.append({ + "jobname": f"job-{job_number}", + "elapsed": 2, + "job options": { + "filename": f"/dev/nvme{job_number}n1", + }, + "usr_cpu": 1.0, + "sys_cpu": 2.0, + "read": { + "io_bytes": 4096, + "total_ios": 4, + "bw": 100 * (job_number + 1), + "bw_min": 90, + "bw_max": 120, + "bw_mean": 100, + "iops": 10 * (job_number + 1), + "iops_min": 8, + "iops_max": 12, + "iops_mean": 10, + "runtime": 2000, + "lat_ns": { + "mean": 1000, + "min": 900, + "max": 1100, + "stddev": 10, + }, + "clat_ns": { + "mean": 800, + "min": 700, + "max": 900, + "stddev": 8, + "percentile": { + "1.000000": 700, + "5.000000": 720, + "10.000000": 740, + "50.000000": 800, + "90.000000": 860, + "95.000000": 880, + "99.000000": 900, + "99.500000": 920, + "99.900000": 940, + }, + }, + "slat_ns": { + "mean": 200, + "min": 100, + "max": 300, + "stddev": 2, + }, + }, + "write": { + "io_bytes": 0, + }, + }) + + return { + "timestamp": 1780000000, + "global options": { + "iodepth": "16", + "runtime": "2", + "time_based": "1", + }, + "jobs": jobs, + } + + @staticmethod + def _write_fio_logs(workload_dir, job_number, bandwidth): + suffix = f".{job_number}.log" + samples = "\n".join( + f"{index * 1000},{value},0,0,0" + for index, value in enumerate(bandwidth) + ) + iops = "\n".join( + f"{index * 1000},{value / 10},0,0,0" + for index, value in enumerate(bandwidth) + ) + latency = "\n".join( + f"{index * 1000},{1000 + index},0,0,0" + for index, _ in enumerate(bandwidth) + ) + + (workload_dir / f"fio_bw{suffix}").write_text(samples) + (workload_dir / f"fio_iops{suffix}").write_text(iops) + (workload_dir / f"fio_lat{suffix}").write_text(latency) + (workload_dir / f"fio_clat{suffix}").write_text(latency) + (workload_dir / f"fio_slat{suffix}").write_text(latency) + + class TestCoreMarkMetricExtraction: """Tests for CoreMark metric extraction from CSV."""