Skip to content

Commit b913994

Browse files
ravwojdyla-agentravwojdylaclaude
authored
iris: remove unused _heartbeat_lock (#4835)
* drop `_heartbeat_lock` from `Controller` and the two `with`-blocks that use it (around `_sync_all_execution_units` and `_sync_direct_provider`) * only one thread ever acquired it: `start()` spawns either `_run_provider_loop` or `_run_direct_provider_loop`, never both, and each loop round runs sequentially * checkpointing no longer contends for the lock either[^1], so every acquisition was uncontended [^1]: backup reads via a dedicated RO connection and each heartbeat round commits as an atomic batch Co-authored-by: Rafal Wojdyla <ravwojdyla@gmail.com> Co-authored-by: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
1 parent 7cc4789 commit b913994

File tree

1 file changed

+13
-21
lines changed

1 file changed

+13
-21
lines changed

lib/iris/src/iris/cluster/controller/controller.py

Lines changed: 13 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -1126,12 +1126,6 @@ def __init__(
11261126

11271127
self._atexit_registered = False
11281128

1129-
# Serializes provider sync rounds so only one heartbeat round mutates
1130-
# DB state at a time. Checkpointing no longer takes this lock: the
1131-
# backup reads via a dedicated RO connection and each heartbeat round
1132-
# commits as an atomic batch, so snapshots are always consistent.
1133-
self._heartbeat_lock = threading.Lock()
1134-
11351129
# Rate-limits periodic (best-effort) checkpoint writes.
11361130
# None when checkpoint_interval is not configured.
11371131
self._periodic_checkpoint_limiter: RateLimiter | None = (
@@ -1408,8 +1402,7 @@ def _run_provider_loop(self, stop_event: threading.Event) -> None:
14081402
if stop_event.is_set():
14091403
break
14101404
try:
1411-
with self._heartbeat_lock:
1412-
self._sync_all_execution_units()
1405+
self._sync_all_execution_units()
14131406
except Exception:
14141407
logger.exception("Provider sync round failed, will retry next interval")
14151408

@@ -1432,19 +1425,18 @@ def _sync_direct_provider(self) -> None:
14321425
return
14331426
assert isinstance(self._provider, K8sTaskProvider)
14341427
provider = self._provider
1435-
with self._heartbeat_lock:
1436-
max_promotions = self._promotion_bucket.available
1437-
batch = self._transitions.drain_for_direct_provider(
1438-
max_promotions=max_promotions,
1439-
)
1440-
if batch.tasks_to_run:
1441-
self._promotion_bucket.try_acquire(len(batch.tasks_to_run))
1442-
result = provider.sync(batch)
1443-
tx_result = self._transitions.apply_direct_provider_updates(result.updates)
1444-
self._provider_scheduling_events = list(result.scheduling_events) if result.scheduling_events else []
1445-
self._provider_capacity = result.capacity
1446-
if tx_result.tasks_to_kill:
1447-
self.kill_tasks_on_workers(tx_result.tasks_to_kill, tx_result.task_kill_workers)
1428+
max_promotions = self._promotion_bucket.available
1429+
batch = self._transitions.drain_for_direct_provider(
1430+
max_promotions=max_promotions,
1431+
)
1432+
if batch.tasks_to_run:
1433+
self._promotion_bucket.try_acquire(len(batch.tasks_to_run))
1434+
result = provider.sync(batch)
1435+
tx_result = self._transitions.apply_direct_provider_updates(result.updates)
1436+
self._provider_scheduling_events = list(result.scheduling_events) if result.scheduling_events else []
1437+
self._provider_capacity = result.capacity
1438+
if tx_result.tasks_to_kill:
1439+
self.kill_tasks_on_workers(tx_result.tasks_to_kill, tx_result.task_kill_workers)
14481440

14491441
def _run_profile_loop(self, stop_event: threading.Event) -> None:
14501442
"""Periodically capture CPU and memory profiles for all running tasks.

0 commit comments

Comments
 (0)