fix: Fix JobSet Immutability and Add Termination Message-Based Metric…#30
Conversation
|
Important Review skippedAuto reviews are disabled on base/target branches other than the default branch. Please check the settings in the CodeRabbit UI or the You can disable this status message by setting the ✨ Finishing touches🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
6b098e3 to
f9e2f89
Compare
…s Capture Signed-off-by: abhijeet-dhumal <abhijeetdhumal652@gmail.com>
f9e2f89 to
408f150
Compare
robert-bell
left a comment
There was a problem hiding this comment.
lgtm, though I'd be keen to split this into smaller PRs, to make it easier to revert things separately if necessary.
| ginkgo.GinkgoWriter.Printf("Warning: Failed to delete namespace %s: %v\n", testNs.Name, err) | ||
| } | ||
| } else { | ||
| ginkgo.GinkgoWriter.Printf("✗ Tests failed - keeping namespace for debugging: %s\n", testNs.Name) |
There was a problem hiding this comment.
is this going to leave a bunch of namespaces in the cluster? How do we clean them up?
There was a problem hiding this comment.
can we also move theese changes to a separate PR so we can easily revert it if necessary?
There was a problem hiding this comment.
nope, it will run all tests serially in the single test namespace
| // Look for the trainer container (typically named "node") | ||
| for _, containerStatus := range pod.Status.ContainerStatuses { | ||
| // Check if this is the trainer container | ||
| if containerStatus.Name != "node" && containerStatus.Name != "trainer" { |
There was a problem hiding this comment.
question: isn't the container name set by the TrainingRuntime? Can the user not set the name to something completely arbitrary? Should we just select the first container that has a termination message?
We need to worry too much about the approach if the feature ends up upstream as I've proposed a slightly different approach there that doesn't have this ambiguity.
There was a problem hiding this comment.
Good catch! You're right that the TrainingRuntime defines the container name, but no, users can't set it arbitrarily.
The TrainingRuntime webhook enforces specific container names at admission time: Trainer containers must be named "node" (defined in constants.Node)
If a TrainingRuntime tries to use a different container name for the trainer, the webhook rejects it.
There was a problem hiding this comment.
I'd be happy to discuss if I've misunderstood you!
There was a problem hiding this comment.
No that makes sense thanks!
| } | ||
|
|
||
| // Do not update the JobSet if it already exists and is not suspended | ||
| // Check if JobSet already exists |
There was a problem hiding this comment.
nit: can you pull this into a separate PR please just in case we need to revert the other changes?
| oldTrainJob := trainJob.DeepCopy() | ||
| if err := UpdateTrainerStatusAnnotation(trainJob, annotationStatus); err != nil { | ||
| return false, fmt.Errorf("failed to update trainer status annotation: %w", err) | ||
| } | ||
| patch := client.MergeFrom(oldTrainJob) | ||
| if err := c.Patch(ctx, trainJob, patch); err != nil { | ||
| return false, fmt.Errorf("failed to patch TrainJob annotations: %w", err) | ||
| } |
There was a problem hiding this comment.
nit: Maybe move the status patching logic outside the if statements to avoid duplicating the logic between the termination case and the polling one.
| if oldJobSet != nil { | ||
| oldSuspend := ptr.Deref(oldJobSet.Spec.Suspend, false) | ||
| newSuspend := ptr.Deref(trainJob.Spec.Suspend, false) | ||
|
|
||
| // Use strategic merge patch for suspend changes to avoid immutable field validation | ||
| if oldSuspend != newSuspend { | ||
| patch := client.MergeFrom(oldJobSet.DeepCopy()) | ||
| oldJobSet.Spec.Suspend = ptr.To(newSuspend) | ||
| if err := j.client.Patch(ctx, oldJobSet, patch); err != nil { | ||
| return nil, fmt.Errorf("failed to patch JobSet suspend field: %w", err) | ||
| } | ||
| return nil, nil | ||
| } | ||
|
|
||
| // Skip update if both TrainJob and JobSet are already running | ||
| if !newSuspend && !oldSuspend { | ||
| return nil, nil | ||
| } | ||
| } |
There was a problem hiding this comment.
Is it still need now we moved away from trying to inject a preStop hook?
|
Hi @astefanutti |
This PR enhances the progression tracking feature by adding support for capturing final metrics from pod termination messages (written by SDK), removes the pre-stop hook dependency, and significantly improves test coverage with comprehensive unit and e2e tests replicating TransfomersTrainer based progression callback wrappers.
Related Kubeflow SDK wrapper instrumentation in on_train_end callback : opendatahub-io/kubeflow-sdk#35
I'm targeting this entire fix into smaller atomic fixes via separate PRs here, : #32, #33, #34
Testing
Unit Tests
go test ./pkg/rhai/progression -vE2E Tests
go test ./pkg/rhai/e2e/... -v -timeout 30mAll tests passed ✅


Checklist: