Skip to content

Commit 651a67b

Browse files
committed
rptest: limit node operation parallelism in cloud tests
It has been observed in tests that creating too many parallel ssh sessions to the agent node can leave the teleport service on it in an unrecoverable state. Even in cases where the agent instance type is changed to something larger(i.e, m5.large) the k8s API seems to timeout requests if too many are outstanding in parallel.
1 parent 4074a5a commit 651a67b

File tree

2 files changed

+12
-4
lines changed

2 files changed

+12
-4
lines changed

tests/rptest/services/redpanda.py

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1275,6 +1275,7 @@ class has both implementation and abstract methods, only for methods which
12751275
def __init__(self, *args: Any, **kwargs: Any) -> None:
12761276
super().__init__(*args, **kwargs)
12771277
self._usage_stats = UsageStats()
1278+
self._max_workers: int | None = None
12781279

12791280
@property
12801281
@abstractmethod
@@ -1405,10 +1406,9 @@ def wrapped():
14051406
)
14061407

14071408
def for_nodes(self, nodes: Collection[U], cb: Callable[[U], T]) -> list[T]:
1408-
n_workers = len(nodes)
1409-
if n_workers > 0:
1409+
if len(nodes) > 0:
14101410
with concurrent.futures.ThreadPoolExecutor(
1411-
max_workers=n_workers
1411+
max_workers=self._max_workers
14121412
) as executor:
14131413
# The list() wrapper is to cause futures to be evaluated here+now
14141414
# (including throwing any exceptions) and not just spawned in background.
@@ -1797,6 +1797,10 @@ def __init__(
17971797

17981798
super().__init__()
17991799

1800+
# Cloudv2 agents run on very small instances that can easily be
1801+
# overwhelmed by too many concurrent ssh sessions.
1802+
self._max_workers = 10
1803+
18001804
self.config_profile_name = config_profile_name
18011805
self._min_brokers = min_brokers
18021806
self._superuser = RedpandaService.SUPERUSER_CREDENTIALS

tests/rptest/services/utils.py

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -151,6 +151,7 @@ def __init__(
151151
self._raise_on_errors: bool = self._context.globals.get(
152152
self.RAISE_ON_ERRORS_KEY, True
153153
)
154+
self._max_workers: int | None = None
154155

155156
# Prepare matching terms
156157
self.match_terms: list[str] = list(self.DEFAULT_MATCH_TERMS)
@@ -242,7 +243,7 @@ def scan_one(
242243
bad_lines: NodeToLines = {}
243244

244245
# Run scans in parallel
245-
with ThreadPoolExecutor() as executor:
246+
with ThreadPoolExecutor(max_workers=self._max_workers) as executor:
246247
futures = [executor.submit(scan_one, v, n) for v, n in versioned_nodes]
247248
for fut in as_completed(futures):
248249
node, vl = fut.result()
@@ -358,6 +359,9 @@ def __init__(
358359
# Prepare capture functions
359360
self.kubectl = kubectl
360361
self.test_start_time = test_start_time
362+
# Cloudv2 agents run on very small instances that can easily be
363+
# overwhelmed by too many concurrent ssh sessions.
364+
self._max_workers = 10
361365

362366
def _capture_log(self, node: CloudBroker, expr: str) -> Generator[str, None, None]:
363367
"""Capture log and check test timing.

0 commit comments

Comments
 (0)