Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
38 changes: 21 additions & 17 deletions sdcm/nemesis/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -1674,37 +1674,43 @@ def disrupt_major_compaction(self):
self._major_compaction()

def disrupt_load_and_stream(self):
# Checking the columns number of keyspace1.standard1
self.log.debug("Prepare keyspace1.standard1 if it does not exist")
self._prepare_test_table(ks="keyspace1")
column_num = SstableLoadUtils.calculate_columns_count_in_table(self.target_node)
# Checking the columns number of keyspace_refresh.standard1
self.log.debug("Prepare keyspace_refresh.standard1 if it does not exist")
self._prepare_test_table(ks="keyspace_refresh")
column_num = SstableLoadUtils.calculate_columns_count_in_table(
self.target_node, keyspace_name="keyspace_refresh"
)

# Run load-and-stream test on regular standard1 table of cassandra-stress.
if column_num < 5:
raise UnsupportedNemesis("Schema doesn't match the snapshot, not uploading")

test_data = SstableLoadUtils.get_load_test_data_inventory(column_num, big_sstable=False, load_and_stream=True)

result = self.target_node.run_nodetool(sub_cmd="cfstats", args="keyspace1.standard1")
result = self.target_node.run_nodetool(sub_cmd="cfstats", args="keyspace_refresh.standard1")

if result is not None and result.exit_status == 0:
map_files_to_node = SstableLoadUtils.distribute_test_files_to_cluster_nodes(
nodes=self.cluster.data_nodes, test_data=test_data
)
for sstables_info, load_on_node in map_files_to_node:
self.actions_log.info(f"Uploading sstables to {load_on_node.name}")
SstableLoadUtils.upload_sstables(load_on_node, test_data=sstables_info, table_name="standard1")
SstableLoadUtils.upload_sstables(
load_on_node, test_data=sstables_info, keyspace_name="keyspace_refresh", table_name="standard1"
)
# NOTE: on K8S logs may appear with a delay, so add a bigger timeout for it.
# See https://github.com/scylladb/scylla-cluster-tests/issues/6314
kwargs = {"start_timeout": 1800, "end_timeout": 1800} if self._is_it_on_kubernetes() else {}
with self.action_log_scope(f"Loading and streaming sstables on {load_on_node.name} node"):
SstableLoadUtils.run_load_and_stream(load_on_node, **kwargs)
Comment on lines +1690 to 1705

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🎯 Functional Correctness | 🟠 Major | ⚡ Quick win

Fail the nemesis when the cfstats precheck fails.

Both disruptors silently return if run_nodetool("cfstats") is non-zero, so the run can be recorded as a success without loading or refreshing anything. Treat this as UnsupportedNemesis or an assertion instead of a no-op.

