Skip to content

Commit c6ec0a1

Browse files
authored
Add aggregate command (#638)
Signed-off-by: Michael Oviedo <[email protected]>
1 parent 542be24 commit c6ec0a1

File tree

3 files changed

+404
-1
lines changed

3 files changed

+404
-1
lines changed

osbenchmark/aggregator.py

+250
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,250 @@
1+
import os
2+
from typing import Any, Dict, List, Union
3+
import uuid
4+
5+
from osbenchmark.metrics import FileTestExecutionStore
6+
from osbenchmark import metrics, workload, config
7+
from osbenchmark.utils import io as rio
8+
9+
class Aggregator:
10+
def __init__(self, cfg, test_executions_dict, args):
11+
self.config = cfg
12+
self.args = args
13+
self.test_executions = test_executions_dict
14+
self.accumulated_results: Dict[str, Dict[str, List[Any]]] = {}
15+
self.accumulated_iterations: Dict[str, int] = {}
16+
self.metrics = ["throughput", "latency", "service_time", "client_processing_time", "processing_time", "error_rate", "duration"]
17+
self.test_store = metrics.test_execution_store(self.config)
18+
self.cwd = cfg.opts("node", "benchmark.cwd")
19+
20+
def count_iterations_for_each_op(self) -> None:
21+
loaded_workload = workload.load_workload(self.config)
22+
test_procedure_name = self.config.opts("workload", "test_procedure.name")
23+
test_procedure_found = False
24+
25+
for test_procedure in loaded_workload.test_procedures:
26+
if test_procedure.name == test_procedure_name:
27+
test_procedure_found = True
28+
for task in test_procedure.schedule:
29+
task_name = task.name
30+
iterations = task.iterations or 1
31+
self.accumulated_iterations[task_name] = self.accumulated_iterations.get(task_name, 0) + iterations
32+
else:
33+
continue # skip to the next test procedure if the name doesn't match
34+
35+
if not test_procedure_found:
36+
raise ValueError(f"Test procedure '{test_procedure_name}' not found in the loaded workload.")
37+
38+
def accumulate_results(self, test_execution: Any) -> None:
39+
for item in test_execution.results.get("op_metrics", []):
40+
task = item.get("task", "")
41+
self.accumulated_results.setdefault(task, {})
42+
for metric in self.metrics:
43+
self.accumulated_results[task].setdefault(metric, [])
44+
self.accumulated_results[task][metric].append(item.get(metric))
45+
46+
def aggregate_json_by_key(self, key_path: Union[str, List[str]]) -> Any:
47+
all_jsons = [self.test_store.find_by_test_execution_id(id).results for id in self.test_executions.keys()]
48+
49+
# retrieve nested value from a dictionary given a key path
50+
def get_nested_value(obj: Dict[str, Any], path: List[str]) -> Any:
51+
for key in path:
52+
if isinstance(obj, dict):
53+
obj = obj.get(key, {})
54+
elif isinstance(obj, list) and key.isdigit():
55+
obj = obj[int(key)] if int(key) < len(obj) else {}
56+
else:
57+
return None
58+
return obj
59+
60+
def aggregate_helper(objects: List[Any]) -> Any:
61+
if not objects:
62+
return None
63+
if all(isinstance(obj, (int, float)) for obj in objects):
64+
avg = sum(objects) / len(objects)
65+
return avg
66+
if all(isinstance(obj, dict) for obj in objects):
67+
keys = set().union(*objects)
68+
return {key: aggregate_helper([obj.get(key) for obj in objects]) for key in keys}
69+
if all(isinstance(obj, list) for obj in objects):
70+
max_length = max(len(obj) for obj in objects)
71+
return [aggregate_helper([obj[i] if i < len(obj) else None for obj in objects]) for i in range(max_length)]
72+
return next((obj for obj in objects if obj is not None), None)
73+
74+
if isinstance(key_path, str):
75+
key_path = key_path.split('.')
76+
77+
values = [get_nested_value(json, key_path) for json in all_jsons]
78+
return aggregate_helper(values)
79+
80+
def build_aggregated_results(self):
81+
test_exe = self.test_store.find_by_test_execution_id(list(self.test_executions.keys())[0])
82+
aggregated_results = {
83+
"op-metrics": [],
84+
"correctness_metrics": self.aggregate_json_by_key("correctness_metrics"),
85+
"total_time": self.aggregate_json_by_key("total_time"),
86+
"total_time_per_shard": self.aggregate_json_by_key("total_time_per_shard"),
87+
"indexing_throttle_time": self.aggregate_json_by_key("indexing_throttle_time"),
88+
"indexing_throttle_time_per_shard": self.aggregate_json_by_key("indexing_throttle_time_per_shard"),
89+
"merge_time": self.aggregate_json_by_key("merge_time"),
90+
"merge_time_per_shard": self.aggregate_json_by_key("merge_time_per_shard"),
91+
"merge_count": self.aggregate_json_by_key("merge_count"),
92+
"refresh_time": self.aggregate_json_by_key("refresh_time"),
93+
"refresh_time_per_shard": self.aggregate_json_by_key("refresh_time_per_shard"),
94+
"refresh_count": self.aggregate_json_by_key("refresh_count"),
95+
"flush_time": self.aggregate_json_by_key("flush_time"),
96+
"flush_time_per_shard": self.aggregate_json_by_key("flush_time_per_shard"),
97+
"flush_count": self.aggregate_json_by_key("flush_count"),
98+
"merge_throttle_time": self.aggregate_json_by_key("merge_throttle_time"),
99+
"merge_throttle_time_per_shard": self.aggregate_json_by_key("merge_throttle_time_per_shard"),
100+
"ml_processing_time": self.aggregate_json_by_key("ml_processing_time"),
101+
"young_gc_time": self.aggregate_json_by_key("young_gc_time"),
102+
"young_gc_count": self.aggregate_json_by_key("young_gc_count"),
103+
"old_gc_time": self.aggregate_json_by_key("old_gc_time"),
104+
"old_gc_count": self.aggregate_json_by_key("old_gc_count"),
105+
"memory_segments": self.aggregate_json_by_key("memory_segments"),
106+
"memory_doc_values": self.aggregate_json_by_key("memory_doc_values"),
107+
"memory_terms": self.aggregate_json_by_key("memory_terms"),
108+
"memory_norms": self.aggregate_json_by_key("memory_norms"),
109+
"memory_points": self.aggregate_json_by_key("memory_points"),
110+
"memory_stored_fields": self.aggregate_json_by_key("memory_stored_fields"),
111+
"store_size": self.aggregate_json_by_key("store_size"),
112+
"translog_size": self.aggregate_json_by_key("translog_size"),
113+
"segment_count": self.aggregate_json_by_key("segment_count"),
114+
"total_transform_search_times": self.aggregate_json_by_key("total_transform_search_times"),
115+
"total_transform_index_times": self.aggregate_json_by_key("total_transform_index_times"),
116+
"total_transform_processing_times": self.aggregate_json_by_key("total_transform_processing_times"),
117+
"total_transform_throughput": self.aggregate_json_by_key("total_transform_throughput")
118+
}
119+
120+
for task, task_metrics in self.accumulated_results.items():
121+
iterations = self.accumulated_iterations.get(task, 1)
122+
aggregated_task_metrics = self.calculate_weighted_average(task_metrics, iterations)
123+
op_metric = {
124+
"task": task,
125+
"operation": task,
126+
"throughput": aggregated_task_metrics["throughput"],
127+
"latency": aggregated_task_metrics["latency"],
128+
"service_time": aggregated_task_metrics["service_time"],
129+
"client_processing_time": aggregated_task_metrics["client_processing_time"],
130+
"processing_time": aggregated_task_metrics["processing_time"],
131+
"error_rate": aggregated_task_metrics["error_rate"],
132+
"duration": aggregated_task_metrics["duration"]
133+
}
134+
aggregated_results["op-metrics"].append(op_metric)
135+
136+
# extract the necessary data from the first test execution, since the configurations should be identical for all test executions
137+
current_timestamp = self.config.opts("system", "time.start")
138+
139+
if hasattr(self.args, 'results_file') and self.args.results_file != "":
140+
normalized_results_file = rio.normalize_path(self.args.results_file, self.cwd)
141+
# ensure that the parent folder already exists when we try to write the file...
142+
rio.ensure_dir(rio.dirname(normalized_results_file))
143+
test_execution_id = os.path.basename(normalized_results_file)
144+
self.config.add(config.Scope.applicationOverride, "system", "test_execution.id", normalized_results_file)
145+
elif hasattr(self.args, 'test_execution_id') and self.args.test_execution_id:
146+
test_execution_id = f"aggregate_results_{test_exe.workload}_{self.args.test_execution_id}"
147+
self.config.add(config.Scope.applicationOverride, "system", "test_execution.id", test_execution_id)
148+
else:
149+
test_execution_id = f"aggregate_results_{test_exe.workload}_{str(uuid.uuid4())}"
150+
self.config.add(config.Scope.applicationOverride, "system", "test_execution.id", test_execution_id)
151+
152+
print("Aggregate test execution ID: ", test_execution_id)
153+
154+
# add values to the configuration object
155+
self.config.add(config.Scope.applicationOverride, "builder",
156+
"provision_config_instance.names", test_exe.provision_config_instance)
157+
self.config.add(config.Scope.applicationOverride, "system",
158+
"env.name", test_exe.environment_name)
159+
self.config.add(config.Scope.applicationOverride, "system", "time.start", current_timestamp)
160+
self.config.add(config.Scope.applicationOverride, "test_execution", "pipeline", test_exe.pipeline)
161+
self.config.add(config.Scope.applicationOverride, "workload", "params", test_exe.workload_params)
162+
self.config.add(config.Scope.applicationOverride, "builder",
163+
"provision_config_instance.params", test_exe.provision_config_instance_params)
164+
self.config.add(config.Scope.applicationOverride, "builder", "plugin.params", test_exe.plugin_params)
165+
self.config.add(config.Scope.applicationOverride, "workload", "latency.percentiles", test_exe.latency_percentiles)
166+
self.config.add(config.Scope.applicationOverride, "workload", "throughput.percentiles", test_exe.throughput_percentiles)
167+
168+
loaded_workload = workload.load_workload(self.config)
169+
test_procedure = loaded_workload.find_test_procedure_or_default(test_exe.test_procedure)
170+
171+
test_execution = metrics.create_test_execution(self.config, loaded_workload, test_procedure, test_exe.workload_revision)
172+
test_execution.add_results(AggregatedResults(aggregated_results))
173+
test_execution.distribution_version = test_exe.distribution_version
174+
test_execution.revision = test_exe.revision
175+
test_execution.distribution_flavor = test_exe.distribution_flavor
176+
test_execution.provision_config_revision = test_exe.provision_config_revision
177+
178+
return test_execution
179+
180+
def calculate_weighted_average(self, task_metrics: Dict[str, List[Any]], iterations: int) -> Dict[str, Any]:
181+
weighted_metrics = {}
182+
183+
for metric, values in task_metrics.items():
184+
weighted_metrics[metric] = {}
185+
if isinstance(values[0], dict):
186+
for item_key in values[0].keys():
187+
if item_key == 'unit':
188+
weighted_metrics[metric][item_key] = values[0][item_key]
189+
else:
190+
item_values = [value.get(item_key, 0) for value in values]
191+
if iterations > 1:
192+
weighted_sum = sum(value * iterations for value in item_values)
193+
total_iterations = iterations * len(values)
194+
weighted_metrics[metric][item_key] = weighted_sum / total_iterations
195+
else:
196+
weighted_metrics[metric][item_key] = sum(item_values) / len(item_values)
197+
else:
198+
if iterations > 1:
199+
weighted_sum = sum(value * iterations for value in values)
200+
total_iterations = iterations * len(values)
201+
weighted_metrics[metric] = weighted_sum / total_iterations
202+
else:
203+
weighted_metrics[metric] = sum(values) / len(values)
204+
return weighted_metrics
205+
206+
def test_execution_compatibility_check(self) -> None:
207+
first_test_execution = self.test_store.find_by_test_execution_id(list(self.test_executions.keys())[0])
208+
workload = first_test_execution.workload
209+
test_procedure = first_test_execution.test_procedure
210+
for id in self.test_executions.keys():
211+
test_execution = self.test_store.find_by_test_execution_id(id)
212+
if test_execution:
213+
if test_execution.workload != workload:
214+
raise ValueError(
215+
f"Incompatible workload: test {id} has workload '{test_execution.workload}' instead of '{workload}'. "
216+
f"Ensure that all test IDs have the same workload."
217+
)
218+
if test_execution.test_procedure != test_procedure:
219+
raise ValueError(
220+
f"Incompatible test procedure: test {id} has test procedure '{test_execution.test_procedure}' "
221+
f"instead of '{test_procedure}'. Ensure that all test IDs have the same test procedure from the same workload."
222+
)
223+
else:
224+
raise ValueError(f"Test execution not found: {id}. Ensure that all provided test IDs are valid.")
225+
226+
self.config.add(config.Scope.applicationOverride, "workload", "test_procedure.name", first_test_execution.test_procedure)
227+
return True
228+
229+
def aggregate(self) -> None:
230+
if self.test_execution_compatibility_check():
231+
for id in self.test_executions.keys():
232+
test_execution = self.test_store.find_by_test_execution_id(id)
233+
if test_execution:
234+
self.config.add(config.Scope.applicationOverride, "workload", "repository.name", self.args.workload_repository)
235+
self.config.add(config.Scope.applicationOverride, "workload", "workload.name", test_execution.workload)
236+
self.count_iterations_for_each_op()
237+
self.accumulate_results(test_execution)
238+
239+
aggregated_results = self.build_aggregated_results()
240+
file_test_exe_store = FileTestExecutionStore(self.config)
241+
file_test_exe_store.store_test_execution(aggregated_results)
242+
else:
243+
raise ValueError("Incompatible test execution results")
244+
245+
class AggregatedResults:
246+
def __init__(self, results):
247+
self.results = results
248+
249+
def as_dict(self):
250+
return self.results

osbenchmark/benchmark.py

+35-1
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@
4040
from osbenchmark.builder import provision_config, builder
4141
from osbenchmark.workload_generator import workload_generator
4242
from osbenchmark.utils import io, convert, process, console, net, opts, versions
43+
from osbenchmark import aggregator
4344

4445

4546
def create_arg_parser():
@@ -221,6 +222,26 @@ def add_workload_source(subparser):
221222
help="Whether to include the comparison in the results file.",
222223
default=True)
223224

