Skip to content

Commit fdefe46

Browse files
scripts: add run_in_parallel helper for prod ops
Add a thread-pool-based run_in_parallel helper to common_lib that buffers each worker's output into a labeled block, prints a 5s progress heartbeat naming still-running items, and aggregates per-item failures into a single non-zero exit. Resolve print_colored's output stream at call time so redirection works. Foundation for parallelizing per-node kubectl operations; nothing calls it yet. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
1 parent 8d50432 commit fdefe46

2 files changed

Lines changed: 207 additions & 4 deletions

File tree

scripts/prod/common_lib.py

Lines changed: 127 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,13 @@
11
#!/usr/bin/env python3
22

33
import argparse
4+
import concurrent.futures
45
import subprocess
56
import sys
7+
import threading
8+
import time
69
from enum import Enum
7-
from typing import Optional
10+
from typing import Callable, Optional, TypeVar
811

912

1013
class Colors(Enum):
@@ -17,15 +20,135 @@ class Colors(Enum):
1720
RESET = "\033[0m"
1821

1922

20-
def print_colored(message: str, color: Colors = Colors.RESET, file=sys.stdout) -> None:
21-
"""Print message with color"""
22-
print(f"{color.value}{message}{Colors.RESET.value}", file=file)
23+
# Thread-local output sink. When `run_in_parallel` runs a worker, it sets `buffer` on this so that
24+
# the worker's log lines are captured per-thread instead of interleaving on the shared stdout/stderr.
25+
# When no buffer is set (the common, single-threaded case), logging prints immediately as before.
26+
_output_sink = threading.local()
27+
28+
# Serializes writes to the real stdout/stderr so buffered blocks and heartbeats don't interleave.
29+
_print_lock = threading.Lock()
30+
31+
32+
def print_colored(message: str, color: Colors = Colors.RESET, file=None) -> None:
33+
"""Print message with color.
34+
35+
`file` is resolved to the current `sys.stdout` when None (resolving at call time rather than
36+
binding a default at definition time, so output redirection is honored).
37+
38+
If the current thread has a buffer set on `_output_sink` (i.e. it is a `run_in_parallel`
39+
worker), the formatted line is appended to that buffer instead of being printed, so it can be
40+
flushed as one grouped block when the worker finishes.
41+
"""
42+
if file is None:
43+
file = sys.stdout
44+
formatted = f"{color.value}{message}{Colors.RESET.value}"
45+
buffer = getattr(_output_sink, "buffer", None)
46+
if buffer is not None:
47+
buffer.append((formatted, file))
48+
else:
49+
print(formatted, file=file)
2350

2451

2552
def print_error(message: str) -> None:
2653
print_colored(message, color=Colors.RED, file=sys.stderr)
2754

2855

56+
T = TypeVar("T")
57+
R = TypeVar("R")
58+
59+
60+
def run_in_parallel(
61+
items: list[T],
62+
worker: Callable[[T], R],
63+
max_parallelism: int,
64+
label: Callable[[T], str],
65+
heartbeat_interval_seconds: int = 5,
66+
) -> list[R]:
67+
"""Run `worker(item)` for each item concurrently, capped at `max_parallelism` threads.
68+
69+
Threads (not processes) are used because the work is I/O-bound (kubectl/urllib calls that
70+
release the GIL).
71+
72+
Output: each worker's log lines (emitted via `print_colored`/`print_error`) are buffered and
73+
flushed as one block, prefixed with `label(item)`, when that item finishes — so concurrent
74+
output stays readable. While items are still running, a heartbeat naming the not-yet-done items
75+
is printed every `heartbeat_interval_seconds`.
76+
77+
Errors: a worker that raises (or calls `sys.exit()`, which raises `SystemExit`) is recorded as
78+
a failure for its item; remaining items still run, and once all have settled a summary is
79+
printed and the process exits with code 1. `KeyboardInterrupt` is not treated as an item
80+
failure — it propagates so Ctrl-C aborts the whole run.
81+
82+
Returns the per-item results in the same order as `items`.
83+
"""
84+
if not items:
85+
return []
86+
87+
num_items = len(items)
88+
results: list[Optional[R]] = [None] * num_items
89+
errors: dict[int, BaseException] = {}
90+
91+
def run_one(item: T) -> R:
92+
buffer: list[tuple[str, object]] = []
93+
_output_sink.buffer = buffer
94+
try:
95+
return worker(item)
96+
finally:
97+
# Stop capturing before flushing so the header itself prints to the real stdout.
98+
_output_sink.buffer = None
99+
with _print_lock:
100+
print_colored(f"===== {label(item)} =====", Colors.BLUE)
101+
for text, file in buffer:
102+
print(text, file=file)
103+
104+
with concurrent.futures.ThreadPoolExecutor(
105+
max_workers=min(max_parallelism, num_items)
106+
) as executor:
107+
future_to_index = {
108+
executor.submit(run_one, item): index for index, item in enumerate(items)
109+
}
110+
pending_futures = set(future_to_index.keys())
111+
last_heartbeat = time.monotonic()
112+
113+
while pending_futures:
114+
done_futures, pending_futures = concurrent.futures.wait(
115+
pending_futures,
116+
timeout=heartbeat_interval_seconds,
117+
return_when=concurrent.futures.FIRST_COMPLETED,
118+
)
119+
for future in done_futures:
120+
index = future_to_index[future]
121+
try:
122+
results[index] = future.result()
123+
except KeyboardInterrupt:
124+
# Ctrl-C is not an item failure; let it abort the whole run.
125+
raise
126+
except BaseException as error:
127+
errors[index] = error
128+
129+
now = time.monotonic()
130+
if pending_futures and now - last_heartbeat >= heartbeat_interval_seconds:
131+
running_labels = ", ".join(
132+
label(items[future_to_index[future]]) for future in pending_futures
133+
)
134+
num_done = num_items - len(pending_futures)
135+
with _print_lock:
136+
print_colored(
137+
f"[{num_done}/{num_items} done] still waiting on: {running_labels}",
138+
Colors.YELLOW,
139+
)
140+
last_heartbeat = now
141+
142+
if errors:
143+
with _print_lock:
144+
print_error(f"{len(errors)} of {num_items} parallel operation(s) failed:")
145+
for index in sorted(errors):
146+
print_error(f" - {label(items[index])}: {errors[index]}")
147+
sys.exit(1)
148+
149+
return results
150+
151+
29152
class RestartStrategy(Enum):
30153
"""Strategy for restarting nodes."""
31154

