diff --git a/.gitignore b/.gitignore index c29d413e..730a22fc 100644 --- a/.gitignore +++ b/.gitignore @@ -9,9 +9,9 @@ ccm.py ccm.egg-info/ .project .coverage -tests/test_results +tests/test_results/ +tests/tests/test_results/ .direnv/ .vscode/ - .envrc ccmlib/tests/ \ No newline at end of file diff --git a/ccmlib/cluster.py b/ccmlib/cluster.py index 78c4ee58..b769e5b7 100644 --- a/ccmlib/cluster.py +++ b/ccmlib/cluster.py @@ -10,11 +10,11 @@ from concurrent.futures import ThreadPoolExecutor from typing import List, Tuple -from ruamel.yaml import YAML +from ruamel.yaml import YAML, CommentedMap from ccmlib import common, repository from ccmlib.node import Node, NodeError -from ccmlib.common import logger +from ccmlib.common import logger, DEFAULT_DATACENTER, DEFAULT_RACK from ccmlib.scylla_node import ScyllaNode from ccmlib.utils.version import parse_version @@ -82,6 +82,10 @@ def __init__(self, path, name, partitioner=None, install_dir=None, create_direct raise self.debug(f"Started cluster '{self.name}' version {self.__version} installed in {self.__install_dir}") + @property + def parallel_start_supported(self): + return self.__version and not self.__version.startswith('3.') + def load_from_repository(self, version, verbose): return repository.setup(version, verbose) @@ -225,7 +229,7 @@ def version(self): def cassandra_version(self): return self.version() - def add(self, node: ScyllaNode, is_seed, data_center=None, rack=None): + def add(self, node: ScyllaNode, is_seed, data_center=DEFAULT_DATACENTER, rack=DEFAULT_RACK): if node.name in self.nodes: raise common.ArgumentError(f'Cannot create existing node {node.name}') self.nodes[node.name] = node @@ -241,9 +245,9 @@ def add(self, node: ScyllaNode, is_seed, data_center=None, rack=None): for trace_class in self._trace: node.set_log_level("TRACE", trace_class) - if data_center is not None: - self.debug(f"{node.name}: data_center={node.data_center} rack={node.rack} snitch={self.snitch}") - self.__update_topology_files() + self.set_configuration_options(values={'endpoint_snitch': self.snitch}) + self._update_config() + self.debug(f"{node.name}: data_center={node.data_center} rack={node.rack} snitch={self.snitch}") node._save() return self @@ -275,18 +279,18 @@ def populate(self, nodes, debug=False, tokens=None, use_vnodes=False, ipprefix=N self.use_vnodes = use_vnodes topology = OrderedDict() if isinstance(nodes, int): - topology[None] = OrderedDict([(None, nodes)]) + topology[DEFAULT_DATACENTER] = OrderedDict([(DEFAULT_RACK, nodes)]) elif isinstance(nodes, list): for i in range(0, len(nodes)): dc = f"dc{i + 1}" n = nodes[i] - topology[dc] = OrderedDict([(None, n)]) + topology[dc] = OrderedDict([(DEFAULT_RACK, n)]) elif isinstance(nodes, dict): for dc, x in nodes.items(): if isinstance(x, int): - topology[dc] = OrderedDict([(None, x)]) + topology[dc] = OrderedDict([(DEFAULT_RACK, x)]) elif isinstance(x, list): - topology[dc] = OrderedDict([(f"RAC{i}", n) for i, n in enumerate(x, start=1)]) + topology[dc] = OrderedDict([(f"rac{i}", n) for i, n in enumerate(x, start=1)]) elif isinstance(x, dict): topology[dc] = OrderedDict([(rack, n) for rack, n in x.items()]) else: @@ -304,13 +308,13 @@ def populate(self, nodes, debug=False, tokens=None, use_vnodes=False, ipprefix=N node_count += n for _ in range(n): node_locations.append((dc, rack)) - if dcs != [None]: - self.set_configuration_options(values={'endpoint_snitch': self.snitch}) self.use_vnodes = use_vnodes if node_count < 1: raise common.ArgumentError(f'invalid topology {topology}') + self.set_configuration_options(values={'endpoint_snitch': self.snitch}) + for i in range(1, node_count + 1): if f'node{i}' in list(self.nodes.values()): raise common.ArgumentError(f'Cannot create existing node node{i}') @@ -331,7 +335,7 @@ def populate(self, nodes, debug=False, tokens=None, use_vnodes=False, ipprefix=N self._update_config() return self - def new_node(self, i, auto_bootstrap=False, debug=False, initial_token=None, add_node=True, is_seed=True, data_center=None, rack=None) -> ScyllaNode: + def new_node(self, i, auto_bootstrap=False, debug=False, initial_token=None, add_node=True, is_seed=True, data_center=DEFAULT_DATACENTER, rack=DEFAULT_RACK) -> ScyllaNode: ipformat = self.get_ipformat() # noqa: F841 binary = self.get_binary_interface(i) node = self.create_node(name=f'node{i}', @@ -480,13 +484,14 @@ def start(self, no_wait=False, verbose=False, wait_for_binary_proto=False, wait_ marks = [(node, node.mark_log()) for node in list(self.nodes.values())] started: List[Tuple[ScyllaNode, subprocess.Popen, int]] = [] + wait_args = {} if self.parallel_start_supported else {'wait_other_notice': True, 'wait_for_binary_proto': True} for node in list(self.nodes.values()): if not node.is_running(): mark = 0 if os.path.exists(node.logfilename()): mark = node.mark_log() - p = node.start(update_pid=False, jvm_args=jvm_args, profile_options=profile_options, verbose=verbose, quiet_start=quiet_start) + p = node.start(update_pid=False, jvm_args=jvm_args, profile_options=profile_options, verbose=verbose, quiet_start=quiet_start, **wait_args) started.append((node, p, mark)) if no_wait and not verbose: @@ -670,7 +675,7 @@ def _update_config(self, install_dir=None): 'log_level': self.__log_level, 'use_vnodes': self.use_vnodes, 'id': self.id, - 'ipprefix': self.ipprefix + 'ipprefix': self.ipprefix, } if getattr(self, 'sni_proxy_docker_ids', None): cluster_config['sni_proxy_docker_ids'] = self.sni_proxy_docker_ids @@ -691,10 +696,10 @@ def __update_topology_files(self): self.__update_topology_using_rackdc_properties() def __update_topology_using_toplogy_properties(self): - dcs = [('default', 'dc1', 'r1')] + dcs = [('default', DEFAULT_DATACENTER, DEFAULT_RACK)] for node in self.nodelist(): if node.data_center is not None: - dcs.append((node.address(), node.data_center, node.rack or 'r1')) + dcs.append((node.address(), node.data_center, node.rack)) content = "" for k, v, r in dcs: @@ -707,10 +712,10 @@ def __update_topology_using_toplogy_properties(self): def __update_topology_using_rackdc_properties(self): for node in self.nodelist(): - dc = 'dc1' + dc = DEFAULT_DATACENTER if node.data_center is not None: dc = node.data_center - rack = 'RAC1' + rack = DEFAULT_RACK if node.rack is not None: rack = node.rack rackdc_file = os.path.join(node.get_conf_dir(), 'cassandra-rackdc.properties') diff --git a/ccmlib/cmds/cluster_cmds.py b/ccmlib/cmds/cluster_cmds.py index a19130d3..adceb8c2 100644 --- a/ccmlib/cmds/cluster_cmds.py +++ b/ccmlib/cmds/cluster_cmds.py @@ -7,7 +7,7 @@ from ccmlib.cluster import Cluster from ccmlib.cluster_factory import ClusterFactory from ccmlib.cmds.command import Cmd, PlainHelpFormatter -from ccmlib.common import ArgumentError +from ccmlib.common import ArgumentError, DEFAULT_DATACENTER, DEFAULT_RACK from ccmlib.dse_cluster import DseCluster from ccmlib.scylla_cluster import ScyllaCluster from ccmlib.scylla_docker_cluster import ScyllaDockerCluster, ScyllaDockerNode @@ -310,9 +310,9 @@ def get_parser(self): parser.add_option('-n', '--token', type="string", dest="initial_token", help="Initial token for the node", default=None) parser.add_option('-d', '--data-center', type="string", dest="data_center", - help="Datacenter name this node is part of", default=None) + help="Datacenter name this node is part of", default=DEFAULT_DATACENTER) parser.add_option('--rack', type="string", dest="rack", - help="Rack name this node is part of", default=None) + help="Rack name this node is part of", default=DEFAULT_RACK) parser.add_option('--dse', action="store_true", dest="dse_node", help="Add node to DSE Cluster", default=False) parser.add_option('--scylla', action="store_true", dest="scylla_node", diff --git a/ccmlib/common.py b/ccmlib/common.py index 2d774dcc..8146bccf 100644 --- a/ccmlib/common.py +++ b/ccmlib/common.py @@ -52,6 +52,9 @@ DOWNLOAD_IN_PROGRESS_FILE = "download_in_progress" +DEFAULT_DATACENTER = 'dc1' +DEFAULT_RACK = 'rac1' + logger = logging.getLogger('ccm') class CCMError(Exception): diff --git a/ccmlib/node.py b/ccmlib/node.py index 1ecd58af..f986ed9b 100644 --- a/ccmlib/node.py +++ b/ccmlib/node.py @@ -21,6 +21,7 @@ from ruamel.yaml import YAML from ccmlib import common +from ccmlib.common import DEFAULT_DATACENTER, DEFAULT_RACK from ccmlib.repository import setup from ccmlib.utils.version import parse_version @@ -116,8 +117,8 @@ def __init__(self, name, cluster, auto_bootstrap, thrift_interface, storage_inte self.initial_token = initial_token self.pid = None self.all_pids = [] - self.data_center = None - self.rack = None + self.data_center = DEFAULT_DATACENTER + self.rack = DEFAULT_RACK self.workload = None self.__config_options = {} self.__install_dir = None @@ -531,7 +532,7 @@ def watch_log_for_alive(self, nodes, from_mark=None, timeout=120, filename='syst nodes are marked UP. This method works similarly to watch_log_for_death. """ tofind = nodes if isinstance(nodes, list) else [nodes] - tofind = [f"({node.address()}|{node.hostid()}).* now UP" for node in tofind] + tofind = [f"({node.address()}|[A-Za-z0-9-]+).* now UP" for node in tofind] self.watch_log_for(tofind, from_mark=from_mark, timeout=timeout, filename=filename) def wait_for_binary_interface(self, **kwargs): @@ -1585,6 +1586,8 @@ def _update_config(self): values['remote_debug_port'] = self.remote_debug_port if self.data_center: values['data_center'] = self.data_center + if self.rack: + values['rack'] = self.rack if self.workload is not None: values['workload'] = self.workload with open(filename, 'w') as f: diff --git a/ccmlib/scylla_cluster.py b/ccmlib/scylla_cluster.py index 0f3b3394..1b223940 100644 --- a/ccmlib/scylla_cluster.py +++ b/ccmlib/scylla_cluster.py @@ -82,6 +82,17 @@ def __set_default_timeouts(self): self.default_wait_other_notice_timeout = 120 if self.scylla_mode != 'debug' else 600 self.default_wait_for_binary_proto = 420 if self.scylla_mode != 'debug' else 900 + @property + def parallel_start_supported(self): + if not self.__version: + return False + chunks = self.__version.split('.') + if len(chunks) < 1: + return False + if int(chunks[0]) >= 2000: + return int(chunks[0]) >= 2025 + return int(chunks[0]) >= 6 + # override get_node_jmx_port for scylla-jmx # scylla-jmx listens on the unique node address (127.0.) # so there's no need to listen on a different port for every jmx instance diff --git a/tests/ccmcluster.py b/tests/ccmcluster.py index cf3eae62..5577df66 100644 --- a/tests/ccmcluster.py +++ b/tests/ccmcluster.py @@ -2,8 +2,9 @@ import os import subprocess +from ruamel.yaml import YAML from ccmlib import common - +from ccmlib.cluster_factory import ClusterFactory LOGGER = logging.getLogger(__name__) @@ -44,6 +45,12 @@ def get_create_cmd(self, args=None): return cmd_args + def get_ccm_cluster(self): + return ClusterFactory.load(common.get_default_path(), self.name) + + def get_populate_cmd(self, *args): + return [self.ccm_bin, "populate", self.name, *args] + def get_remove_cmd(self): return [self.ccm_bin, "remove", self.name] @@ -56,20 +63,39 @@ def get_list_cmd(self): def get_status_cmd(self): return [self.ccm_bin, "status"] - def get_start_cmd(self): - return [self.ccm_bin, "start", "--wait-for-binary-proto"] + def get_start_cmd(self, *args): + return [self.ccm_bin, "start", "--wait-for-binary-proto", *args] def get_start_sni_proxy_cmd(self): return [self.ccm_bin, "start", "--sni-proxy", "--sni-port", "8443", "--wait-for-binary-proto"] - def get_add_cmd(self, node_name): - cmd_args = [] + def get_add_cmd(self, node_name, *args): + cmd_args = args if self.use_scylla: - cmd_args += ["--scylla"] - return [self.ccm_bin, "add", "-b", *cmd_args, node_name] - - def get_node_start_cmd(self, node_name): - return [self.ccm_bin, node_name, "start"] + cmd_args += ("--scylla", ) + return [self.ccm_bin, "add", "-b", node_name, *cmd_args] + + def get_node_start_cmd(self, node_name, *args): + return [self.ccm_bin, node_name, "start", *args] + + def get_cluster_config(self): + with open(os.path.join(self.cluster_dir, "cluster.conf")) as f: + return YAML(typ='rt').load(f) + + def get_nodes_cassandra_rackdc_properties(self, node_name): + rackdc_file_name = os.path.join(self.cluster_dir, node_name, "conf", "cassandra-rackdc.properties") + if not os.path.exists(rackdc_file_name): + return [] + with open(rackdc_file_name) as f: + res = {} + for line in f.readlines(): + if line.startswith("#"): + continue + chunks = line.strip().split("=", 2) + if len(chunks) == 2: + key, value = chunks + res[key.strip()] = value.strip() + return res def get_updateconf_cmd(self): return [self.ccm_bin, "updateconf", diff --git a/tests/test_topology.py b/tests/test_topology.py new file mode 100644 index 00000000..119b95d6 --- /dev/null +++ b/tests/test_topology.py @@ -0,0 +1,288 @@ +import os +import re + +import pytest +from ccmlib import common + +from tests.test_scylla_cmds import cluster_params, copy_cluster_data + + +@cluster_params +class TestCCMTopology: + start_params = ["--wait-other-notice", "--wait-for-binary-proto"] + + @staticmethod + @pytest.fixture(autouse=True) + def base_setup(request, cluster_under_test): + try: + yield + finally: + cluster_under_test.run_command(cluster_under_test.get_stop_cmd()) + cluster_under_test.process.wait() + copy_cluster_data(request) + cluster_under_test.run_command(cluster_under_test.get_remove_cmd()) + cluster_under_test.process.wait() + if os.path.exists(cluster_under_test.cluster_dir): + common.rmdirs(cluster_under_test.cluster_dir) + + def test_topology_single_dc(self, cluster_under_test): + cluster_under_test.run_command(cluster_under_test.get_create_cmd(["-n", "1"])) + cluster_under_test.validate_command_result() + cluster_under_test.run_command(cluster_under_test.get_start_cmd("--wait-other-notice")) + cluster_under_test.validate_command_result() + assert_topology( + cluster_under_test, + { + 'node1': {'dc': 'dc1', 'rack': 'rac1'} + }) + cluster_under_test.run_command(cluster_under_test.get_add_cmd("node2", "-j", "7501")) + cluster_under_test.validate_command_result() + cluster_under_test.run_command(cluster_under_test.get_node_start_cmd("node2", *self.start_params)) + cluster_under_test.validate_command_result() + # No DC is provided node is provisioned to the same DC + # Cluster stays as single DC cluster, `conf/cassandra-rackdc.properties` should be empty for both nodes + assert_topology( + cluster_under_test, + { + 'node1': {'dc': 'dc1', 'rack': 'rac1'}, + 'node2': {'dc': 'dc1', 'rack': 'rac1'} + }) + + def test_topology_multi_dc(self, cluster_under_test): + cluster_under_test.run_command(cluster_under_test.get_create_cmd(["-n", "1:0"])) + cluster_under_test.validate_command_result() + cluster_under_test.run_command(cluster_under_test.get_start_cmd("--wait-other-notice")) + cluster_under_test.validate_command_result() + # Since it is single DC cluster `conf/cassandra-rackdc.properties` should be empty + assert_topology( + cluster_under_test, + { + 'node1': {'dc': 'dc1', 'rack': 'rac1'} + }) + cluster_under_test.run_command(cluster_under_test.get_add_cmd("node2", "--rack", "rac2", "-j", "7501")) + cluster_under_test.validate_command_result() + cluster_under_test.run_command(cluster_under_test.get_node_start_cmd("node2", *self.start_params)) + cluster_under_test.validate_command_result() + # No DC is provided node is provisioned to the same DC + # Cluster stays as single DC cluster, `conf/cassandra-rackdc.properties` should be empty for both nodes + assert_topology( + cluster_under_test, + { + 'node1': {'dc': 'dc1', 'rack': 'rac1'}, + 'node2': {'dc': 'dc1', 'rack': 'rac2'} + }) + cluster_under_test.run_command(cluster_under_test.get_add_cmd("node3", "--data-center", "dc2", "-j", "7502")) + cluster_under_test.validate_command_result() + cluster_under_test.run_command(cluster_under_test.get_node_start_cmd("node3", *self.start_params)) + cluster_under_test.validate_command_result() + # DC2 is specifically targeted + # Cluster switches to multi DC cluster, `conf/cassandra-rackdc.properties` should be populated with RACK and DC + assert_topology( + cluster_under_test, + { + 'node1': {'dc': 'dc1', 'rack': 'rac1'}, + 'node2': {'dc': 'dc1', 'rack': 'rac2'}, + 'node3': {'dc': 'dc2', 'rack': 'rac1'} + }) + cluster_under_test.run_command(cluster_under_test.get_add_cmd("node4", "--data-center", "dc2", "--rack", "rac2", "-j", "7503")) + cluster_under_test.validate_command_result() + cluster_under_test.run_command(cluster_under_test.get_node_start_cmd("node4", *self.start_params)) + cluster_under_test.validate_command_result() + # DC2 is specifically targeted + # Cluster switches to multi DC cluster, `conf/cassandra-rackdc.properties` should be populated with RACK and DC + assert_topology( + cluster_under_test, + { + 'node1': {'dc': 'dc1', 'rack': 'rac1'}, + 'node2': {'dc': 'dc1', 'rack': 'rac2'}, + 'node3': {'dc': 'dc2', 'rack': 'rac1'}, + 'node4': {'dc': 'dc2', 'rack': 'rac2'} + }) + + def test_topology_no_populate_multi_dc(self, cluster_under_test): + cluster_under_test.run_command(cluster_under_test.get_create_cmd()) + cluster_under_test.validate_command_result() + # Since it is single DC cluster `conf/cassandra-rackdc.properties` should be empty + cluster_under_test.run_command(cluster_under_test.get_add_cmd("node1")) + cluster_under_test.validate_command_result() + cluster_under_test.run_command(cluster_under_test.get_add_cmd("node2", "--rack", "rac2", "-j", "7501")) + cluster_under_test.validate_command_result() + cluster_under_test.run_command(cluster_under_test.get_add_cmd("node3", "--data-center", "dc2", "-j", "7502")) + cluster_under_test.validate_command_result() + cluster_under_test.run_command(cluster_under_test.get_add_cmd("node4", "--data-center", "dc2", "--rack", "rac2", "-j", "7503")) + cluster_under_test.validate_command_result() + cluster_under_test.run_command(cluster_under_test.get_start_cmd("--wait-other-notice")) + cluster_under_test.validate_command_result() + # DC2 is specifically targeted + # Cluster switches to multi DC cluster, `conf/cassandra-rackdc.properties` should be populated with RACK and DC + assert_topology( + cluster_under_test, + { + 'node1': {'dc': 'dc1', 'rack': 'rac1'}, + 'node2': {'dc': 'dc1', 'rack': 'rac2'}, + 'node3': {'dc': 'dc2', 'rack': 'rac1'}, + 'node4': {'dc': 'dc2', 'rack': 'rac2'} + }) + + +def get_all_cassandra_rackdc_properties(cluster): + res = {} + for node_name in cluster.get_cluster_config().get("nodes"): + res[node_name] = cluster.get_nodes_cassandra_rackdc_properties(node_name) + return res + +def get_node_cassandra_topology_properties(ccm_cluster, node_name): + address_to_name = {} + for node in ccm_cluster.nodelist(): + address_to_name[node.address()] = node.name + + topology_file_name = os.path.join(ccm_cluster.get_path(), node_name, "conf", "cassandra-topology.properties") + if not os.path.exists(topology_file_name): + return [] + with open(topology_file_name) as f: + res = {} + for line in f.readlines(): + if line.startswith("#"): + continue + chunks = line.strip().split("=", 2) + if len(chunks) == 2: + address, dc_rack = chunks + if address.strip() == "default": + continue + chunks = dc_rack.split(":") + if len(chunks) == 2: + dc, rack = dc_rack.split(":") + host_name = address_to_name.get(address) + if not host_name: + continue + res[host_name] = { + 'dc': dc, + 'rack': rack, + } + return res + + +def assert_cassandra_topology_properties(ccm_cluster, expected_topology): + for node in ccm_cluster.nodelist(): + assert get_node_cassandra_topology_properties(ccm_cluster, node.name) == expected_topology, \ + f"cassandra-topology.properties on {node.name} returned different topology" + + +def run_cqlsh_and_return(node, query): + result = [] + is_result = False + for line in node.run_cqlsh(query, return_output=True)[0].split('\n'): + if not line: + continue + if line.startswith('----'): + is_result = True + continue + if is_result: + if line.startswith('(') and line.endswith('rows)'): + break + result.append([chunk.strip() for chunk in line.split(' | ')]) + return result + + +def assert_topology_on_cqlsh(ccm_cluster, expected_topology): + node_id_to_name = {} + for node in ccm_cluster.nodelist(): + node_id_to_name[node.hostid()] = node.name + + for node in ccm_cluster.nodelist(): + if node.name in expected_topology: + expected_local = expected_topology.get(node.name) + row = run_cqlsh_and_return(node, 'select data_center, rack from system.local') + if not row or not row[0]: + raise ValueError("query 'select data_center, rack from system.local' returned no results") + assert { + "dc": row[0][0], + "rack": row[0][1], + } == expected_local, f"node {node.name} `system.local` returned different topology" + row = run_cqlsh_and_return(node, 'select host_id, data_center, rack from system.peers') + for rec in row: + node_name = node_id_to_name.get(rec[0]) + if not node_name: + raise ValueError(f"node id {rec[0]} is unknown") + expected_node_topology = expected_topology.get(node_name) + if expected_node_topology is None: + continue + assert { + "dc": rec[1], + "rack": rec[2], + } == expected_node_topology, f"node {node.name} `system.peers` returned different topology" + + +def get_nodetool_topology(node, node_id_to_name): + topology = {} + out = node.nodetool("status") + is_rows = False + dc = None + rack_header_id = None + host_header_id = None + for line in out[0].split('\n'): + if line.startswith('Datacenter:'): + chunks = line.split() + dc = chunks[1] if len(chunks) > 1 else None + if line.startswith('--'): + # Reverse line, pattern breaks in some nodetool versions, but ends stays similar + line = rev(line.lower()) + is_rows = True + header_id = 0 + rack_header_id = None + host_header_id = None + for header in [x.group() for x in re.finditer(r'(di tsoh|\w+)', line)]: + header = rev(header.strip()) + if header.lower() == 'rack': + rack_header_id = header_id + if header.lower() == 'host id': + host_header_id = header_id + header_id += 1 + if rack_header_id is None: + raise ValueError(f"rack header not found in header line: {line}") + if host_header_id is None: + raise ValueError(f"host id not found in header line: {line}") + continue + if not is_rows: + continue + if not line.strip(): + dc = None + is_rows = False + continue + chunks = rev(line).split() + if len(chunks) < 3: + continue + host_id, rack = rev(chunks[host_header_id]), rev(chunks[rack_header_id]) + node_name = node_id_to_name.get(host_id) + if not node_name: + raise ValueError(f"node id {host_id} is unknown") + topology[node_name] = { + "dc": dc, + "rack": rack, + } + return topology + + +def assert_topology_on_nodetool(ccm_cluster, expected_topology): + node_id_to_name = {} + for node in ccm_cluster.nodelist(): + node_id_to_name[node.hostid()] = node.name + + for node in ccm_cluster.nodelist(): + assert get_nodetool_topology(node, node_id_to_name) == expected_topology, f"nodetool status on {node.name} returned different topology" + + +def assert_topology(cluster, expected_topology): + ccm_cluster = cluster.get_ccm_cluster() + if ccm_cluster.snitch == 'org.apache.cassandra.locator.PropertyFileSnitch': + assert_cassandra_topology_properties(ccm_cluster, expected_topology) + elif ccm_cluster.snitch == 'org.apache.cassandra.locator.GossipingPropertyFileSnitch': + assert get_all_cassandra_rackdc_properties( + cluster) == expected_topology, "cassandra-rackdc.properties contains different topology" + if cluster.use_scylla: + # Cassandra 3.x cqlsh is broken on modern python + assert_topology_on_cqlsh(ccm_cluster, expected_topology) + assert_topology_on_nodetool(ccm_cluster, expected_topology) + +def rev(line): + return ''.join(reversed(line)) \ No newline at end of file