Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
134 changes: 107 additions & 27 deletions src/chronicler/processors/fio_processor.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,16 @@
import json
import re
import statistics
from dataclasses import replace
from typing import Dict, Any, List, Optional, Tuple
from pathlib import Path
from datetime import datetime, timedelta
import logging

from .base_processor import BaseProcessor, ProcessorError
from ..schema import (
Run, TimeSeriesPoint, TimeSeriesSummary, PrimaryMetric,
Run, TimeSeriesPoint, TimeSeriesSummary, PrimaryMetric, Results,
ZathrasDocument,
create_run_key, create_sequence_key
)
from ..utils.parser_utils import (
Expand Down Expand Up @@ -77,6 +79,78 @@ class FioProcessor(BaseProcessor):
def get_test_name(self) -> str:
return "fio"

def process_multiple(self) -> List[ZathrasDocument]:
"""
Process FIO as one summary document per workload.

A single FIO archive can contain dozens of workload combinations. Keeping
those as dynamic `runs.run_N` objects in one OpenSearch summary document
multiplies every workload-specific field and can exceed the 5,000-field
limit. Splitting each workload into its own document follows the PyPerf
pattern and keeps detailed time series in `zathras-timeseries`.
"""
logger.info(f"Processing {self.get_test_name()} results (multi-document mode)...")

try:
base_metadata = self.build_metadata()
test_info = self.build_test_info()
sut = self.build_system_under_test()
test_config = self.build_test_configuration()
runtime_info = self.build_runtime_info()
results_obj = self.build_results()

if not results_obj.runs:
logger.warning("No FIO workloads found")
return []

documents = []

for run_key, run in results_obj.runs.items():
workload_key = self._build_workload_document_key(run_key, run)
metadata = replace(base_metadata)
if metadata.scenario_name:
metadata.scenario_name = f"{metadata.scenario_name}_{workload_key}"
else:
metadata.scenario_name = f"fio_{workload_key}"

single_run = replace(run, run_number=0)
single_results = Results(
status=run.status,
execution_time_seconds=run.duration_seconds,
total_runs=1,
runs={"run_0": single_run},
)

bandwidth = (run.metrics or {}).get("total_bandwidth_kbps")
if isinstance(bandwidth, (int, float)):
single_results.primary_metric = PrimaryMetric(
name="total_bandwidth_kbps",
value=bandwidth,
unit="KiB/s",
)

document = ZathrasDocument(
metadata=metadata,
test=test_info,
system_under_test=sut,
test_configuration=test_config,
results=single_results,
runtime_info=runtime_info,
)

content_hash = document.calculate_content_hash()
document.metadata.document_id = f"fio_{workload_key}_{content_hash[:16]}"
documents.append(document)

logger.info(f"Successfully processed {len(documents)} FIO workload documents")
return documents

except Exception as e:
logger.error(f"Failed to process FIO results: {e}")
raise
finally:
self.archive_handler.cleanup()

def build_results(self) -> Any:
"""
Build Results object with overall primary metric.
Expand Down Expand Up @@ -112,6 +186,20 @@ def build_results(self) -> Any:

return results

def _build_workload_document_key(self, run_key: str, run: Run) -> str:
"""Build a stable, readable key for a per-workload FIO document."""
configuration = run.configuration or {}
parts = [
configuration.get("operation"),
configuration.get("block_size"),
f"iodepth_{configuration.get('iodepth')}",
f"run_{run.run_number}",
]
raw_key = "_".join(str(part) for part in parts if part not in (None, "unknown"))
if not raw_key:
raw_key = run_key
return re.sub(r"[^a-zA-Z0-9]+", "_", raw_key).strip("_").lower()

def parse_runs(self, extracted_result: Dict[str, Any]) -> Dict[str, Any]:
"""
Parse FIO runs into object-based structure.
Expand Down Expand Up @@ -337,12 +425,6 @@ def _build_run_object(
# Build aggregated metrics
aggregated_metrics = self._aggregate_metrics(jobs, operation)

# Add per-job breakdown
aggregated_metrics['jobs'] = [
self._build_job_metrics(job, i, operation)
for i, job in enumerate(jobs)
]

# Add disk utilization
if 'disk_util' in fio_data:
aggregated_metrics['disk_utilization'] = fio_data['disk_util']
Expand Down Expand Up @@ -735,29 +817,27 @@ def _parse_timeseries(

total_bw_values.append(total_bw)

# Build per-job breakdown (all jobs have valid data at index i after validation)
jobs_data = [
{
"job_number": j,
"bandwidth_kbps": all_bw[j][i][1],
"iops": all_iops[j][i][1],
"latency_ns": all_lat[j][i][1],
"clat_ns": all_clat[j][i][1],
"slat_ns": all_slat[j][i][1],
}
for j in range(num_jobs)
]
point_metrics = {
'total_bandwidth_kbps': total_bw,
'total_iops': total_iops,
'avg_latency_ns': avg_lat,
'avg_clat_ns': avg_clat,
'avg_slat_ns': avg_slat,
}

# Preserve per-job granularity as scalar point_metrics fields in
# zathras-timeseries, without nested arrays in the summary document.
for j in range(num_jobs):
job_prefix = f"job_{j}"
point_metrics[f'{job_prefix}_bandwidth_kbps'] = all_bw[j][i][1]
point_metrics[f'{job_prefix}_iops'] = all_iops[j][i][1]
point_metrics[f'{job_prefix}_latency_ns'] = all_lat[j][i][1]
point_metrics[f'{job_prefix}_clat_ns'] = all_clat[j][i][1]
point_metrics[f'{job_prefix}_slat_ns'] = all_slat[j][i][1]

timeseries[create_sequence_key(i)] = TimeSeriesPoint(
timestamp=sample_time.strftime("%Y-%m-%dT%H:%M:%SZ"),
metrics={
'total_bandwidth_kbps': total_bw,
'total_iops': total_iops,
'avg_latency_ns': avg_lat,
'avg_clat_ns': avg_clat,
'avg_slat_ns': avg_slat,
'jobs': jobs_data
}
metrics=point_metrics
)

# Calculate summary
Expand Down
26 changes: 25 additions & 1 deletion src/chronicler/schema.py
Original file line number Diff line number Diff line change
Expand Up @@ -554,13 +554,24 @@ def validate(self) -> tuple[bool, List[str]]:

# Validate timeseries has sequence keys
if run.timeseries:
for ts_key in run.timeseries.keys():
for ts_key, ts_point in run.timeseries.items():
if not ts_key.startswith("sequence_"):
errors.append(
f"Invalid sequence key in {run_key}.timeseries: {ts_key}. "
f"Must start with 'sequence_'"
)

if isinstance(ts_point, TimeSeriesPoint):
metrics = ts_point.metrics
else:
metrics = ts_point.get("metrics", {})
for metric_name, metric_value in metrics.items():
if isinstance(metric_value, (dict, list, tuple)):
errors.append(
f"{run_key}.timeseries.{ts_key}.metrics.{metric_name} "
"must be a scalar value, not a nested object or array"
)

return (len(errors) == 0, errors)


Expand Down Expand Up @@ -690,5 +701,18 @@ def validate_json_schema(document: Dict[str, Any]) -> tuple[bool, List[str]]:
if 'timeseries' in run_data:
if not isinstance(run_data['timeseries'], dict):
errors.append(f"{run_key}.timeseries must be an object with timestamp keys")
else:
for ts_key, ts_point in run_data['timeseries'].items():
metrics = (
ts_point.get('metrics', {})
if isinstance(ts_point, dict)
else {}
)
for metric_name, metric_value in metrics.items():
if isinstance(metric_value, (dict, list, tuple)):
errors.append(
f"{run_key}.timeseries.{ts_key}.metrics.{metric_name} "
"must be a scalar value, not a nested object or array"
)

return (len(errors) == 0, errors)
Loading