Skip to content

Conversation

@amaslenn
Copy link
Contributor

Summary

Continuously fetch logs for NCCL over k8s to avoid cases when launcher is already removed but logs were not yet collected.

Addresses internal issue.

Test Plan

  1. CI
  2. Manual runs.

Additional Notes

Include any other notes or comments about the pull request here. This can include challenges faced, future considerations, or context that reviewers might find helpful.

@coderabbitai
Copy link
Contributor

coderabbitai bot commented Jan 27, 2026

📝 Walkthrough

Walkthrough

Refactor passes the full KubernetesJob object into MPIJob handling: _is_mpijob_running now accepts a KubernetesJob and uses job.name for checks and logging; MPIJob pod logs are captured and stored during status checks and before deletion.

Changes

Cohort / File(s) Change Summary
MPIJob status & lifecycle
src/cloudai/systems/kubernetes/kubernetes_system.py
_is_mpijob_running signature changed from job_name: str to job: KubernetesJob; callers updated (e.g., _is_job_running); internal references use job.name; MPIJob pod logs are stored via store_logs_for_job during status checks and before deletion; 404/exception messages and logging updated to reference the job object.

Estimated code review effort

🎯 2 (Simple) | ⏱️ ~12 minutes

Poem

🐰 A job object hops where a name used to be,
Logs gathered gently, a tidy jubilee,
Conditions checked by a rabbit's delight,
Names and pods snug in the moonlit night,
Hooray for small changes that make things right.

🚥 Pre-merge checks | ✅ 1 | ❌ 1
❌ Failed checks (1 warning)
Check name Status Explanation Resolution
Title check ⚠️ Warning The title describes a specific technical change (continuous log fetching for NCCL over Kubernetes), but the changeset summary shows the actual changes are refactoring the _is_mpijob_running method signature to accept a KubernetesJob object instead of a string. The PR title should describe the primary change: refactoring MPIJob handling to pass full job objects and add log storage. Consider updating the title to reflect the actual code changes being made.
✅ Passed checks (1 passed)
Check name Status Explanation
Description check ✅ Passed The description discusses continuous log fetching to avoid missing logs when the launcher is removed, which aligns with the changeset's addition of log storage via store_logs_for_job in the kill method.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.

✨ Finishing touches
  • 📝 Generate docstrings

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

@greptile-apps
Copy link
Contributor

greptile-apps bot commented Jan 27, 2026

Greptile Overview

Greptile Summary

Implements continuous log fetching for MPIJobs on Kubernetes to prevent log loss when the launcher is removed before logs are collected. The _is_mpijob_running method now accepts the full job object and calls store_logs_for_job during each polling interval.

Key Changes:

  • Modified _is_mpijob_running signature to accept KubernetesJob instead of just job name
  • Added log fetching during polling in _is_mpijob_running (line 171)
  • Added log fetching in kill method before job deletion (line 397)
  • Addressed previous review concern about empty conditions by placing log fetch before the empty check

Issues Found:

  • I/O overhead: Logs are fetched every second (default monitor_interval=1) and files are overwritten, resulting in ~600 write operations for a 10-minute job
  • Redundant log fetching: Logs are fetched in the final _is_mpijob_running call that detects completion, then fetched again in on_job_completion (kubernetes_runner.py:55)

Confidence Score: 3/5

  • This PR is safe to merge with moderate risk - the core functionality is sound but has efficiency concerns.
  • The implementation successfully addresses the primary goal of preventing log loss when launchers are removed early. However, the continuous log fetching every second introduces significant I/O overhead (file overwrites ~600 times for a 10-minute job) and there's redundant log fetching at job completion. These are performance/efficiency concerns rather than correctness issues. The changes are isolated to MPIJob handling and won't affect other job types.
  • Pay close attention to src/cloudai/systems/kubernetes/kubernetes_system.py for potential I/O overhead in production environments with long-running jobs.

Important Files Changed

Filename Overview
src/cloudai/systems/kubernetes/kubernetes_system.py Added continuous log fetching for MPIJobs during polling and before job deletion; changed _is_mpijob_running to accept full job object. Addresses log loss when launcher is removed early, but introduces I/O overhead and redundant log fetching.

Sequence Diagram

sequenceDiagram
    participant Runner as BaseRunner
    participant System as KubernetesSystem
    participant K8sAPI as Kubernetes API
    participant FS as File System

    Note over Runner,FS: Job Monitoring Loop (every 1 second)
    
    Runner->>System: is_job_completed(job)
    System->>System: _is_job_running(job)
    System->>System: _is_mpijob_running(job)
    System->>K8sAPI: get_namespaced_custom_object(mpijob)
    K8sAPI-->>System: status, conditions
    
    System->>System: store_logs_for_job(job_name, output_path)
    System->>K8sAPI: list_namespaced_pod() for job
    K8sAPI-->>System: pod list
    loop For each pod
        System->>K8sAPI: read_namespaced_pod_log(pod)
        K8sAPI-->>System: logs
        System->>FS: write logs (overwrite mode)
    end
    
    System-->>Runner: is_running=False (when complete)
    
    Note over Runner,FS: Job Completion Flow
    
    Runner->>Runner: on_job_completion(job)
    Runner->>System: store_logs_for_job(job_name, output_path)
    Note right of System: Redundant log fetch
    System->>K8sAPI: list_namespaced_pod() for job
    loop For each pod
        System->>K8sAPI: read_namespaced_pod_log(pod)
        System->>FS: write logs (overwrite mode)
    end
    
    Runner->>System: delete_job(job_name, kind)
    System->>K8sAPI: delete_namespaced_custom_object(mpijob)
Loading

Copy link
Contributor

@greptile-apps greptile-apps bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

2 files reviewed, 2 comments

Edit Code Review Agent Settings | Greptile

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (1)
src/cloudai/systems/kubernetes/kubernetes_system.py (1)

156-193: Move log collection before the empty-conditions early return.
Right now, jobs with empty conditions return True before store_logs_for_job runs, so no logs are collected during the early running phase—this undermines the “continuous fetch” goal. Collect logs first, then return.

🐛 Suggested fix
-            # Consider an empty conditions list as running
-            if not conditions:
-                return True
-
-            self.store_logs_for_job(job.name, job.test_run.output_path)
+            self.store_logs_for_job(job.name, job.test_run.output_path)
+
+            # Consider an empty conditions list as running
+            if not conditions:
+                return True

Copy link
Contributor

@greptile-apps greptile-apps bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

2 files reviewed, 2 comments

Edit Code Review Agent Settings | Greptile

@greptile-apps
Copy link
Contributor

greptile-apps bot commented Jan 27, 2026

Additional Comments (1)

src/cloudai/systems/kubernetes/kubernetes_system.py
Files are opened in write mode, causing complete file overwrites every second during polling. Consider using append mode or implementing a mechanism to check if logs have changed before writing.

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

🤖 Fix all issues with AI agents
In `@src/cloudai/systems/kubernetes/kubernetes_system.py`:
- Around line 396-398: The code calls store_logs_for_job for every KubernetesJob
but get_pod_names_for_job only matches MPI/Kubeflow pods, so limit log
collection to MPI jobs: update the post-run sequence where KubernetesJob (k_job)
is handled to call store_logs_for_job(k_job.name, k_job.test_run.output_path)
only when k_job.kind indicates an MPIJob (or other kinds that
get_pod_names_for_job supports), leaving delete_job(k_job.name, k_job.kind)
unchanged; alternatively, if you prefer broader support, enhance
get_pod_names_for_job to detect/handle batch or DynamoGraphDeployment pod
labels—choose one approach and implement the conditional check or label handling
in functions get_pod_names_for_job and the place where store_logs_for_job is
invoked.

@amaslenn amaslenn added the bug Something isn't working label Jan 27, 2026
Copy link
Contributor

@jeffnvidia jeffnvidia left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

@amaslenn amaslenn merged commit e74d423 into main Jan 27, 2026
5 checks passed
@amaslenn amaslenn deleted the am/logs-fetch-nccl branch January 27, 2026 14:28
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

bug Something isn't working

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants