Skip to content

Commit 1b4c62e

Browse files
committed
refactor(elastic): remove all elasticsearch related code
since we are now sending performence stats into Argus, we don't need anything to be sending data to ElasticSearch anymore
1 parent 755c512 commit 1b4c62e

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

41 files changed

+61
-3804
lines changed

Diff for: add_new_dc_test.py

+4-4
Original file line numberDiff line numberDiff line change
@@ -77,15 +77,15 @@ def reconfigure_keyspaces_to_use_network_topology_strategy(self, keyspaces: List
7777
def prewrite_db_with_data(self) -> None:
7878
self.log.info("Prewriting database...")
7979
stress_cmd = self.params.get('prepare_write_cmd')
80-
pre_thread = self.run_stress_thread(stress_cmd=stress_cmd, stats_aggregate_cmds=False, round_robin=False)
80+
pre_thread = self.run_stress_thread(stress_cmd=stress_cmd, round_robin=False)
8181
self.verify_stress_thread(cs_thread_pool=pre_thread)
8282
self.log.info("Database pre write completed")
8383

8484
def start_stress_during_adding_new_dc(self) -> Tuple[CassandraStressThread, CassandraStressThread]:
8585
self.log.info("Running stress during adding new DC")
8686
stress_cmds = self.params.get('stress_cmd')
87-
read_thread = self.run_stress_thread(stress_cmd=stress_cmds[0], stats_aggregate_cmds=False, round_robin=False)
88-
write_thread = self.run_stress_thread(stress_cmd=stress_cmds[1], stats_aggregate_cmds=False, round_robin=False)
87+
read_thread = self.run_stress_thread(stress_cmd=stress_cmds[0], round_robin=False)
88+
write_thread = self.run_stress_thread(stress_cmd=stress_cmds[1], round_robin=False)
8989
self.log.info("Stress during adding DC started")
9090
return read_thread, write_thread
9191

@@ -106,7 +106,7 @@ def add_node_in_new_dc(self) -> BaseNode:
106106
def verify_data_can_be_read_from_new_dc(self, new_node: BaseNode) -> None:
107107
self.log.info("Veryfing if data has been transferred successfully to the new DC")
108108
stress_cmd = self.params.get('verify_data_after_entire_test') + f" -node {new_node.ip_address}"
109-
end_stress = self.run_stress_thread(stress_cmd=stress_cmd, stats_aggregate_cmds=False, round_robin=False)
109+
end_stress = self.run_stress_thread(stress_cmd=stress_cmd, round_robin=False)
110110
self.verify_stress_thread(cs_thread_pool=end_stress)
111111

112112
def querying_new_node_should_return_no_data(self, new_node: BaseNode) -> None:

Diff for: admission_control_overload_test.py

+2-4
Original file line numberDiff line numberDiff line change
@@ -27,14 +27,12 @@ def check_prometheus_metrics(self, start_time, now):
2727
def run_load(self, job_num, job_cmd, is_prepare=False):
2828
is_ever_triggered = False
2929
if is_prepare and not skip_optional_stage('prepare_write'):
30-
prepare_stress_queue = self.run_stress_thread(stress_cmd=job_cmd, stress_num=job_num, prefix='preload-',
31-
stats_aggregate_cmds=False)
30+
prepare_stress_queue = self.run_stress_thread(stress_cmd=job_cmd, stress_num=job_num, prefix='preload-')
3231
self.get_stress_results(prepare_stress_queue)
3332
elif not is_prepare and not skip_optional_stage('main_load'):
3433
stress_queue = []
3534
stress_res = []
36-
stress_queue.append(self.run_stress_thread(stress_cmd=job_cmd, stress_num=job_num,
37-
stats_aggregate_cmds=False))
35+
stress_queue.append(self.run_stress_thread(stress_cmd=job_cmd, stress_num=job_num))
3836

3937
start_time = time.time()
4038
while stress_queue:

Diff for: docs/bisecting-with-sct.md

+1-3
Original file line numberDiff line numberDiff line change
@@ -32,11 +32,10 @@ def test_write(self):
3232
if stress_multiplier_w := self.params.get("stress_multiplier_w"):
3333
stress_multiplier = stress_multiplier_w
3434

35-
self.create_test_stats(doc_id_with_timestamp=True)
3635
self.run_fstrim_on_all_db_nodes()
3736

3837
stress_queue = self.run_stress_thread(
39-
stress_cmd=base_cmd_w, stress_num=stress_multiplier, stats_aggregate_cmds=False)
38+
stress_cmd=base_cmd_w, stress_num=stress_multiplier)
4039
results = self.get_stress_results(queue=stress_queue)
4140