Also applies to: 1724-1760

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@sdcm/nemesis/__init__.py` around lines 1690 - 1705, The cfstats precheck in
the nemesis flow is silently skipping the rest of the action when
`self.target_node.run_nodetool(...)` returns a non-zero exit status, so the
disruptor can appear successful without doing any work. Update the relevant
nemesis methods in `sdcm/nemesis/__init__.py` that use this precheck to fail
fast instead of returning normally, either by raising `UnsupportedNemesis` or an
assertion, and keep the existing load/refresh logic in the successful path
(`run_nodetool`, `SstableLoadUtils.run_load_and_stream`, and the surrounding
action scope). Also apply the same behavior to the other matching disruptor
block referenced by the review.


def disrupt_nodetool_refresh(self, big_sstable: bool = False):
# Checking the columns number of keyspace1.standard1
self.log.debug("Prepare keyspace1.standard1 if it does not exist")
self._prepare_test_table(ks="keyspace1")
column_num = SstableLoadUtils.calculate_columns_count_in_table(self.target_node)
# Checking the columns number of keyspace_refresh.standard1
self.log.debug("Prepare keyspace_refresh.standard1 if it does not exist")
self._prepare_test_table(ks="keyspace_refresh")
column_num = SstableLoadUtils.calculate_columns_count_in_table(
self.target_node, keyspace_name="keyspace_refresh", table_name="standard1"
)

# Note: when issue #6617 is fixed, we can try to load snapshot (cols=5) to a table (1 < cols < 5),
# expect that refresh will fail (no serious db error).
Expand All @@ -1715,14 +1721,14 @@ def disrupt_nodetool_refresh(self, big_sstable: bool = False):
column_num, big_sstable=big_sstable, load_and_stream=False
)

result = self.target_node.run_nodetool(sub_cmd="cfstats", args="keyspace1.standard1")
result = self.target_node.run_nodetool(sub_cmd="cfstats", args="keyspace_refresh.standard1")

if result is not None and result.exit_status == 0:
key = "0x32373131364f334f3830"
# Check one special key before refresh, we will verify refresh by query in the end
# Note: we can't DELETE the key before refresh, otherwise the old sstable won't be loaded
# TRUNCATE can be used the clean the table, but we can't do it for keyspace1.standard1
query_verify = f"SELECT * FROM keyspace1.standard1 WHERE key={key}"
# TRUNCATE can be used the clean the table, but we can't do it for keyspace_refresh.standard1
query_verify = f"SELECT * FROM keyspace_refresh.standard1 WHERE key={key}"
result = self.target_node.run_cqlsh(query_verify)
if "(0 rows)" in result.stdout:
self.log.debug("Key %s does not exist before refresh", key)
Expand Down Expand Up @@ -2169,12 +2175,10 @@ def _prepare_test_table(self, ks="keyspace1"):
"""Populate ``ks.standard1`` with 400 K rows via cassandra-stress."""
stress_cmd = (
"cassandra-stress write n=400000 cl=QUORUM -mode native cql3 "
f"-schema 'replication(strategy=NetworkTopologyStrategy,"
f"-schema 'keyspace={ks} replication(strategy=NetworkTopologyStrategy,"
f"replication_factor={self.tester.reliable_replication_factor})' -log interval=5"
)
cs_thread = self.tester.run_stress_thread(
stress_cmd=stress_cmd, keyspace_name=ks, stop_test_on_failure=False, round_robin=True
)
cs_thread = self.tester.run_stress_thread(stress_cmd=stress_cmd, stop_test_on_failure=False, round_robin=True)
self.tester.verify_stress_thread(cs_thread, error_handler=self._nemesis_stress_failure_handler)

def _nemesis_stress_failure_handler(self, stress_pool, errors):
Expand Down
1 change: 1 addition & 0 deletions sdcm/remote/local_cmd_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ def run(
watchers: Optional[List[StreamWatcher]] = None,
change_context: bool = False,
timestamp_logs: bool = False,
user: Optional[str] = None,
) -> Result:
watchers = self._setup_watchers(
verbose=verbose, log_file=log_file, additional_watchers=watchers, timestamp_logs=timestamp_logs
Expand Down
47 changes: 27 additions & 20 deletions sdcm/utils/sstable/load_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ class SstableLoadUtils:

@staticmethod
def calculate_columns_count_in_table(
target_node, keyspace_name: str = "keyspace1", table_name: str = "standard1"
target_node, keyspace_name: str = "keyspace_refresh", table_name: str = "standard1"
) -> int:
query_cmd = f"SELECT * FROM {keyspace_name}.{table_name} LIMIT 1"
result = target_node.run_cqlsh(query_cmd)
Expand Down Expand Up @@ -67,7 +67,7 @@ def distribute_test_files_to_cluster_nodes(cls, nodes, test_data: List[TestDataI
def upload_sstables(
node,
test_data: TestDataInventory,
keyspace_name: str = "keyspace1",
keyspace_name: str = "keyspace_refresh",
table_name=None,
create_schema: bool = False,
is_cloud_cluster=False,
Expand Down Expand Up @@ -129,7 +129,12 @@ def upload_sstables(

@classmethod
def run_load_and_stream(
cls, node, keyspace_name: str = "keyspace1", table_name: str = "standard1", start_timeout=60, end_timeout=600
cls,
node,
keyspace_name: str = "keyspace_refresh",
table_name: str = "standard1",
start_timeout=60,
end_timeout=600,
):
"""runs load and stream using API request and waits for it to finish"""
with wait_for_log_lines(
Expand All @@ -155,7 +160,7 @@ def run_refresh(node, test_data: namedtuple) -> Iterable[str]:
# Find the compaction output that reported about the resharding

system_log_follower = node.follow_system_log(patterns=[r"Resharded.*"])
node.run_nodetool(sub_cmd="refresh", args="-- keyspace1 standard1")
node.run_nodetool(sub_cmd="refresh", args="-- keyspace_refresh standard1")
return system_log_follower

@staticmethod
Expand All @@ -169,24 +174,24 @@ def validate_resharding_after_refresh(node, system_log_follower):
# Validate that files after resharding were saved in the "upload" folder.
# Example of compaction output:

# scylla[6653]: [shard 0] compaction - [Reshard keyspace1.standard1 3cad4140-f8c3-11ea-acb1-000000000002]
# scylla[6653]: [shard 0] compaction - [Reshard keyspace_refresh.standard1 3cad4140-f8c3-11ea-acb1-000000000002]
# Resharded 1 sstables to [
# /var/lib/scylla/data/keyspace1/standard1-9fbed8d0f8c211ea9bb1000000000000/upload/md-9-big-Data.db:level=0,
# /var/lib/scylla/data/keyspace1/standard1-9fbed8d0f8c211ea9bb1000000000000/upload/md-10-big-Data.db:level=0,
# /var/lib/scylla/data/keyspace1/standard1-9fbed8d0f8c211ea9bb1000000000000/upload/md-11-big-Data.db:level=0,
# /var/lib/scylla/data/keyspace1/standard1-9fbed8d0f8c211ea9bb1000000000000/upload/md-12-big-Data.db:level=0,
# /var/lib/scylla/data/keyspace1/standard1-9fbed8d0f8c211ea9bb1000000000000/upload/md-13-big-Data.db:level=0,
# /var/lib/scylla/data/keyspace1/standard1-9fbed8d0f8c211ea9bb1000000000000/upload/md-22-big-Data.db:level=0,
# /var/lib/scylla/data/keyspace1/standard1-9fbed8d0f8c211ea9bb1000000000000/upload/md-15-big-Data.db:level=0,
# /var/lib/scylla/data/keyspace1/standard1-9fbed8d0f8c211ea9bb1000000000000/upload/md-16-big-Data.db:level=0,
# /var/lib/scylla/data/keyspace_refresh/standard1-9fbed8d0f8c211ea9bb1000000000000/upload/md-9-big-Data.db:level=0,
# /var/lib/scylla/data/keyspace_refresh/standard1-9fbed8d0f8c211ea9bb1000000000000/upload/md-10-big-Data.db:level=0,
# /var/lib/scylla/data/keyspace_refresh/standard1-9fbed8d0f8c211ea9bb1000000000000/upload/md-11-big-Data.db:level=0,
# /var/lib/scylla/data/keyspace_refresh/standard1-9fbed8d0f8c211ea9bb1000000000000/upload/md-12-big-Data.db:level=0,
# /var/lib/scylla/data/keyspace_refresh/standard1-9fbed8d0f8c211ea9bb1000000000000/upload/md-13-big-Data.db:level=0,
# /var/lib/scylla/data/keyspace_refresh/standard1-9fbed8d0f8c211ea9bb1000000000000/upload/md-22-big-Data.db:level=0,
# /var/lib/scylla/data/keyspace_refresh/standard1-9fbed8d0f8c211ea9bb1000000000000/upload/md-15-big-Data.db:level=0,
# /var/lib/scylla/data/keyspace_refresh/standard1-9fbed8d0f8c211ea9bb1000000000000/upload/md-16-big-Data.db:level=0,
# ]. 91MB to 92MB (~100% of original) in 5009ms = 18MB/s. ~370176 total partitions merged to 370150

Starting with Scylla 4.7 messages have changed to the following:
[shard 1] sstables_loader - Loading new SSTables for keyspace=keyspace1, table=standard1, ...
[shard 1] database - Resharding 223kB for keyspace1.standard1
[shard 1] database - Resharded 223kB for keyspace1.standard1 in 0.14 seconds, 1MB/s
[shard 1] database - Loaded 16 SSTables into /var/lib/scylla/data/keyspace1/standard1-eb0401905d8311ecb391aa52ebf0b3e1
[shard 1] sstables_loader - Done loading new SSTables for keyspace=keyspace1, table=standard1, ...
[shard 1] sstables_loader - Loading new SSTables for keyspace=keyspace_refresh, table=standard1, ...
[shard 1] database - Resharding 223kB for keyspace_refresh.standard1
[shard 1] database - Resharded 223kB for keyspace_refresh.standard1 in 0.14 seconds, 1MB/s
[shard 1] database - Loaded 16 SSTables into /var/lib/scylla/data/keyspace_refresh/standard1-eb0401905d8311ecb391aa52ebf0b3e1
[shard 1] sstables_loader - Done loading new SSTables for keyspace=keyspace_refresh, table=standard1, ...

So, there is no per-file paths anymore for resharding log messages, only root dir path.
"""
Expand Down Expand Up @@ -230,7 +235,7 @@ def get_load_test_data_inventory(
def create_keyspace(
cls,
node,
keyspace_name: str = "keyspace1",
keyspace_name: str = "keyspace_refresh",
strategy: str = "NetworkTopologyStrategy",
replication_factor: int = 1,
):
Expand All @@ -245,7 +250,9 @@ def create_table_for_load(cls, node, schema_file_and_path: str, session):
session.execute(schema.replace("\n", ""))

@classmethod
def validate_data_count_after_upload(cls, node, keyspace_name: str = "keyspace1", table_name: str = "standard1"):
def validate_data_count_after_upload(
cls, node, keyspace_name: str = "keyspace_refresh", table_name: str = "standard2"
):
Comment on lines +253 to +255

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🎯 Functional Correctness | 🟠 Major | ⚡ Quick win

validate_data_count_after_upload() now defaults to the wrong table.

Everything else in this flow was moved to keyspace_refresh.standard1, but this helper now defaults to keyspace_refresh.standard2. Any caller that relies on defaults will validate a different table than the one loaded/refreshed and can report a false result.

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@sdcm/utils/sstable/load_utils.py` around lines 253 - 255, The
validate_data_count_after_upload helper is defaulting to the wrong table for the
refresh flow. Update the default table_name in validate_data_count_after_upload
so callers that omit it validate the same table used by the rest of the
keyspace_refresh flow, and check any nearby callers/tests that rely on the
default to ensure they still point to the intended table.