Lines changed: 80 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,80 @@
1+
#!/usr/bin/env python3
2+
"""Unit tests for `run_in_parallel` in common_lib (no kubectl / cluster needed)."""
3+
4+
import time
5+
6+
import pytest
7+
from common_lib import print_colored, run_in_parallel
8+
9+
10+
def _label(item) -> str:
11+
return f"node-{item}"
12+
13+
14+
def test_results_match_input_order_regardless_of_completion_order():
15+
# Earlier items sleep longer, so they finish last — results must still be in input order.
16+
def worker(item: int) -> int:
17+
time.sleep((5 - item) * 0.02)
18+
return item * 10
19+
20+
results = run_in_parallel([0, 1, 2, 3, 4], worker, max_parallelism=4, label=_label)
21+
assert results == [0, 10, 20, 30, 40]
22+
23+
24+
def test_empty_items_returns_empty_list():
25+
calls = []
26+
results = run_in_parallel([], lambda item: calls.append(item), max_parallelism=4, label=_label)
27+
assert results == []
28+
assert calls == []
29+
30+
31+
def test_worker_output_is_buffered_grouped_and_labeled(capsys):
32+
def worker(item: int) -> None:
33+
print_colored(f"line-a from {item}")
34+
print_colored(f"line-b from {item}")
35+
36+
run_in_parallel([0, 1], worker, max_parallelism=2, label=_label)
37+
out = capsys.readouterr().out
38+
39+
# Each node's lines are flushed contiguously after its own header (no interleaving between
40+
# nodes), even though both ran concurrently.
41+
for item in (0, 1):
42+
header_pos = out.index(f"node-{item}")
43+
line_a_pos = out.index(f"line-a from {item}")
44+
line_b_pos = out.index(f"line-b from {item}")
45+
assert header_pos < line_a_pos < line_b_pos
46+
# Nothing from the other node appears between this node's two lines.
47+
other = 1 - item
48+
assert f"from {other}" not in out[line_a_pos:line_b_pos]
49+
50+
51+
def test_heartbeat_lists_still_running_items(capsys):
52+
# One slow item keeps the pool busy long enough for at least one heartbeat (interval 1s).
53+
def worker(item: int) -> int:
54+
if item == 0:
55+
time.sleep(2.5)
56+
return item
57+
58+
run_in_parallel([0, 1], worker, max_parallelism=2, label=_label, heartbeat_interval_seconds=1)
59+
out = capsys.readouterr().out
60+
assert "still waiting on: node-0" in out
61+
assert "done]" in out
62+
63+
64+
def test_failing_worker_is_reported_and_exits_nonzero(capsys):
65+
def worker(item: int) -> int:
66+
if item == 1:
67+
raise ValueError("boom from 1")
68+
return item
69+
70+
with pytest.raises(SystemExit) as exit_info:
71+
run_in_parallel([0, 1, 2], worker, max_parallelism=3, label=_label)
72+
73+
assert exit_info.value.code == 1
74+
err = capsys.readouterr().err
75+
assert "1 of 3 parallel operation(s) failed" in err
76+
assert "node-1: boom from 1" in err
77+
78+
79+
if __name__ == "__main__":
80+
raise SystemExit(pytest.main([__file__, "-v"]))

0 commit comments

Comments
 (0)