225+
aggregate_parser = subparsers.add_parser("aggregate", help="Aggregate multiple test_executions")
226+
aggregate_parser.add_argument(
227+
"--test-executions",
228+
"--t",
229+
type=non_empty_list,
230+
required=True,
231+
help="Comma-separated list of TestExecution IDs to aggregate")
232+
aggregate_parser.add_argument(
233+
"--test-execution-id",
234+
help="Define a unique id for this aggregated test_execution.",
235+
default="")
236+
aggregate_parser.add_argument(
237+
"--results-file",
238+
help="Write the aggregated results to the provided file.",
239+
default="")
240+
aggregate_parser.add_argument(
241+
"--workload-repository",
242+
help="Define the repository from where OSB will load workloads (default: default).",
243+
default="default")
244+
224245
download_parser = subparsers.add_parser("download", help="Downloads an artifact")
225246
download_parser.add_argument(
226247
"--provision-config-repository",
@@ -613,7 +634,7 @@ def add_workload_source(subparser):
613634
action="store_true",
614635
default=False)
615636

616-
for p in [list_parser, test_execution_parser, compare_parser, download_parser, install_parser,
637+
for p in [list_parser, test_execution_parser, compare_parser, aggregate_parser, download_parser, install_parser,
617638
start_parser, stop_parser, info_parser, create_workload_parser]:
618639
# This option is needed to support a separate configuration for the integration tests on the same machine
619640
p.add_argument(
@@ -832,6 +853,15 @@ def configure_results_publishing_params(args, cfg):
832853
cfg.add(config.Scope.applicationOverride, "results_publishing", "output.path", args.results_file)
833854
cfg.add(config.Scope.applicationOverride, "results_publishing", "numbers.align", args.results_numbers_align)
834855

856+
def prepare_test_executions_dict(args, cfg):
857+
cfg.add(config.Scope.applicationOverride, "results_publishing", "output.path", args.results_file)
858+
test_executions_dict = {}
859+
if args.test_executions:
860+
for execution in args.test_executions:
861+
execution = execution.strip()
862+
if execution:
863+
test_executions_dict[execution] = None
864+
return test_executions_dict
835865

836866
def print_test_execution_id(args):
837867
console.info(f"[Test Execution ID]: {args.test_execution_id}")
@@ -847,6 +877,10 @@ def dispatch_sub_command(arg_parser, args, cfg):
847877
configure_results_publishing_params(args, cfg)
848878
cfg.add(config.Scope.applicationOverride, "results_publishing", "percentiles", args.percentiles)
849879
results_publisher.compare(cfg, args.baseline, args.contender)
880+
elif sub_command == "aggregate":
881+
test_executions_dict = prepare_test_executions_dict(args, cfg)
882+
aggregator_instance = aggregator.Aggregator(cfg, test_executions_dict, args)
883+
aggregator_instance.aggregate()
850884
elif sub_command == "list":
851885
cfg.add(config.Scope.applicationOverride, "system", "list.config.option", args.configuration)
852886
cfg.add(config.Scope.applicationOverride, "system", "list.test_executions.max_results", args.limit)

0 commit comments

Comments
 (0)