Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 16 additions & 9 deletions lib/zephyr/src/zephyr/execution.py
Original file line number Diff line number Diff line change
Expand Up @@ -622,13 +622,18 @@ def _coordinator_loop(self) -> None:
last_log_time = 0.0

while not self._shutdown_event.is_set():
self.check_heartbeats()
self._check_worker_group()

now = time.monotonic()
if self._has_active_execution() and now - last_log_time > 5.0:
self._log_status()
last_log_time = now
try:
self.check_heartbeats()
self._check_worker_group()

now = time.monotonic()
if self._has_active_execution() and now - last_log_time > 5.0:
self._log_status()
last_log_time = now
except Exception:
logger.exception("Coordinator loop crashed, aborting pipeline")
self.abort("Coordinator loop crashed unexpectedly")
return

self._shutdown_event.wait(timeout=0.5)

Expand All @@ -649,8 +654,10 @@ def _has_active_execution(self) -> bool:
return self._execution_id != "" and self._total_shards > 0 and self._completed_shards < self._total_shards

def _log_status(self) -> None:
alive = sum(1 for s in self._worker_states.values() if s in {WorkerState.READY, WorkerState.BUSY})
dead = sum(1 for s in self._worker_states.values() if s in {WorkerState.FAILED, WorkerState.DEAD})
with self._lock:
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

FYI @rjpower : Claude told me this ^ race condition killed my tokenize job, and after I told it that it's very unlikely to trigger, it accepted that the cause of the tokenization job failing was likely something else (hence additional logging in #4006 ). It still insisted this race condition can occur (see test) and bullied me into opening this PR.

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

seems legit. if we have 2000 workers, and log_status happens to trigger around when a worker pings?

states = list(self._worker_states.values())
alive = sum(1 for s in states if s in {WorkerState.READY, WorkerState.BUSY})
dead = sum(1 for s in states if s in {WorkerState.FAILED, WorkerState.DEAD})
logger.info(
"[%s] [%s] %d/%d complete, %d in-flight, %d queued, %d/%d workers alive, %d dead",
self._execution_id,
Expand Down
25 changes: 25 additions & 0 deletions lib/zephyr/tests/test_execution.py
Original file line number Diff line number Diff line change
Expand Up @@ -618,6 +618,31 @@ def test_pull_task_returns_shutdown_on_last_stage_empty_queue(actor_context, tmp
assert result == "SHUTDOWN"


def test_coordinator_loop_crash_aborts_pipeline(actor_context, tmp_path):
"""Coordinator loop crash sets _fatal_error instead of dying silently. #3996."""
from zephyr.execution import ZephyrCoordinator

coord = ZephyrCoordinator()
coord.set_chunk_config(str(tmp_path / "chunks"), "test-exec")

crashed = threading.Event()
original = coord.check_heartbeats

def crashing_heartbeats(*a, **kw):
if not crashed.is_set():
crashed.set()
raise RuntimeError("dictionary changed size during iteration")
return original(*a, **kw)

coord.check_heartbeats = crashing_heartbeats

t = threading.Thread(target=coord._coordinator_loop, daemon=True, name="zephyr-coordinator-loop")
t.start()
assert crashed.wait(timeout=5.0)
t.join(timeout=2.0)
assert coord._fatal_error is not None


def test_run_pipeline_rejects_concurrent_calls(actor_context, tmp_path):
"""Calling run_pipeline while another is already running raises RuntimeError."""
from unittest.mock import MagicMock
Expand Down
Loading