Skip to content

Commit 7b81d67

Browse files
committed
test(nemesis-refresh): add integration test for refresh monkey flow
Add test_refresh_monkey_flow that exercises the full nodetool refresh path using a Docker-based Scylla node: writes data with cassandra-stress, uploads sstables, runs nodetool refresh, and verifies the data is loaded via CQL query. Includes related changes to nemesis, sstable load utilities, and test fixtures needed to support the refresh test flow.
1 parent 20a3a25 commit 7b81d67

6 files changed

Lines changed: 138 additions & 51 deletions

File tree

sdcm/nemesis/__init__.py

Lines changed: 20 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -1674,37 +1674,43 @@ def disrupt_major_compaction(self):
16741674
self._major_compaction()
16751675

16761676
def disrupt_load_and_stream(self):
1677-
# Checking the columns number of keyspace1.standard1
1678-
self.log.debug("Prepare keyspace1.standard1 if it does not exist")
1679-
self._prepare_test_table(ks="keyspace1")
1680-
column_num = SstableLoadUtils.calculate_columns_count_in_table(self.target_node)
1677+
# Checking the columns number of keyspace_refresh.standard1
1678+
self.log.debug("Prepare keyspace_refresh.standard1 if it does not exist")
1679+
self._prepare_test_table(ks="keyspace_refresh")
1680+
column_num = SstableLoadUtils.calculate_columns_count_in_table(
1681+
self.target_node, keyspace_name="keyspace_refresh"
1682+
)
16811683

16821684
# Run load-and-stream test on regular standard1 table of cassandra-stress.
16831685
if column_num < 5:
16841686
raise UnsupportedNemesis("Schema doesn't match the snapshot, not uploading")
16851687

16861688
test_data = SstableLoadUtils.get_load_test_data_inventory(column_num, big_sstable=False, load_and_stream=True)
16871689

1688-
result = self.target_node.run_nodetool(sub_cmd="cfstats", args="keyspace1.standard1")
1690+
result = self.target_node.run_nodetool(sub_cmd="cfstats", args="keyspace_refresh.standard1")
16891691

16901692
if result is not None and result.exit_status == 0:
16911693
map_files_to_node = SstableLoadUtils.distribute_test_files_to_cluster_nodes(
16921694
nodes=self.cluster.data_nodes, test_data=test_data
16931695
)
16941696
for sstables_info, load_on_node in map_files_to_node:
16951697
self.actions_log.info(f"Uploading sstables to {load_on_node.name}")
1696-
SstableLoadUtils.upload_sstables(load_on_node, test_data=sstables_info, table_name="standard1")
1698+
SstableLoadUtils.upload_sstables(
1699+
load_on_node, test_data=sstables_info, keyspace_name="keyspace_refresh", table_name="standard1"
1700+
)
16971701
# NOTE: on K8S logs may appear with a delay, so add a bigger timeout for it.
16981702
# See https://github.com/scylladb/scylla-cluster-tests/issues/6314
16991703
kwargs = {"start_timeout": 1800, "end_timeout": 1800} if self._is_it_on_kubernetes() else {}
17001704
with self.action_log_scope(f"Loading and streaming sstables on {load_on_node.name} node"):
17011705
SstableLoadUtils.run_load_and_stream(load_on_node, **kwargs)
17021706

17031707
def disrupt_nodetool_refresh(self, big_sstable: bool = False):
1704-
# Checking the columns number of keyspace1.standard1
1705-
self.log.debug("Prepare keyspace1.standard1 if it does not exist")
1706-
self._prepare_test_table(ks="keyspace1")
1707-
column_num = SstableLoadUtils.calculate_columns_count_in_table(self.target_node)
1708+
# Checking the columns number of keyspace_refresh.standard1
1709+
self.log.debug("Prepare keyspace_refresh.standard1 if it does not exist")
1710+
self._prepare_test_table(ks="keyspace_refresh")
1711+
column_num = SstableLoadUtils.calculate_columns_count_in_table(
1712+
self.target_node, keyspace_name="keyspace_refresh", table_name="standard1"
1713+
)
17081714

17091715
# Note: when issue #6617 is fixed, we can try to load snapshot (cols=5) to a table (1 < cols < 5),
17101716
# expect that refresh will fail (no serious db error).
@@ -1715,14 +1721,14 @@ def disrupt_nodetool_refresh(self, big_sstable: bool = False):
17151721
column_num, big_sstable=big_sstable, load_and_stream=False
17161722
)
17171723

1718-
result = self.target_node.run_nodetool(sub_cmd="cfstats", args="keyspace1.standard1")
1724+
result = self.target_node.run_nodetool(sub_cmd="cfstats", args="keyspace_refresh.standard1")
17191725

