Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
47 changes: 47 additions & 0 deletions examples/trt-external-payload-cluster-density-percentiles.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
tests:
- name: payload-cluster-density-v2
metadata:
platform: AWS
clusterType: self-managed
masterNodesType: m6a.xlarge
masterNodesCount: 3
workerNodesType: m6a.xlarge
workerNodesCount: 6
benchmark.keyword: cluster-density-v2
ocpVersion: "{{ version }}"
networkType: OVNKubernetes
jobType: {{ jobtype | default('periodic') }}
pullNumber: {{ pull_number | default(0) }}
organization: {{ organization | default('') }}
repository: {{ repository | default('') }}
not:
stream: okd

metrics:
- name: podReadyLatency
metricName.keyword: podLatencyMeasurement
metric_of_interest: containersReadyLatency
agg:
value: cpu
agg_type: percentiles
percents: [10,25,50,90,95]
not:
jobConfig.name: "garbage-collection"
labels:
- "[Jira: PerfScale]"
direction: 1
threshold: 10

- name: kube-apiserverCPU
metricName.keyword: containerCPU
labels.namespace.keyword: openshift-kube-apiserver
labels.container.keyword: kube-apiserver
metric_of_interest: value
agg:
value: cpu
agg_type: percentiles
percents: [10,25,50,90,95]
labels:
- "[Jira: kube-apiserver]"
direction: 1
threshold: 10
21 changes: 15 additions & 6 deletions orion/matcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -401,20 +401,29 @@ def parse_agg_results(
return res

uuids = data.aggregations.uuid.buckets

for uuid in uuids:
data = {
self.uuid_field: uuid.key,
timestamp_field: uuid.time.value_as_string,
}
value_key = agg_value + "_" + agg_type
if agg_type == "percentiles":
# For percentiles, extract the target percentile value
# Default to 95th percentile if not specified
percentile_values = uuid.get(agg_value).values
# OpenSearch returns percentile keys as strings (e.g., "95.0")
percentile_key = str(float(metrics["agg"].get("target_percentile", "95.0")))
data[value_key] = percentile_values.get(percentile_key)
self.logger.info("AC agg_type == percentiles")
percentile_dict = uuid.get(agg_value).to_dict().get("values", {})
if metrics and "agg" in metrics and "target_percentile" in metrics["agg"]:
target_percentile = float(metrics["agg"]["target_percentile"])
percentile_key = str(target_percentile)
self.logger.info("found target_percentile %s", target_percentile)
value_key = agg_value + "_" + agg_type + "_" + percentile_key
data[value_key] = percentile_dict.get(percentile_key)
else:
self.logger.info("no target_percentile found, using all percentiles")
for key, val in percentile_dict.items():
self.logger.info("percentile_values value %s", key)
data[agg_value + "_" + agg_type + "_" + str(key)] = val
else:
# Standard single-value aggregations
data[value_key] = uuid.get(agg_value).value
res.append(data)
return res
Expand Down
15 changes: 8 additions & 7 deletions orion/tests/test_matcher_aggregations.py
Original file line number Diff line number Diff line change
Expand Up @@ -167,7 +167,8 @@ def mock_execute(self):
@pytest.mark.parametrize(
"fixture_name,test_uuids,test_metrics,data_dict,expected",
[
# Test percentile aggregation with default target (95th percentile)
# Test percentile aggregation with no target_percentile
# Should return all percents
(
"matcher_instance",
["uuid1", "uuid2"],
Expand Down Expand Up @@ -200,8 +201,8 @@ def mock_execute(self):
}
},
[
{"uuid": "uuid1", "timestamp": "2024-02-09T12:00:00", "response_time_ms_percentiles": 250.3},
{"uuid": "uuid2", "timestamp": "2024-02-09T13:00:00", "response_time_ms_percentiles": 260.8},
{"uuid": "uuid1", "timestamp": "2024-02-09T12:00:00", "response_time_ms_percentiles_50.0": 100.5, "response_time_ms_percentiles_95.0": 250.3, "response_time_ms_percentiles_99.0": 350.7 },
{"uuid": "uuid2", "timestamp": "2024-02-09T13:00:00", "response_time_ms_percentiles_50.0": 105.2, "response_time_ms_percentiles_95.0": 260.8, "response_time_ms_percentiles_99.0": 360.1 },
],
),
# Test percentile aggregation with custom target (99th percentile)
Expand Down Expand Up @@ -238,8 +239,8 @@ def mock_execute(self):
}
},
[
{"uuid": "uuid1", "timestamp": "2024-02-09T12:00:00", "response_time_ms_percentiles": 350.7},
{"uuid": "uuid2", "timestamp": "2024-02-09T13:00:00", "response_time_ms_percentiles": 360.1},
{"uuid": "uuid1", "timestamp": "2024-02-09T12:00:00", "response_time_ms_percentiles_99.0": 350.7},
{"uuid": "uuid2", "timestamp": "2024-02-09T13:00:00", "response_time_ms_percentiles_99.0": 360.1},
],
),
# Test percentile aggregation with uuid_matcher_instance
Expand Down Expand Up @@ -276,8 +277,8 @@ def mock_execute(self):
}
},
[
{"run_uuid": "uuid1", "timestamp": "2024-02-09T12:00:00", "value_ms_percentiles": 150.2},
{"run_uuid": "uuid2", "timestamp": "2024-02-09T13:00:00", "value_ms_percentiles": 155.8},
{"run_uuid": "uuid1", "timestamp": "2024-02-09T12:00:00", "value_ms_percentiles_95.0": 150.2},
{"run_uuid": "uuid2", "timestamp": "2024-02-09T13:00:00", "value_ms_percentiles_95.0": 155.8},
],
),
],
Expand Down
65 changes: 46 additions & 19 deletions orion/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -69,19 +69,21 @@ def get_metric_data(
self.logger.info("Collecting %s", metric_name)
try:
if "agg" in metric:
metric_df, metric_dataframe_name = self.process_aggregation_metric(
metric_df, metric_dataframe_names = self.process_aggregation_metric(
uuids, metric, match, timestamp_field
)
else:
metric_df, metric_dataframe_name = self.process_standard_metric(
metric_df, single_name = self.process_standard_metric(
uuids, metric, match, metric_value_field, timestamp_field
)
metric_dataframe_names = [single_name]
metric["labels"] = labels
metric["direction"] = direction
metric["threshold"] = threshold
metric["correlation"] = correlation
metric["context"] = context
metrics_config[metric_dataframe_name] = metric
for metric_dataframe_name in metric_dataframe_names:
metrics_config[metric_dataframe_name] = metric
dataframe_list.append(metric_df)
self.logger.debug(metric_df)
except Exception as e:
Expand All @@ -96,39 +98,64 @@ def get_metric_data(
def process_aggregation_metric(
self, uuids: List[str], metric: Dict[str, Any], match: Matcher, timestamp_field: str="timestamp"
) -> pd.DataFrame:
"""Method to get aggregated dataframe
"""
Method to get an aggregated dataframe for a given metric.

Args:
uuids (List[str]): _description_
metric (Dict[str, Any]): _description_
match (Matcher): _description_
uuids (List[str]): List of UUIDs to include in the aggregation.
metric (Dict[str, Any]): Metric configuration dictionary.
match (Matcher): Matcher instance for query operations.
timestamp_field (str, optional): Timestamp field to use. Defaults to "timestamp".

Returns:
pd.DataFrame: _description_
pd.DataFrame: Aggregated metric dataframe and list of metric column names.
"""
self.logger.info("process_aggregation_metric")
aggregated_metric_data = match.get_agg_metric_query(uuids, metric, timestamp_field)
self.logger.info("aggregated_metric_data %s", aggregated_metric_data)
aggregation_value = metric["agg"]["value"]
aggregation_type = metric["agg"]["agg_type"]
aggregation_name = f"{aggregation_value}_{aggregation_type}"

if aggregation_type == "percentiles":
percentile_prefix = f"{aggregation_value}_{aggregation_type}_"
if aggregated_metric_data:
agg_columns = [k for k in aggregated_metric_data[0].keys()
if k.startswith(percentile_prefix)]
else:
agg_columns = []
self.logger.info("percentile columns found: %s", agg_columns)
else:
agg_columns = [f"{aggregation_value}_{aggregation_type}"]

all_columns = [self.uuid_field, timestamp_field] + agg_columns

if len(aggregated_metric_data) == 0:
aggregated_df = pd.DataFrame(columns=[self.uuid_field, timestamp_field, aggregation_name])
aggregated_df = pd.DataFrame(columns=all_columns)
else:
aggregated_df = match.convert_to_df(
aggregated_metric_data, columns=[self.uuid_field, timestamp_field, aggregation_name],
aggregated_metric_data, columns=all_columns,
timestamp_field=timestamp_field
)
aggregated_df.loc[:, timestamp_field] = aggregated_df[timestamp_field].apply(self.standardize_timestamp)

aggregated_df = aggregated_df.drop_duplicates(subset=[self.uuid_field], keep="first")
aggregated_metric_name = f"{metric['name']}_{aggregation_type}"
aggregated_df = aggregated_df.rename(
columns={aggregation_name: aggregated_metric_name}
)

rename_map = {}
aggregated_metric_names = []
for col in agg_columns:
if aggregation_type == "percentiles":
suffix = col[len(f"{aggregation_value}_{aggregation_type}_"):]
new_name = f"{metric['name']}_{aggregation_type}_{suffix}"
else:
new_name = f"{metric['name']}_{aggregation_type}"
rename_map[col] = new_name
aggregated_metric_names.append(new_name)

aggregated_df = aggregated_df.rename(columns=rename_map)
if timestamp_field != "timestamp":
aggregated_df = aggregated_df.rename(
columns={timestamp_field: "timestamp"}
)
return aggregated_df, aggregated_metric_name
aggregated_df = aggregated_df.rename(columns={timestamp_field: "timestamp"})

return aggregated_df, aggregated_metric_names

def standardize_timestamp(self, timestamp: Any) -> str:
"""Method to standardize timestamp formats
Expand Down
Loading