1616import os
1717from multiprocessing .connection import Listener
1818import time
19- import signal
2019import threading
2120import sys
2221
3837 format = "%(levelname)s: %(message)s" , stream = sys .stderr )
3938
4039
41- last_time = time .time ()
42- timeout = 600 # 10 minutes for initial connection
43- keep_alive_timeout = (
44- 900 # 30 minutes for keep-alive, if no closed message (allow for reconnects)
45- )
40+ class WaitInfo :
41+ wait_limit_reached = False
42+ timeout = 20
43+ # timeout = 600 # 10 minutes for initial connection
44+ last_time = time .time ()
45+ # 30 minutes for keep-alive, if no closed message (allow for reconnects)
46+ # keep_alive_timeout = 900
47+ keep_alive_timeout = 20
4648
4749
4850def _is_truthy_env_var (var_name : str ) -> bool :
@@ -96,9 +98,10 @@ def should_halt_for_connection() -> bool:
9698
9799def wait_for_notification (address ):
98100 """Waits for connection notification from the listener."""
99- # TODO(belitskiy): Get rid of globals?
100- global last_time , timeout
101101 while True :
102+ time .sleep (0.05 )
103+ if WaitInfo .wait_limit_reached :
104+ logging .info (f"No connection in { WaitInfo .timeout } seconds - exiting " )
102105 with Listener (address ) as listener :
103106 logging .info ("Waiting for connection..." )
104107 with listener .accept () as conn :
@@ -111,15 +114,17 @@ def wait_for_notification(address):
111114 logging .info ("Received message" )
112115 if message == "keep_alive" :
113116 logging .info ("Keep-alive received" )
114- last_time = time .time ()
117+ WaitInfo . last_time = time .time ()
115118 continue # Keep-alive received, continue waiting
116119 elif message == "closed" :
117120 logging .info ("Connection closed by the other process." )
118121 return # Graceful exit
119122 elif message == "connected" :
120- last_time = time .time ()
121- timeout = keep_alive_timeout
123+ WaitInfo . last_time = time .time ()
124+ WaitInfo . timeout = WaitInfo . keep_alive_timeout
122125 logging .info ("Connected" )
126+ elif message == "wait_limit_reached" :
127+ logging .info ("Finished waiting" )
123128 else :
124129 logging .warning ("Unknown message received:" , message )
125130 continue
@@ -128,12 +133,12 @@ def wait_for_notification(address):
128133def timer ():
129134 while True :
130135 logging .info ("Checking status" )
131- time_elapsed = time .time () - last_time
132- if time_elapsed < timeout :
136+ time_elapsed = time .time () - WaitInfo . last_time
137+ if time_elapsed < WaitInfo . timeout :
133138 logging .info (f"Time since last keep-alive: { int (time_elapsed )} s" )
134139 else :
135- logging . info ( "Timeout reached, exiting" )
136- os . kill ( os . getpid (), signal . SIGTERM )
140+ WaitInfo . wait_limit_reached = True
141+ return
137142 time .sleep (60 )
138143
139144
0 commit comments