Skip to content

Commit 42175af

Browse files
committed
Fixes.
1 parent dcd5b6a commit 42175af

3 files changed

Lines changed: 30 additions & 6 deletions

File tree

lib/iris/pyproject.toml

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -81,7 +81,8 @@ packages = ["src/iris"]
8181

8282
[tool.pytest.ini_options]
8383
timeout = 10
84-
addopts = "-n auto --durations=25 -m 'not slow and not docker' -v"
84+
timeout_method = "thread"
85+
addopts = "-n auto --durations=25 -m 'not slow and not docker and not e2e' -v"
8586
markers = [
8687
"slow: marks tests as slow (deselect with '-m \"not slow\"')",
8788
"docker: marks tests requiring Docker runtime (slow, needs daemon)",

lib/iris/src/iris/cluster/controller/controller.py

Lines changed: 16 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -2236,9 +2236,13 @@ def _run_ping_loop(self, stop_event: threading.Event) -> None:
22362236
worker. This is safe: fail_workers_batch, on_worker_failed, and
22372237
terminate_slices_for_workers are all idempotent.
22382238
"""
2239-
limiter = RateLimiter(interval_seconds=self._config.heartbeat_interval.to_seconds())
2239+
ping_interval_s = self._config.heartbeat_interval.to_seconds()
2240+
limiter = RateLimiter(interval_seconds=ping_interval_s)
22402241
ping_failures: dict[str, int] = {}
22412242
threshold = self._config.heartbeat_failure_threshold
2243+
# Refresh resource snapshots every ~60s; other cycles just note liveness.
2244+
resource_update_every = max(1, round(60.0 / ping_interval_s))
2245+
cycle = 0
22422246

22432247
while not stop_event.is_set():
22442248
if not limiter.wait(cancel=stop_event):
@@ -2249,8 +2253,11 @@ def _run_ping_loop(self, stop_event: threading.Event) -> None:
22492253
self._reap_stale_workers()
22502254
workers = self._get_active_worker_addresses()
22512255
results = self._provider.ping_workers(workers)
2256+
update_resources = cycle % resource_update_every == 0
2257+
cycle += 1
22522258

22532259
dead_workers: list[str] = []
2260+
liveness_ids: list[WorkerId] = []
22542261
for result in results:
22552262
wid_str = str(result.worker_id)
22562263
if result.error is not None:
@@ -2264,8 +2271,13 @@ def _run_ping_loop(self, stop_event: threading.Event) -> None:
22642271
)
22652272
else:
22662273
ping_failures.pop(wid_str, None)
2267-
if result.resource_snapshot:
2274+
if update_resources and result.resource_snapshot:
22682275
self._transitions.update_worker_ping_success(result.worker_id, result.resource_snapshot)
2276+
else:
2277+
liveness_ids.append(result.worker_id)
2278+
2279+
if liveness_ids:
2280+
self._transitions.touch_worker_liveness(liveness_ids)
22692281

22702282
if dead_workers:
22712283
failure_result = self._transitions.fail_workers_batch(
@@ -2302,10 +2314,10 @@ def _run_ping_loop(self, stop_event: threading.Event) -> None:
23022314
def _run_poll_loop(self, stop_event: threading.Event) -> None:
23032315
"""Periodic full-state reconciliation for split heartbeat mode.
23042316
2305-
Polls all workers via PollTasks every 30s and feeds results into the
2317+
Polls all workers via PollTasks every 60s and feeds results into the
23062318
task-updater queue for batched application.
23072319
"""
2308-
limiter = RateLimiter(interval_seconds=30.0)
2320+
limiter = RateLimiter(interval_seconds=60.0)
23092321
while not stop_event.is_set():
23102322
if not limiter.wait(cancel=stop_event):
23112323
break

lib/iris/src/iris/cluster/controller/transitions.py

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@
1313
import json
1414
import logging
1515
from dataclasses import dataclass, field
16-
from collections.abc import Callable, Iterable
16+
from collections.abc import Callable, Iterable, Sequence
1717
from typing import Any, NamedTuple
1818

1919
from iris.cluster.constraints import AttributeValue, Constraint, constraints_from_resources, merge_constraints
@@ -3042,6 +3042,17 @@ def _stopped() -> bool:
30423042
# Split Heartbeat Helpers
30433043
# =========================================================================
30443044

3045+
def touch_worker_liveness(self, worker_ids: Sequence[WorkerId]) -> None:
3046+
"""Cheap liveness bump: update last_heartbeat_ms without rewriting resources."""
3047+
if not worker_ids:
3048+
return
3049+
now_ms = Timestamp.now().epoch_ms()
3050+
with self._db.transaction() as cur:
3051+
cur.executemany(
3052+
"UPDATE workers SET last_heartbeat_ms = ? WHERE worker_id = ?",
3053+
[(now_ms, str(wid)) for wid in worker_ids],
3054+
)
3055+
30453056
def update_worker_ping_success(self, worker_id: WorkerId, resource_snapshot: job_pb2.WorkerResourceSnapshot) -> None:
30463057
"""Update worker timestamp and resource snapshot from a successful ping.
30473058

0 commit comments

Comments
 (0)