result = node.run_cqlsh(f"consistency QUORUM;SELECT COUNT(*) FROM {keyspace_name}.{table_name}")

next_line_is_result = False
Expand Down
73 changes: 73 additions & 0 deletions unit_tests/integration/test_nemesis_refresh.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
import logging

import pytest

from sdcm.utils.sstable.load_utils import SstableLoadUtils

from sdcm.stress_thread import CassandraStressThread
from unit_tests.lib.dummy_remote import LocalLoaderSetDummy

pytestmark = [
pytest.mark.usefixtures("events"),
pytest.mark.integration,
pytest.mark.xdist_group("docker_heavy"),
]


@pytest.mark.integration
def test_refresh_monkey_flow(docker_scylla, params, events, request):
"""test the flow of refrash monkey locall with a docker base scylla"""

loader_set = LocalLoaderSetDummy(params=params)

ks = "keyspace_refresh"
# Checking the columns number of keyspace_refresh.standard1
stress_cmd = (
"cassandra-stress write n=40000 cl=ONE -mode native cql3 "
f"-schema 'keyspace={ks} replication(strategy=NetworkTopologyStrategy,"
f"replication_factor=1)' -log interval=5"
)
cs_thread = CassandraStressThread(loader_set, stress_cmd, node_list=[docker_scylla], timeout=120, params=params)

def cleanup_thread():
cs_thread.kill()

request.addfinalizer(cleanup_thread)

cs_thread.run()

output, _ = cs_thread.parse_results()
print(output)
column_num = SstableLoadUtils.calculate_columns_count_in_table(
docker_scylla, keyspace_name="keyspace_refresh", table_name="standard1"
)

assert column_num
test_data = SstableLoadUtils.get_load_test_data_inventory(column_num, big_sstable=False, load_and_stream=False)

result = docker_scylla.run_nodetool(sub_cmd="cfstats", args="keyspace_refresh.standard1")

if result is not None and result.exit_status == 0:
key = "0x32373131364f334f3830"
# Check one special key before refresh, we will verify refresh by query in the end
# Note: we can't DELETE the key before refresh, otherwise the old sstable won't be loaded
# TRUNCATE can be used the clean the table, but we can't do it for keyspace_refresh.standard1
query_verify = f"SELECT * FROM keyspace_refresh.standard1 WHERE key={key}"
result = docker_scylla.run_cqlsh(query_verify)
if "(0 rows)" in result.stdout:
logging.debug("Key %s does not exist before refresh", key)
else:
logging.debug("Key %s already exists before refresh", key)

# Executing rolling refresh one by one
for node in [docker_scylla]:
SstableLoadUtils.upload_sstables(
node,
test_data=test_data[0],
table_name="standard1",
is_cloud_cluster=False,
)
SstableLoadUtils.run_refresh(node, test_data=test_data[0])
# Verify that the special key is loaded by SELECT query
result = docker_scylla.run_cqlsh(query_verify)
assert "(1 rows)" in result.stdout, f"The key {key} is not loaded by `nodetool refresh`"
Comment on lines +48 to +73

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🎯 Functional Correctness | 🟠 Major | ⚡ Quick win

This test passes without testing anything when cfstats fails.

The refresh path and final assertion are skipped entirely unless cfstats returns zero. A broken setup therefore reports green CI instead of a failed test. Assert the precondition or fail immediately.

🧰 Tools
🪛 Ruff (0.15.18)

[error] 55-55: Possible SQL injection vector through string-based query construction

(S608)

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@unit_tests/integration/test_nemesis_refresh.py` around lines 48 - 73, The
integration test in the refresh flow skips all validation when
docker_scylla.run_nodetool("cfstats", ...) fails, so a broken setup can still
pass; update the test around the cfstats precondition to fail immediately
instead of wrapping the rest of the logic in a conditional. Use the existing
result from run_nodetool and the surrounding refresh/assertion block in
test_nemesis_refresh to either assert exit_status == 0 or raise on failure
before running the cqlsh check, upload_sstables, and run_refresh steps.

2 changes: 1 addition & 1 deletion unit_tests/lib/fake_cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ def __init__(self, system_log):
def run(self, *args, **kwargs):
lines = [
"[shard 11] sstables_loader - load_and_stream: started ops_uuid=a2661989-6836-418f-aa67-2c5466499848, process [0-1] out",
"[shard 2] sstables_loader - Done loading new SSTables for keyspace=keyspace1, table=standard1, load_and_stream=true, "
"[shard 2] sstables_loader - Done loading new SSTables for keyspace=keyspace_refresh, table=standard1, load_and_stream=true, "
"primary_replica_only=false, status=succeeded",
]
for line in lines:
Expand Down
Loading