11#!/usr/bin/env python3
22
33import argparse
4+ import concurrent .futures
45import subprocess
56import sys
7+ import threading
8+ import time
69from enum import Enum
7- from typing import Optional
10+ from typing import Callable , Optional , TypeVar
811
912
1013class Colors (Enum ):
@@ -17,15 +20,132 @@ class Colors(Enum):
1720 RESET = "\033 [0m"
1821
1922
20- def print_colored (message : str , color : Colors = Colors .RESET , file = sys .stdout ) -> None :
21- """Print message with color"""
22- print (f"{ color .value } { message } { Colors .RESET .value } " , file = file )
23+ # Thread-local output sink. When `run_in_parallel` runs a worker, it sets `buffer` on this so that
24+ # the worker's log lines are captured per-thread instead of interleaving on the shared stdout/stderr.
25+ # When no buffer is set (the common, single-threaded case), logging prints immediately as before.
26+ _output_sink = threading .local ()
27+
28+ # Serializes writes to the real stdout/stderr so buffered blocks and heartbeats don't interleave.
29+ _print_lock = threading .Lock ()
30+
31+
32+ def print_colored (message : str , color : Colors = Colors .RESET , file = None ) -> None :
33+ """Print message with color.
34+
35+ `file` is resolved to the current `sys.stdout` when None (resolving at call time rather than
36+ binding a default at definition time, so output redirection is honored).
37+
38+ If the current thread has a buffer set on `_output_sink` (i.e. it is a `run_in_parallel`
39+ worker), the formatted line is appended to that buffer instead of being printed, so it can be
40+ flushed as one grouped block when the worker finishes.
41+ """
42+ if file is None :
43+ file = sys .stdout
44+ formatted = f"{ color .value } { message } { Colors .RESET .value } "
45+ buffer = getattr (_output_sink , "buffer" , None )
46+ if buffer is not None :
47+ buffer .append ((formatted , file ))
48+ else :
49+ print (formatted , file = file )
2350
2451
2552def print_error (message : str ) -> None :
2653 print_colored (message , color = Colors .RED , file = sys .stderr )
2754
2855
56+ T = TypeVar ("T" )
57+ R = TypeVar ("R" )
58+
59+
60+ def run_in_parallel (
61+ items : list [T ],
62+ worker : Callable [[T ], R ],
63+ max_parallelism : int ,
64+ label : Callable [[T ], str ],
65+ heartbeat_interval_seconds : int = 5 ,
66+ ) -> list [R ]:
67+ """Run `worker(item)` for each item concurrently, capped at `max_parallelism` threads.
68+
69+ Threads (not processes) are used because the work is I/O-bound (kubectl/urllib calls that
70+ release the GIL).
71+
72+ Output: each worker's log lines (emitted via `print_colored`/`print_error`) are buffered and
73+ flushed as one block, prefixed with `label(item)`, when that item finishes — so concurrent
74+ output stays readable. While items are still running, a heartbeat naming the not-yet-done items
75+ is printed every `heartbeat_interval_seconds`.
76+
77+ Errors: workers must raise rather than call `sys.exit()` (a `SystemExit` raised in a worker
78+ thread is swallowed by the executor and would not stop the process). Exceptions are collected
79+ per item; if any item failed, a summary is printed and the process exits with code 1 once all
80+ items have settled.
81+
82+ Returns the per-item results in the same order as `items`.
83+ """
84+ if not items :
85+ return []
86+
87+ num_items = len (items )
88+ results : list [Optional [R ]] = [None ] * num_items
89+ errors : dict [int , BaseException ] = {}
90+
91+ def run_one (index : int , item : T ) -> R :
92+ buffer : list [tuple [str , object ]] = []
93+ _output_sink .buffer = buffer
94+ try :
95+ return worker (item )
96+ finally :
97+ # Stop capturing before flushing so the header itself prints to the real stdout.
98+ _output_sink .buffer = None
99+ with _print_lock :
100+ print_colored (f"===== { label (item )} =====" , Colors .BLUE )
101+ for text , file in buffer :
102+ print (text , file = file )
103+
104+ with concurrent .futures .ThreadPoolExecutor (
105+ max_workers = min (max_parallelism , num_items )
106+ ) as executor :
107+ future_to_index = {
108+ executor .submit (run_one , index , item ): index for index , item in enumerate (items )
109+ }
110+ pending_futures = set (future_to_index .keys ())
111+ last_heartbeat = time .monotonic ()
112+
113+ while pending_futures :
114+ done_futures , pending_futures = concurrent .futures .wait (
115+ pending_futures ,
116+ timeout = heartbeat_interval_seconds ,
117+ return_when = concurrent .futures .FIRST_COMPLETED ,
118+ )
119+ for future in done_futures :
120+ index = future_to_index [future ]
121+ try :
122+ results [index ] = future .result ()
123+ except BaseException as error :
124+ errors [index ] = error
125+
126+ now = time .monotonic ()
127+ if pending_futures and now - last_heartbeat >= heartbeat_interval_seconds :
128+ running_labels = ", " .join (
129+ label (items [future_to_index [future ]]) for future in pending_futures
130+ )
131+ num_done = num_items - len (pending_futures )
132+ with _print_lock :
133+ print_colored (
134+ f"[{ num_done } /{ num_items } done] still waiting on: { running_labels } " ,
135+ Colors .YELLOW ,
136+ )
137+ last_heartbeat = now
138+
139+ if errors :
140+ with _print_lock :
141+ print_error (f"{ len (errors )} of { num_items } parallel operation(s) failed:" )
142+ for index in sorted (errors ):
143+ print_error (f" - { label (items [index ])} : { errors [index ]} " )
144+ sys .exit (1 )
145+
146+ return results
147+
148+
29149class RestartStrategy (Enum ):
30150 """Strategy for restarting nodes."""
31151
0 commit comments