4241
# set bisect_ref_value only for the first test run
@@ -46,5 +45,4 @@ def test_write(self):
4645
self.build_histogram(PerformanceTestWorkload.WRITE, PerformanceTestType.THROUGHPUT)
4746
self.update_test_details(scylla_conf=True)
4847
self.display_results(results, test_name='test_write')
49-
self.check_regression()
5048
```

Diff for: jupyter/provision-test.ipynb

-2
Original file line numberDiff line numberDiff line change
@@ -93,8 +93,6 @@
9393
" self.stats[disrupt][key[status]].append(data)\n",
9494
" self.stats[disrupt]['cnt'] += 1\n",
9595
" self.log.debug('Update nemesis info with: %s', data)\n",
96-
" if self.tester.create_stats:\n",
97-
" self.tester.update({'nemesis': self.stats})\n",
9896
" if self.es_publisher:\n",
9997
" self.es_publisher.publish(disrupt_name=disrupt, status=status, data=data)\n",
10098
"\n",

Diff for: microbenchmarking_test.py

+2-10
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,6 @@
1414

1515
from sdcm.argus_results import send_perf_simple_query_result_to_argus
1616
from sdcm.tester import ClusterTester, teardown_on_exception, log_run_info
17-
from sdcm.utils.microbenchmarking.perf_simple_query_reporter import PerfSimpleQueryAnalyzer
1817

1918

2019
class PerfSimpleQueryTest(ClusterTester):
@@ -32,12 +31,5 @@ def test_perf_simple_query(self):
3231
if result.ok:
3332
output = self.db_cluster.nodes[0].remoter.run("cat perf-simple-query-result.txt").stdout
3433
results = json.loads(output)
35-
self.create_test_stats(
36-
specific_tested_stats={"perf_simple_query_result": results},
37-
doc_id_with_timestamp=True)
38-
if self.create_stats:
39-
is_gce = self.params.get('cluster_backend') == 'gce'
40-
PerfSimpleQueryAnalyzer(self._test_index, self._es_doc_type).check_regression(
41-
self._test_id, is_gce=is_gce,
42-
extra_jobs_to_compare=self.params.get('perf_extra_jobs_to_compare'))
43-
send_perf_simple_query_result_to_argus(self.test_config.argus_client(), results)
34+
35+
send_perf_simple_query_result_to_argus(self.test_config.argus_client(), results)

Diff for: performance_regression_alternator_test.py

+2-21
Original file line numberDiff line numberDiff line change
@@ -36,14 +36,9 @@ def _workload(self, stress_cmd, stress_num, test_name=None, sub_type=None, keysp
3636
if debug_message:
3737
self.log.debug(debug_message)
3838

39-
if save_stats:
40-
self.create_test_stats(test_name=test_name, sub_type=sub_type,
41-
doc_id_with_timestamp=True, append_sub_test_to_name=False)
4239
stress_queue = self.run_stress_thread(stress_cmd=stress_cmd, stress_num=stress_num, keyspace_num=keyspace_num,
43-
prefix=prefix, stats_aggregate_cmds=False)
40+
prefix=prefix)
4441
self.get_stress_results(queue=stress_queue, store_results=True)
45-
if save_stats:
46-
self.update_test_details(scylla_conf=True, alternator=is_alternator)
4742

4843
def create_alternator_table(self, schema, alternator_write_isolation):
4944
node = self.db_cluster.nodes[0]
@@ -74,9 +69,6 @@ def preload_data(self, compaction_strategy=None):
7469
# if test require a pre-population of data
7570
prepare_write_cmd = self.params.get('prepare_write_cmd')
7671
if prepare_write_cmd:
77-
# create new document in ES with doc_id = test_id + timestamp
78-
# allow to correctly save results for future compare
79-
self.create_test_stats(sub_type='write-prepare', doc_id_with_timestamp=True)
8072
stress_queue = []
8173
params = {'prefix': 'preload-'}
8274
for stress_type in ['dynamodb', 'cassandra-cql']:
@@ -91,14 +83,13 @@ def preload_data(self, compaction_strategy=None):
9183
params.update({'stress_cmd': stress_cmd.replace('dynamodb', stress_type)})
9284

9385
# Run all stress commands
94-
params.update(dict(stats_aggregate_cmds=False))
9586
self.log.debug('RUNNING stress cmd: {}'.format(stress_cmd.replace('dynamodb', stress_type)))
9687
stress_queue.append(self.run_stress_thread(**params))
9788

9889
# One stress cmd command
9990
else:
10091
stress_queue.append(self.run_stress_thread(stress_cmd=prepare_write_cmd.replace('dynamodb', stress_type), stress_num=1,
101-
prefix='preload-', stats_aggregate_cmds=False))
92+
prefix='preload-'))
10293

10394
for stress in stress_queue:
10495
self.get_stress_results(queue=stress, store_results=False)
@@ -136,8 +127,6 @@ def test_write(self):
136127
schema=schema, alternator_write_isolation=alternator.enums.WriteIsolation.ALWAYS_USE_LWT)
137128
self._workload(sub_type='with-lwt', stress_cmd=base_cmd_w, stress_num=stress_multiplier, keyspace_num=1)
138129

139-
self.check_regression_with_baseline('cql')
140-
141130
def test_read(self):
142131
"""
143132
Test steps:
@@ -171,8 +160,6 @@ def test_read(self):
171160
self.alternator.set_write_isolation(node=node, isolation=alternator.enums.WriteIsolation.ALWAYS_USE_LWT)
172161
self._workload(sub_type='with-lwt', stress_cmd=base_cmd_r, stress_num=stress_multiplier, keyspace_num=1)
173162

174-
self.check_regression_with_baseline('cql')
175-
176163
def test_mixed(self):
177164
"""
178165
Test steps:
@@ -207,8 +194,6 @@ def test_mixed(self):
207194
self.alternator.set_write_isolation(node=node, isolation=alternator.enums.WriteIsolation.ALWAYS_USE_LWT)
208195
self._workload(sub_type='with-lwt', stress_cmd=base_cmd_m, stress_num=stress_multiplier, keyspace_num=1)
209196

210-
self.check_regression_with_baseline('cql')
211-
212197
def test_latency(self):
213198
"""
214199
Test steps:
@@ -259,7 +244,6 @@ def test_latency(self):
259244
self._workload(
260245
test_name=self.id() + '_read', sub_type='with-lwt', stress_cmd=base_cmd_r, stress_num=stress_multiplier,
261246
keyspace_num=1)
262-
self.check_regression_with_baseline('cql')
263247

264248
stress_multiplier = 1
265249
self.run_fstrim_on_all_db_nodes()
@@ -282,7 +266,6 @@ def test_latency(self):
282266
self._workload(
283267
test_name=self.id() + '_write', sub_type='with-lwt', stress_cmd=base_cmd_w + " -target 3000",
284268
stress_num=stress_multiplier, keyspace_num=1)
285-
self.check_regression_with_baseline('cql')
286269

287270
stress_multiplier = 1
288271
self.wait_no_compactions_running(n=120)
@@ -303,5 +286,3 @@ def test_latency(self):
303286
self.alternator.set_write_isolation(node=node, isolation=alternator.enums.WriteIsolation.ALWAYS_USE_LWT)
304287
self._workload(test_name=self.id() + '_mixed', sub_type='with-lwt',
305288
stress_cmd=base_cmd_m + " -target 5000", stress_num=stress_multiplier, keyspace_num=1)
306-
307-
self.check_regression_with_baseline('cql')

Diff for: performance_regression_cdc_test.py

+1-14
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,6 @@ def test_write_with_cdc(self):
3737
sub_type="cdc_enabled")
3838

3939
self.wait_no_compactions_running()
40-
self.check_regression_with_baseline(subtest_baseline="cdc_disabled")
4140

4241
def test_write_with_cdc_preimage(self):
4342
write_cmd = self.params.get("stress_cmd_w")
@@ -56,8 +55,6 @@ def test_write_with_cdc_preimage(self):
5655
test_name="test_write",
5756
sub_type="cdc_preimage_enabled")
5857

59-
self.check_regression_with_baseline(subtest_baseline="cdc_disabled")
60-
6158
def test_write_with_cdc_postimage(self):
6259
write_cmd = self.params.get("stress_cmd_w")
6360

@@ -75,8 +72,6 @@ def test_write_with_cdc_postimage(self):
7572
test_name="test_write",
7673
sub_type="cdc_preimage_enabled")
7774

78-
self.check_regression_with_baseline(subtest_baseline="cdc_disabled")
79-
8075
def test_write_throughput(self):
8176
self.cdc_workflow()
8277

@@ -150,19 +145,11 @@ def cdc_workflow(self, use_cdclog_reader=False): # pylint: disable=unused-varia
150145

151146
self.wait_no_compactions_running()
152147

153-
self.check_regression_with_baseline(subtest_baseline="cdc_disabled")
154-
155148
def _workload_cdc(self, stress_cmd, stress_num, test_name, sub_type=None, # pylint: disable=too-many-arguments
156149
save_stats=True, read_cdclog_cmd=None, update_cdclog_stats=False, enable_batching=True):
157150
cdc_stress_queue = None
158151

159-
if save_stats:
160-
self.create_test_stats(sub_type=sub_type,
161-
doc_id_with_timestamp=True,
162-
append_sub_test_to_name=False)
163-
164-
stress_queue = self.run_stress_thread(stress_cmd=stress_cmd, stress_num=stress_num,
165-
stats_aggregate_cmds=False)
152+
stress_queue = self.run_stress_thread(stress_cmd=stress_cmd, stress_num=stress_num)
166153

167154
if read_cdclog_cmd:
168155
cdc_stress_queue = self.run_cdclog_reader_thread(stress_cmd=read_cdclog_cmd,

Diff for: performance_regression_gradual_grow_throughput.py

+4-68
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,5 @@
1-
import pathlib
21
import time
32
from enum import Enum
4-
from collections import defaultdict
53

64
import json
75
from typing import NamedTuple
@@ -10,9 +8,7 @@
108
from sdcm.utils.common import skip_optional_stage
119
from sdcm.sct_events import Severity
1210
from sdcm.sct_events.system import TestFrameworkEvent
13-
from sdcm.results_analyze import PredefinedStepsTestPerformanceAnalyzer
1411
from sdcm.utils.decorators import latency_calculator_decorator
15-
from sdcm.utils.latency import calculate_latency, analyze_hdr_percentiles
1612

1713

1814
class CSPopulateDistribution(Enum):
@@ -160,7 +156,6 @@ def preload_data(self, compaction_strategy=None):
160156
for stress_cmd in population_commands:
161157
params.update({'stress_cmd': stress_cmd})
162158
# Run all stress commands
163-
params.update(dict(stats_aggregate_cmds=False))
164159
self.log.debug('RUNNING stress cmd: {}'.format(stress_cmd))
165160
stress_queue.append(self.run_stress_thread(**params))
166161

@@ -174,20 +169,12 @@ def check_latency_during_steps(self, step):
174169
latency_results = json.load(file)
175170
self.log.debug('Step %s: latency_results were loaded from file %s and its result is %s',
176171
step, self.latency_results_file, latency_results)
177-
if latency_results and self.create_stats:
178-
latency_results[step]["step"] = step
179-
latency_results[step] = calculate_latency(latency_results[step])
180-
latency_results = analyze_hdr_percentiles(latency_results)
181-
pathlib.Path(self.latency_results_file).unlink()
182-
self.log.debug('collected latency values are: %s', latency_results)
183-
self.update({"latency_during_ops": latency_results})
184-
return latency_results
185172

186173
def run_step(self, stress_cmds, current_throttle, num_threads, step_duration):
187174
results = []
188175
stress_queue = []
189176
for stress_cmd in stress_cmds:
190-
params = {"round_robin": True, "stats_aggregate_cmds": False}
177+
params = {"round_robin": True}
191178
stress_cmd_to_run = stress_cmd.replace(
192179
"$threads", f"{num_threads}").replace("$throttle", f"{current_throttle}")
193180
if step_duration is not None:
@@ -215,23 +202,17 @@ def run_gradual_increase_load(self, workload: Workload, stress_num, num_loaders,
215202
# Wait for 4 minutes after warmup to let for all background processes to finish
216203
time.sleep(240)
217204

218-
if not self.exists():
219-
self.log.debug("Create test statistics in ES")
220-
self.create_test_stats(sub_type=workload.workload_type, doc_id_with_timestamp=True)
221205
total_summary = {}
222206

223207
for throttle_step in workload.throttle_steps:
224208
self.log.info("Run cs command with rate: %s Kops", throttle_step)
225209
current_throttle = f"fixed={int(int(throttle_step) // (num_loaders * stress_num))}/s" if throttle_step != "unthrottled" else ""
226210
run_step = ((latency_calculator_decorator(legend=f"Gradual test step {throttle_step} op/s",
227211
cycle_name=throttle_step))(self.run_step))
228-
results, _ = run_step(stress_cmds=workload.cs_cmd_tmpl, current_throttle=current_throttle,
229-
num_threads=workload.num_threads, step_duration=workload.step_duration)
212+
run_step(stress_cmds=workload.cs_cmd_tmpl, current_throttle=current_throttle,
213+
num_threads=workload.num_threads, step_duration=workload.step_duration)
230214

231-
calculate_result = self._calculate_average_max_latency(results)
232-
self.update_test_details(scylla_conf=True)
233215
summary_result = self.check_latency_during_steps(step=throttle_step)
234-
summary_result[throttle_step].update({"ops_rate": calculate_result["op rate"] * num_loaders})
235216
total_summary.update(summary_result)
236217
if workload.drop_keyspace:
237218
self.drop_keyspace()
@@ -242,7 +223,6 @@ def run_gradual_increase_load(self, workload: Workload, stress_num, num_loaders,
242223
time.sleep(180 - wait_time)
243224

244225
self.save_total_summary_in_file(total_summary)
245-
self.run_performance_analyzer(total_summary=total_summary)
246226

247227
def save_total_summary_in_file(self, total_summary):
248228
total_summary_json = json.dumps(total_summary, indent=4, separators=(", ", ": "))
@@ -254,54 +234,10 @@ def save_total_summary_in_file(self, total_summary):
254234
with open(filename, "w", encoding="utf-8") as res_file:
255235
res_file.write(total_summary_json)
256236

257-
def run_performance_analyzer(self, total_summary):
258-
perf_analyzer = PredefinedStepsTestPerformanceAnalyzer(
259-
es_index=self._test_index,
260-
es_doc_type=self._es_doc_type,
261-
email_recipients=self.params.get('email_recipients'))
262-
# Keep next 2 lines for debug purpose
263-
self.log.debug("es_index: %s", self._test_index)
264-
self.log.debug("total_summary: %s", total_summary)
265-
is_gce = bool(self.params.get('cluster_backend') == 'gce')
266-
try:
267-
perf_analyzer.check_regression(test_id=self.test_id,
268-
data=total_summary,
269-
is_gce=is_gce,
270-
email_subject_postfix=self.params.get('email_subject_postfix'))
271-
except Exception as exc: # noqa: BLE001
272-
TestFrameworkEvent(
273-
message='Failed to check regression',
274-
source=self.__class__.__name__,
275-
source_method='check_regression',
276-
exception=exc
277-
).publish_or_dump()
278-
279-
@staticmethod
280-
def _calculate_average_max_latency(results):
281-
status = defaultdict(float).fromkeys(results[0].keys(), 0.0)
282-
max_latency = defaultdict(list)
283-
284-
for result in results:
285-
for key in status:
286-
try:
287-
status[key] += float(result.get(key, 0.0))
288-
if key in ["latency 95th percentile", "latency 99th percentile"]:
289-
max_latency[f"{key} max"].append(float(result.get(key, 0.0)))
290-
except ValueError:
291-
continue
292-
293-
for key in status:
294-
status[key] = round(status[key] / len(results), 2)
295-
296-
for key, latency in max_latency.items():
297-
status[key] = max(latency)
298-
299-
return status
300-
301237
def warmup_cache(self, stress_cmd_templ, num_threads):
302238
stress_queue = []
303239
for stress_cmd in stress_cmd_templ:
304-
params = {"round_robin": True, "stats_aggregate_cmds": False}
240+
params = {"round_robin": True}
305241
stress_cmd_to_run = stress_cmd.replace("$threads", str(num_threads))
306242
params.update({'stress_cmd': stress_cmd_to_run})
307243
# Run all stress commands

0 commit comments

Comments
 (0)