-
Notifications
You must be signed in to change notification settings - Fork 110
Expand file tree
/
Copy pathtablet_split_merge_test.py
More file actions
170 lines (144 loc) · 7.07 KB
/
Copy pathtablet_split_merge_test.py
File metadata and controls
170 lines (144 loc) · 7.07 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation; either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
#
# See LICENSE for more details.
#
# Copyright (c) 2025 ScyllaDB
import time
import os
from enum import StrEnum
from functools import cache, cached_property
from textwrap import dedent
from longevity_test import LongevityTest
from sdcm import cluster
from sdcm.sct_events import Severity
from sdcm.sct_events.health import DataValidatorEvent
from sdcm.sct_events.group_common_events import ignore_mutation_write_errors
REPLICATOR_URL = (
"https://mlitvk.s3.eu-north-1.amazonaws.com/scylla-cdc-replicator-1.3.8-SNAPSHOT-jar-with-dependencies.jar"
)
REPLICATOR_JAR = "replicator.jar"
REPLICATOR_LOG = "cdc-replicator.log"
class Mode(StrEnum):
DELTA = "delta"
PREIMAGE = "preimage"
POSTIMAGE = "postimage"
class TabletSplitMergeTest(LongevityTest):
@cached_property
def enable_cdc(self) -> bool:
return self.params["latte_schema_parameters"]["cdc"] == "true"
@cached_property
def ks_name(self) -> str:
return self.params["latte_schema_parameters"]["ks_name"]
@cached_property
def table_name(self) -> str:
return self.params["latte_schema_parameters"]["table_name"]
@cached_property
def mv_name(self) -> str:
return self.params["latte_schema_parameters"]["mv_name"]
def setUp(self) -> None:
super().setUp()
if self.enable_cdc:
self.setup_replicator_tools(loader_node=self.loaders.nodes[0])
def run_pre_create_keyspace(self) -> None:
super().run_pre_create_schema()
if self.enable_cdc:
with self.cs_db_cluster.cql_connection_patient(node=self.cs_db_cluster.nodes[0]) as sess:
self.log.info("Create the keyspace on the replica cluster.")
sess.execute(
dedent(f"""\
CREATE KEYSPACE IF NOT EXISTS {self.ks_name}
WITH replication = {{
'class' : 'NetworkTopologyStrategy',
'replication_factor' : 3
}}
AND tablets = {{ 'enabled' : true }}
""")
)
@cache
def run_post_latte_schema_cmd(self) -> None:
if self.enable_cdc:
with self.db_cluster.cql_connection_patient(node=self.db_cluster.nodes[0]) as sess:
table = sess.cluster.metadata.keyspaces[self.ks_name].tables[self.table_name]
del table.extensions["cdc"]
self.log.info("Create the table on the replica cluster.")
with self.cs_db_cluster.cql_connection_patient(node=self.cs_db_cluster.nodes[0]) as sess:
sess.execute(table.as_cql_query())
self.start_replicator(mode=Mode.DELTA)
def test_custom_time(self) -> None:
with ignore_mutation_write_errors():
super().test_custom_time()
# Prefer all nodes will be run before collect data for validation.
# Increase timeout to wait for nemesis finish.
if self.db_cluster.nemesis_threads:
self.db_cluster.stop_nemesis(timeout=900)
self.validate_data()
def validate_data(self) -> None:
if self.enable_cdc:
self.log.info("Waiting for replicator to catch up latest changes")
time.sleep(1800)
self.log.info("Validate number of rows in master and replica.")
count_query = f"SELECT COUNT(*) FROM {self.ks_name}.{self.table_name};"
with self.db_cluster.cql_connection_patient(node=self.db_cluster.nodes[0]) as sess:
master_row_count = sess.execute(count_query).current_rows[0].count
with self.cs_db_cluster.cql_connection_patient(node=self.cs_db_cluster.nodes[0]) as sess:
replica_row_count = sess.execute(count_query).current_rows[0].count
if master_row_count != replica_row_count:
DataValidatorEvent.DataValidator(
severity=Severity.WARNING,
error=f"Number of rows in master and replica do not match: {master_row_count=}, {replica_row_count=}",
).publish()
else:
DataValidatorEvent.DataValidator(
severity=Severity.NORMAL,
message=f"Number of rows in master and replica: {master_row_count}",
).publish()
replicator_log_path = os.path.join(self.logdir, "cdc-replicator.log")
self.loaders.nodes[0].remoter.receive_files(src=REPLICATOR_LOG, dst=replicator_log_path)
def setup_replicator_tools(self, loader_node: cluster.BaseNode) -> None:
self.log.info("Installing tmux on loader node.")
try:
loader_node.install_package("tmux")
except Exception as e: # noqa: BLE001
raise Exception(f"Could not install tmux: {e}")
try:
loader_node.install_package("openjdk-17-jre")
except Exception as e: # noqa: BLE001
raise Exception(f"Could not install java: {e}")
self.log.info("Getting replicator on loader node.")
res = loader_node.remoter.run(cmd=f"wget {REPLICATOR_URL} -O {REPLICATOR_JAR}")
if res.failed:
self.fail("Could not obtain CDC replicator.")
def start_replicator(self, mode: Mode) -> None:
"""Start scylla-cdc-replicator in a tmux session on the loader node.
Redirect stdout and stderr to cdc-replicator.log file.
"""
replicator_cmd = f"java -cp {REPLICATOR_JAR} com.scylladb.cdc.replicator.Main -k {self.ks_name} -t {self.table_name} -s {self.db_cluster.nodes[0].external_address} -d {self.cs_db_cluster.nodes[0].external_address} -cl one -m {mode}"
replicator_script = dedent(f"""\
tmux new-session -d -s replicator
tmux pipe-pane -t replicator -o 'cat >> {REPLICATOR_LOG}'
tmux send-keys -t replicator '{replicator_cmd}' ENTER
""")
self.log.debug("Replicator script:\n%s", replicator_script)
self.log.info("Starting replicator.")
res = self.loaders.nodes[0].remoter.run(cmd=replicator_script)
if res.failed:
self.fail("Could not start CDC replicator.")
def get_email_data(self) -> dict:
email_data = super().get_email_data()
if self.enable_cdc:
email_data.update(
{
"number_of_oracle_nodes": self.params.get("n_test_oracle_db_nodes"),
"oracle_ami_id": self.params.get("ami_id_db_oracle"),
"oracle_db_version": self.cs_db_cluster.nodes[0].scylla_version if self.cs_db_cluster else "N/A",
"oracle_instance_type": self.params.get("instance_type_db_oracle"),
}
)
return email_data