44import socket
55import subprocess
66import sys
7+ import threading
78import urllib .error
89import urllib .request
910from time import sleep
1213from common_lib import Colors , get_namespace_args , print_colored , print_error
1314from prometheus_client .parser import text_string_to_metric_families
1415
16+ # Registry of live `kubectl port-forward` processes, so they can be cleaned up even when `gate()`
17+ # runs on a worker thread (where it cannot install its own signal handlers). Guarded by a lock since
18+ # parallel waits register/unregister concurrently.
19+ _active_port_forwards_lock = threading .Lock ()
20+ _active_port_forwards : set [subprocess .Popen ] = set ()
21+
22+
23+ def terminate_all_port_forwards () -> None :
24+ """Terminate every port-forward process still running.
25+
26+ Best-effort cleanup intended to be called from a SIGINT/SIGTERM handler installed on the main
27+ thread by the orchestrator when waits run in parallel (worker threads cannot install handlers).
28+ """
29+ with _active_port_forwards_lock :
30+ port_forward_processes = list (_active_port_forwards )
31+ for port_forward_process in port_forward_processes :
32+ MetricConditionGater ._terminate_port_forward_process (port_forward_process )
33+
1534
1635class MetricConditionGater :
1736 """Gates progress on a metric satisfying a condition.
@@ -123,6 +142,8 @@ def _terminate_port_forward_process(pf_process: subprocess.Popen):
123142 print_colored ("Force killing kubectl port-forward process" )
124143 pf_process .kill ()
125144 pf_process .wait ()
145+ with _active_port_forwards_lock :
146+ _active_port_forwards .discard (pf_process )
126147
127148 def gate (self ):
128149 """Wait until the nodes metrics satisfy the condition."""
@@ -142,6 +163,8 @@ def gate(self):
142163
143164 try :
144165 pf_process = subprocess .Popen (cmd , stdout = subprocess .DEVNULL , stderr = subprocess .DEVNULL )
166+ with _active_port_forwards_lock :
167+ _active_port_forwards .add (pf_process )
145168 print ("Waiting for forwarding to start" )
146169 # Give the forwarding time to start.
147170 # TODO(guy.f): Consider poll until the forwarding is ready if we see any issues.
@@ -154,13 +177,19 @@ def gate(self):
154177 f"Forwarding started (from local port { self .local_port } to { self .pod } :{ self .metrics_port } )"
155178 )
156179
157- # Set up signal handler to ensure forwarding subprocess is terminated on interruption
158- def signal_handler (signum , frame ):
159- self ._terminate_port_forward_process (pf_process )
160- sys .exit (0 )
180+ # Set up a signal handler to terminate the forwarding subprocess on interruption.
181+ # signal.signal only works on the main thread, so skip it when gating runs on a worker
182+ # thread (parallel waits) — the orchestrator installs a main-thread handler that calls
183+ # terminate_all_port_forwards instead, and the finally block below still cleans up on
184+ # normal completion.
185+ if threading .current_thread () is threading .main_thread ():
186+
187+ def signal_handler (signum , frame ):
188+ self ._terminate_port_forward_process (pf_process )
189+ sys .exit (0 )
161190
162- signal .signal (signal .SIGINT , signal_handler )
163- signal .signal (signal .SIGTERM , signal_handler )
191+ signal .signal (signal .SIGINT , signal_handler )
192+ signal .signal (signal .SIGTERM , signal_handler )
164193
165194 self ._poll_until_condition_met ()
166195
0 commit comments