Skip to content

Commit c9f229d

Browse files
scripts: make metrics gate() thread-safe (#14301)
Guard signal.signal registration to the main thread (it raises ValueError on worker threads), and track live kubectl port-forward processes in a lock-guarded registry exposing terminate_all_port_forwards for best-effort cleanup from a main-thread signal handler. Enables gate() to run under parallel waits. Co-authored-by: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
1 parent 087a271 commit c9f229d

1 file changed

Lines changed: 51 additions & 11 deletions

File tree

scripts/prod/metrics_lib.py

Lines changed: 51 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
import socket
55
import subprocess
66
import sys
7+
import threading
78
import urllib.error
89
import urllib.request
910
from time import sleep
@@ -12,6 +13,29 @@
1213
from common_lib import Colors, get_namespace_args, print_colored, print_error
1314
from prometheus_client.parser import text_string_to_metric_families
1415

16+
# Registry of live `kubectl port-forward` processes, so they can be cleaned up even when `gate()`
17+
# runs on a worker thread (where it cannot install its own signal handlers). Guarded by a lock since
18+
# parallel waits register/unregister concurrently.
19+
_active_port_forwards_lock = threading.Lock()
20+
_active_port_forwards: set[subprocess.Popen] = set()
21+
22+
# Set when port-forwards are torn down by an interrupt handler, so a gate() running on a worker
23+
# thread can tell an intentional shutdown apart from a port-forward that failed to start.
24+
_shutdown_requested = threading.Event()
25+
26+
27+
def terminate_all_port_forwards() -> None:
28+
"""Terminate every port-forward process still running.
29+
30+
Best-effort cleanup intended to be called from a SIGINT/SIGTERM handler installed on the main
31+
thread by the orchestrator when waits run in parallel (worker threads cannot install handlers).
32+
"""
33+
_shutdown_requested.set()
34+
with _active_port_forwards_lock:
35+
port_forward_processes = list(_active_port_forwards)
36+
for port_forward_process in port_forward_processes:
37+
MetricConditionGater._terminate_port_forward_process(port_forward_process)
38+
1539

1640
class MetricConditionGater:
1741
"""Gates progress on a metric satisfying a condition.
@@ -123,6 +147,8 @@ def _terminate_port_forward_process(pf_process: subprocess.Popen):
123147
print_colored("Force killing kubectl port-forward process")
124148
pf_process.kill()
125149
pf_process.wait()
150+
with _active_port_forwards_lock:
151+
_active_port_forwards.discard(pf_process)
126152

127153
def gate(self):
128154
"""Wait until the nodes metrics satisfy the condition."""
@@ -142,25 +168,39 @@ def gate(self):
142168

143169
try:
144170
pf_process = subprocess.Popen(cmd, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL)
145-
print("Waiting for forwarding to start")
171+
with _active_port_forwards_lock:
172+
_active_port_forwards.add(pf_process)
173+
print_colored("Waiting for forwarding to start")
146174
# Give the forwarding time to start.
147175
# TODO(guy.f): Consider poll until the forwarding is ready if we see any issues.
148176
sleep(3)
149-
assert (
150-
pf_process.poll() is None
151-
), f"Port forwarding process exited with code {pf_process.returncode}"
177+
if pf_process.poll() is not None:
178+
if _shutdown_requested.is_set():
179+
# An interrupt handler tore down the port-forward during startup; exit cleanly
180+
# rather than treating the intentional shutdown as an unexpected failure.
181+
return
182+
raise RuntimeError(
183+
f"Port forwarding process for pod {self.pod} exited with code "
184+
f"{pf_process.returncode}"
185+
)
152186

153-
print(
187+
print_colored(
154188
f"Forwarding started (from local port {self.local_port} to {self.pod}:{self.metrics_port})"
155189
)
156190

157-
# Set up signal handler to ensure forwarding subprocess is terminated on interruption
158-
def signal_handler(signum, frame):
159-
self._terminate_port_forward_process(pf_process)
160-
sys.exit(0)
191+
# Set up a signal handler to terminate the forwarding subprocess on interruption.
192+
# signal.signal only works on the main thread, so skip it when gating runs on a worker
193+
# thread (parallel waits) — the orchestrator installs a main-thread handler that calls
194+
# terminate_all_port_forwards instead, and the finally block below still cleans up on
195+
# normal completion.
196+
if threading.current_thread() is threading.main_thread():
197+
198+
def signal_handler(signum, frame):
199+
self._terminate_port_forward_process(pf_process)
200+
sys.exit(0)
161201

162-
signal.signal(signal.SIGINT, signal_handler)
163-
signal.signal(signal.SIGTERM, signal_handler)
202+
signal.signal(signal.SIGINT, signal_handler)
203+
signal.signal(signal.SIGTERM, signal_handler)
164204

165205
self._poll_until_condition_met()
166206

0 commit comments

Comments
 (0)