Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,9 @@ ccm.py
ccm.egg-info/
.project
.coverage
tests/test_results
tests/test_results/
tests/tests/test_results/
.direnv/
.vscode/

.envrc
ccmlib/tests/
43 changes: 24 additions & 19 deletions ccmlib/cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,11 @@
from concurrent.futures import ThreadPoolExecutor
from typing import List, Tuple

from ruamel.yaml import YAML
from ruamel.yaml import YAML, CommentedMap

from ccmlib import common, repository
from ccmlib.node import Node, NodeError
from ccmlib.common import logger
from ccmlib.common import logger, DEFAULT_DATACENTER, DEFAULT_RACK
from ccmlib.scylla_node import ScyllaNode
from ccmlib.utils.version import parse_version

Expand Down Expand Up @@ -82,6 +82,10 @@ def __init__(self, path, name, partitioner=None, install_dir=None, create_direct
raise
self.debug(f"Started cluster '{self.name}' version {self.__version} installed in {self.__install_dir}")

@property
def parallel_start_supported(self):
return self.__version and not self.__version.startswith('3.')

def load_from_repository(self, version, verbose):
return repository.setup(version, verbose)

Expand Down Expand Up @@ -225,7 +229,7 @@ def version(self):
def cassandra_version(self):
return self.version()

def add(self, node: ScyllaNode, is_seed, data_center=None, rack=None):
def add(self, node: ScyllaNode, is_seed, data_center=DEFAULT_DATACENTER, rack=DEFAULT_RACK):
if node.name in self.nodes:
raise common.ArgumentError(f'Cannot create existing node {node.name}')
self.nodes[node.name] = node
Expand All @@ -241,9 +245,9 @@ def add(self, node: ScyllaNode, is_seed, data_center=None, rack=None):
for trace_class in self._trace:
node.set_log_level("TRACE", trace_class)

if data_center is not None:
self.debug(f"{node.name}: data_center={node.data_center} rack={node.rack} snitch={self.snitch}")
self.__update_topology_files()
self.set_configuration_options(values={'endpoint_snitch': self.snitch})
self._update_config()
self.debug(f"{node.name}: data_center={node.data_center} rack={node.rack} snitch={self.snitch}")
node._save()
return self

Expand Down Expand Up @@ -275,18 +279,18 @@ def populate(self, nodes, debug=False, tokens=None, use_vnodes=False, ipprefix=N
self.use_vnodes = use_vnodes
topology = OrderedDict()
if isinstance(nodes, int):
topology[None] = OrderedDict([(None, nodes)])
topology[DEFAULT_DATACENTER] = OrderedDict([(DEFAULT_RACK, nodes)])
elif isinstance(nodes, list):
for i in range(0, len(nodes)):
dc = f"dc{i + 1}"
n = nodes[i]
topology[dc] = OrderedDict([(None, n)])
topology[dc] = OrderedDict([(DEFAULT_RACK, n)])
elif isinstance(nodes, dict):
for dc, x in nodes.items():
if isinstance(x, int):
topology[dc] = OrderedDict([(None, x)])
topology[dc] = OrderedDict([(DEFAULT_RACK, x)])
elif isinstance(x, list):
topology[dc] = OrderedDict([(f"RAC{i}", n) for i, n in enumerate(x, start=1)])
topology[dc] = OrderedDict([(f"rac{i}", n) for i, n in enumerate(x, start=1)])
elif isinstance(x, dict):
topology[dc] = OrderedDict([(rack, n) for rack, n in x.items()])
else:
Expand All @@ -304,13 +308,13 @@ def populate(self, nodes, debug=False, tokens=None, use_vnodes=False, ipprefix=N
node_count += n
for _ in range(n):
node_locations.append((dc, rack))
if dcs != [None]:
self.set_configuration_options(values={'endpoint_snitch': self.snitch})
self.use_vnodes = use_vnodes

if node_count < 1:
raise common.ArgumentError(f'invalid topology {topology}')

self.set_configuration_options(values={'endpoint_snitch': self.snitch})

for i in range(1, node_count + 1):
if f'node{i}' in list(self.nodes.values()):
raise common.ArgumentError(f'Cannot create existing node node{i}')
Expand All @@ -331,7 +335,7 @@ def populate(self, nodes, debug=False, tokens=None, use_vnodes=False, ipprefix=N
self._update_config()
return self

def new_node(self, i, auto_bootstrap=False, debug=False, initial_token=None, add_node=True, is_seed=True, data_center=None, rack=None) -> ScyllaNode:
def new_node(self, i, auto_bootstrap=False, debug=False, initial_token=None, add_node=True, is_seed=True, data_center=DEFAULT_DATACENTER, rack=DEFAULT_RACK) -> ScyllaNode:
ipformat = self.get_ipformat() # noqa: F841
binary = self.get_binary_interface(i)
node = self.create_node(name=f'node{i}',
Expand Down Expand Up @@ -480,13 +484,14 @@ def start(self, no_wait=False, verbose=False, wait_for_binary_proto=False, wait_
marks = [(node, node.mark_log()) for node in list(self.nodes.values())]

started: List[Tuple[ScyllaNode, subprocess.Popen, int]] = []
wait_args = {} if self.parallel_start_supported else {'wait_other_notice': True, 'wait_for_binary_proto': True}
for node in list(self.nodes.values()):
if not node.is_running():
mark = 0
if os.path.exists(node.logfilename()):
mark = node.mark_log()

p = node.start(update_pid=False, jvm_args=jvm_args, profile_options=profile_options, verbose=verbose, quiet_start=quiet_start)
p = node.start(update_pid=False, jvm_args=jvm_args, profile_options=profile_options, verbose=verbose, quiet_start=quiet_start, **wait_args)
started.append((node, p, mark))

if no_wait and not verbose:
Expand Down Expand Up @@ -670,7 +675,7 @@ def _update_config(self, install_dir=None):
'log_level': self.__log_level,
'use_vnodes': self.use_vnodes,
'id': self.id,
'ipprefix': self.ipprefix
'ipprefix': self.ipprefix,
}
if getattr(self, 'sni_proxy_docker_ids', None):
cluster_config['sni_proxy_docker_ids'] = self.sni_proxy_docker_ids
Expand All @@ -691,10 +696,10 @@ def __update_topology_files(self):
self.__update_topology_using_rackdc_properties()

def __update_topology_using_toplogy_properties(self):
dcs = [('default', 'dc1', 'r1')]
dcs = [('default', DEFAULT_DATACENTER, DEFAULT_RACK)]
for node in self.nodelist():
if node.data_center is not None:
dcs.append((node.address(), node.data_center, node.rack or 'r1'))
dcs.append((node.address(), node.data_center, node.rack))

content = ""
for k, v, r in dcs:
Expand All @@ -707,10 +712,10 @@ def __update_topology_using_toplogy_properties(self):

def __update_topology_using_rackdc_properties(self):
for node in self.nodelist():
dc = 'dc1'
dc = DEFAULT_DATACENTER
if node.data_center is not None:
dc = node.data_center
rack = 'RAC1'
rack = DEFAULT_RACK
if node.rack is not None:
rack = node.rack
rackdc_file = os.path.join(node.get_conf_dir(), 'cassandra-rackdc.properties')
Expand Down
6 changes: 3 additions & 3 deletions ccmlib/cmds/cluster_cmds.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
from ccmlib.cluster import Cluster
from ccmlib.cluster_factory import ClusterFactory
from ccmlib.cmds.command import Cmd, PlainHelpFormatter
from ccmlib.common import ArgumentError
from ccmlib.common import ArgumentError, DEFAULT_DATACENTER, DEFAULT_RACK
from ccmlib.dse_cluster import DseCluster
from ccmlib.scylla_cluster import ScyllaCluster
from ccmlib.scylla_docker_cluster import ScyllaDockerCluster, ScyllaDockerNode
Expand Down Expand Up @@ -310,9 +310,9 @@ def get_parser(self):
parser.add_option('-n', '--token', type="string", dest="initial_token",
help="Initial token for the node", default=None)
parser.add_option('-d', '--data-center', type="string", dest="data_center",
help="Datacenter name this node is part of", default=None)
help="Datacenter name this node is part of", default=DEFAULT_DATACENTER)
parser.add_option('--rack', type="string", dest="rack",
help="Rack name this node is part of", default=None)
help="Rack name this node is part of", default=DEFAULT_RACK)
parser.add_option('--dse', action="store_true", dest="dse_node",
help="Add node to DSE Cluster", default=False)
parser.add_option('--scylla', action="store_true", dest="scylla_node",
Expand Down
3 changes: 3 additions & 0 deletions ccmlib/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,9 @@

DOWNLOAD_IN_PROGRESS_FILE = "download_in_progress"

DEFAULT_DATACENTER = 'dc1'
DEFAULT_RACK = 'rac1'

logger = logging.getLogger('ccm')

class CCMError(Exception):
Expand Down
9 changes: 6 additions & 3 deletions ccmlib/node.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
from ruamel.yaml import YAML

from ccmlib import common
from ccmlib.common import DEFAULT_DATACENTER, DEFAULT_RACK
from ccmlib.repository import setup
from ccmlib.utils.version import parse_version

Expand Down Expand Up @@ -116,8 +117,8 @@ def __init__(self, name, cluster, auto_bootstrap, thrift_interface, storage_inte
self.initial_token = initial_token
self.pid = None
self.all_pids = []
self.data_center = None
self.rack = None
self.data_center = DEFAULT_DATACENTER
self.rack = DEFAULT_RACK
self.workload = None
self.__config_options = {}
self.__install_dir = None
Expand Down Expand Up @@ -531,7 +532,7 @@ def watch_log_for_alive(self, nodes, from_mark=None, timeout=120, filename='syst
nodes are marked UP. This method works similarly to watch_log_for_death.
"""
tofind = nodes if isinstance(nodes, list) else [nodes]
tofind = [f"({node.address()}|{node.hostid()}).* now UP" for node in tofind]
tofind = [f"({node.address()}|[A-Za-z0-9-]+).* now UP" for node in tofind]
self.watch_log_for(tofind, from_mark=from_mark, timeout=timeout, filename=filename)

def wait_for_binary_interface(self, **kwargs):
Expand Down Expand Up @@ -1585,6 +1586,8 @@ def _update_config(self):
values['remote_debug_port'] = self.remote_debug_port
if self.data_center:
values['data_center'] = self.data_center
if self.rack:
values['rack'] = self.rack
if self.workload is not None:
values['workload'] = self.workload
with open(filename, 'w') as f:
Expand Down
11 changes: 11 additions & 0 deletions ccmlib/scylla_cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,17 @@ def __set_default_timeouts(self):
self.default_wait_other_notice_timeout = 120 if self.scylla_mode != 'debug' else 600
self.default_wait_for_binary_proto = 420 if self.scylla_mode != 'debug' else 900

@property
def parallel_start_supported(self):
if not self.__version:
return False
chunks = self.__version.split('.')
if len(chunks) < 1:
return False
if int(chunks[0]) >= 2000:
return int(chunks[0]) >= 2025
return int(chunks[0]) >= 6

# override get_node_jmx_port for scylla-jmx
# scylla-jmx listens on the unique node address (127.0.<cluster.id><node.id>)
# so there's no need to listen on a different port for every jmx instance
Expand Down
46 changes: 36 additions & 10 deletions tests/ccmcluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,9 @@
import os
import subprocess

from ruamel.yaml import YAML
from ccmlib import common

from ccmlib.cluster_factory import ClusterFactory

LOGGER = logging.getLogger(__name__)

Expand Down Expand Up @@ -44,6 +45,12 @@ def get_create_cmd(self, args=None):

return cmd_args

def get_ccm_cluster(self):
return ClusterFactory.load(common.get_default_path(), self.name)

def get_populate_cmd(self, *args):
return [self.ccm_bin, "populate", self.name, *args]

def get_remove_cmd(self):
return [self.ccm_bin, "remove", self.name]

Expand All @@ -56,20 +63,39 @@ def get_list_cmd(self):
def get_status_cmd(self):
return [self.ccm_bin, "status"]

def get_start_cmd(self):
return [self.ccm_bin, "start", "--wait-for-binary-proto"]
def get_start_cmd(self, *args):
return [self.ccm_bin, "start", "--wait-for-binary-proto", *args]

def get_start_sni_proxy_cmd(self):
return [self.ccm_bin, "start", "--sni-proxy", "--sni-port", "8443", "--wait-for-binary-proto"]

def get_add_cmd(self, node_name):
cmd_args = []
def get_add_cmd(self, node_name, *args):
cmd_args = args
if self.use_scylla:
cmd_args += ["--scylla"]
return [self.ccm_bin, "add", "-b", *cmd_args, node_name]

def get_node_start_cmd(self, node_name):
return [self.ccm_bin, node_name, "start"]
cmd_args += ("--scylla", )
return [self.ccm_bin, "add", "-b", node_name, *cmd_args]

def get_node_start_cmd(self, node_name, *args):
return [self.ccm_bin, node_name, "start", *args]

def get_cluster_config(self):
with open(os.path.join(self.cluster_dir, "cluster.conf")) as f:
return YAML(typ='rt').load(f)

def get_nodes_cassandra_rackdc_properties(self, node_name):
rackdc_file_name = os.path.join(self.cluster_dir, node_name, "conf", "cassandra-rackdc.properties")
if not os.path.exists(rackdc_file_name):
return []
with open(rackdc_file_name) as f:
res = {}
for line in f.readlines():
if line.startswith("#"):
continue
chunks = line.strip().split("=", 2)
if len(chunks) == 2:
key, value = chunks
res[key.strip()] = value.strip()
return res

def get_updateconf_cmd(self):
return [self.ccm_bin, "updateconf",
Expand Down
Loading