Skip to content
Draft
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions ccmlib/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -291,6 +291,7 @@ def replaces_in_files(src, dst, replacement_list):
match = r.search(line)
if match:
line = replace + "\n"
break
f_tmp.write(line)


Expand Down
48 changes: 27 additions & 21 deletions ccmlib/node.py
Original file line number Diff line number Diff line change
Expand Up @@ -457,7 +457,7 @@ def watch_log_for(self, exprs, from_mark=None, timeout=600, process=None, verbos
tofind = [exprs] if isinstance(exprs, str) else exprs
tofind = [re.compile(e) for e in tofind]
matchings = []
reads = ""
reads = []
if len(tofind) == 0:
return None

Expand Down Expand Up @@ -488,7 +488,7 @@ def watch_log_for(self, exprs, from_mark=None, timeout=600, process=None, verbos

line = f.readline()
if line:
reads = reads + line
reads.append(line)
for e in tofind:
m = e.search(line)
if m:
Expand All @@ -507,8 +507,9 @@ def watch_log_for(self, exprs, from_mark=None, timeout=600, process=None, verbos
# FIXME: consider using inotify with IN_MODIFY to monitor the file
time.sleep(polling_interval)
if time.time() > deadline:
reads_str = "".join(reads)
raise TimeoutError(time.strftime("%d %b %Y %H:%M:%S", time.gmtime()) + " [" + self.name + "] Missing: " + str(
[e.pattern for e in tofind]) + ":\n" + reads[:50] + f".....\nSee {filename} for remainder")
[e.pattern for e in tofind]) + ":\n" + reads_str[:50] + f".....\nSee {filename} for remainder")

if process:
if common.is_win():
Expand Down Expand Up @@ -650,26 +651,26 @@ def start(self,
self._delete_old_pid()

process = None
FNULL = open(os.devnull, 'w')
stdout_sink = subprocess.PIPE if verbose else FNULL
if common.is_win():
# clean up any old dirty_pid files from prior runs
if (os.path.isfile(self.get_path() + "/dirty_pid.tmp")):
os.remove(self.get_path() + "/dirty_pid.tmp")
with open(os.devnull, 'w') as FNULL:
stdout_sink = subprocess.PIPE if verbose else FNULL
if common.is_win():
# clean up any old dirty_pid files from prior runs
if (os.path.isfile(self.get_path() + "/dirty_pid.tmp")):
os.remove(self.get_path() + "/dirty_pid.tmp")

if quiet_start and self.cluster.version() >= '2.2.4':
args.append('-q')
if quiet_start and self.cluster.version() >= '2.2.4':
args.append('-q')

process = subprocess.Popen(args, cwd=self.get_bin_dir(), env=env, stdout=stdout_sink, stderr=subprocess.PIPE, universal_newlines=True)
else:
process = subprocess.Popen(args, env=env, stdout=stdout_sink, stderr=subprocess.PIPE, universal_newlines=True)
# Our modified batch file writes a dirty output with more than just the pid - clean it to get in parity
# with *nix operation here.
process = subprocess.Popen(args, cwd=self.get_bin_dir(), env=env, stdout=stdout_sink, stderr=subprocess.PIPE, universal_newlines=True)
else:
process = subprocess.Popen(args, env=env, stdout=stdout_sink, stderr=subprocess.PIPE, universal_newlines=True)
# Our modified batch file writes a dirty output with more than just the pid - clean it to get in parity
# with *nix operation here.

if verbose:
stdout, stderr = process.communicate()
print(stdout)
print(stderr)
if verbose:
stdout, stderr = process.communicate()
print(stdout)
print(stderr)

if common.is_win():
self.__clean_win_pid()
Expand Down Expand Up @@ -809,6 +810,7 @@ def wait_for_compactions(self, keyspace: str=None, column_family: str=None, idle
raise ValueError("Cannot search only by column family, need also keyspace")
pending_tasks = -1
last_change = None
sleep_time = 0.1 # Start with 100ms
while not last_change or time.time() - last_change < idle_timeout:
output, err = self.nodetool("compactionstats", capture_output=True)
n = self._parse_tasks(output, keyspace, column_family)
Expand All @@ -817,11 +819,15 @@ def wait_for_compactions(self, keyspace: str=None, column_family: str=None, idle
return
if n != pending_tasks:
last_change = time.time()
sleep_time = 0.1 # Reset to 100ms on progress
if 0 < pending_tasks < n:
# background progress
self.warning(f"Pending compaction tasks increased from {pending_tasks} to {n} while waiting for compactions.")
pending_tasks = n
time.sleep(1)
else:
# No progress, use exponential backoff up to 1 second
sleep_time = min(sleep_time * 1.5, 1.0)
time.sleep(sleep_time)
raise TimeoutError(f"Waiting for compactions timed out after {idle_timeout} seconds with pending tasks remaining: {output}.")

def _do_run_nodetool(self, nodetool, capture_output=True, wait=True, timeout=None, verbose=True):
Expand Down
87 changes: 44 additions & 43 deletions ccmlib/repository.py
Original file line number Diff line number Diff line change
Expand Up @@ -200,10 +200,11 @@ def download_dse_version(version, username, password, verbose=False):
__download(url, target, username=username, password=password, show_progress=verbose)
if verbose:
print(f"Extracting {target} as version {version} ...")
tar = tarfile.open(target)
dir = tar.next().name.split("/")[0]
tar.extractall(path=__get_dir())
tar.close()
with tarfile.open(target) as tar:
# Get directory name from first member
first_member = tar.getmembers()[0]
dir = first_member.name.split("/")[0]
tar.extractall(path=__get_dir())
target_dir = os.path.join(__get_dir(), version)
if os.path.exists(target_dir):
rmdirs(target_dir)
Expand All @@ -223,10 +224,11 @@ def download_opscenter_version(version, target_version, verbose=False):
__download(url, target, show_progress=verbose)
if verbose:
print(f"Extracting {target} as version {target_version} ...")
tar = tarfile.open(target)
dir = tar.next().name.split("/")[0]
tar.extractall(path=__get_dir())
tar.close()
with tarfile.open(target) as tar:
# Get directory name from first member
first_member = tar.getmembers()[0]
dir = first_member.name.split("/")[0]
tar.extractall(path=__get_dir())
target_dir = os.path.join(__get_dir(), target_version)
if os.path.exists(target_dir):
rmdirs(target_dir)
Expand Down Expand Up @@ -255,10 +257,11 @@ def download_version(version, url=None, verbose=False, binary=False):
__download(u, target, show_progress=verbose)
if verbose:
print(f"Extracting {target} as version {version} ...")
tar = tarfile.open(target)
dir = tar.next().name.split("/")[0]
tar.extractall(path=__get_dir())
tar.close()
with tarfile.open(target) as tar:
# Get directory name from first member
first_member = tar.getmembers()[0]
dir = first_member.name.split("/")[0]
tar.extractall(path=__get_dir())
target_dir = os.path.join(__get_dir(), version)
if os.path.exists(target_dir):
rmdirs(target_dir)
Expand Down Expand Up @@ -415,39 +418,37 @@ def __download(url, target, username=None, password=None, show_progress=False):
opener = urllib.request.build_opener(handler)
urllib.request.install_opener(opener)

u = urllib.request.urlopen(url)
f = open(target, 'wb')
meta = u.info()
file_size = int(meta.get("Content-Length"))
if show_progress:
print(f"Downloading {url} to {target} ({float(file_size) / (1024 * 1024):.3f}MB)")

file_size_dl = 0
block_sz = 8192
status = None
attempts = 0
while file_size_dl < file_size:
buffer = u.read(block_sz)
if not buffer:
attempts = attempts + 1
if attempts >= 5:
raise CCMError("Error downloading file (nothing read after %i attempts, downloded only %i of %i bytes)" % (attempts, file_size_dl, file_size))
time.sleep(0.5 * attempts)
continue
else:
attempts = 0
with urllib.request.urlopen(url) as u:
meta = u.info()
file_size = int(meta.get("Content-Length"))
if show_progress:
print(f"Downloading {url} to {target} ({float(file_size) / (1024 * 1024):.3f}MB)")

file_size_dl = 0
block_sz = 8192
status = None
attempts = 0
with open(target, 'wb') as f:
while file_size_dl < file_size:
buffer = u.read(block_sz)
if not buffer:
attempts = attempts + 1
if attempts >= 5:
raise CCMError("Error downloading file (nothing read after %i attempts, downloded only %i of %i bytes)" % (attempts, file_size_dl, file_size))
time.sleep(0.5 * attempts)
continue
else:
attempts = 0

file_size_dl += len(buffer)
f.write(buffer)
if show_progress:
status = r"%10d [%3.2f%%]" % (file_size_dl, file_size_dl * 100. / file_size)
status = chr(8) * (len(status) + 1) + status
print(status, end='')

file_size_dl += len(buffer)
f.write(buffer)
if show_progress:
status = r"%10d [%3.2f%%]" % (file_size_dl, file_size_dl * 100. / file_size)
status = chr(8) * (len(status) + 1) + status
print(status, end='')

if show_progress:
print("")
f.close()
u.close()
print("")


def __get_dir():
Expand Down
173 changes: 173 additions & 0 deletions tests/test_performance_optimizations.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,173 @@
"""
Performance tests to validate optimization improvements in ccm.
These tests verify that the performance optimizations don't break functionality
and demonstrate improved efficiency.
"""
import pytest
import tempfile
import os
import time
from ccmlib import common


class TestStringConcatenationOptimization:
"""Test that list append + join is more efficient than string concatenation."""

def test_list_append_vs_string_concat(self):
"""Verify list.append() + join() pattern works correctly."""
# This mimics the pattern used in watch_log_for()
test_lines = [f"line {i}\n" for i in range(100)]

# New optimized approach: list append
result_list = []
for line in test_lines:
result_list.append(line)
result = "".join(result_list)

# Verify it produces the same output
expected = "".join(test_lines)
assert result == expected

def test_list_slicing_for_error_messages(self):
"""Test that we can still slice joined strings for error messages."""
test_lines = [f"line {i}\n" for i in range(100)]
result_list = []
for line in test_lines:
result_list.append(line)

# Should be able to join and slice for error messages
reads_str = "".join(result_list)
truncated = reads_str[:50]

assert len(truncated) == 50
assert truncated.startswith("line 0")


class TestRegexReplacementOptimization:
"""Test that early break in regex replacements works correctly."""

def test_replaces_in_files_with_multiple_patterns(self):
"""Test that file replacement works with multiple patterns."""
with tempfile.NamedTemporaryFile(mode='w', delete=False) as src:
src.write("line1: old_value1\n")
src.write("line2: old_value2\n")
src.write("line3: unchanged\n")
src_path = src.name

with tempfile.NamedTemporaryFile(delete=False) as dst:
dst_path = dst.name

try:
# Test with multiple patterns - should only apply first match per line
replacements = [
(r'old_value1', 'new_value1'),
(r'old_value2', 'new_value2'),
]
common.replaces_in_files(src_path, dst_path, replacements)

with open(dst_path, 'r') as f:
content = f.read()

assert 'new_value1' in content
assert 'new_value2' in content
assert 'unchanged' in content
assert 'old_value1' not in content
assert 'old_value2' not in content
finally:
os.unlink(src_path)
os.unlink(dst_path)

def test_replaces_in_files_early_break(self):
"""Test that only first matching pattern is applied per line."""
with tempfile.NamedTemporaryFile(mode='w', delete=False) as src:
src.write("test_pattern\n")
src_path = src.name

with tempfile.NamedTemporaryFile(delete=False) as dst:
dst_path = dst.name

try:
# Multiple patterns that could match - only first should apply
replacements = [
(r'test', 'FIRST'),
(r'pattern', 'SECOND'), # Should not be checked if first matches
]
common.replaces_in_files(src_path, dst_path, replacements)

with open(dst_path, 'r') as f:
content = f.read()

# With early break, only first replacement should happen
assert content == "FIRST\n"
# Without early break, it would be "FIRST\n" then "SECOND\n"
finally:
os.unlink(src_path)
os.unlink(dst_path)


class TestExponentialBackoffBehavior:
"""Test exponential backoff logic for polling operations."""

def test_exponential_backoff_calculation(self):
"""Test that exponential backoff increases correctly."""
sleep_time = 0.1
max_sleep = 1.0

# Simulate the backoff pattern
sleep_times = [sleep_time]
for _ in range(10):
sleep_time = min(sleep_time * 1.5, max_sleep)
sleep_times.append(sleep_time)

# Verify it grows exponentially up to max (use approximate equality for floats)
assert abs(sleep_times[0] - 0.1) < 0.001
assert abs(sleep_times[1] - 0.15) < 0.001
assert abs(sleep_times[2] - 0.225) < 0.001
assert abs(sleep_times[3] - 0.3375) < 0.001
# Should eventually cap at 1.0
assert abs(sleep_times[-1] - 1.0) < 0.001
assert all(t <= max_sleep for t in sleep_times)

def test_backoff_resets_on_progress(self):
"""Test that backoff resets to minimum on progress."""
sleep_time = 0.5 # Some accumulated backoff

# Simulate progress detection
sleep_time = 0.1 # Reset to minimum

assert sleep_time == 0.1


class TestFileHandleManagement:
"""Test that file handles are properly managed with context managers."""

def test_context_manager_closes_file(self):
"""Verify context managers properly close file handles."""
test_file = tempfile.NamedTemporaryFile(delete=False)
test_path = test_file.name
test_file.close()

try:
# Write using context manager
with open(test_path, 'w') as f:
f.write("test")
# File should auto-close on exit

# File should be closed and readable
with open(test_path, 'r') as f:
content = f.read()

assert content == "test"
finally:
os.unlink(test_path)

def test_devnull_context_manager(self):
"""Test that /dev/null can be used with context manager."""
# This pattern is now used in node.py for subprocess output
with open(os.devnull, 'w') as FNULL:
FNULL.write("test")
# Should not raise any errors


if __name__ == '__main__':
pytest.main([__file__, '-v'])