-
Notifications
You must be signed in to change notification settings - Fork 108
Queue sandbox jobs with client polling to avoid overload timeouts #939
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
Signed-off-by: i-vainn <[email protected]>
Signed-off-by: i-vainn <[email protected]>
Signed-off-by: i-vainn <[email protected]>
Signed-off-by: i-vainn <[email protected]>
Signed-off-by: i-vainn <[email protected]>
Signed-off-by: i-vainn <[email protected]>
Signed-off-by: i-vainn <[email protected]>
Signed-off-by: i-vainn <[email protected]>
Signed-off-by: i-vainn <[email protected]>
Signed-off-by: i-vainn <[email protected]>
WalkthroughAdds asynchronous job orchestration to the local sandbox server: introduces a Job dataclass and a serialized single-worker JobManager, routes /execute through job submission returning job_id (HTTP 202), adds job retrieval, cancellation, session list/delete, admin reset endpoints, and client-side polling with session-affinity handling. Changes
Sequence Diagram(s)sequenceDiagram
autonumber
participant Client
participant Server as LocalSandboxServer
participant JM as JobManager
participant Worker as SingleWorker
rect rgba(230,240,255,0.6)
Note over Client,Server: Submit code (may include X-Session-ID)
Client->>Server: POST /execute
Server->>JM: submit(job)
JM-->>Server: {job_id, queued_ahead}
Server-->>Client: 202 Accepted (job_id, queued_ahead)
end
rect rgba(240,255,240,0.6)
Note over Client,Server: Poll job status until terminal state
loop poll
Client->>Server: GET /jobs/{job_id} (X-Session-ID)
Server->>JM: get(job_id)
JM-->>Server: job record (queued/running/completed/error/canceled)
Server-->>Client: 200 OK job record
end
end
rect rgba(255,245,230,0.6)
Note over JM,Worker: Serialized execution by single worker
JM->>Worker: run(next_job)
Worker-->>JM: update job status (running -> completed/error)
JM-->>Server: job result available
end
Estimated code review effort🎯 4 (Complex) | ⏱️ ~75 minutes Poem
Pre-merge checks and finishing touches❌ Failed checks (1 warning)
✅ Passed checks (2 passed)
✨ Finishing touches
🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
various comments and questions--some to verify intended behavior, in other places I think some additional logging will be helpful for future debugging.
nemo_skills/code_execution/local_sandbox/local_sandbox_server.py
Outdated
Show resolved
Hide resolved
nemo_skills/code_execution/local_sandbox/local_sandbox_server.py
Outdated
Show resolved
Hide resolved
|
|
||
| class JobManager: | ||
| def __init__(self, shell_manager): | ||
| self.jobs = {} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
A potential issue with keeping this all in memory, is if the worker stops responding or crashes for some reason, all of your state disappears with the worker. Might not be something we need to fix right now, but if we start having issues of workers stopping/crashing, this will be a crux.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's not clear to me if we need something more robust yet, though.
nemo_skills/code_execution/local_sandbox/local_sandbox_server.py
Outdated
Show resolved
Hide resolved
| if p.get("status") in {"completed", "timeout", "error", "failed", "canceled"}: | ||
| r = p.get("result") or {} | ||
| return r if "process_status" in r else {**r, "process_status": p.get("status", "error")} | ||
| await asyncio.sleep(0.2) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
log session id? maybe 0.2 is too often to log that though, if you want to log it every so often? Will be helpful for tracing timeouts.
| continue | ||
| p = resp.json() | ||
| if p.get("status") in {"completed", "timeout", "error", "failed", "canceled"}: | ||
| r = p.get("result") or {} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
log session id? maybe 0.2 is too often to log that though, if you want to log it every so often? Will be helpful for tracing timeouts.
also, if you follow suggestion to return pid, log and pop it here alongside the session id
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good point. I’d prefer to handle that on the server side, though, so it goes into sandbox.log instead of main.log. That way, we keep the main logs clean and avoid excessive noise from frequent polling.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yeah I mean either way I would probably increase the time between the status poll logging, and probably also log when you update the state
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added logging on state update. Not sure if we want to log each polling request though
Signed-off-by: i-vainn <[email protected]>
Signed-off-by: i-vainn <[email protected]>
Signed-off-by: i-vainn <[email protected]>
Signed-off-by: i-vainn <[email protected]>
Signed-off-by: i-vainn <[email protected]>
Signed-off-by: i-vainn <[email protected]>
Signed-off-by: i-vainn <[email protected]>
Signed-off-by: i-vainn <[email protected]>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 0
♻️ Duplicate comments (4)
nemo_skills/code_execution/sandbox.py (3)
130-139: Document the timeout condition and consider canceling the job.The timeout condition
time.monotonic() > deadlineshould be rare under normal operation (only iftimeout * queued_ahead > 1200or if the server is unresponsive). However, when it occurs, the job may still be running on the server.As noted in past reviews:
- Consider canceling the job via POST to
/jobs/{job_id}/cancelbefore resetting the worker, since the client will never retrieve the result.- The condition "should never happen under normal circumstances" warrants clarification: it can happen if the queue is very long or if the server becomes unresponsive. If timeouts become frequent, this may indicate a need to scale workers or investigate server health rather than continuing to send requests.
Based on past review comments.
156-156: Consider reducing polling frequency or throttling logs.Polling every 0.2 seconds can generate excessive logs if session_id logging is added. Past reviews suggested that 0.2 seconds might be too frequent for logging each poll attempt.
Consider one of the following:
- Increase the polling interval (e.g., 0.5 or 1.0 seconds) if responsiveness allows.
- Log session_id only every N attempts (e.g., every 5 seconds) rather than on every poll.
Based on past review comments.
145-146: Log the actual status code received.When polling fails with a non-200 status, log the status code to aid debugging. Past reviews noted this "shouldn't really happen" but if it does, detailed logging is essential.
Apply this diff:
if getattr(resp, "status_code", 200) != 200: - raise RuntimeError(f"Error during polling job {job_id}: {resp.text}") + LOG.error("Polling job %s failed with status %d: %s", job_id, resp.status_code, resp.text) + raise RuntimeError(f"Error during polling job {job_id} (status {resp.status_code}): {resp.text}")Based on past review comments.
nemo_skills/code_execution/local_sandbox/local_sandbox_server.py (1)
77-85: Job eviction after first retrieval may cause data loss.Evicting finished jobs immediately after returning them (lines 83-84) means a client can only retrieve a job result once. If the client's GET request times out or fails, the result is permanently lost.
Consider one of the following:
- Keep finished jobs in memory for a grace period (e.g., 60 seconds) before eviction.
- Add a flag to track whether a job has been successfully retrieved (e.g.,
retrieved_attimestamp).- Document this behavior clearly so clients know to implement retries with idempotency.
Past reviews noted: "you probably want to distinguish whether it is a finished job or a queued/incomplete job. If finished, it should probably be evicted after it is returned in order to save memory. This will also mean finished jobs can only be 'gotten' once."
Based on past review comments.
🧹 Nitpick comments (5)
nemo_skills/code_execution/sandbox.py (2)
102-107: Consider explicit None-check for session_id.The
oroperator will cause_affinity_keyto be used ifsession_idis any falsy value (empty string, 0, False, etc.), not just None. If this is intentional, consider adding a comment to clarify the behavior.Apply this diff for more explicit behavior:
- affinity_header = request.pop("session_id", None) or request.pop("_affinity_key", None) + affinity_header = request.pop("session_id", None) + if affinity_header is None: + affinity_header = request.pop("_affinity_key", None)
120-120: Consider usingraise ... from errfor exception chaining.Static analysis suggests using
raise ... from errto preserve the exception chain, which helps with debugging.Apply this diff:
- except json.JSONDecodeError: - raise RuntimeError(f"Error during parsing output: {output.text}") + except json.JSONDecodeError as err: + raise RuntimeError(f"Error during parsing output: {output.text}") from errnemo_skills/code_execution/local_sandbox/local_sandbox_server.py (3)
192-194: Uselogging.exceptionfor better traceback capture.When catching a broad Exception in job execution, use
logging.exceptioninstead oflogging.errorto automatically include the full traceback.Apply this diff:
except Exception as e: - logging.error(f"Error during job execution: {e}\n{traceback.format_exc()}") + logging.exception("Error during job execution: %s", e) return {"process_status": "error", "stderr": f"An unexpected error occurred: {e}"}This eliminates the need for manual
traceback.format_exc()calls.
284-310: Simplify nested try-except blocks in stop_shell.The multiple nested try-except blocks (lines 292-306) that silently ignore all exceptions make debugging difficult. While defensive coding is good, consider consolidating or logging suppressed exceptions.
Apply this diff to consolidate the cleanup logic:
- try: - try: - conn.send({"cmd": "shutdown"}) - except Exception: - pass - try: - conn.close() - except Exception: - pass - try: - proc.terminate() - except Exception: - try: - os.kill(proc.pid, signal.SIGKILL) - except Exception: - pass - proc.join(timeout=2.0) - return True, f"IPython session {shell_id} deleted successfully" - except Exception as e: - return False, f"Error stopping IPython session {shell_id}: {e}" + try: + # Best-effort shutdown sequence + for action, fn in [ + ("send shutdown", lambda: conn.send({"cmd": "shutdown"})), + ("close conn", lambda: conn.close()), + ("terminate", lambda: proc.terminate()), + ]: + try: + fn() + except Exception as exc: + logging.debug("stop_shell %s: %s failed: %s", shell_id, action, exc) + + # Last resort: SIGKILL + try: + if proc.is_alive(): + os.kill(proc.pid, signal.SIGKILL) + except Exception as exc: + logging.debug("stop_shell %s: SIGKILL failed: %s", shell_id, exc) + + proc.join(timeout=2.0) + return True, f"IPython session {shell_id} deleted successfully" + except Exception as e: + return False, f"Error stopping IPython session {shell_id}: {e}"This makes the cleanup sequence more readable and logs suppressed exceptions at DEBUG level for troubleshooting.
877-878: Log cancellation results for related jobs.The code cancels related jobs (line 877-878) but doesn't check or log the results. This could make debugging difficult if related job cancellation fails.
Apply this diff:
- for related_job_id in job_manager.get_related_jobs(job_id): - ok, msg = job_manager.cancel_job(related_job_id) + for related_job_id in job_manager.get_related_jobs(job_id): + ok, msg = job_manager.cancel_job(related_job_id) + if ok: + logging.info("Canceled related job %s: %s", related_job_id, msg) + else: + logging.warning("Failed to cancel related job %s: %s", related_job_id, msg)
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (2)
nemo_skills/code_execution/local_sandbox/local_sandbox_server.py(6 hunks)nemo_skills/code_execution/sandbox.py(3 hunks)
🧰 Additional context used
🪛 Ruff (0.14.0)
nemo_skills/code_execution/local_sandbox/local_sandbox_server.py
191-191: Consider moving this statement to an else block
(TRY300)
192-192: Do not catch blind exception: Exception
(BLE001)
193-193: Use logging.exception instead of logging.error
Replace with exception
(TRY400)
294-295: try-except-pass detected, consider logging the exception
(S110)
294-294: Do not catch blind exception: Exception
(BLE001)
298-299: try-except-pass detected, consider logging the exception
(S110)
298-298: Do not catch blind exception: Exception
(BLE001)
302-302: Do not catch blind exception: Exception
(BLE001)
305-306: try-except-pass detected, consider logging the exception
(S110)
305-305: Do not catch blind exception: Exception
(BLE001)
308-308: Consider moving this statement to an else block
(TRY300)
309-309: Do not catch blind exception: Exception
(BLE001)
864-864: Consider moving this statement to an else block
(TRY300)
865-865: Do not catch blind exception: Exception
(BLE001)
866-866: Use logging.exception instead of logging.error
Replace with exception
(TRY400)
nemo_skills/code_execution/sandbox.py
120-120: Within an except clause, raise exceptions with raise ... from err or raise ... from None to distinguish them from errors in exception handling
(B904)
120-120: Avoid specifying long messages outside the exception class
(TRY003)
146-146: Avoid specifying long messages outside the exception class
(TRY003)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (2)
- GitHub Check: pre-commit
- GitHub Check: unit-tests
🔇 Additional comments (3)
nemo_skills/code_execution/sandbox.py (1)
226-280: LGTM on session restoration logic!The restoration flow properly:
- Propagates session affinity via
_affinity_key(line 245)- Handles restoration failures by cleaning up the remote session
- Returns a clear error message to the user
- Avoids re-triggering cells that previously errored or timed out
nemo_skills/code_execution/local_sandbox/local_sandbox_server.py (2)
786-795: LGTM on job submission endpoint!The execute endpoint properly:
- Submits jobs to JobManager
- Returns 202 Accepted with job_id for polling
- Includes queued_ahead count for client deadline estimation
- Logs pid and worker_id for tracing timeouts (addressing past review comments)
844-867: LGTM on reset_worker endpoint!The delayed exit pattern (lines 856-862) ensures the HTTP response is delivered before the worker terminates. This is a good approach for administrative reset.
Signed-off-by: i-vainn <[email protected]>
Signed-off-by: i-vainn <[email protected]>
Signed-off-by: i-vainn <[email protected]>
Signed-off-by: i-vainn <[email protected]>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 1
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (1)
nemo_skills/code_execution/sandbox.py (1)
441-446: delete_session bypasses SSH path; fails for SSH‑tunneled sandboxesThis calls AsyncClient directly; it won’t work when using ssh_server/ssh_key_path.
- response = await self.http_session.delete( - url=f"http://{self.host}:{self.port}/sessions/{session_id}", - timeout=10.0, - headers={"X-Session-ID": session_id}, - ) + response = await self._request( + "delete", + f"http://{self.host}:{self.port}/sessions/{session_id}", + timeout=3.0, + headers={"X-Session-ID": session_id}, + )Also reduced timeout for this fast control call, per prior feedback.
🧹 Nitpick comments (7)
nemo_skills/code_execution/sandbox.py (7)
109-116: Initial POST timeout is high; make this a short, strict callThe execute POST should return fast (202 or result). Use a smaller timeout to surface server issues quickly and avoid tying up client connections.
- output = await self._request( + output = await self._request( "post", self._get_execute_url(), content=payload, - timeout=10.0, + timeout=2.0, headers={"Content-Type": "application/json", **extra_headers}, )Past review asked for shorter timeouts for these fast paths.
125-142: Cancel job before hard worker reset; avoid immediate destructive actionOn poll deadline, first try canceling the job; only reset the worker if cancel fails. This uses the new cancel endpoint and reduces recovery blast-radius.
- if time.monotonic() > deadline: + if time.monotonic() > deadline: # TODO: if the worker is healthy, instead of resetting it, we should try to restart the job # maybe reset the deadline and log retry attempt LOG.error( "Client timed out polling job %s; issuing hard worker reset (session_id=%s)", job_id, affinity_header, ) - _ = await self._request("post", self._get_reset_worker_url(), timeout=5.0, headers=extra_headers) + try: + # Best-effort cancel + await self._request("post", self._get_cancel_job_url(job_id), timeout=2.0, headers=extra_headers) + except Exception as e: + LOG.warning("Cancel job %s failed before reset: %s", job_id, e) + # Fall back to hard reset + _ = await self._request("post", self._get_reset_worker_url(), timeout=3.0, headers=extra_headers) raise httpx.TimeoutException("Client poll deadline exceeded")Also consider exponential backoff and telemetry on repeated deadline hits.
117-121: Exception chaining for JSON decode; satisfy Ruff B904Preserve the original decode error context.
- try: - response_json = output.json() - except json.JSONDecodeError: - raise RuntimeError(f"Error during parsing output: {output.text}") + try: + response_json = output.json() + except json.JSONDecodeError as exc: + raise RuntimeError(f"Error during parsing output: {output.text}") from excBased on static analysis hints.
149-157: Guard poll JSON parsing; return actionable error on malformed responseIf the job status endpoint returns non‑JSON, the client will crash. Handle decode errors and include job/session info for triage.
- job_data = resp.json() + try: + job_data = resp.json() + except json.JSONDecodeError as exc: + LOG.error( + "Polling job %s returned non-JSON (session_id=%s): %s", + job_id, + affinity_header, + getattr(resp, "text", "<no text>"), + ) + raise RuntimeError("Invalid JSON from job status endpoint") from exc
143-147: Include session affinity in poll error logsAdd session identifier for traceability across workers.
- if getattr(resp, "status_code", 200) != 200: - LOG.error("Polling job %s failed with status %d: %s", job_id, resp.status_code, resp.text) + if getattr(resp, "status_code", 200) != 200: + LOG.error( + "Polling job %s failed with status %d (session_id=%s): %s", + job_id, + resp.status_code, + affinity_header, + resp.text, + )This aligns with previous review feedback about logging session IDs for timeouts/errors.
130-157: Handle client cancellation by canceling remote jobIf the coroutine is cancelled (e.g., shutdown), propagate cancellation to the server to avoid orphaned jobs.
- while True: + while True: # This should never happen under normal circumstances if time.monotonic() > deadline: ... - resp = await self._request("get", self._get_jobs_url(job_id), timeout=2.0, headers=extra_headers) + try: + resp = await self._request("get", self._get_jobs_url(job_id), timeout=1.0, headers=extra_headers) + except asyncio.CancelledError: + try: + await self._request("post", self._get_cancel_job_url(job_id), timeout=2.0, headers=extra_headers) + except Exception as e: + LOG.warning("Cancel job %s failed after client cancellation: %s", job_id, e) + raiseAlso slightly reduce the poll timeout to keep the call short‑lived.
82-99: Minor: request shim OK; consider reusing SSH session or lazy import guardCurrent approach creates a new SSH session per call. If performance becomes an issue, reuse a session or pool. Also guard ImportError with a clearer message.
Would you like a follow‑up patch to pool SSH sessions?
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (1)
nemo_skills/code_execution/sandbox.py(5 hunks)
🧰 Additional context used
🪛 Ruff (0.14.0)
nemo_skills/code_execution/sandbox.py
120-120: Within an except clause, raise exceptions with raise ... from err or raise ... from None to distinguish them from errors in exception handling
(B904)
120-120: Avoid specifying long messages outside the exception class
(TRY003)
141-141: Avoid specifying long messages outside the exception class
(TRY003)
147-147: Avoid specifying long messages outside the exception class
(TRY003)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (1)
- GitHub Check: unit-tests
Signed-off-by: i-vainn <[email protected]>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 0
🧹 Nitpick comments (1)
nemo_skills/code_execution/local_sandbox/local_sandbox_server.py (1)
166-197: Preferlogging.exceptionfor automatic traceback inclusion.The execution logic is correct. However, consider using
logging.exceptionat Line 196 instead of manually formatting the traceback, which automatically includes the traceback and is more concise.Apply this diff:
- except Exception as e: - logging.error(f"Error during job execution: {e}\n{traceback.format_exc()}") + except Exception as e: + logging.exception("Error during job execution: %s", e) return {"process_status": "error", "stderr": f"An unexpected error occurred: {e}"}
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (1)
nemo_skills/code_execution/local_sandbox/local_sandbox_server.py(6 hunks)
🧰 Additional context used
🪛 Ruff (0.14.0)
nemo_skills/code_execution/local_sandbox/local_sandbox_server.py
194-194: Consider moving this statement to an else block
(TRY300)
195-195: Do not catch blind exception: Exception
(BLE001)
196-196: Use logging.exception instead of logging.error
Replace with exception
(TRY400)
297-298: try-except-pass detected, consider logging the exception
(S110)
297-297: Do not catch blind exception: Exception
(BLE001)
301-302: try-except-pass detected, consider logging the exception
(S110)
301-301: Do not catch blind exception: Exception
(BLE001)
305-305: Do not catch blind exception: Exception
(BLE001)
308-309: try-except-pass detected, consider logging the exception
(S110)
308-308: Do not catch blind exception: Exception
(BLE001)
311-311: Consider moving this statement to an else block
(TRY300)
312-312: Do not catch blind exception: Exception
(BLE001)
867-867: Consider moving this statement to an else block
(TRY300)
868-868: Do not catch blind exception: Exception
(BLE001)
869-869: Use logging.exception instead of logging.error
Replace with exception
(TRY400)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (2)
- GitHub Check: pre-commit
- GitHub Check: unit-tests
🔇 Additional comments (15)
nemo_skills/code_execution/local_sandbox/local_sandbox_server.py (15)
28-32: LGTM!The new imports and Job dataclass are clean and appropriate for the async job orchestration. The use of
field(default_factory=time.time)forcreated_atis the correct pattern.Also applies to: 48-57
59-75: LGTM!The JobManager design is solid:
- Single-worker ThreadPoolExecutor ensures serial execution
- Lock correctly protects shared state within class methods
- Job submission with UUID generation is clean
77-85: LGTM!The job eviction logic correctly implements the previous review feedback: finished jobs (not "queued" or "running") are popped from memory after being retrieved once, which helps manage memory usage effectively.
87-89: LGTM!The
queued_ahead_countmethod cleanly implements the previous review suggestion using a sum with boolean expressions. The lock is used correctly.
91-97: LGTM!The
get_related_jobsmethod correctly identifies jobs sharing the same session_id for coordinated cancellation. Lock usage is appropriate.
99-128: LGTM!The
_run_jobmethod correctly manages job lifecycle:
- Properly transitions from queued → running → finished states
- Handles mid-execution cancellation by checking status after execution
- Comprehensive logging with worker_id and job_id
129-164: LGTM!The
cancel_jobmethod implements robust cancellation logic:
- Properly handles queued vs. running jobs
- For IPython sessions, attempts to stop the shell for graceful interruption
- Best-effort approach is well-documented
- Returns informative success/failure messages
292-313: LGTM!The defensive cleanup operations in
stop_shellare appropriate for shutdown scenarios where failures are expected. The updated return pattern using(ok, msg)tuples improves caller ergonomics. Silent failures during cleanup are acceptable here as they don't impact correctness.Based on learnings
494-498: LGTM!The updated cleanup logic properly handles both success and failure cases from
stop_shell, with appropriate logging for each path.
777-777: LGTM!Global
job_managerinstance is appropriate for the singleton pattern in this server context.
780-785: LGTM!The
/jobs/<job_id>endpoint cleanly retrieves job state, returning 404 for non-existent jobs and usingasdictfor consistent JSON serialization.
789-798: LGTM!The modified
/executeendpoint correctly implements the async pattern:
- Returns HTTP 202 (Accepted) for queued jobs
- Includes job_id for polling
- Logs pid alongside job_id as requested in previous review
- Provides queued_ahead count for client feedback
844-844: LGTM!Adding worker identifier to health check response improves observability.
847-870: LGTM!The
/admin/reset_workerendpoint correctly implements hard reset:
- Returns response before exit using background thread with delay
- Ensures HTTP response is delivered to client
- Appropriate logging
Minor: Consider using
logging.exceptionat Line 869 for consistency (same pattern as execute_job comment).
873-890: LGTM!The
/jobs/<job_id>/cancelendpoint correctly implements cancellation:
- Cancels related jobs sharing the same session_id
- Returns appropriate HTTP status codes (200 for success, 404/409 for failures)
- Logs with header session info for debugging
Signed-off-by: i-vainn <[email protected]>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 0
♻️ Duplicate comments (1)
nemo_skills/code_execution/sandbox.py (1)
131-140: Cancel the job before resetting the worker.When the client timeout is reached, the code resets the entire worker without canceling the specific job that timed out. This can compound issues if multiple requests time out, as noted in past reviews.
Apply this diff to cancel the job before resetting:
if time.monotonic() > deadline: # TODO: if the worker is healthy, instead of resetting it, we should try to restart the job # maybe reset the deadline and log retry attempt LOG.error( "Client timed out polling job %s; issuing hard worker reset (session_id=%s)", job_id, affinity_header, ) + # Cancel the job first to prevent it from running after reset + try: + _ = await self._request("post", self._get_cancel_job_url(job_id), timeout=2.0, headers=extra_headers) + except Exception as e: + LOG.warning("Failed to cancel job %s before worker reset: %s", job_id, e) _ = await self._request("post", self._get_reset_worker_url(), timeout=5.0, headers=extra_headers) raise httpx.TimeoutException("Client poll deadline exceeded")
🧹 Nitpick comments (1)
nemo_skills/code_execution/sandbox.py (1)
116-119: Use exception chaining for better debugging.When re-raising exceptions, use
raise ... from errto preserve the exception chain and make debugging easier.Apply this diff:
try: response_json = output.json() except json.JSONDecodeError: - raise RuntimeError(f"Error during parsing output: {output.text}") + raise RuntimeError(f"Error during parsing output: {output.text}") from None
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (1)
nemo_skills/code_execution/sandbox.py(5 hunks)
🧰 Additional context used
🪛 Ruff (0.14.0)
nemo_skills/code_execution/sandbox.py
119-119: Within an except clause, raise exceptions with raise ... from err or raise ... from None to distinguish them from errors in exception handling
(B904)
119-119: Avoid specifying long messages outside the exception class
(TRY003)
140-140: Avoid specifying long messages outside the exception class
(TRY003)
146-146: Avoid specifying long messages outside the exception class
(TRY003)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (1)
- GitHub Check: unit-tests
🔇 Additional comments (5)
nemo_skills/code_execution/sandbox.py (5)
82-98: LGTM! Clean request routing abstraction.The centralized request handler properly routes through SSH tunneling when configured and correctly adapts the
contentkwarg todatafor the sshtunnel_requests library.
166-176: LGTM! Well-structured URL builder abstraction.The abstract methods provide a clean interface for URL construction, making it easy to support different sandbox backends.
398-405: LGTM! Clean URL builder implementations.The implementations correctly construct RESTful URLs for the local sandbox endpoints.
439-444: LGTM! Consistent with the new request architecture.The update properly uses the centralized
_requestmethod and correctly passes theX-Session-IDheader for session affinity.
102-108: Session-ID header handling confirmed. Server readsX-Session-IDand injects it into the request JSON, so omittingsession_idfrom the payload is safe.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@i-vainn Appears to address the major functional concerns from before. I have some lingering concerns about robustness of the overall system (e.g., if workers start failing), but it's probably best to see what happens in practice. Happy to help debug any sessions you find that are having failures with the sandbox interaction.
|
@i-vainn what's left for us to merge this? |
This PR queues sandbox jobs for sequential execution and adds client-side polling to avoid HTTP timeouts, with session affinity tests updated to match the new server API.
Summary by CodeRabbit
New Features
Bug Fixes
Tests