Skip to content

Commit e245e31

Browse files
committed
fix: remove deadlocks by making the scheduler signal handler signal-safe
See relevant comments - we can get deadlocks with logging and threading primitives rarely which causes the process to hang around 5% of the time on a SIGINT/SIGTERM, but we also want to be able to have the signal interrupt our poll wait/sleep without busy waiting (for performance), which means we also cannot use time.sleep (an early signal will not interrupt this, and a pre-check leads to ToC-ToU races), nor signal.sigtimedwait (registers its own handlers to handle signals inside the wait, but misses signals outside). This leaves us with one clear solution - use an OS pipe and define a selector on the read file descriptor, and have the signal handler set a flag with the signal number and write to the write file descriptor. By querying the flag we always know if we have handled a signal in our main loop, and by using a fd we reliably skip the wait on a signal, where the wait is blocking (i.e. not a busy wait). The signal handler is then minimal and async-signal-safe, just setting a flag and writing to the pipe. The relevant logging logic is moved to be dispatched by the main loop instead. Signed-off-by: Alex Jones <[email protected]>
1 parent 6c32f12 commit e245e31

File tree

1 file changed

+48
-33
lines changed

1 file changed

+48
-33
lines changed

src/dvsim/scheduler.py

Lines changed: 48 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -5,8 +5,10 @@
55
"""Job scheduler."""
66

77
import contextlib
8-
import threading
8+
import os
9+
import selectors
910
from collections.abc import (
11+
Callable,
1012
Mapping,
1113
MutableMapping,
1214
MutableSequence,
@@ -164,63 +166,76 @@ def __init__(
164166
# variant-specific settings such as max parallel jobs & poll rate.
165167
self._launcher_cls: type[Launcher] = launcher_cls
166168

167-
def run(self) -> Sequence[CompletedJobStatus]:
168-
"""Run all scheduled jobs and return the results.
169+
def _handle_exit_signal(self, last_received_signal: int, handler: Callable) -> None:
170+
"""Handle a received exit (SIGINT/SIGTERM signal) in the main scheduler loop.
169171
170-
Returns the results (status) of all items dispatched for all
171-
targets and cfgs.
172+
On either signal, this will tell runners to quit and cancel future jobs.
173+
On receiving a SIGINT specifically, this re-installs the old signal handler
174+
such that subsequent SIGINT signals will kill the process (non-gracefully).
172175
"""
173-
timer = Timer()
174-
175-
# Catch one SIGINT and tell the runner to quit. On a second, die.
176-
stop_now = threading.Event()
177-
old_handler = None
176+
log.info(
177+
"Received signal %s. Exiting gracefully.",
178+
last_received_signal,
179+
)
178180

179-
def on_signal(signal_received: int, _: FrameType | None) -> None:
181+
if last_received_signal == SIGINT:
180182
log.info(
181-
"Received signal %s. Exiting gracefully.",
182-
signal_received,
183+
"Send another to force immediate quit (but you may "
184+
"need to manually kill child processes)",
183185
)
184186

185-
if signal_received == SIGINT:
186-
log.info(
187-
"Send another to force immediate quit (but you may "
188-
"need to manually kill child processes)",
189-
)
187+
# Restore old handler to catch a second SIGINT
188+
signal(SIGINT, handler)
190189

191-
# Restore old handler to catch a second SIGINT
192-
if old_handler is None:
193-
raise RuntimeError("Old SIGINT handler not found")
190+
self._kill()
194191

195-
signal(signal_received, old_handler)
192+
def run(self) -> Sequence[CompletedJobStatus]:
193+
"""Run all scheduled jobs and return the results.
196194
197-
stop_now.set()
195+
Returns the results (status) of all items dispatched for all
196+
targets and cfgs.
197+
"""
198+
timer = Timer()
198199

199-
old_handler = signal(SIGINT, on_signal)
200+
# On SIGTERM or SIGINT, tell the runner to quit.
201+
# On a second SIGINT specifically, die.
202+
sel = selectors.DefaultSelector()
203+
signal_rfd, signal_wfd = os.pipe()
204+
sel.register(signal_rfd, selectors.EVENT_READ)
205+
last_received_signal: int | None = None
200206

201-
# Install the SIGTERM handler before scheduling jobs.
207+
def on_signal(signal_received: int, _: FrameType | None) -> None:
208+
# To allow async-safe-signal logic where signals can be handled
209+
# while sleeping, we use a selector to perform a blocking wait,
210+
# and signal the event through a pipe. We then set a flag with
211+
# the received signal. Like this, we can receive a signal
212+
# at any point, and it can also interrupt the poll wait to take
213+
# immediate effect.
214+
nonlocal last_received_signal
215+
last_received_signal = signal_received
216+
os.write(signal_wfd, b"\x00")
217+
218+
# Install the SIGINT and SIGTERM handlers before scheduling jobs.
219+
old_handler = signal(SIGINT, on_signal)
202220
signal(SIGTERM, on_signal)
203221

204222
# Enqueue all items of the first target.
205223
self._enqueue_successors(None)
206224

207225
try:
208226
while True:
209-
if stop_now.is_set():
210-
# We've had an interrupt. Kill any jobs that are running.
211-
self._kill()
227+
if last_received_signal is not None:
228+
self._handle_exit_signal(last_received_signal, old_handler)
229+
last_received_signal = None
212230

213231
hms = timer.hms()
214232
changed = self._poll(hms) or timer.check_time()
215233
self._dispatch(hms)
216234
if changed and self._check_if_done(hms):
217235
break
218236

219-
# This is essentially sleep(1) to wait a second between each
220-
# polling loop. But we do it with a bounded wait on stop_now so
221-
# that we jump back to the polling loop immediately on a
222-
# signal.
223-
stop_now.wait(timeout=self._launcher_cls.poll_freq)
237+
# Wait between each poll, except we may be woken by a signal.
238+
sel.select(timeout=self._launcher_cls.poll_freq)
224239

225240
finally:
226241
signal(SIGINT, old_handler)

0 commit comments

Comments
 (0)