iris: async heartbeat dispatch, concurrency 128#4842
Conversation
Wraps the WorkerProvider.sync() call in slow_log so heartbeat rounds that exceed a 1-second budget — e.g. rounds where many failing workers saturate the thread pool with 30s timeouts — surface a WARNING. A healthy round with fast RPCs completes well under this budget; exceeding it indicates either timeouts stacking or a misconfigured pool. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Replace the 32-slot ThreadPoolExecutor dispatch path with an asyncio event loop on a dedicated thread, fanning out per-worker heartbeat RPCs via asyncio.gather with a Semaphore(128) cap. The default WorkerProvider parallelism is raised from 32 to 128. This bounds the worst-case sync round from N_failing * timeout / 32 to ceil(N_failing / 128) * timeout. On a 1339-worker cluster with 308 failing workers (observed 313s previously), the new worst case is ~24s at a 10s timeout. Also lower DEFAULT_WORKER_RPC_TIMEOUT from 30s to 10s and _SLOW_HEARTBEAT_RPC_LOG_THRESHOLD_MS from 10s to 5s to match. RpcWorkerStubFactory now caches async WorkerServiceClient instances instead of WorkerServiceClientSync so that the single pyqwest HTTP client per address persists across rounds. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
There was a problem hiding this comment.
💡 Codex Review
Here are some automated review suggestions for this pull request.
Reviewed commit: 8e2fdda8a8
ℹ️ About Codex in GitHub
Codex has been enabled to automatically review pull requests in this repo. Reviews are triggered when you
- Open a pull request for review
- Mark a draft as ready
- Comment "@codex review".
If Codex has suggestions, it will comment; otherwise it will react with 👍.
When you sign up for Codex through ChatGPT, Codex can also answer questions or update the PR, like "@codex address that feedback".
| def close(self) -> None: | ||
| with self._lock: | ||
| stubs = list(self._stubs.values()) | ||
| self._stubs.clear() |
There was a problem hiding this comment.
Close cached worker clients before dropping stub map
RpcWorkerStubFactory.close() now only clears the _stubs dict, so every cached WorkerServiceClient is discarded without calling close(). In a long-running controller, worker churn/failover creates many distinct addresses, and those async clients can retain open HTTP connections/file descriptors until process exit, causing resource leaks and eventually destabilizing heartbeat/control-plane RPCs. The previous implementation explicitly closed each stub, so this is a regression introduced in this commit.
Useful? React with 👍 / 👎.
There was a problem hiding this comment.
The bot comment is not relevant. Evidence:
ConnectClient.close at connectrpc/_client_async.py:178-181:
async def close(self) -> None:
"""Close the HTTP client. After closing, the client cannot be used to make requests."""
if not self._closed:
self._closed = True
It only flips a boolean flag — does not close the underlying pyqwest HTTPClient. The pyqwest client is Rust-owned; its connections/sockets are released via Rust's Drop when the Python reference is GC'd, which happens the moment we clear the dict.
In the sync version, ConnectClientSync.close() did close the httpx sync client. But we've switched to the async variant, whose close() is a no-op.
So calling it would (a) require spinning up an event loop from sync close(), and (b) do nothing.
A 1s threshold fires on every healthy round for large clusters: with 1339 workers and 128 concurrency, ~11 RPC waves at 200ms each already take ~2s. 5s matches the heartbeat interval — exceeding it means the round can't keep pace with the schedule. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Replace the persistent asyncio event-loop thread with asyncio.run() per sync/exec/profile call. Simpler: no custom thread lifecycle, no dataclass fields to manage, no close() plumbing. Semaphore now lives as a local inside _sync_all — semantically correct (per-round cap, not cross-round). pyqwest's HTTP connection pool is owned by the Rust tokio runtime, not the Python loop, so cached stubs keep their connections across loop creation/teardown. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
|
|
||
| # Sync with the execution backend (ThreadPoolExecutor inside provider). | ||
| results = self._provider.sync(batches) | ||
| with slow_log(logger, "provider sync (RPC dispatch)", threshold_ms=5_000): |
There was a problem hiding this comment.
5s is somewhat hand-wavy
|
@rjpower I would like to have this tested, but this feels like a larger testing harness adventure. It could be an interesting project to have a good multithreaded testing env to ensure bugs are these do not resurface 🤔 |
rjpower
left a comment
There was a problem hiding this comment.
Do you know why we removed the close()? Maybe worth kicking out onto a helper thread if it needs to be async?
WorkerProvider's 32-slotThreadPoolExecutorforasyncio.gatherwith anasyncio.Semaphore(128)cap, driven byasyncio.runper round1RpcWorkerStubFactorynow caches asyncWorkerServiceClientinstances; the pyqwest HTTP client is Rust-owned so connection pools survive across per-round event loopsWorkerProvider.parallelism32 → 128DEFAULT_WORKER_RPC_TIMEOUT30s → 10s,_SLOW_HEARTBEAT_RPC_LOG_THRESHOLD_MS10s → 5sslow_logaroundself._provider.sync(batches)in_sync_all_execution_unitswith a 5s threshold2WorkerProvider.{get_process_status,profile_task,exec_in_container}keep sync-callable signatures by wrapping each async call in its ownasyncio.runceil(N_failing / 128) × timeout≈ 24sFootnotes
asyncio.runcreates and tears down an event loop per call (~1ms); the semaphore lives as a local inside_sync_all, so the cap is per-round rather than cross-round ↩matches
heartbeat_intervaldefault (5s); exceeding the interval means the round can no longer keep pace with the schedule ↩previous behavior: 308 × 30s timeout / 32 threads ≈ 289s round; healthy workers' inter-heartbeat gap stretched to match ↩