17201726
if result is not None and result.exit_status == 0:
17211727
key = "0x32373131364f334f3830"
17221728
# Check one special key before refresh, we will verify refresh by query in the end
17231729
# Note: we can't DELETE the key before refresh, otherwise the old sstable won't be loaded
1724-
# TRUNCATE can be used the clean the table, but we can't do it for keyspace1.standard1
1725-
query_verify = f"SELECT * FROM keyspace1.standard1 WHERE key={key}"
1730+
# TRUNCATE can be used the clean the table, but we can't do it for keyspace_refresh.standard1
1731+
query_verify = f"SELECT * FROM keyspace_refresh.standard1 WHERE key={key}"
17261732
result = self.target_node.run_cqlsh(query_verify)
17271733
if "(0 rows)" in result.stdout:
17281734
self.log.debug("Key %s does not exist before refresh", key)
@@ -2169,7 +2175,7 @@ def _prepare_test_table(self, ks="keyspace1"):
21692175
"""Populate ``ks.standard1`` with 400 K rows via cassandra-stress."""
21702176
stress_cmd = (
21712177
"cassandra-stress write n=400000 cl=QUORUM -mode native cql3 "
2172-
f"-schema 'replication(strategy=NetworkTopologyStrategy,"
2178+
f"-schema 'keyspace={ks} replication(strategy=NetworkTopologyStrategy,"
21732179
f"replication_factor={self.tester.reliable_replication_factor})' -log interval=5"
21742180
)
21752181
cs_thread = self.tester.run_stress_thread(

sdcm/remote/local_cmd_runner.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,7 @@ def run(
5656
watchers: Optional[List[StreamWatcher]] = None,
5757
change_context: bool = False,
5858
timestamp_logs: bool = False,
59+
user: Optional[str] = None,
5960
) -> Result:
6061
watchers = self._setup_watchers(
6162
verbose=verbose, log_file=log_file, additional_watchers=watchers, timestamp_logs=timestamp_logs

sdcm/utils/sstable/load_utils.py

Lines changed: 27 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@ class SstableLoadUtils:
3232

3333
@staticmethod
3434
def calculate_columns_count_in_table(
35-
target_node, keyspace_name: str = "keyspace1", table_name: str = "standard1"
35+
target_node, keyspace_name: str = "keyspace_refresh", table_name: str = "standard1"
3636
) -> int:
3737
query_cmd = f"SELECT * FROM {keyspace_name}.{table_name} LIMIT 1"
3838
result = target_node.run_cqlsh(query_cmd)
@@ -67,7 +67,7 @@ def distribute_test_files_to_cluster_nodes(cls, nodes, test_data: List[TestDataI
6767
def upload_sstables(
6868
node,
6969
test_data: TestDataInventory,
70-
keyspace_name: str = "keyspace1",
70+
keyspace_name: str = "keyspace_refresh",
7171
table_name=None,
7272
create_schema: bool = False,
7373
is_cloud_cluster=False,
@@ -129,7 +129,12 @@ def upload_sstables(
129129

130130
@classmethod
131131
def run_load_and_stream(
132-
cls, node, keyspace_name: str = "keyspace1", table_name: str = "standard1", start_timeout=60, end_timeout=600
132+
cls,
133+
node,
134+
keyspace_name: str = "keyspace_refresh",
135+
table_name: str = "standard1",
136+
start_timeout=60,
137+
end_timeout=600,
133138
):
134139
"""runs load and stream using API request and waits for it to finish"""
135140
with wait_for_log_lines(
@@ -155,7 +160,7 @@ def run_refresh(node, test_data: namedtuple) -> Iterable[str]:
155160
# Find the compaction output that reported about the resharding
156161

157162
system_log_follower = node.follow_system_log(patterns=[r"Resharded.*"])
158-
node.run_nodetool(sub_cmd="refresh", args="-- keyspace1 standard1")
163+
node.run_nodetool(sub_cmd="refresh", args="-- keyspace_refresh standard1")
159164
return system_log_follower
160165

161166
@staticmethod
@@ -169,24 +174,24 @@ def validate_resharding_after_refresh(node, system_log_follower):
169174
# Validate that files after resharding were saved in the "upload" folder.
170175
# Example of compaction output:
171176
172-
# scylla[6653]: [shard 0] compaction - [Reshard keyspace1.standard1 3cad4140-f8c3-11ea-acb1-000000000002]
177+
# scylla[6653]: [shard 0] compaction - [Reshard keyspace_refresh.standard1 3cad4140-f8c3-11ea-acb1-000000000002]
173178
# Resharded 1 sstables to [
174-
# /var/lib/scylla/data/keyspace1/standard1-9fbed8d0f8c211ea9bb1000000000000/upload/md-9-big-Data.db:level=0,
175-
# /var/lib/scylla/data/keyspace1/standard1-9fbed8d0f8c211ea9bb1000000000000/upload/md-10-big-Data.db:level=0,
176-
# /var/lib/scylla/data/keyspace1/standard1-9fbed8d0f8c211ea9bb1000000000000/upload/md-11-big-Data.db:level=0,
177-
# /var/lib/scylla/data/keyspace1/standard1-9fbed8d0f8c211ea9bb1000000000000/upload/md-12-big-Data.db:level=0,
178-
# /var/lib/scylla/data/keyspace1/standard1-9fbed8d0f8c211ea9bb1000000000000/upload/md-13-big-Data.db:level=0,
179-
# /var/lib/scylla/data/keyspace1/standard1-9fbed8d0f8c211ea9bb1000000000000/upload/md-22-big-Data.db:level=0,
180-
# /var/lib/scylla/data/keyspace1/standard1-9fbed8d0f8c211ea9bb1000000000000/upload/md-15-big-Data.db:level=0,
181-
# /var/lib/scylla/data/keyspace1/standard1-9fbed8d0f8c211ea9bb1000000000000/upload/md-16-big-Data.db:level=0,
179+
# /var/lib/scylla/data/keyspace_refresh/standard1-9fbed8d0f8c211ea9bb1000000000000/upload/md-9-big-Data.db:level=0,
180+
# /var/lib/scylla/data/keyspace_refresh/standard1-9fbed8d0f8c211ea9bb1000000000000/upload/md-10-big-Data.db:level=0,
181+
# /var/lib/scylla/data/keyspace_refresh/standard1-9fbed8d0f8c211ea9bb1000000000000/upload/md-11-big-Data.db:level=0,
182+
# /var/lib/scylla/data/keyspace_refresh/standard1-9fbed8d0f8c211ea9bb1000000000000/upload/md-12-big-Data.db:level=0,
183+
# /var/lib/scylla/data/keyspace_refresh/standard1-9fbed8d0f8c211ea9bb1000000000000/upload/md-13-big-Data.db:level=0,
184+
# /var/lib/scylla/data/keyspace_refresh/standard1-9fbed8d0f8c211ea9bb1000000000000/upload/md-22-big-Data.db:level=0,
185+
# /var/lib/scylla/data/keyspace_refresh/standard1-9fbed8d0f8c211ea9bb1000000000000/upload/md-15-big-Data.db:level=0,
186+
# /var/lib/scylla/data/keyspace_refresh/standard1-9fbed8d0f8c211ea9bb1000000000000/upload/md-16-big-Data.db:level=0,
182187
# ]. 91MB to 92MB (~100% of original) in 5009ms = 18MB/s. ~370176 total partitions merged to 370150
183188
184189
Starting with Scylla 4.7 messages have changed to the following:
185-
[shard 1] sstables_loader - Loading new SSTables for keyspace=keyspace1, table=standard1, ...
186-
[shard 1] database - Resharding 223kB for keyspace1.standard1
187-
[shard 1] database - Resharded 223kB for keyspace1.standard1 in 0.14 seconds, 1MB/s
188-
[shard 1] database - Loaded 16 SSTables into /var/lib/scylla/data/keyspace1/standard1-eb0401905d8311ecb391aa52ebf0b3e1
189-
[shard 1] sstables_loader - Done loading new SSTables for keyspace=keyspace1, table=standard1, ...
190+
[shard 1] sstables_loader - Loading new SSTables for keyspace=keyspace_refresh, table=standard1, ...
191+
[shard 1] database - Resharding 223kB for keyspace_refresh.standard1
192+
[shard 1] database - Resharded 223kB for keyspace_refresh.standard1 in 0.14 seconds, 1MB/s
193+
[shard 1] database - Loaded 16 SSTables into /var/lib/scylla/data/keyspace_refresh/standard1-eb0401905d8311ecb391aa52ebf0b3e1
194+
[shard 1] sstables_loader - Done loading new SSTables for keyspace=keyspace_refresh, table=standard1, ...
190195
191196
So, there is no per-file paths anymore for resharding log messages, only root dir path.
192197
"""
@@ -230,7 +235,7 @@ def get_load_test_data_inventory(
230235
def create_keyspace(
231236
cls,
232237
node,
233-
keyspace_name: str = "keyspace1",
238+
keyspace_name: str = "keyspace_refresh",
234239
strategy: str = "NetworkTopologyStrategy",
235240
replication_factor: int = 1,
236241
):
@@ -245,7 +250,9 @@ def create_table_for_load(cls, node, schema_file_and_path: str, session):
245250
session.execute(schema.replace("\n", ""))
246251

247252
@classmethod
248-
def validate_data_count_after_upload(cls, node, keyspace_name: str = "keyspace1", table_name: str = "standard1"):
253+
def validate_data_count_after_upload(
254+
cls, node, keyspace_name: str = "keyspace_refresh", table_name: str = "standard2"
255+
):
249256
result = node.run_cqlsh(f"consistency QUORUM;SELECT COUNT(*) FROM {keyspace_name}.{table_name}")
250257

251258
next_line_is_result = False
Lines changed: 73 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,73 @@
1+
import logging
2+
3+
import pytest
4+
5+
from sdcm.utils.sstable.load_utils import SstableLoadUtils
6+
7+
from sdcm.stress_thread import CassandraStressThread
8+
from unit_tests.lib.dummy_remote import LocalLoaderSetDummy
9+
10+
pytestmark = [
11+
pytest.mark.usefixtures("events"),
12+
pytest.mark.integration,
13+
pytest.mark.xdist_group("docker_heavy"),
14+
]
15+
16+
17+
@pytest.mark.integration
18+
def test_refresh_monkey_flow(docker_scylla, params, events, request):
19+
"""test the flow of refrash monkey locall with a docker base scylla"""
20+
21+
loader_set = LocalLoaderSetDummy(params=params)
22+
23+
ks = "keyspace_refresh"
24+
# Checking the columns number of keyspace_refresh.standard1
25+
stress_cmd = (
26+
"cassandra-stress write n=40000 cl=ONE -mode native cql3 "
27+
f"-schema 'keyspace={ks} replication(strategy=NetworkTopologyStrategy,"
28+
f"replication_factor=1)' -log interval=5"
29+
)
30+
cs_thread = CassandraStressThread(loader_set, stress_cmd, node_list=[docker_scylla], timeout=120, params=params)
31+
32+
def cleanup_thread():
33+
cs_thread.kill()
34+
35+
request.addfinalizer(cleanup_thread)
36+
37+
cs_thread.run()
38+
39+
output, _ = cs_thread.parse_results()
40+
print(output)
41+
column_num = SstableLoadUtils.calculate_columns_count_in_table(
42+
docker_scylla, keyspace_name="keyspace_refresh", table_name="standard1"
43+
)
44+
45+
assert column_num
46+
test_data = SstableLoadUtils.get_load_test_data_inventory(column_num, big_sstable=False, load_and_stream=False)
47+
48+
result = docker_scylla.run_nodetool(sub_cmd="cfstats", args="keyspace_refresh.standard1")
49+
50+
if result is not None and result.exit_status == 0:
51+
key = "0x32373131364f334f3830"
52+
# Check one special key before refresh, we will verify refresh by query in the end
53+
# Note: we can't DELETE the key before refresh, otherwise the old sstable won't be loaded
54+
# TRUNCATE can be used the clean the table, but we can't do it for keyspace_refresh.standard1
55+
query_verify = f"SELECT * FROM keyspace_refresh.standard1 WHERE key={key}"
56+
result = docker_scylla.run_cqlsh(query_verify)
57+
if "(0 rows)" in result.stdout:
58+
logging.debug("Key %s does not exist before refresh", key)
59+
else:
60+
logging.debug("Key %s already exists before refresh", key)
61+
62+
# Executing rolling refresh one by one
63+
for node in [docker_scylla]:
64+
SstableLoadUtils.upload_sstables(
65+
node,
66+
test_data=test_data[0],
67+
table_name="standard1",
68+
is_cloud_cluster=False,
69+
)
70+
SstableLoadUtils.run_refresh(node, test_data=test_data[0])
71+
# Verify that the special key is loaded by SELECT query
72+
result = docker_scylla.run_cqlsh(query_verify)
73+
assert "(1 rows)" in result.stdout, f"The key {key} is not loaded by `nodetool refresh`"

unit_tests/lib/fake_cluster.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,7 @@ def __init__(self, system_log):
3737
def run(self, *args, **kwargs):
3838
lines = [
3939
"[shard 11] sstables_loader - load_and_stream: started ops_uuid=a2661989-6836-418f-aa67-2c5466499848, process [0-1] out",
40-
"[shard 2] sstables_loader - Done loading new SSTables for keyspace=keyspace1, table=standard1, load_and_stream=true, "
40+
"[shard 2] sstables_loader - Done loading new SSTables for keyspace=keyspace_refresh, table=standard1, load_and_stream=true, "
4141
"primary_replica_only=false, status=succeeded",
4242
]
4343
for line in lines:

0 commit comments

Comments
 (0)