diff --git a/docs/api-reference/operator-api.md b/docs/api-reference/operator-api.md index 1c9f6bfa8..7dea473fb 100644 --- a/docs/api-reference/operator-api.md +++ b/docs/api-reference/operator-api.md @@ -253,6 +253,7 @@ _Appears in:_ | `cliqueNames` _string array_ | CliqueNames is the list of names of the PodClique's that are part of the scaling group. | | | | `replicas` _integer_ | Replicas is the desired number of replicas for the scaling group at template level.
This allows one to control the replicas of the scaling group at startup.
If not specified, it defaults to 1. | 1 | | | `minAvailable` _integer_ | MinAvailable serves two purposes:
Gang Scheduling:
It defines the minimum number of replicas that are guaranteed to be gang scheduled.
Gang Termination:
It defines the minimum requirement of available replicas for a PodCliqueScalingGroup.
Violation of this threshold for a duration beyond TerminationDelay will result in termination of the PodCliqueSet replica that it belongs to.
Default: If not specified, it defaults to 1.
Constraints:
MinAvailable cannot be greater than Replicas.
If ScaleConfig is defined then its MinAvailable should not be less than ScaleConfig.MinReplicas. | 1 | | +| `terminationDelay` _[Duration](https://kubernetes.io/docs/reference/generated/kubernetes-api/v1.33/#duration-v1-meta)_ | TerminationDelay overrides the PodCliqueSet-level terminationDelay for this scaling group.
Can only be set if PodCliqueSetTemplateSpec.TerminationDelay is set (gang termination is enabled).
When set, this value is used instead of the PodCliqueSet-level terminationDelay for gang termination
decisions affecting this scaling group's replicas. | | | | `scaleConfig` _[AutoScalingConfig](#autoscalingconfig)_ | ScaleConfig is the horizontal pod autoscaler configuration for the pod clique scaling group. | | | | `topologyConstraint` _[TopologyConstraint](#topologyconstraint)_ | TopologyConstraint defines topology placement requirements for PodCliqueScalingGroup.
Must be equal to or stricter than parent PodCliqueSet constraints. | | | @@ -458,7 +459,7 @@ _Appears in:_ | `priorityClassName` _string_ | PriorityClassName is the name of the PriorityClass to be used for the PodCliqueSet.
If specified, indicates the priority of the PodCliqueSet. "system-node-critical" and
"system-cluster-critical" are two special keywords which indicate the
highest priorities with the former being the highest priority. Any other
name must be defined by creating a PriorityClass object with that name.
If not specified, the pod priority will be default or zero if there is no default. | | | | `headlessServiceConfig` _[HeadlessServiceConfig](#headlessserviceconfig)_ | HeadlessServiceConfig defines the config options for the headless service.
If present, create headless service for each PodGang. | | | | `topologyConstraint` _[TopologyConstraint](#topologyconstraint)_ | TopologyConstraint defines topology placement requirements for PodCliqueSet. | | | -| `terminationDelay` _[Duration](https://kubernetes.io/docs/reference/generated/kubernetes-api/v1.33/#duration-v1-meta)_ | TerminationDelay is the delay after which the gang termination will be triggered.
A gang is a candidate for termination if number of running pods fall below a threshold for any PodClique.
If a PodGang remains a candidate past TerminationDelay then it will be terminated. This allows additional time
to the kube-scheduler to re-schedule sufficient pods in the PodGang that will result in having the total number of
running pods go above the threshold.
Defaults to 4 hours. | | | +| `terminationDelay` _[Duration](https://kubernetes.io/docs/reference/generated/kubernetes-api/v1.33/#duration-v1-meta)_ | TerminationDelay is the delay after which the gang termination will be triggered.
A gang is a candidate for termination if number of running pods fall below a threshold for any PodClique.
If a PodGang remains a candidate past TerminationDelay then it will be terminated. This allows additional time
to the kube-scheduler to re-schedule sufficient pods in the PodGang that will result in having the total number of
running pods go above the threshold.
When not set (nil), gang termination is disabled for the entire PodCliqueSet.
When set, gang termination is enabled and PodCliqueScalingGroups may optionally override this value. | | | | `podCliqueScalingGroups` _[PodCliqueScalingGroupConfig](#podcliquescalinggroupconfig) array_ | PodCliqueScalingGroupConfigs is a list of scaling groups for the PodCliqueSet. | | | @@ -504,6 +505,7 @@ _Appears in:_ | `updatedReplicas` _integer_ | UpdatedReplicas is the number of Pods that have been updated and are at the desired revision of the PodClique. | 0 | | | `scheduleGatedReplicas` _integer_ | ScheduleGatedReplicas is the number of Pods that have been created with one or more scheduling gate(s) set.
Sum of ReadyReplicas and ScheduleGatedReplicas will always be <= Replicas. | 0 | | | `scheduledReplicas` _integer_ | ScheduledReplicas is the number of Pods that have been scheduled by the kube-scheduler. | 0 | | +| `wasAvailable` _boolean_ | WasAvailable indicates whether the PodClique has ever reached its MinAvailable threshold.
Once set to true, it remains true for the lifetime of the PodClique.
This field is used for gang termination: a PodClique can only be considered in breach of
MinAvailable if it was previously available (WasAvailable=true). | false | | | `hpaPodSelector` _string_ | Selector is the label selector that determines which pods are part of the PodClique.
PodClique is a unit of scale and this selector is used by HPA to scale the PodClique based on metrics captured for the pods that match this selector. | | | | `conditions` _[Condition](https://kubernetes.io/docs/reference/generated/kubernetes-api/v1.33/#condition-v1-meta) array_ | Conditions represents the latest available observations of the clique by its controller. | | | | `currentPodCliqueSetGenerationHash` _string_ | CurrentPodCliqueSetGenerationHash establishes a correlation to PodCliqueSet generation hash indicating
that the spec of the PodCliqueSet at this generation is fully realized in the PodClique. | | | diff --git a/docs/designs/gang-termination.md b/docs/designs/gang-termination.md new file mode 100644 index 000000000..0388e2e08 --- /dev/null +++ b/docs/designs/gang-termination.md @@ -0,0 +1,375 @@ +# Gang Termination Design Doc + +# Overview + +Gang termination is a mechanism in Grove that ensures when a component becomes unhealthy (falls below minimum availability threshold), the entire group (gang) is terminated rather than leaving it in a degraded state. This is particularly important for distributed AI inference workloads where partial availability often means the workload cannot function properly. + +## Abbreviations + +| Abbreviation | Full Name | Description | +|--------------|-----------|-------------| +| PCS | PodCliqueSet | Grove CRD that manages a set of PodCliques and PodCliqueScalingGroups | +| PCLQ | PodClique | Grove CRD representing a group of related pods | +| PCSG | PodCliqueScalingGroup | Grove CRD that manages scaling of PodCliques | + +## Motivation + +Distributed AI workloads often require all components to be available for the workload to function correctly. When some pods in a distributed inference or training job fail, the remaining pods may: + +- Continue consuming expensive GPU resources without producing useful work +- Block other workloads from being scheduled +- Leave the system in an undefined state + +Gang termination addresses this by providing an automated mechanism to clean up degraded workloads, freeing resources for healthy workloads to be scheduled. + +## Background + +### Gang Scheduling in Grove + +Grove implements gang scheduling through the `MinAvailable` field, which ensures that a minimum number of units can be scheduled together before any are created. At the PodClique level, this means a minimum number of pods; at the PodCliqueScalingGroup level, this means a minimum number of PCSG replicas. This prevents partial scheduling where some units run while others are stuck pending. + +### The Problem with Partial Availability + +After a workload is running, individual pods may fail due to: +- Node failures +- OOM kills +- Application crashes +- Network partitions + +Without gang termination, a workload that loses critical pods remains in a degraded state indefinitely. The `MinAvailable` threshold used for gang scheduling is reused to define "acceptable availability" for gang termination. + +# Goals + +- **Automatic Cleanup:** Automatically terminate workloads that fall below minimum availability threshold +- **Configurable Grace Period:** Allow time for Kubernetes scheduler to recover before terminating +- **Startup Protection:** Avoid terminating workloads that haven't yet reached full availability +- **Rolling Update Safety:** Prevent false terminations during expected pod churn +- **Multi-Level Granularity:** Support gang termination at PCLQ, PCSG, and PCS levels +- **Opt-In Behavior:** Gang termination is disabled by default and must be explicitly enabled + +## Scope and Limitations + +**Limitations:** + +- **Ready Pods Only (PCLQ level):** Only ready pods count toward availability—starting pods are not considered +- **Sticky WasAvailable:** Once a PodClique has reached MinAvailable, the WasAvailable flag never reverts, even if the workload is later degraded and recovers +- **No Partial Recovery:** When gang termination triggers, the affected replica (PCS or PCSG) is deleted in its entirety, healthy PodCliques within that replica are not preserved. +- **Single TerminationDelay per PCS:** While PCSGs can override the delay, standalone PCLQs within a PCS all use the PCS-level delay + +# Design Details + +## Key Concepts + +### MinAvailable + +`MinAvailable` is a field that exists at multiple levels in the Grove hierarchy, serving two purposes at each level: + +1. **Gang Scheduling**: Defines the minimum number of units that must be schedulable together +2. **Gang Termination**: Defines the threshold below which availability is considered breached + +**At each level:** + +| Level | Resource | MinAvailable Meaning | +|-------|----------|---------------------| +| PCLQ | PodClique | Minimum number of **pods** that must be available | +| PCSG | PodCliqueScalingGroup / PodCliqueScalingGroupConfig | Minimum number of **PCSG replicas** that must be available | + +**PCSG replica availability:** A PCSG replica is considered unavailable (breached) if **any** of its constituent PCLQs has `MinAvailableBreached=True`. The PCSG aggregates the breach status from all PCLQs within each replica to determine overall PCSG replica availability. + +### MinAvailableBreached Condition + +A status condition (`MinAvailableBreached`) set on PodClique and PodCliqueScalingGroup resources when available replicas fall below `MinAvailable`. The condition includes: + +- `Status`: True (breached), False (not breached), or Unknown (update in progress) +- `LastTransitionTime`: When the condition last changed, used for tracking termination delay +- `Reason`: Why the condition is in this state +- `Message`: Human-readable description + +### TerminationDelay + +A configurable duration specified at `PodCliqueSet.Spec.Template.TerminationDelay`. After `MinAvailable` is breached, this delay must pass before gang termination is triggered. This gives the Kubernetes scheduler time to potentially recover by rescheduling pods. + +**Important:** Gang termination is **disabled by default**. When `terminationDelay` is `nil` (not set), no gang termination will occur. To enable gang termination, you must explicitly set `terminationDelay` to a non-zero duration. + +### PCSG TerminationDelay Override + +Individual `PodCliqueScalingGroupConfig` entries can override the PCS-level `terminationDelay`: + +- If PCS-level `terminationDelay` is nil, gang termination is disabled for the entire PCS +- If PCS-level `terminationDelay` is set, each PCSG can optionally override it with its own delay +- PCSG overrides can only be set when PCS-level `terminationDelay` is set + +### WasAvailable Gate + +The `MinAvailableBreached` condition is only set to `True` after a PodClique has **previously reached** its `MinAvailable` threshold. This is tracked via the `WasAvailable` status field: + +- `WasAvailable` starts as `false` for new PodCliques +- It becomes `true` when `readyReplicas >= MinAvailable` (and not during rolling update) +- Once `true`, it never reverts to `false` (sticky bit) +- If `WasAvailable` is `false`, the breach condition will be `False` with reason `"NeverAvailable"` + +**Motivation for WasAvailable:** + +1. **Ignore Initial Startup:** New PodCliques should not be terminated simply because they haven't yet reached their MinAvailable threshold. The WasAvailable gate ensures gang termination only triggers for workloads that were previously healthy and then became degraded—not for workloads that are still starting up. + +2. **Operator Crash Resilience:** The WasAvailable flag is persisted in the PodClique's status (not held in memory). This ensures that if the Grove operator crashes and restarts during the transition to availability, the flag is not lost. Because it's a sticky bit that never reverts to `false`, the operator can safely restart at any time without missing the transition or incorrectly treating a previously-available PodClique as never-available. + +## Architecture + +Gang termination operates at three levels, with each level potentially triggering termination or delegating to the level above: + +``` +┌─────────────────────────────────────────────────────────────┐ +│ PodCliqueSet │ +│ - Monitors all PCSGs and standalone PCLQs per replica │ +│ - Terminates entire PCS replica if any constituent │ +│ breaches MinAvailable past TerminationDelay │ +└─────────────────────────────────────────────────────────────┘ + │ + ▼ +┌─────────────────────────────────────────────────────────────┐ +│ PodCliqueScalingGroup │ +│ - Monitors constituent PodCliques │ +│ - Can gang-terminate individual PCSG replicas if: │ +│ * PCSG.Spec.MinAvailable is NOT breached overall │ +│ * Individual replica has PCLQ with breach > delay │ +│ - If PCSG.Spec.MinAvailable IS breached, delegates to PCS │ +└─────────────────────────────────────────────────────────────┘ + │ + ▼ +┌─────────────────────────────────────────────────────────────┐ +│ PodClique │ +│ - Monitors ready pods only │ +│ - Sets MinAvailableBreached condition when: │ +│ readyPods < MinAvailable │ +└─────────────────────────────────────────────────────────────┘ +``` + +## Detailed Flow + +### 1. PodClique Level Detection + +The PodClique controller continuously monitors pod status and updates the `MinAvailableBreached` condition. The evaluation follows this order: + +1. **Check availability:** If `readyReplicas >= minAvailable`, the condition is set to `False` with reason `SufficientReadyPods`. No breach is occurring. + +2. **Check WasAvailable gate:** If `readyReplicas < minAvailable` but `WasAvailable` is `false`, the condition is set to `False` with reason `NeverAvailable`. The PodClique has never reached its minimum threshold, so we don't consider this a breach. + +3. **Check rolling update:** If a rolling update is in progress, the condition is set to `Unknown` with reason `UpdateInProgress`. This prevents false terminations during expected pod churn. + +4. **Breach detected:** If none of the above apply (ready pods are below minimum, WasAvailable is true, and no rolling update), the condition is set to `True` with reason `InsufficientReadyPods`. + +**WasAvailable tracking:** The `WasAvailable` status field is updated separately. When `readyReplicas >= minAvailable` (and no rolling update is in progress), `WasAvailable` is set to `true`. Once set, it never reverts to `false`. + +**Key points:** +- Only **ready** pods count toward availability—starting pods do not count +- Breach is detected immediately when ready pods drop below MinAvailable +- The WasAvailable gate prevents termination of workloads that haven't yet reached full availability +- Rolling updates temporarily suspend breach detection to avoid false positives + +### 2. PodCliqueScalingGroup Level Processing + +The PCSG controller aggregates breach information from its constituent PodCliques and decides whether to handle termination itself or delegate to the PCS level. + +**Identifying breached PCSG replicas:** + +The controller groups PodCliques by their PCSG replica index and checks each group for breaches. For each PCSG replica: +- If any PCLQ has `MinAvailableBreached=True` and the termination delay has expired, the replica is marked for termination +- If any PCLQ has `MinAvailableBreached=True` but the delay hasn't expired, the replica is marked for requeue (to check again later) + +**Two-level protection at PCSG:** + +Before terminating individual PCSG replicas, the controller checks whether the overall PCSG MinAvailable would be breached: + +1. **If PCSG-level MinAvailable is breached:** When the number of healthy replicas would fall below the PCSG's MinAvailable threshold, the PCSG controller does not terminate replicas itself. Instead, it signals the breach up to the PCS level, which will handle termination of the entire PCS replica. + +2. **If PCSG-level MinAvailable is NOT breached:** The PCSG controller can safely gang-terminate individual PCSG replicas that have breached PCLQs. It deletes all PodCliques belonging to those PCSG replica indices. + +### 3. PodCliqueSet Level Termination + +The PCS controller scans all constituents within each PCS replica and determines whether the entire replica should be terminated. + +**For each PCS replica, the controller checks:** + +1. **PCSGs:** Are any PodCliqueScalingGroups signaling that their MinAvailable has been breached (delegated from PCSG level)? +2. **Standalone PCLQs:** Are any PodCliques (not part of a PCSG) breached with their termination delay expired? + +**Termination decision:** + +If either condition is true with an expired termination delay, the entire PCS replica is scheduled for deletion. This means all PodCliques belonging to that PCS replica index will be deleted, regardless of their individual health status. + +If breaches exist but delays haven't expired, the controller requeues reconciliation to check again after the shortest remaining delay. + +### 4. Deletion Execution + +When gang termination is triggered for a PCS replica, the controller deletes all PodCliques that match: +- The PodCliqueSet name +- The specific PCS replica index + +This bulk deletion ensures that all components of the PCS replica (both PCSG-managed and standalone PCLQs) are removed together, maintaining the gang semantics. + +## Timing and Wait Calculation + +The wait duration is calculated as: + +```go +waitFor := terminationDelay - time.Since(condition.LastTransitionTime) +``` + +- If `waitFor <= 0`: Delay has expired, termination can proceed +- If `waitFor > 0`: Requeue reconciliation to check again after delay + +When multiple constituents are breached, the minimum wait time across all is used to determine requeue timing. + +## Special Cases + +### Gang Termination Disabled + +When `PodCliqueSet.Spec.Template.TerminationDelay` is nil: +- Gang termination is completely disabled +- `MinAvailableBreached` conditions are still computed and set on resources +- No automatic deletion of PCS replicas will occur regardless of breach status +- PCSG-level termination delay overrides cannot be set + +### Rolling Updates + +During rolling updates, `MinAvailableBreached` is set to `Unknown`: +- Prevents false positive terminations during expected pod churn +- Both old and new pods may be transitioning, affecting availability temporarily +- Once rolling update completes, normal monitoring resumes +- `WasAvailable` is not updated during rolling updates + +### Never Available (WasAvailable Gate) + +If `WasAvailable` is `false`, the condition is set to `False` with reason `NeverAvailable`: +- This is the initial state for new PodCliques +- Prevents premature termination during startup +- Only after pods have successfully reached MinAvailable threshold and then become unavailable is the breach detected +- This replaces the previous "Insufficient Scheduled Pods" logic + +### Standalone PodCliques vs PCSG-managed PodCliques + +PCS replica termination checks both: +1. **PCSGs**: Any PCSG with `MinAvailableBreached=True` triggers PCS replica check, using the effective termination delay (PCSG override or PCS default) +2. **Standalone PCLQs**: PodCliques not part of any PCSG are checked independently, using PCS-level termination delay + +## Configuration + +### Enabling Gang Termination + +Gang termination is **disabled by default**. To enable it, set `terminationDelay` at the PodCliqueSet template level: + +```yaml +apiVersion: grove.io/v1alpha1 +kind: PodCliqueSet +spec: + template: + terminationDelay: 4h # Enables gang termination with 4 hour delay + cliques: + - name: worker + spec: + replicas: 4 + minAvailable: 3 # Breach if < 3 pods available +``` + +### Disabling Gang Termination + +To disable gang termination, omit the `terminationDelay` field or set it to null: + +```yaml +apiVersion: grove.io/v1alpha1 +kind: PodCliqueSet +spec: + template: + # terminationDelay not set - gang termination disabled + cliques: + - name: worker + spec: + replicas: 4 + minAvailable: 3 +``` + +### PCSG-Level TerminationDelay Override + +When gang termination is enabled at the PCS level, individual PCSGs can override the delay: + +```yaml +apiVersion: grove.io/v1alpha1 +kind: PodCliqueSet +spec: + template: + terminationDelay: 4h # Default for all PCSGs + cliques: + - name: leader + spec: + replicas: 1 + minAvailable: 1 + - name: worker + spec: + replicas: 4 + minAvailable: 3 + podCliqueScalingGroups: + - name: inference-group + replicas: 2 + minAvailable: 1 + terminationDelay: 2h # Override: shorter delay for this PCSG + cliqueNames: [leader, worker] + - name: another-group + replicas: 3 + minAvailable: 2 + # No override: uses PCS-level 4h delay + cliqueNames: [other-clique] +``` + +**Note:** PCSG-level `terminationDelay` can only be set if PCS-level `terminationDelay` is set. If you try to set a PCSG override when PCS-level is nil, validation will fail. + +### MinAvailable at Different Levels + +**PodClique level:** +```yaml +spec: + replicas: 4 + minAvailable: 3 # At least 3 of 4 pods must be available +``` + +**PodCliqueScalingGroupConfig level:** +```yaml +spec: + template: + podCliqueScalingGroups: + - name: inference-group + replicas: 2 + minAvailable: 1 # At least 1 of 2 PCSG replicas must be available + cliqueNames: [leader, worker] +``` + +## Source Files + +| Component | File | +|-----------|------| +| PodClique status/condition | `operator/internal/controller/podclique/reconcilestatus.go` | +| PCSG gang termination | `operator/internal/controller/podcliquescalinggroup/components/podclique/sync.go` | +| PCS replica termination | `operator/internal/controller/podcliqueset/components/podcliquesetreplica/gangterminate.go` | +| Shared utilities | `operator/internal/controller/common/component/utils/podclique.go` | +| Condition constants | `operator/api/common/constants/constants.go` | + +## Debugging Gang Termination Issues + +### Check MinAvailableBreached Conditions + +```bash +# Check PodClique conditions +kubectl get pclq -o jsonpath='{range .items[*]}{.metadata.name}: {.status.conditions[?(@.type=="MinAvailableBreached")]}{"\n"}{end}' + +# Check PCSG conditions +kubectl get pcsg -o jsonpath='{range .items[*]}{.metadata.name}: {.status.conditions[?(@.type=="MinAvailableBreached")]}{"\n"}{end}' +``` + +### Key Status Fields + +- `PodClique.Status.ReadyReplicas`: Number of ready pods +- `PodClique.Status.ScheduledReplicas`: Number of scheduled pods +- `PodClique.Status.WasAvailable`: Whether the PodClique has ever reached MinAvailable +- `PodCliqueScalingGroup.Status.AvailableReplicas`: Number of available PCSG replicas +- `PodCliqueSet.Status.AvailableReplicas`: Number of available PCS replicas + diff --git a/operator/api/common/constants/constants.go b/operator/api/common/constants/constants.go index 422eacbaf..984e3e1b4 100644 --- a/operator/api/common/constants/constants.go +++ b/operator/api/common/constants/constants.go @@ -112,6 +112,9 @@ const ( ConditionReasonSufficientAvailablePCSGReplicas = "SufficientAvailablePodCliqueScalingGroupReplicas" // ConditionReasonUpdateInProgress indicates that the resource is undergoing rolling update. ConditionReasonUpdateInProgress = "UpdateInProgress" + // ConditionReasonNeverAvailable indicates that the PodClique has never reached its MinAvailable threshold. + // A PodClique must have been available at least once before it can be considered in breach of MinAvailable. + ConditionReasonNeverAvailable = "NeverAvailable" // ConditionReasonClusterTopologyNotFound indicates that the ClusterTopology resource required for topology-aware scheduling was not found. ConditionReasonClusterTopologyNotFound = "ClusterTopologyNotFound" // ConditionReasonTopologyLevelsUnavailable indicates that the one or more required topology levels defined on a diff --git a/operator/api/core/v1alpha1/crds/grove.io_podcliques.yaml b/operator/api/core/v1alpha1/crds/grove.io_podcliques.yaml index ef1b50c84..3584e3895 100644 --- a/operator/api/core/v1alpha1/crds/grove.io_podcliques.yaml +++ b/operator/api/core/v1alpha1/crds/grove.io_podcliques.yaml @@ -9117,11 +9117,20 @@ spec: updated and are at the desired revision of the PodClique. format: int32 type: integer + wasAvailable: + default: false + description: |- + WasAvailable indicates whether the PodClique has ever reached its MinAvailable threshold. + Once set to true, it remains true for the lifetime of the PodClique. + This field is used for gang termination: a PodClique can only be considered in breach of + MinAvailable if it was previously available (WasAvailable=true). + type: boolean required: - readyReplicas - scheduleGatedReplicas - scheduledReplicas - updatedReplicas + - wasAvailable type: object required: - spec diff --git a/operator/api/core/v1alpha1/crds/grove.io_podcliquesets.yaml b/operator/api/core/v1alpha1/crds/grove.io_podcliquesets.yaml index 4de29ccbd..e0cf327cc 100644 --- a/operator/api/core/v1alpha1/crds/grove.io_podcliquesets.yaml +++ b/operator/api/core/v1alpha1/crds/grove.io_podcliquesets.yaml @@ -9834,6 +9834,13 @@ spec: required: - maxReplicas type: object + terminationDelay: + description: |- + TerminationDelay overrides the PodCliqueSet-level terminationDelay for this scaling group. + Can only be set if PodCliqueSetTemplateSpec.TerminationDelay is set (gang termination is enabled). + When set, this value is used instead of the PodCliqueSet-level terminationDelay for gang termination + decisions affecting this scaling group's replicas. + type: string topologyConstraint: description: |- TopologyConstraint defines topology placement requirements for PodCliqueScalingGroup. @@ -9880,7 +9887,8 @@ spec: If a PodGang remains a candidate past TerminationDelay then it will be terminated. This allows additional time to the kube-scheduler to re-schedule sufficient pods in the PodGang that will result in having the total number of running pods go above the threshold. - Defaults to 4 hours. + When not set (nil), gang termination is disabled for the entire PodCliqueSet. + When set, gang termination is enabled and PodCliqueScalingGroups may optionally override this value. type: string topologyConstraint: description: TopologyConstraint defines topology placement requirements diff --git a/operator/api/core/v1alpha1/podclique.go b/operator/api/core/v1alpha1/podclique.go index ca694203d..06e675a78 100644 --- a/operator/api/core/v1alpha1/podclique.go +++ b/operator/api/core/v1alpha1/podclique.go @@ -121,6 +121,12 @@ type PodCliqueStatus struct { // ScheduledReplicas is the number of Pods that have been scheduled by the kube-scheduler. // +kubebuilder:default=0 ScheduledReplicas int32 `json:"scheduledReplicas"` + // WasAvailable indicates whether the PodClique has ever reached its MinAvailable threshold. + // Once set to true, it remains true for the lifetime of the PodClique. + // This field is used for gang termination: a PodClique can only be considered in breach of + // MinAvailable if it was previously available (WasAvailable=true). + // +kubebuilder:default=false + WasAvailable bool `json:"wasAvailable"` // Selector is the label selector that determines which pods are part of the PodClique. // PodClique is a unit of scale and this selector is used by HPA to scale the PodClique based on metrics captured for the pods that match this selector. Selector *string `json:"hpaPodSelector,omitempty"` diff --git a/operator/api/core/v1alpha1/podcliqueset.go b/operator/api/core/v1alpha1/podcliqueset.go index 0f43acc81..3ade27a75 100644 --- a/operator/api/core/v1alpha1/podcliqueset.go +++ b/operator/api/core/v1alpha1/podcliqueset.go @@ -151,7 +151,8 @@ type PodCliqueSetTemplateSpec struct { // If a PodGang remains a candidate past TerminationDelay then it will be terminated. This allows additional time // to the kube-scheduler to re-schedule sufficient pods in the PodGang that will result in having the total number of // running pods go above the threshold. - // Defaults to 4 hours. + // When not set (nil), gang termination is disabled for the entire PodCliqueSet. + // When set, gang termination is enabled and PodCliqueScalingGroups may optionally override this value. // +optional TerminationDelay *metav1.Duration `json:"terminationDelay,omitempty"` // PodCliqueScalingGroupConfigs is a list of scaling groups for the PodCliqueSet. @@ -225,6 +226,12 @@ type PodCliqueScalingGroupConfig struct { // +optional // +kubebuilder:default=1 MinAvailable *int32 `json:"minAvailable,omitempty"` + // TerminationDelay overrides the PodCliqueSet-level terminationDelay for this scaling group. + // Can only be set if PodCliqueSetTemplateSpec.TerminationDelay is set (gang termination is enabled). + // When set, this value is used instead of the PodCliqueSet-level terminationDelay for gang termination + // decisions affecting this scaling group's replicas. + // +optional + TerminationDelay *metav1.Duration `json:"terminationDelay,omitempty"` // ScaleConfig is the horizontal pod autoscaler configuration for the pod clique scaling group. // +optional ScaleConfig *AutoScalingConfig `json:"scaleConfig,omitempty"` diff --git a/operator/api/core/v1alpha1/zz_generated.deepcopy.go b/operator/api/core/v1alpha1/zz_generated.deepcopy.go index 25dd28da1..bf5a56d20 100644 --- a/operator/api/core/v1alpha1/zz_generated.deepcopy.go +++ b/operator/api/core/v1alpha1/zz_generated.deepcopy.go @@ -319,6 +319,11 @@ func (in *PodCliqueScalingGroupConfig) DeepCopyInto(out *PodCliqueScalingGroupCo *out = new(int32) **out = **in } + if in.TerminationDelay != nil { + in, out := &in.TerminationDelay, &out.TerminationDelay + *out = new(v1.Duration) + **out = **in + } if in.ScaleConfig != nil { in, out := &in.ScaleConfig, &out.ScaleConfig *out = new(AutoScalingConfig) diff --git a/operator/e2e/tests/gang_termination_test.go b/operator/e2e/tests/gang_termination_test.go new file mode 100644 index 000000000..359696a66 --- /dev/null +++ b/operator/e2e/tests/gang_termination_test.go @@ -0,0 +1,995 @@ +//go:build e2e + +// /* +// Copyright 2025 The Grove Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// */ + +package tests + +import ( + "context" + "fmt" + "slices" + "strings" + "testing" + "time" + + "github.com/ai-dynamo/grove/operator/e2e/utils" + k8sutils "github.com/ai-dynamo/grove/operator/internal/utils/kubernetes" + v1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" +) + +const ( + // TerminationDelay is a mirror of the value in the workload YAMLs. + TerminationDelay = 10 * time.Second +) + +// Test_GT1_GangTerminationFullReplicasPCSOwned tests gang-termination behavior when a PCS-owned PodClique is breached +// Scenario GT-1: +// 1. Initialize a 10-node Grove cluster +// 2. Deploy workload WL1, and verify 10 newly created pods +// 3. Wait for pods to get scheduled and become ready +// 4. Cordon node and then delete 1 ready pod from PCS-owned podclique pcs-0-pc-a +// 5. Wait for TerminationDelay seconds +// 6. Verify that all pods in the workload get terminated +func Test_GT1_GangTerminationFullReplicasPCSOwned(t *testing.T) { + ctx := context.Background() + + logger.Info("1. Initialize a 10-node Grove cluster") + totalPods := 10 // pc-a: 2 replicas, pc-b: 1*2 (scaling group), pc-c: 3*2 (scaling group) = 2+2+6=10 + clientset, restConfig, dynamicClient, cleanup := prepareTestCluster(ctx, t, totalPods) + defer cleanup() + + logger.Info("2. Deploy workload WL1, and verify 10 newly created pods") + tc := TestContext{ + T: t, + Ctx: ctx, + Clientset: clientset, + RestConfig: restConfig, + DynamicClient: dynamicClient, + Namespace: "default", + Timeout: defaultPollTimeout, + Interval: defaultPollInterval, + Workload: &WorkloadConfig{ + Name: "workload1", + YAMLPath: "../yaml/workload1-gt.yaml", + Namespace: "default", + ExpectedPods: totalPods, + }, + } + + pods, err := deployAndVerifyWorkload(tc) + if err != nil { + t.Fatalf("Failed to deploy workload: %v", err) + } + + logger.Info("3. Wait for pods to get scheduled and become ready") + if err := waitForReadyPods(tc, totalPods); err != nil { + t.Fatalf("Failed to wait for pods to be ready: %v", err) + } + + // Verify pods are distributed across distinct nodes + listPodsAndAssertDistinctNodes(tc) + + logger.Info("4. Cordon node and then delete 1 ready pod from PCS-owned podclique pcs-0-pc-a") + // Refresh pods list to get current ready status + pods, err = listPods(tc) + if err != nil { + t.Fatalf("Failed to refresh pod list: %v", err) + } + + // Find a pod from workload1-0-pc-a podclique (PCS-owned) + targetPod := findReadyPodFromPodClique(pods, "workload1-0-pc-a") + if targetPod == nil { + // Debug: List all pods and their cliques + logger.Errorf("Failed to find ready pod from workload1-0-pc-a. Available pods:") + for _, pod := range pods.Items { + clique := pod.Labels["grove.io/podclique"] + logger.Errorf(" Pod %s: clique=%s, phase=%s, ready=%v", pod.Name, clique, pod.Status.Phase, k8sutils.IsPodReady(&pod)) + } + t.Fatalf("Failed to find a ready pod from PCS-owned podclique workload1-0-pc-a") + } + + // Cordon the node where the target pod is running + if err := cordonNode(tc, targetPod.Spec.NodeName); err != nil { + t.Fatalf("Failed to cordon node %s: %v", targetPod.Spec.NodeName, err) + } + + // Capture pod UIDs before gang termination + originalPodUIDs := capturePodUIDs(pods) + + // Delete the target pod + logger.Debugf("Deleting pod %s from node %s", targetPod.Name, targetPod.Spec.NodeName) + if err := tc.Clientset.CoreV1().Pods(tc.Namespace).Delete(tc.Ctx, targetPod.Name, metav1.DeleteOptions{}); err != nil { + t.Fatalf("Failed to delete pod %s: %v", targetPod.Name, err) + } + + logger.Infof("5. Wait for TerminationDelay (%v) seconds", TerminationDelay) + time.Sleep(TerminationDelay) + + logger.Info("6. Verify that all pods in the workload get gang-terminated and recreated") + verifyGangTermination(tc, totalPods, originalPodUIDs) + + logger.Info("Gang-termination with full-replicas PCS-owned test (GT-1) completed successfully!") +} + +// Test_GT2_GangTerminationFullReplicasPCSGOwned tests gang-termination behavior when a PCSG-owned PodClique is breached +// Scenario GT-2: +// 1. Initialize a 10-node Grove cluster +// 2. Deploy workload WL1, and verify 10 newly created pods +// 3. Wait for pods to get scheduled and become ready +// 4. Cordon node and then delete 1 ready pod from PCSG-owned podclique pcs-0-sg-x-0-pc-c +// 5. Wait for TerminationDelay seconds +// 6. Verify that all pods in the workload get terminated +func Test_GT2_GangTerminationFullReplicasPCSGOwned(t *testing.T) { + ctx := context.Background() + + logger.Info("1. Initialize a 10-node Grove cluster") + totalPods := 10 // pc-a: 2 replicas, pc-b: 1*2 (scaling group), pc-c: 3*2 (scaling group) = 2+2+6=10 + clientset, restConfig, dynamicClient, cleanup := prepareTestCluster(ctx, t, totalPods) + defer cleanup() + + logger.Info("2. Deploy workload WL1, and verify 10 newly created pods") + tc := TestContext{ + T: t, + Ctx: ctx, + Clientset: clientset, + RestConfig: restConfig, + DynamicClient: dynamicClient, + Namespace: "default", + Timeout: defaultPollTimeout, + Interval: defaultPollInterval, + Workload: &WorkloadConfig{ + Name: "workload1", + YAMLPath: "../yaml/workload1-gt.yaml", + Namespace: "default", + ExpectedPods: totalPods, + }, + } + + pods, err := deployAndVerifyWorkload(tc) + if err != nil { + t.Fatalf("Failed to deploy workload: %v", err) + } + + logger.Info("3. Wait for pods to get scheduled and become ready") + if err := waitForReadyPods(tc, totalPods); err != nil { + t.Fatalf("Failed to wait for pods to be ready: %v", err) + } + + // Verify pods are distributed across distinct nodes + listPodsAndAssertDistinctNodes(tc) + + logger.Info("4. Cordon node and then delete 1 ready pod from PCSG-owned podclique pcs-0-sg-x-0-pc-c") + // Refresh pods list to get current ready status + pods, err = listPods(tc) + if err != nil { + t.Fatalf("Failed to refresh pod list: %v", err) + } + + // Find a pod from workload1-0-sg-x-0-pc-c podclique (PCSG-owned) + targetPod := findReadyPodFromPodClique(pods, "workload1-0-sg-x-0-pc-c") + if targetPod == nil { + // Debug: List all pods and their cliques + logger.Errorf("Failed to find ready pod from workload1-0-sg-x-0-pc-c. Available pods:") + for _, pod := range pods.Items { + clique := pod.Labels["grove.io/podclique"] + logger.Errorf(" Pod %s: clique=%s, phase=%s, ready=%v", pod.Name, clique, pod.Status.Phase, k8sutils.IsPodReady(&pod)) + } + t.Fatalf("Failed to find a ready pod from PCSG-owned podclique workload1-0-sg-x-0-pc-c") + } + + // Cordon the node where the target pod is running + if err := cordonNode(tc, targetPod.Spec.NodeName); err != nil { + t.Fatalf("Failed to cordon node %s: %v", targetPod.Spec.NodeName, err) + } + + // Capture pod UIDs before gang termination + originalPodUIDs := capturePodUIDs(pods) + + // Delete the target pod + logger.Debugf("Deleting pod %s from node %s", targetPod.Name, targetPod.Spec.NodeName) + if err := tc.Clientset.CoreV1().Pods(tc.Namespace).Delete(tc.Ctx, targetPod.Name, metav1.DeleteOptions{}); err != nil { + t.Fatalf("Failed to delete pod %s: %v", targetPod.Name, err) + } + + logger.Infof("5. Wait for TerminationDelay (%v) seconds", TerminationDelay) + time.Sleep(TerminationDelay) + + logger.Info("6. Verify that all pods in the workload get gang-terminated and recreated") + verifyGangTermination(tc, totalPods, originalPodUIDs) + + logger.Info("Gang-termination with full-replicas PCSG-owned test (GT-2) completed successfully!") +} + +// Test_GT3_GangTerminationMinReplicasPCSOwned tests gang-termination behavior with min-replicas when a PCS-owned PodClique is breached +// Scenario GT-3: +// 1. Initialize a 10-node Grove cluster +// 2. Deploy workload WL2, and verify 10 newly created pods +// 3. Wait for pods to get scheduled and become ready +// 4. Cordon node and then delete 1 pod from PCS-owned podclique pcs-0-pc-a +// 5. Wait for TerminationDelay seconds +// 6. Verify that workload pods do not get gang-terminated +// 7. Cordon node and then delete 1 ready pod from PCS-owned podclique pcs-0-pc-a +// 8. Wait for TerminationDelay seconds +// 9. Verify that all pods in the workload get terminated +func Test_GT3_GangTerminationMinReplicasPCSOwned(t *testing.T) { + ctx := context.Background() + + logger.Info("1. Initialize a 10-node Grove cluster") + totalPods := 10 // pc-a: 2 replicas, pc-b: 1*2 (scaling group), pc-c: 3*2 (scaling group) = 2+2+6=10 + clientset, restConfig, dynamicClient, cleanup := prepareTestCluster(ctx, t, totalPods) + defer cleanup() + + logger.Info("2. Deploy workload WL2, and verify 10 newly created pods") + tc := TestContext{ + T: t, + Ctx: ctx, + Clientset: clientset, + RestConfig: restConfig, + DynamicClient: dynamicClient, + Namespace: "default", + Timeout: defaultPollTimeout, + Interval: defaultPollInterval, + Workload: &WorkloadConfig{ + Name: "workload2", + YAMLPath: "../yaml/workload2-gt.yaml", + Namespace: "default", + ExpectedPods: totalPods, + }, + } + + pods, err := deployAndVerifyWorkload(tc) + if err != nil { + t.Fatalf("Failed to deploy workload: %v", err) + } + + logger.Info("3. Wait for pods to get scheduled and become ready") + if err := waitForReadyPods(tc, totalPods); err != nil { + t.Fatalf("Failed to wait for pods to be ready: %v", err) + } + + // Verify pods are distributed across distinct nodes + listPodsAndAssertDistinctNodes(tc) + + logger.Info("4. Cordon node and then delete 1 pod from PCS-owned podclique pcs-0-pc-a") + // Find the first pod from workload2-0-pc-a podclique (PCS-owned) + pods, err = listPods(tc) + if err != nil { + t.Fatalf("Failed to list workload pods: %v", err) + } + firstPod := findAndDeleteReadyPodFromPodClique(tc, pods, "workload2-0-pc-a", "first") + + // Capture pod UIDs after first deletion for later comparison + pods, err = listPods(tc) + if err != nil { + t.Fatalf("Failed to list workload pods: %v", err) + } + originalPodUIDs := capturePodUIDs(pods) + + logger.Infof("5. Wait for TerminationDelay (%v) seconds", TerminationDelay) + time.Sleep(TerminationDelay) + + logger.Info("6. Verify that workload pods do not get gang-terminated") + // Verify at least 9 original pods remain (only deleted 1, gang termination would replace all pod UIDs) + verifyNoGangTermination(tc, 9, originalPodUIDs) + + logger.Info("7. Cordon node and then delete 1 ready pod from PCS-owned podclique pcs-0-pc-a") + // Refresh pod list + pods, err = listPods(tc) + if err != nil { + t.Fatalf("Failed to list workload pods: %v", err) + } + + // Find and delete another pod from workload2-0-pc-a (excluding the first one we deleted) + findAndDeleteReadyPodFromPodCliqueExcluding(tc, pods, "workload2-0-pc-a", []string{firstPod}, "second") + + // Capture pod UIDs before gang termination + pods, err = listPods(tc) + if err != nil { + t.Fatalf("Failed to list workload pods: %v", err) + } + podUIDsBeforeFinalDeletion := capturePodUIDs(pods) + + logger.Infof("8. Wait for TerminationDelay (%v) seconds", TerminationDelay) + time.Sleep(TerminationDelay) + + logger.Info("9. Verify that all pods in the workload get gang-terminated and recreated") + verifyGangTermination(tc, totalPods, podUIDsBeforeFinalDeletion) + + logger.Info("Gang-termination with min-replicas PCS-owned test (GT-3) completed successfully!") +} + +// Test_GT4_GangTerminationMinReplicasPCSGOwned tests PCSG-level gang-termination behavior when a PCSG replica is breached +// but the PCSG's overall minAvailable is NOT breached. In this case, only the breached PCSG replica should be +// gang-terminated, not the entire PCS replica. +// +// Workload WL2 has: +// - pc-a: 2 replicas (PCS-owned, standalone) +// - sg-x: PCSG with 2 replicas (minAvailable=1), each containing pc-b (1 pod) and pc-c (3 pods, minAvailable=1) +// +// When sg-x-1's pc-c loses all pods (breaching pc-c's minAvailable), the PCSG should: +// - Gang-terminate only sg-x-1 (sg-x-1-pc-b and sg-x-1-pc-c) +// - NOT terminate sg-x-0 or pc-a (since PCSG's minAvailable=1 is still satisfied by sg-x-0) +// +// Scenario GT-4: +// 1. Initialize a 10-node Grove cluster +// 2. Deploy workload WL2, and verify 10 newly created pods +// 3. Wait for pods to get scheduled and become ready +// 4. Cordon node and then delete 1 ready pod from PCSG-owned podclique pcs-0-sg-x-0-pc-c +// 5. Wait for TerminationDelay seconds +// 6. Verify that workload pods do not get gang-terminated +// 7. Cordon node and then delete 1 ready pod from PCSG-owned podclique pcs-0-sg-x-0-pc-c +// 8. Wait for TerminationDelay seconds +// 9. Verify that workload pods do not get gang-terminated +// 10. Cordon node and then delete 1 ready pod from PCSG-owned podclique pcs-0-sg-x-1-pc-c +// 11. Wait for TerminationDelay seconds +// 12. Verify that workload pods do not get gang-terminated +// 13. Cordon nodes and then delete 2 remaining ready pods from PCSG-owned podclique pcs-0-sg-x-1-pc-c +// 14. Wait for TerminationDelay seconds +// 15. Verify that only PCSG replica sg-x-1 pods get gang-terminated and recreated (not the entire workload) +func Test_GT4_GangTerminationMinReplicasPCSGOwned(t *testing.T) { + ctx := context.Background() + + logger.Info("1. Initialize a 10-node Grove cluster") + totalPods := 10 // pc-a: 2 replicas, pc-b: 1*2 (scaling group), pc-c: 3*2 (scaling group) = 2+2+6=10 + clientset, restConfig, dynamicClient, cleanup := prepareTestCluster(ctx, t, totalPods) + defer cleanup() + + logger.Info("2. Deploy workload WL2, and verify 10 newly created pods") + tc := TestContext{ + T: t, + Ctx: ctx, + Clientset: clientset, + RestConfig: restConfig, + DynamicClient: dynamicClient, + Namespace: "default", + Timeout: defaultPollTimeout, + Interval: defaultPollInterval, + Workload: &WorkloadConfig{ + Name: "workload2", + YAMLPath: "../yaml/workload2-gt.yaml", + Namespace: "default", + ExpectedPods: totalPods, + }, + } + + pods, err := deployAndVerifyWorkload(tc) + if err != nil { + t.Fatalf("Failed to deploy workload: %v", err) + } + + logger.Info("3. Wait for pods to get scheduled and become ready") + if err := waitForReadyPods(tc, totalPods); err != nil { + t.Fatalf("Failed to wait for pods to be ready: %v", err) + } + + // Verify pods are distributed across distinct nodes + listPodsAndAssertDistinctNodes(tc) + + logger.Info("4. Cordon node and then delete 1 ready pod from PCSG-owned podclique pcs-0-sg-x-0-pc-c") + pods, err = listPods(tc) + if err != nil { + t.Fatalf("Failed to list workload pods: %v", err) + } + firstPodSgx0 := findAndDeleteReadyPodFromPodClique(tc, pods, "workload2-0-sg-x-0-pc-c", "first from sg-x-0") + + // Capture pod UIDs for sg-x-0 verification + pods, err = listPods(tc) + if err != nil { + t.Fatalf("Failed to list workload pods: %v", err) + } + originalPodUIDsSgx0 := capturePodUIDs(pods) + + logger.Infof("5. Wait for TerminationDelay (%v) seconds", TerminationDelay) + time.Sleep(TerminationDelay) + + logger.Info("6. Verify that workload pods do not get gang-terminated") + verifyNoGangTermination(tc, 9, originalPodUIDsSgx0) + + logger.Info("7. Cordon node and then delete 1 ready pod from PCSG-owned podclique pcs-0-sg-x-0-pc-c") + pods, err = listPods(tc) + if err != nil { + t.Fatalf("Failed to list workload pods: %v", err) + } + _ = findAndDeleteReadyPodFromPodCliqueExcluding(tc, pods, "workload2-0-sg-x-0-pc-c", []string{firstPodSgx0}, "second from sg-x-0") + + // Capture pod UIDs for verification + pods, err = listPods(tc) + if err != nil { + t.Fatalf("Failed to list workload pods: %v", err) + } + originalPodUIDsAfterSecond := capturePodUIDs(pods) + + logger.Infof("8. Wait for TerminationDelay (%v) seconds", TerminationDelay) + time.Sleep(TerminationDelay) + + logger.Info("9. Verify that workload pods do not get gang-terminated") + // sg-x-0-pc-c has 3 replicas with minAvailable=1, so losing 2 pods still keeps us at 1 >= minAvailable + verifyNoGangTermination(tc, 8, originalPodUIDsAfterSecond) + + logger.Info("10. Cordon node and then delete 1 ready pod from PCSG-owned podclique pcs-0-sg-x-1-pc-c") + pods, err = listPods(tc) + if err != nil { + t.Fatalf("Failed to list workload pods: %v", err) + } + firstPodSgx1 := findAndDeleteReadyPodFromPodClique(tc, pods, "workload2-0-sg-x-1-pc-c", "first from sg-x-1") + + // Capture pod UIDs for sg-x-1 verification + pods, err = listPods(tc) + if err != nil { + t.Fatalf("Failed to list workload pods: %v", err) + } + originalPodUIDsSgx1 := capturePodUIDs(pods) + + logger.Infof("11. Wait for TerminationDelay (%v) seconds", TerminationDelay) + time.Sleep(TerminationDelay) + + logger.Info("12. Verify that workload pods do not get gang-terminated") + // Verify at least 3 original pods remain (gang termination would replace all pod UIDs) + verifyNoGangTermination(tc, 3, originalPodUIDsSgx1) + + logger.Info("13. Cordon nodes and then delete 2 remaining ready pods from PCSG-owned podclique pcs-0-sg-x-1-pc-c") + pods, err = listPods(tc) + if err != nil { + t.Fatalf("Failed to list workload pods: %v", err) + } + + // Capture pod UIDs BEFORE deleting the remaining pods - we need to track which pods should be terminated + // and which should remain unchanged + podsBeforeFinalDeletion, err := listPods(tc) + if err != nil { + t.Fatalf("Failed to list workload pods: %v", err) + } + + // Identify pods that belong to sg-x-1 (should be terminated) vs other components (should remain) + sgx1PodUIDs := make(map[string]string) // Pods in sg-x-1 (should be gang-terminated) + otherPodUIDs := make(map[string]string) // Pods NOT in sg-x-1 (should remain unchanged) + for _, pod := range podsBeforeFinalDeletion.Items { + clique := pod.Labels["grove.io/podclique"] + // sg-x-1 pods have clique names like "workload2-0-sg-x-1-pc-b" or "workload2-0-sg-x-1-pc-c" + if strings.Contains(clique, "-sg-x-1-") { + sgx1PodUIDs[pod.Name] = string(pod.UID) + } else { + otherPodUIDs[pod.Name] = string(pod.UID) + } + } + logger.Debugf("Pods in sg-x-1 (to be terminated): %d, Other pods (should remain): %d", len(sgx1PodUIDs), len(otherPodUIDs)) + + // Find and delete remaining pods from workload2-0-sg-x-1-pc-c + deletedPodsSgx1 := []string{firstPodSgx1} + for i := range 2 { + // Same reasoning: delete Ready pods only, so we validate behavior after healthy losses + podName := findAndDeleteReadyPodFromPodCliqueExcluding(tc, pods, "workload2-0-sg-x-1-pc-c", deletedPodsSgx1, fmt.Sprintf("pod %d from sg-x-1", i+2)) + deletedPodsSgx1 = append(deletedPodsSgx1, podName) + // Refresh pod list + pods, err = listPods(tc) + if err != nil { + t.Fatalf("Failed to list workload pods: %v", err) + } + } + + logger.Infof("14. Wait for TerminationDelay (%v) seconds", TerminationDelay) + time.Sleep(TerminationDelay) + + logger.Info("15. Verify that only PCSG replica sg-x-1 pods get gang-terminated (not the entire workload)") + // sg-x-1 should have 4 pods: 1 from pc-b + 3 from pc-c + expectedSgx1Pods := 4 + verifyPCSGReplicaGangTermination(tc, totalPods, sgx1PodUIDs, otherPodUIDs, expectedSgx1Pods) + + logger.Info("PCSG-level gang-termination test (GT-4) completed successfully!") +} + +// Test_GT5_GangTerminationPCSGMinAvailableBreach tests PCSG-level minAvailable breach that delegates to PCS-level gang termination. +// When both PCSG replicas are breached, the PCSG's own minAvailable becomes unsatisfied, and it should delegate +// to PCS-level gang termination (terminating the entire workload, including standalone pc-a). +// +// Workload WL2 has: +// - pc-a: 2 replicas (PCS-owned, standalone) +// - sg-x: PCSG with 2 replicas (minAvailable=1), each containing pc-b (1 pod) and pc-c (3 pods, minAvailable=1) +// +// Key difference from GT-4: +// | Test | Breach | PCSG minAvailable | Result | +// |------|------------------------|-------------------------|-------------------------------| +// | GT-4 | Only sg-x-1 | Still satisfied (1 ≥ 1) | Only sg-x-1 terminated | +// | GT-5 | Both sg-x-0 AND sg-x-1 | Breached (0 < 1) | Entire PCS replica terminated | +// +// Scenario GT-5: +// 1. Initialize a 10-node Grove cluster +// 2. Deploy workload WL2, and verify 10 newly created pods +// 3. Wait for pods to get scheduled and become ready +// 4. Delete all 3 pods from sg-x-0-pc-c (breach sg-x-0) +// 5. Immediately delete all 3 pods from sg-x-1-pc-c (breach sg-x-1) +// (Must do steps 4-5 quickly, within TerminationDelay window) +// 6. Wait for TerminationDelay seconds +// 7. Verify that ALL pods in the workload get gang-terminated and recreated (including pc-a) +func Test_GT5_GangTerminationPCSGMinAvailableBreach(t *testing.T) { + ctx := context.Background() + + logger.Info("1. Initialize a 10-node Grove cluster") + totalPods := 10 // pc-a: 2 replicas, pc-b: 1*2 (scaling group), pc-c: 3*2 (scaling group) = 2+2+6=10 + clientset, restConfig, dynamicClient, cleanup := prepareTestCluster(ctx, t, totalPods) + defer cleanup() + + logger.Info("2. Deploy workload WL2, and verify 10 newly created pods") + tc := TestContext{ + T: t, + Ctx: ctx, + Clientset: clientset, + RestConfig: restConfig, + DynamicClient: dynamicClient, + Namespace: "default", + Timeout: defaultPollTimeout, + Interval: defaultPollInterval, + Workload: &WorkloadConfig{ + Name: "workload2", + YAMLPath: "../yaml/workload2-gt.yaml", + Namespace: "default", + ExpectedPods: totalPods, + }, + } + + pods, err := deployAndVerifyWorkload(tc) + if err != nil { + t.Fatalf("Failed to deploy workload: %v", err) + } + + logger.Info("3. Wait for pods to get scheduled and become ready") + if err := waitForReadyPods(tc, totalPods); err != nil { + t.Fatalf("Failed to wait for pods to be ready: %v", err) + } + + // Verify pods are distributed across distinct nodes + listPodsAndAssertDistinctNodes(tc) + + logger.Info("4. Delete all 3 pods from sg-x-0-pc-c (breach sg-x-0)") + pods, err = listPods(tc) + if err != nil { + t.Fatalf("Failed to list workload pods: %v", err) + } + + // Delete all 3 pods from sg-x-0-pc-c + deletedPodsSgx0 := deleteAllPodsFromPodClique(tc, pods, "workload2-0-sg-x-0-pc-c") + if len(deletedPodsSgx0) != 3 { + t.Fatalf("Expected to delete 3 pods from sg-x-0-pc-c, but deleted %d", len(deletedPodsSgx0)) + } + + logger.Info("5. Immediately delete all 3 pods from sg-x-1-pc-c (breach sg-x-1)") + // Refresh pod list + pods, err = listPods(tc) + if err != nil { + t.Fatalf("Failed to list workload pods: %v", err) + } + + // Delete all 3 pods from sg-x-1-pc-c + deletedPodsSgx1 := deleteAllPodsFromPodClique(tc, pods, "workload2-0-sg-x-1-pc-c") + if len(deletedPodsSgx1) != 3 { + t.Fatalf("Expected to delete 3 pods from sg-x-1-pc-c, but deleted %d", len(deletedPodsSgx1)) + } + + // Capture pod UIDs after deletions for later verification + // Note: We need to capture these AFTER the deletions to track what remains + pods, err = listPods(tc) + if err != nil { + t.Fatalf("Failed to list workload pods: %v", err) + } + originalPodUIDs := capturePodUIDs(pods) + + logger.Infof("6. Wait for TerminationDelay (%v) seconds", TerminationDelay) + time.Sleep(TerminationDelay) + + logger.Info("7. Verify that ALL pods in the workload get gang-terminated and recreated (including pc-a)") + // With both PCSG replicas breached, PCSG minAvailable (0 < 1) is violated + // This should trigger PCS-level gang termination, which terminates ALL pods including pc-a + verifyGangTermination(tc, totalPods, originalPodUIDs) + + logger.Info("PCSG minAvailable breach → PCS-level gang-termination test (GT-5) completed successfully!") +} + +// Helper functions + +// capturePodUIDs captures the UIDs of all pods in the list +func capturePodUIDs(pods *v1.PodList) map[string]string { + uidMap := make(map[string]string) + for _, pod := range pods.Items { + uidMap[pod.Name] = string(pod.UID) + } + return uidMap +} + +// findReadyPodFromPodClique finds a ready pod from the specified podclique. +// These helpers focus on Ready pods because the gang-termination tests must emulate +// the operator reacting to healthy replica loss, not pods that were already failing. +func findReadyPodFromPodClique(pods *v1.PodList, podCliqueName string) *v1.Pod { + for i := range pods.Items { + pod := &pods.Items[i] + if podCliqueLabel, exists := pod.Labels["grove.io/podclique"]; exists && podCliqueLabel == podCliqueName { + if pod.Status.Phase == v1.PodRunning && k8sutils.IsPodReady(pod) { + return pod + } + } + } + return nil +} + +// findReadyPodFromPodCliqueExcluding finds first ready pod from the specified podclique (excluding certain pods). +// Maintaining the Ready requirement keeps the test aligned with healthy-replica eviction scenarios. +func findReadyPodFromPodCliqueExcluding(pods *v1.PodList, podCliqueName string, excludePods []string) *v1.Pod { + for i := range pods.Items { + pod := &pods.Items[i] + if podCliqueLabel, exists := pod.Labels["grove.io/podclique"]; exists && podCliqueLabel == podCliqueName { + if pod.Status.Phase == v1.PodRunning && k8sutils.IsPodReady(pod) { + // Check if this pod should be excluded + if !slices.Contains(excludePods, pod.Name) { + return pod + } + } + } + } + return nil +} + +// findAndDeleteReadyPodFromPodClique finds a ready pod from the specified podclique and deletes it. +// Ready-only deletions mimic losing healthy replicas, which is what should trigger gang termination logic. +func findAndDeleteReadyPodFromPodClique(tc TestContext, pods *v1.PodList, podCliqueName, description string) string { + targetPod := findReadyPodFromPodClique(pods, podCliqueName) + if targetPod == nil { + tc.T.Fatalf("Failed to find %s ready pod from podclique %s", description, podCliqueName) + return "" + } + + // Cordon the node + if err := cordonNode(tc, targetPod.Spec.NodeName); err != nil { + tc.T.Fatalf("Failed to cordon node %s: %v", targetPod.Spec.NodeName, err) + } + + // Delete the pod + logger.Debugf("Deleting %s pod %s from node %s (podclique: %s)", description, targetPod.Name, targetPod.Spec.NodeName, podCliqueName) + if err := tc.Clientset.CoreV1().Pods(tc.Namespace).Delete(tc.Ctx, targetPod.Name, metav1.DeleteOptions{}); err != nil { + tc.T.Fatalf("Failed to delete pod %s: %v", targetPod.Name, err) + } + + return targetPod.Name +} + +// findAndDeleteReadyPodFromPodCliqueExcluding finds a ready pod from the specified podclique (excluding certain pods) and deletes it. +// This helper keeps the Ready constraint for the same reason—testing healthy replica loss semantics. +func findAndDeleteReadyPodFromPodCliqueExcluding(tc TestContext, pods *v1.PodList, podCliqueName string, excludePods []string, description string) string { + targetPod := findReadyPodFromPodCliqueExcluding(pods, podCliqueName, excludePods) + if targetPod == nil { + tc.T.Fatalf("Failed to find %s ready pod from podclique %s", description, podCliqueName) + return "" + } + + // Cordon the node + if err := cordonNode(tc, targetPod.Spec.NodeName); err != nil { + tc.T.Fatalf("Failed to cordon node %s: %v", targetPod.Spec.NodeName, err) + } + + // Delete the pod + logger.Debugf("Deleting %s pod %s from node %s (podclique: %s)", description, targetPod.Name, targetPod.Spec.NodeName, podCliqueName) + if err := tc.Clientset.CoreV1().Pods(tc.Namespace).Delete(tc.Ctx, targetPod.Name, metav1.DeleteOptions{}); err != nil { + tc.T.Fatalf("Failed to delete pod %s: %v", targetPod.Name, err) + } + + return targetPod.Name +} + +// deleteAllPodsFromPodClique finds all ready pods from the specified podclique, cordons their nodes, and deletes them. +// Returns the names of all deleted pods. This is used in GT-5 to quickly breach multiple PCSG replicas +// within the TerminationDelay window. +func deleteAllPodsFromPodClique(tc TestContext, pods *v1.PodList, podCliqueName string) []string { + var deletedPods []string + + for i := range pods.Items { + pod := &pods.Items[i] + if podCliqueLabel, exists := pod.Labels["grove.io/podclique"]; exists && podCliqueLabel == podCliqueName { + if pod.Status.Phase == v1.PodRunning && k8sutils.IsPodReady(pod) { + // Cordon the node + if err := cordonNode(tc, pod.Spec.NodeName); err != nil { + tc.T.Fatalf("Failed to cordon node %s: %v", pod.Spec.NodeName, err) + } + + // Delete the pod + logger.Debugf("Deleting pod %s from node %s (podclique: %s)", pod.Name, pod.Spec.NodeName, podCliqueName) + if err := tc.Clientset.CoreV1().Pods(tc.Namespace).Delete(tc.Ctx, pod.Name, metav1.DeleteOptions{}); err != nil { + tc.T.Fatalf("Failed to delete pod %s: %v", pod.Name, err) + } + deletedPods = append(deletedPods, pod.Name) + } + } + } + + if len(deletedPods) == 0 { + tc.T.Fatalf("Failed to find any ready pods from podclique %s", podCliqueName) + } + + return deletedPods +} + +// verifyNoGangTermination verifies that gang-termination has not occurred by checking that original pod UIDs still exist +func verifyNoGangTermination(tc TestContext, minExpectedRunning int, originalPodUIDs map[string]string) { + tc.T.Helper() + + pollCount := 0 + err := pollForCondition(tc, func() (bool, error) { + pollCount++ + pods, err := tc.Clientset.CoreV1().Pods(tc.Namespace).List(tc.Ctx, metav1.ListOptions{ + LabelSelector: tc.getLabelSelector(), + }) + if err != nil { + return false, err + } + + runningCount := 0 + pendingCount := 0 + terminatingCount := 0 + currentUIDs := make(map[string]bool) + oldPodsRemaining := 0 + + for _, pod := range pods.Items { + currentUIDs[string(pod.UID)] = true + + switch pod.Status.Phase { + case v1.PodRunning: + runningCount++ + case v1.PodPending: + pendingCount++ + } + if pod.DeletionTimestamp != nil { + terminatingCount++ + } + } + + // Check that original UIDs still exist (gang termination would replace all UIDs) + for _, originalUID := range originalPodUIDs { + if currentUIDs[originalUID] { + oldPodsRemaining++ + } + } + + runningOrPendingCount := runningCount + pendingCount + // Success criteria: + // 1. At least minExpectedRunning pods are running/pending + // 2. Most original pods still exist (allowing for the few we intentionally deleted) + success := runningOrPendingCount >= minExpectedRunning && oldPodsRemaining >= minExpectedRunning + status := "OK" + if !success { + status = "WAITING" + } + logger.Debugf("%s [Poll %d] running=%d, pending=%d, terminating=%d, total=%d, original_remain=%d/%d (min_expected=%d)", + status, pollCount, runningCount, pendingCount, terminatingCount, len(pods.Items), oldPodsRemaining, len(originalPodUIDs), minExpectedRunning) + + return success, nil + }) + + if err != nil { + tc.T.Fatalf("Failed to verify no gang-termination: %v", err) + } +} + +// verifyGangTermination verifies that gang-termination has occurred and all pods were recreated +func verifyGangTermination(tc TestContext, expectedPods int, originalPodUIDs map[string]string) { + tc.T.Helper() + + // After gang-termination, pods should be recreated with new UIDs + // They don't need to all be Pending - they may have already started scheduling + pollCount := 0 + err := pollForCondition(tc, func() (bool, error) { + pollCount++ + pods, err := tc.Clientset.CoreV1().Pods(tc.Namespace).List(tc.Ctx, metav1.ListOptions{ + LabelSelector: tc.getLabelSelector(), + }) + if err != nil { + return false, err + } + + // Verify none of the original pod UIDs exist (all were deleted and recreated) + currentUIDs := make(map[string]bool) + pendingCount := 0 + runningCount := 0 + terminatingCount := 0 + oldPodsRemaining := 0 + nonTerminatingCount := 0 + + for _, pod := range pods.Items { + currentUIDs[string(pod.UID)] = true + + // Don't count terminating pods + if pod.DeletionTimestamp == nil { + nonTerminatingCount++ + } + + switch pod.Status.Phase { + case v1.PodPending: + pendingCount++ + case v1.PodRunning: + runningCount++ + } + if pod.DeletionTimestamp != nil { + terminatingCount++ + } + } + + // Check that no original UIDs exist in current pods (all recreated) + for _, originalUID := range originalPodUIDs { + if currentUIDs[originalUID] { + oldPodsRemaining++ + } + } + + // Success criteria: + // 1. We have the expected number of non-terminating pods + // 2. None of the old pod UIDs remain (all were recreated) + success := nonTerminatingCount == expectedPods && oldPodsRemaining == 0 + status := "OK" + if !success { + status = "WAITING" + } + logger.Debugf("%s [Poll %d] total=%d, non-terminating=%d/%d, pending=%d, running=%d, terminating=%d, old=%d", + status, pollCount, len(pods.Items), nonTerminatingCount, expectedPods, pendingCount, runningCount, terminatingCount, oldPodsRemaining) + + return success, nil + }) + + if err != nil { + // Add detailed diagnostics on failure + pods, listErr := utils.ListPods(tc.Ctx, tc.Clientset, tc.Namespace, tc.getLabelSelector()) + if listErr == nil { + logger.Errorf("Gang-termination verification failed. Current state: total_pods=%d, expected=%d", len(pods.Items), expectedPods) + + // Count old UIDs still present + oldUIDs := 0 + for _, pod := range pods.Items { + if _, exists := originalPodUIDs[pod.Name]; exists && originalPodUIDs[pod.Name] == string(pod.UID) { + oldUIDs++ + logger.Debugf(" OLD Pod %s: phase=%s, uid=%s (terminating=%v)", pod.Name, pod.Status.Phase, pod.UID, pod.DeletionTimestamp != nil) + } else { + logger.Debugf(" NEW Pod %s: phase=%s, uid=%s (terminating=%v)", pod.Name, pod.Status.Phase, pod.UID, pod.DeletionTimestamp != nil) + } + } + logger.Errorf("Old pods remaining: %d/%d", oldUIDs, len(originalPodUIDs)) + } + tc.T.Fatalf("Failed to verify gang-termination and recreation: %v", err) + } +} + +// verifyPCSGReplicaGangTermination verifies that only the specified PCSG replica's pods were gang-terminated +// while other pods in the workload remained unchanged. This tests PCSG-level gang termination where only +// a single PCSG replica is breached but the PCSG's overall minAvailable is still satisfied. +func verifyPCSGReplicaGangTermination(tc TestContext, expectedTotalPods int, terminatedPodUIDs, unchangedPodUIDs map[string]string, expectedTerminatedPods int) { + tc.T.Helper() + + pollCount := 0 + err := pollForCondition(tc, func() (bool, error) { + pollCount++ + pods, err := tc.Clientset.CoreV1().Pods(tc.Namespace).List(tc.Ctx, metav1.ListOptions{ + LabelSelector: tc.getLabelSelector(), + }) + if err != nil { + return false, err + } + + // Track current state + currentUIDs := make(map[string]string) + nonTerminatingCount := 0 + terminatingCount := 0 + newTerminatedPods := 0 // Pods that were in terminatedPodUIDs but now have new UIDs + unchangedPodsRemaining := 0 // Pods that were in unchangedPodUIDs and still have same UIDs + + for _, pod := range pods.Items { + currentUIDs[pod.Name] = string(pod.UID) + + if pod.DeletionTimestamp != nil { + terminatingCount++ + continue + } + nonTerminatingCount++ + + // Check if this pod was supposed to be terminated (sg-x-1 pods) + // We look for pods with similar names (same clique) but different UIDs + for origName, origUID := range terminatedPodUIDs { + clique := pod.Labels["grove.io/podclique"] + origClique := strings.TrimSuffix(origName, origName[strings.LastIndex(origName, "-"):]) + if strings.HasPrefix(clique, "workload2-0-sg-x-1-") && string(pod.UID) != origUID { + // This is a new pod for a sg-x-1 clique + newTerminatedPods++ + break + } + _ = origClique // avoid unused variable + } + } + + // Check that unchanged pods still have same UIDs + for origName, origUID := range unchangedPodUIDs { + if currentUID, exists := currentUIDs[origName]; exists && currentUID == origUID { + unchangedPodsRemaining++ + } + } + + // Count how many sg-x-1 pods now exist (should be expectedTerminatedPods with new UIDs) + sgx1PodsCount := 0 + for _, pod := range pods.Items { + if pod.DeletionTimestamp != nil { + continue + } + clique := pod.Labels["grove.io/podclique"] + if strings.Contains(clique, "-sg-x-1-") { + sgx1PodsCount++ + } + } + + // Success criteria: + // 1. Total non-terminating pods equals expected total + // 2. sg-x-1 has the expected number of pods (recreated) + // 3. Unchanged pods (pc-a, sg-x-0-*) still have original UIDs + success := nonTerminatingCount == expectedTotalPods && + sgx1PodsCount == expectedTerminatedPods && + unchangedPodsRemaining == len(unchangedPodUIDs) + + status := "OK" + if !success { + status = "WAITING" + } + logger.Debugf("%s [Poll %d] total=%d, non-terminating=%d/%d, terminating=%d, sg-x-1=%d/%d, unchanged=%d/%d", + status, pollCount, len(pods.Items), nonTerminatingCount, expectedTotalPods, + terminatingCount, sgx1PodsCount, expectedTerminatedPods, + unchangedPodsRemaining, len(unchangedPodUIDs)) + + return success, nil + }) + + if err != nil { + // Add detailed diagnostics on failure + pods, listErr := utils.ListPods(tc.Ctx, tc.Clientset, tc.Namespace, tc.getLabelSelector()) + if listErr == nil { + logger.Errorf("PCSG replica gang-termination verification failed. Current pods:") + sgx1Count := 0 + otherCount := 0 + for _, pod := range pods.Items { + clique := pod.Labels["grove.io/podclique"] + isSgx1 := strings.Contains(clique, "-sg-x-1-") + if isSgx1 { + sgx1Count++ + } else { + otherCount++ + } + // Check if UID changed + var uidStatus string + if origUID, wasTerminated := terminatedPodUIDs[pod.Name]; wasTerminated { + if string(pod.UID) == origUID { + uidStatus = "SAME-UID (should be NEW)" + } else { + uidStatus = "NEW-UID (correct)" + } + } else if origUID, wasUnchanged := unchangedPodUIDs[pod.Name]; wasUnchanged { + if string(pod.UID) == origUID { + uidStatus = "SAME-UID (correct)" + } else { + uidStatus = "NEW-UID (should be SAME)" + } + } else { + uidStatus = "NEW-POD" + } + logger.Debugf(" Pod %s: clique=%s, phase=%s, uid=%s, %s, terminating=%v", + pod.Name, clique, pod.Status.Phase, pod.UID, uidStatus, pod.DeletionTimestamp != nil) + } + logger.Errorf("Summary: sg-x-1 pods=%d (expected %d), other pods=%d (expected %d)", + sgx1Count, expectedTerminatedPods, otherCount, len(unchangedPodUIDs)) + } + tc.T.Fatalf("Failed to verify PCSG replica gang-termination: %v", err) + } +} diff --git a/operator/e2e/yaml/workload1-gt.yaml b/operator/e2e/yaml/workload1-gt.yaml new file mode 100644 index 000000000..8ec7a9e78 --- /dev/null +++ b/operator/e2e/yaml/workload1-gt.yaml @@ -0,0 +1,111 @@ +# Workload 1 for Gang Termination Tests: Strict Startup (AnyOrder) +# This variant enables gang termination with a 10s delay +--- +apiVersion: grove.io/v1alpha1 +kind: PodCliqueSet +metadata: + name: workload1 + labels: + app: workload1 +spec: + replicas: 1 + template: + terminationDelay: 10s + cliques: + - name: pc-a + labels: + kai.scheduler/queue: test + spec: + roleName: role-a + replicas: 2 + minAvailable: 2 + podSpec: + schedulerName: kai-scheduler + terminationGracePeriodSeconds: 1 + affinity: + nodeAffinity: + requiredDuringSchedulingIgnoredDuringExecution: + nodeSelectorTerms: + - matchExpressions: + - key: node_role.e2e.grove.nvidia.com + operator: In + values: + - agent + tolerations: + - key: node_role.e2e.grove.nvidia.com + operator: Equal + value: agent + effect: NoSchedule + containers: + - name: pc-a + image: registry:5001/nginx:alpine-slim + resources: + requests: + memory: 80Mi + - name: pc-b + labels: + kai.scheduler/queue: test + spec: + roleName: role-b + replicas: 1 + minAvailable: 1 + podSpec: + schedulerName: kai-scheduler + terminationGracePeriodSeconds: 1 + affinity: + nodeAffinity: + requiredDuringSchedulingIgnoredDuringExecution: + nodeSelectorTerms: + - matchExpressions: + - key: node_role.e2e.grove.nvidia.com + operator: In + values: + - agent + tolerations: + - key: node_role.e2e.grove.nvidia.com + operator: Equal + value: agent + effect: NoSchedule + containers: + - name: pc-b + image: registry:5001/nginx:alpine-slim + resources: + requests: + memory: 80Mi + - name: pc-c + labels: + kai.scheduler/queue: test + spec: + roleName: role-c + replicas: 3 + minAvailable: 3 + podSpec: + schedulerName: kai-scheduler + terminationGracePeriodSeconds: 1 + affinity: + nodeAffinity: + requiredDuringSchedulingIgnoredDuringExecution: + nodeSelectorTerms: + - matchExpressions: + - key: node_role.e2e.grove.nvidia.com + operator: In + values: + - agent + tolerations: + - key: node_role.e2e.grove.nvidia.com + operator: Equal + value: agent + effect: NoSchedule + containers: + - name: pc-c + image: registry:5001/nginx:alpine-slim + resources: + requests: + memory: 80Mi + podCliqueScalingGroups: + - name: sg-x + replicas: 2 + minAvailable: 2 + cliqueNames: + - pc-b + - pc-c diff --git a/operator/e2e/yaml/workload2-gt.yaml b/operator/e2e/yaml/workload2-gt.yaml new file mode 100644 index 000000000..bc327d832 --- /dev/null +++ b/operator/e2e/yaml/workload2-gt.yaml @@ -0,0 +1,112 @@ +# Workload 2 for Gang Termination Tests: Flexible Startup +# This variant enables gang termination with a 10s delay +# Uses minAvailable < replicas to test threshold behavior +--- +apiVersion: grove.io/v1alpha1 +kind: PodCliqueSet +metadata: + name: workload2 + labels: + app: workload2 +spec: + replicas: 1 + template: + terminationDelay: 10s + cliques: + - name: pc-a + labels: + kai.scheduler/queue: test + spec: + roleName: role-a + replicas: 2 + minAvailable: 1 + podSpec: + schedulerName: kai-scheduler + terminationGracePeriodSeconds: 1 + affinity: + nodeAffinity: + requiredDuringSchedulingIgnoredDuringExecution: + nodeSelectorTerms: + - matchExpressions: + - key: node_role.e2e.grove.nvidia.com + operator: In + values: + - agent + tolerations: + - key: node_role.e2e.grove.nvidia.com + operator: Equal + value: agent + effect: NoSchedule + containers: + - name: pc-a + image: registry:5001/nginx:alpine-slim + resources: + requests: + memory: 80Mi + - name: pc-b + labels: + kai.scheduler/queue: test + spec: + roleName: role-b + replicas: 1 + minAvailable: 1 + podSpec: + schedulerName: kai-scheduler + terminationGracePeriodSeconds: 1 + affinity: + nodeAffinity: + requiredDuringSchedulingIgnoredDuringExecution: + nodeSelectorTerms: + - matchExpressions: + - key: node_role.e2e.grove.nvidia.com + operator: In + values: + - agent + tolerations: + - key: node_role.e2e.grove.nvidia.com + operator: Equal + value: agent + effect: NoSchedule + containers: + - name: pc-b + image: registry:5001/nginx:alpine-slim + resources: + requests: + memory: 80Mi + - name: pc-c + labels: + kai.scheduler/queue: test + spec: + roleName: role-c + replicas: 3 + minAvailable: 1 + podSpec: + schedulerName: kai-scheduler + terminationGracePeriodSeconds: 1 + affinity: + nodeAffinity: + requiredDuringSchedulingIgnoredDuringExecution: + nodeSelectorTerms: + - matchExpressions: + - key: node_role.e2e.grove.nvidia.com + operator: In + values: + - agent + tolerations: + - key: node_role.e2e.grove.nvidia.com + operator: Equal + value: agent + effect: NoSchedule + containers: + - name: pc-c + image: registry:5001/nginx:alpine-slim + resources: + requests: + memory: 80Mi + podCliqueScalingGroups: + - name: sg-x + replicas: 2 + minAvailable: 1 + cliqueNames: + - pc-b + - pc-c diff --git a/operator/internal/controller/common/component/utils/podclique.go b/operator/internal/controller/common/component/utils/podclique.go index f4af085f8..50a1b866f 100644 --- a/operator/internal/controller/common/component/utils/podclique.go +++ b/operator/internal/controller/common/component/utils/podclique.go @@ -87,8 +87,14 @@ func GroupPCLQsByPCSReplicaIndex(pclqs []grovecorev1alpha1.PodClique) map[string } // GetMinAvailableBreachedPCLQInfo filters PodCliques that have grovecorev1alpha1.ConditionTypeMinAvailableBreached set to true. -// For each such PodClique it returns the name of the PodClique a duration to wait for before terminationDelay is breached. -func GetMinAvailableBreachedPCLQInfo(pclqs []grovecorev1alpha1.PodClique, terminationDelay time.Duration, since time.Time) ([]string, time.Duration) { +// For each such PodClique it returns the name of the PodClique and a duration to wait for before terminationDelay is breached. +// If terminationDelay is nil (gang termination disabled), returns empty results. +func GetMinAvailableBreachedPCLQInfo(pclqs []grovecorev1alpha1.PodClique, terminationDelay *time.Duration, since time.Time) ([]string, time.Duration) { + // If terminationDelay is nil, gang termination is disabled - return empty results + if terminationDelay == nil { + return nil, 0 + } + pclqCandidateNames := make([]string, 0, len(pclqs)) waitForDurations := make([]time.Duration, 0, len(pclqs)) for _, pclq := range pclqs { @@ -98,7 +104,7 @@ func GetMinAvailableBreachedPCLQInfo(pclqs []grovecorev1alpha1.PodClique, termin } if cond.Status == metav1.ConditionTrue { pclqCandidateNames = append(pclqCandidateNames, pclq.Name) - waitFor := terminationDelay - since.Sub(cond.LastTransitionTime.Time) + waitFor := *terminationDelay - since.Sub(cond.LastTransitionTime.Time) waitForDurations = append(waitForDurations, waitFor) } } diff --git a/operator/internal/controller/common/component/utils/podcliquescalinggroup.go b/operator/internal/controller/common/component/utils/podcliquescalinggroup.go index c7382db34..ef2b32f06 100644 --- a/operator/internal/controller/common/component/utils/podcliquescalinggroup.go +++ b/operator/internal/controller/common/component/utils/podcliquescalinggroup.go @@ -19,11 +19,13 @@ package utils import ( "context" "slices" + "time" apicommon "github.com/ai-dynamo/grove/operator/api/common" grovecorev1alpha1 "github.com/ai-dynamo/grove/operator/api/core/v1alpha1" "github.com/samber/lo" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "sigs.k8s.io/controller-runtime/pkg/client" ) @@ -41,6 +43,38 @@ func FindScalingGroupConfigForClique(scalingGroupConfigs []grovecorev1alpha1.Pod return &pcsgConfig } +// FindScalingGroupConfigByName searches through the scaling group configurations to find +// the one with the specified name. +// +// Returns the matching PodCliqueScalingGroupConfig if found, or nil if not found. +func FindScalingGroupConfigByName(scalingGroupConfigs []grovecorev1alpha1.PodCliqueScalingGroupConfig, name string) *grovecorev1alpha1.PodCliqueScalingGroupConfig { + pcsgConfig, ok := lo.Find(scalingGroupConfigs, func(pcsgConfig grovecorev1alpha1.PodCliqueScalingGroupConfig) bool { + return pcsgConfig.Name == name + }) + if !ok { + return nil + } + return &pcsgConfig +} + +// GetEffectiveTerminationDelay returns the effective termination delay for a PCSG. +// It uses the PCSG-level override if set, otherwise falls back to the PCS-level default. +// Returns nil if gang termination is disabled (both PCS and PCSG terminationDelay are nil). +func GetEffectiveTerminationDelay(pcsTerminationDelay *metav1.Duration, pcsgConfig *grovecorev1alpha1.PodCliqueScalingGroupConfig) *time.Duration { + // If PCS-level terminationDelay is nil, gang termination is disabled + if pcsTerminationDelay == nil { + return nil + } + + // If PCSG config has an override, use it + if pcsgConfig != nil && pcsgConfig.TerminationDelay != nil { + return &pcsgConfig.TerminationDelay.Duration + } + + // Otherwise use PCS-level default + return &pcsTerminationDelay.Duration +} + // GetPCSGsForPCS fetches all PodCliqueScalingGroups for a PodCliqueSet. func GetPCSGsForPCS(ctx context.Context, cl client.Client, pcsObjKey client.ObjectKey) ([]grovecorev1alpha1.PodCliqueScalingGroup, error) { pcsgList, err := doGetPCSGsForPCS(ctx, cl, pcsObjKey, nil) diff --git a/operator/internal/controller/common/component/utils/podcliquescalinggroup_test.go b/operator/internal/controller/common/component/utils/podcliquescalinggroup_test.go index 6a90be75b..69a3e80d5 100644 --- a/operator/internal/controller/common/component/utils/podcliquescalinggroup_test.go +++ b/operator/internal/controller/common/component/utils/podcliquescalinggroup_test.go @@ -18,10 +18,12 @@ package utils import ( "testing" + "time" grovecorev1alpha1 "github.com/ai-dynamo/grove/operator/api/core/v1alpha1" "github.com/stretchr/testify/assert" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" ) func TestFindScalingGroupConfigForClique(t *testing.T) { @@ -105,3 +107,130 @@ func TestFindScalingGroupConfigForClique(t *testing.T) { }) } } + +func TestFindScalingGroupConfigByName(t *testing.T) { + scalingGroupConfigs := []grovecorev1alpha1.PodCliqueScalingGroupConfig{ + { + Name: "sga", + CliqueNames: []string{"pca", "pcb"}, + }, + { + Name: "sgb", + CliqueNames: []string{"pcc", "pcd"}, + }, + } + + tests := []struct { + name string + configs []grovecorev1alpha1.PodCliqueScalingGroupConfig + searchName string + expectedFound bool + expectedConfigName string + }{ + { + name: "config found by name", + configs: scalingGroupConfigs, + searchName: "sga", + expectedFound: true, + expectedConfigName: "sga", + }, + { + name: "config found by name - second config", + configs: scalingGroupConfigs, + searchName: "sgb", + expectedFound: true, + expectedConfigName: "sgb", + }, + { + name: "config not found", + configs: scalingGroupConfigs, + searchName: "nonexistent", + expectedFound: false, + expectedConfigName: "", + }, + { + name: "empty configs", + configs: []grovecorev1alpha1.PodCliqueScalingGroupConfig{}, + searchName: "sga", + expectedFound: false, + expectedConfigName: "", + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + config := FindScalingGroupConfigByName(tt.configs, tt.searchName) + assert.Equal(t, tt.expectedFound, config != nil) + if tt.expectedFound { + assert.Equal(t, tt.expectedConfigName, config.Name) + } else { + assert.Nil(t, config) + } + }) + } +} + +func TestGetEffectiveTerminationDelay(t *testing.T) { + pcsDelay := metav1.Duration{Duration: 4 * time.Hour} + pcsgDelay := metav1.Duration{Duration: 2 * time.Hour} + + tests := []struct { + name string + pcsTerminationDelay *metav1.Duration + pcsgConfig *grovecorev1alpha1.PodCliqueScalingGroupConfig + expectedDelay *time.Duration + expectedDelayDuration time.Duration + }{ + { + name: "nil PCS delay - gang termination disabled", + pcsTerminationDelay: nil, + pcsgConfig: nil, + expectedDelay: nil, + }, + { + name: "PCS delay set, no PCSG config - uses PCS delay", + pcsTerminationDelay: &pcsDelay, + pcsgConfig: nil, + expectedDelayDuration: 4 * time.Hour, + }, + { + name: "PCS delay set, PCSG config with nil override - uses PCS delay", + pcsTerminationDelay: &pcsDelay, + pcsgConfig: &grovecorev1alpha1.PodCliqueScalingGroupConfig{ + Name: "test-pcsg", + TerminationDelay: nil, + }, + expectedDelayDuration: 4 * time.Hour, + }, + { + name: "PCS delay set, PCSG config with override - uses PCSG delay", + pcsTerminationDelay: &pcsDelay, + pcsgConfig: &grovecorev1alpha1.PodCliqueScalingGroupConfig{ + Name: "test-pcsg", + TerminationDelay: &pcsgDelay, + }, + expectedDelayDuration: 2 * time.Hour, + }, + { + name: "nil PCS delay, PCSG config with override - still disabled (PCS nil takes precedence)", + pcsTerminationDelay: nil, + pcsgConfig: &grovecorev1alpha1.PodCliqueScalingGroupConfig{ + Name: "test-pcsg", + TerminationDelay: &pcsgDelay, + }, + expectedDelay: nil, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + result := GetEffectiveTerminationDelay(tt.pcsTerminationDelay, tt.pcsgConfig) + if tt.expectedDelay == nil && tt.expectedDelayDuration == 0 { + assert.Nil(t, result) + } else { + assert.NotNil(t, result) + assert.Equal(t, tt.expectedDelayDuration, *result) + } + }) + } +} diff --git a/operator/internal/controller/podclique/components/pod/syncflow.go b/operator/internal/controller/podclique/components/pod/syncflow.go index 575bcc642..281cc8d53 100644 --- a/operator/internal/controller/podclique/components/pod/syncflow.go +++ b/operator/internal/controller/podclique/components/pod/syncflow.go @@ -300,13 +300,29 @@ func (r _resource) checkAndRemovePodSchedulingGates(sc *syncContext, logger logr return skippedScheduleGatedPods, nil } -// isBasePodGangScheduled checks if the base PodGang (identified by name) is scheduled, returning errors for API failures. +// isBasePodGangScheduled checks if the base PodGang (identified by name) is scheduled, returning errors only for actual API failures. // A base PodGang is considered "scheduled" when ALL of its constituent PodCliques have achieved // their minimum required number of scheduled pods (PodClique.Status.ScheduledReplicas >= PodGroup.MinReplicas). +// +// If the base PodGang does not exist (NotFound), this returns (false, nil) rather than an error. +// This is intentional because NotFound is a legitimate transient state that occurs during: +// - Initial PodCliqueSet creation (PodGang created after pods are assigned) +// - Gang termination and recreation (old PodGang deleted, new one not yet created) +// +// Returning nil allows the caller to handle this gracefully with a normal requeue interval, +// rather than triggering exponential backoff which would slow down recreation after gang termination. func (r _resource) isBasePodGangScheduled(ctx context.Context, logger logr.Logger, namespace, basePodGangName string) (bool, error) { - // Get the base PodGang - treat all errors (including NotFound) as requeue-able basePodGang, err := componentutils.GetPodGang(ctx, r.client, basePodGangName, namespace) if err != nil { + if apierrors.IsNotFound(err) { + // Base PodGang doesn't exist yet - treat same as "not scheduled". + // This happens during initial creation or recreation after gang termination. + // The PodCliqueSet controller will create it once all pods are assigned. + logger.Info("Base PodGang not found, treating as not scheduled", + "basePodGangName", basePodGangName) + return false, nil + } + // Real API errors (network, RBAC, etc.) should still trigger error handling return false, groveerr.WrapError(err, errCodeGetPodGang, component.OperationSync, diff --git a/operator/internal/controller/podclique/components/pod/syncflow_test.go b/operator/internal/controller/podclique/components/pod/syncflow_test.go index aeee3403c..d8b1bf591 100644 --- a/operator/internal/controller/podclique/components/pod/syncflow_test.go +++ b/operator/internal/controller/podclique/components/pod/syncflow_test.go @@ -94,9 +94,9 @@ func TestCheckAndRemovePodSchedulingGates_MinAvailableAware(t *testing.T) { podHasGate: true, podInPodGang: true, expectedGateRemoved: false, - expectedSkippedPods: 0, // No skips when error occurs - expectError: true, - description: "Scaled PodGang pods should cause requeue when base PodGang missing", + expectedSkippedPods: 1, // Pod is skipped (not error) - base PodGang NotFound is treated as "not scheduled" + expectError: false, + description: "Scaled PodGang pods should skip gate removal when base PodGang missing (transient state during creation/recreation)", }, { name: "Pod not in PodGang yet", @@ -356,8 +356,8 @@ func TestIsBasePodGangScheduled(t *testing.T) { basePodGangExists: false, podCliques: []testPodClique{}, expectedScheduled: false, - expectError: true, - description: "Missing base PodGang should return error for requeue", + expectError: false, + description: "Missing base PodGang should return not-scheduled (transient state during creation/recreation)", }, { name: "Base PodGang ready - single PodClique", diff --git a/operator/internal/controller/podclique/reconcilestatus.go b/operator/internal/controller/podclique/reconcilestatus.go index efebc5f1b..17bdbb745 100644 --- a/operator/internal/controller/podclique/reconcilestatus.go +++ b/operator/internal/controller/podclique/reconcilestatus.go @@ -69,9 +69,9 @@ func (r *Reconciler) reconcileStatus(ctx context.Context, logger logr.Logger, pc // This prevents prematurely setting incorrect conditions. if pclq.Status.ObservedGeneration != nil { mutatePodCliqueScheduledCondition(pclq) - mutateMinAvailableBreachedCondition(pclq, - len(podCategories[k8sutils.PodHasAtleastOneContainerWithNonZeroExitCode]), - len(podCategories[k8sutils.PodStartedButNotReady])) + // mutate WasAvailable before MinAvailableBreached condition as it's used in the breach calculation + mutateWasAvailable(pclq) + mutateMinAvailableBreachedCondition(pclq) } // mutate the selector that will be used by an autoscaler. @@ -165,16 +165,43 @@ func mutateSelector(pcsName string, pclq *grovecorev1alpha1.PodClique) error { return nil } +// mutateWasAvailable updates the WasAvailable status field based on current pod availability. +// Once set to true, it remains true for the lifetime of the PodClique (sticky bit). +// This field is used to determine if a PodClique can be considered in breach of MinAvailable - +// a PodClique must have been available at least once before it can be considered breached. +func mutateWasAvailable(pclq *grovecorev1alpha1.PodClique) { + // Once WasAvailable is true, it stays true + if pclq.Status.WasAvailable { + return + } + // Don't set WasAvailable during rolling updates to avoid false positives + if componentutils.IsPCLQUpdateInProgress(pclq) { + return + } + // Check if current ready replicas meets or exceeds MinAvailable threshold + minAvailable := int32(0) + if pclq.Spec.MinAvailable != nil { + minAvailable = *pclq.Spec.MinAvailable + } + if pclq.Status.ReadyReplicas >= minAvailable { + pclq.Status.WasAvailable = true + } +} + // mutateMinAvailableBreachedCondition updates the MinAvailableBreached condition based on pod availability -func mutateMinAvailableBreachedCondition(pclq *grovecorev1alpha1.PodClique, numNotReadyPodsWithContainersInError, numPodsStartedButNotReady int) { - newCondition := computeMinAvailableBreachedCondition(pclq, numNotReadyPodsWithContainersInError, numPodsStartedButNotReady) +func mutateMinAvailableBreachedCondition(pclq *grovecorev1alpha1.PodClique) { + newCondition := computeMinAvailableBreachedCondition(pclq) if k8sutils.HasConditionChanged(pclq.Status.Conditions, newCondition) { meta.SetStatusCondition(&pclq.Status.Conditions, newCondition) } } -// computeMinAvailableBreachedCondition calculates the MinAvailableBreached condition status based on pod availability -func computeMinAvailableBreachedCondition(pclq *grovecorev1alpha1.PodClique, numPodsHavingAtleastOneContainerWithNonZeroExitCode, numPodsStartedButNotReady int) metav1.Condition { +// computeMinAvailableBreachedCondition calculates the MinAvailableBreached condition status based on pod availability. +// The breach calculation considers: +// 1. Rolling update status - during updates, condition is Unknown to prevent false terminations +// 2. WasAvailable status - a PodClique must have been available once before it can be considered breached +// 3. Only ready pods count toward availability - starting pods do NOT count +func computeMinAvailableBreachedCondition(pclq *grovecorev1alpha1.PodClique) metav1.Condition { if componentutils.IsPCLQUpdateInProgress(pclq) { return metav1.Condition{ Type: constants.ConditionTypeMinAvailableBreached, @@ -183,43 +210,48 @@ func computeMinAvailableBreachedCondition(pclq *grovecorev1alpha1.PodClique, num Message: "Update is in progress", } } + // dereferencing is considered safe as MinAvailable will always be set by the defaulting webhook. If this changes in the future, // make sure that you check for nil explicitly. minAvailable := int(*pclq.Spec.MinAvailable) - scheduledReplicas := int(pclq.Status.ScheduledReplicas) now := metav1.Now() - // If the number of scheduled pods is less than the minimum available, then minAvailable is not considered as breached. - // Consider a case where none of the PodCliques have been scheduled yet, then it should not cause the PodGang to be recreated all the time. - if scheduledReplicas < minAvailable { + // Only ready pods count toward availability for breach detection. + // Starting pods (pods that have started but aren't ready yet) do NOT count. + // This ensures that a PodClique is considered in breach as soon as ready pods + // drop below MinAvailable, regardless of how many pods are still starting. + readyReplicas := int(pclq.Status.ReadyReplicas) + + // If we have sufficient ready pods, there's no breach - no need to check WasAvailable + if readyReplicas >= minAvailable { return metav1.Condition{ Type: constants.ConditionTypeMinAvailableBreached, Status: metav1.ConditionFalse, - Reason: constants.ConditionReasonInsufficientScheduledPods, - Message: fmt.Sprintf("Insufficient scheduled pods. expected at least: %d, found: %d", minAvailable, scheduledReplicas), + Reason: constants.ConditionReasonSufficientReadyPods, + Message: fmt.Sprintf("Sufficient ready pods found. Expected at least: %d, found: %d", minAvailable, readyReplicas), LastTransitionTime: now, } } - readyOrStartingPods := scheduledReplicas - numPodsHavingAtleastOneContainerWithNonZeroExitCode - numPodsStartedButNotReady - // pclq.Status.ReadyReplicas do not account for Pods which are not yet ready and are in the process of starting/initializing. - // This allows sufficient time specially for pods that have long-running init containers or slow-to-start main containers. - // Therefore, we take Pods that are NotReady and at least one of their containers have exited with a non-zero exit code. Kubelet - // has attempted to start the containers within the Pod at least once and failed. These pods count towards unavailability. - if readyOrStartingPods < minAvailable { + // At this point, readyReplicas < minAvailable. Check if this is a real breach or just startup. + // A PodClique can only be considered in breach of MinAvailable if it was previously available. + // This prevents false breach detection during initial startup when pods are still being created/scheduled. + if !pclq.Status.WasAvailable { return metav1.Condition{ Type: constants.ConditionTypeMinAvailableBreached, - Status: metav1.ConditionTrue, - Reason: constants.ConditionReasonInsufficientReadyPods, - Message: fmt.Sprintf("Insufficient ready or starting pods. expected at least: %d, found: %d", minAvailable, readyOrStartingPods), + Status: metav1.ConditionFalse, + Reason: constants.ConditionReasonNeverAvailable, + Message: fmt.Sprintf("PodClique has never reached MinAvailable threshold of %d, cannot be considered breached", minAvailable), LastTransitionTime: now, } } + + // WasAvailable is true and readyReplicas < minAvailable - this is a real breach return metav1.Condition{ Type: constants.ConditionTypeMinAvailableBreached, - Status: metav1.ConditionFalse, - Reason: constants.ConditionReasonSufficientReadyPods, - Message: fmt.Sprintf("Either sufficient ready or starting pods found. expected at least: %d, found: %d", minAvailable, readyOrStartingPods), + Status: metav1.ConditionTrue, + Reason: constants.ConditionReasonInsufficientReadyPods, + Message: fmt.Sprintf("Insufficient ready pods. Expected at least: %d, found: %d", minAvailable, readyReplicas), LastTransitionTime: now, } } diff --git a/operator/internal/controller/podcliquescalinggroup/components/podclique/sync.go b/operator/internal/controller/podcliquescalinggroup/components/podclique/sync.go index 08192b0c1..5f148cb6c 100644 --- a/operator/internal/controller/podcliquescalinggroup/components/podclique/sync.go +++ b/operator/internal/controller/podcliquescalinggroup/components/podclique/sync.go @@ -47,6 +47,10 @@ type syncContext struct { pcsgIndicesToRequeue []string expectedPCLQFQNsPerPCSGReplica map[int][]string expectedPCLQPodTemplateHashMap map[string]string + // effectiveTerminationDelay is the termination delay to use for gang termination. + // It is the PCSG-level override if set, otherwise the PCS-level default. + // When nil, gang termination is disabled for this PCSG. + effectiveTerminationDelay *time.Duration } // prepareSyncContext creates and initializes the synchronization context with all necessary data for PCSG reconciliation @@ -76,10 +80,17 @@ func (r _resource) prepareSyncContext(ctx context.Context, logger logr.Logger, p return nil, err } + // compute the effective termination delay for this PCSG (PCSG override if set, otherwise PCS default) + pcsgConfig := findMatchingPCSGConfig(syncCtx.pcs, pcsg) + syncCtx.effectiveTerminationDelay = componentutils.GetEffectiveTerminationDelay(syncCtx.pcs.Spec.Template.TerminationDelay, pcsgConfig) + // compute the PCSG indices that have their MinAvailableBreached condition set to true. Segregated these into two // pcsgIndicesToTerminate will have the indices for which the TerminationDelay has expired. // pcsgIndicesToRequeue will have the indices for which the TerminationDelay has not yet expired. - syncCtx.pcsgIndicesToTerminate, syncCtx.pcsgIndicesToRequeue = getMinAvailableBreachedPCSGIndices(logger, syncCtx.existingPCLQs, syncCtx.pcs.Spec.Template.TerminationDelay.Duration) + // Only compute if gang termination is enabled (terminationDelay is not nil) + if syncCtx.effectiveTerminationDelay != nil { + syncCtx.pcsgIndicesToTerminate, syncCtx.pcsgIndicesToRequeue = getMinAvailableBreachedPCSGIndices(logger, syncCtx.existingPCLQs, *syncCtx.effectiveTerminationDelay) + } // pre-compute expected PodTemplateHash for each PCLQ syncCtx.expectedPCLQPodTemplateHashMap = getExpectedPCLQPodTemplateHashMap(syncCtx.pcs, pcsg) @@ -87,6 +98,23 @@ func (r _resource) prepareSyncContext(ctx context.Context, logger logr.Logger, p return syncCtx, nil } +// findMatchingPCSGConfig finds the PodCliqueScalingGroupConfig that matches the given PCSG. +// It returns nil if no matching config is found. +func findMatchingPCSGConfig(pcs *grovecorev1alpha1.PodCliqueSet, pcsg *grovecorev1alpha1.PodCliqueScalingGroup) *grovecorev1alpha1.PodCliqueScalingGroupConfig { + pcsReplicaIndex, err := k8sutils.GetPodCliqueSetReplicaIndex(pcsg.ObjectMeta) + if err != nil { + return nil + } + matchingPCSGConfig, ok := lo.Find(pcs.Spec.Template.PodCliqueScalingGroupConfigs, func(pcsgConfig grovecorev1alpha1.PodCliqueScalingGroupConfig) bool { + pcsgFQN := apicommon.GeneratePodCliqueScalingGroupName(apicommon.ResourceNameReplica{Name: pcs.Name, Replica: pcsReplicaIndex}, pcsgConfig.Name) + return pcsgFQN == pcsg.Name + }) + if !ok { + return nil + } + return &matchingPCSGConfig +} + // runSyncFlow executes the main synchronization logic for PodCliqueScalingGroup including replica management and updates func (r _resource) runSyncFlow(logger logr.Logger, sc *syncContext) error { // If there are excess PodCliques than expected, delete the ones that are no longer expected but existing. @@ -117,7 +145,8 @@ func (r _resource) runSyncFlow(logger logr.Logger, sc *syncContext) error { // If there are any PCSG replicas which have minAvailableBreached but the terminationDelay has not yet expired, then // requeue the event after a fixed delay. - if len(sc.pcsgIndicesToRequeue) > 0 { + // Only requeue if gang termination is enabled (terminationDelay is not nil) + if sc.effectiveTerminationDelay != nil && len(sc.pcsgIndicesToRequeue) > 0 { return groveerr.New(groveerr.ErrCodeRequeueAfter, component.OperationSync, "Requeuing to re-process PCLQs that have breached MinAvailable but not crossed TerminationDelay", @@ -204,6 +233,10 @@ func (r _resource) createExpectedPCLQs(logger logr.Logger, sc *syncContext) erro // processMinAvailableBreachedPCSGReplicas handles gang termination of PCSG replicas that have breached minimum availability requirements func (r _resource) processMinAvailableBreachedPCSGReplicas(logger logr.Logger, sc *syncContext) error { + // If gang termination is disabled (terminationDelay is nil), skip processing + if sc.effectiveTerminationDelay == nil { + return nil + } // If pcsg.spec.minAvailable is breached, then delegate the responsibility to the PodCliqueSet reconciler which after // termination delay terminate the PodCliqueSet replica. No further processing is required to be done here. minAvailableBreachedPCSGReplicas := len(sc.pcsgIndicesToTerminate) + len(sc.pcsgIndicesToRequeue) @@ -214,7 +247,7 @@ func (r _resource) processMinAvailableBreachedPCSGReplicas(logger logr.Logger, s // its minAvailable breached for a duration > terminationDelay then gang terminate such PCSG replicas. if len(sc.pcsgIndicesToTerminate) > 0 { logger.Info("Identified PodCliqueScalingGroup indices for gang termination", "indices", sc.pcsgIndicesToTerminate) - reason := fmt.Sprintf("Delete PodCliques %v for PodCliqueScalingGroup %v which have breached MinAvailable longer than TerminationDelay: %s", sc.pcsgIndicesToTerminate, client.ObjectKeyFromObject(sc.pcsg), sc.pcs.Spec.Template.TerminationDelay.Duration) + reason := fmt.Sprintf("Delete PodCliques %v for PodCliqueScalingGroup %v which have breached MinAvailable longer than TerminationDelay: %s", sc.pcsgIndicesToTerminate, client.ObjectKeyFromObject(sc.pcsg), *sc.effectiveTerminationDelay) pclqGangTerminationTasks := r.createDeleteTasks(logger, sc.pcs, sc.pcsg.Name, sc.pcsgIndicesToTerminate, reason) if err := r.triggerDeletionOfPodCliques(sc.ctx, logger, client.ObjectKeyFromObject(sc.pcsg), pclqGangTerminationTasks); err != nil { return err @@ -234,7 +267,7 @@ func getMinAvailableBreachedPCSGIndices(logger logr.Logger, existingPCLQs []grov pcsgReplicaIndexPCLQs := componentutils.GroupPCLQsByPCSGReplicaIndex(existingPCLQs) // For each PCSG replica check if minAvailable for any constituent PCLQ has been violated. Those PCSG replicas should be marked for termination. for pcsgReplicaIndex, pclqs := range pcsgReplicaIndexPCLQs { - pclqNames, minWaitFor := componentutils.GetMinAvailableBreachedPCLQInfo(pclqs, terminationDelay, now) + pclqNames, minWaitFor := componentutils.GetMinAvailableBreachedPCLQInfo(pclqs, &terminationDelay, now) if len(pclqNames) > 0 { logger.Info("minAvailable breached for PCLQs", "pcsgReplicaIndex", pcsgReplicaIndex, "pclqNames", pclqNames, "minWaitFor", minWaitFor) if minWaitFor <= 0 { diff --git a/operator/internal/controller/podcliqueset/components/podcliquesetreplica/gangterminate.go b/operator/internal/controller/podcliqueset/components/podcliquesetreplica/gangterminate.go index 23379906b..cd2ac26df 100644 --- a/operator/internal/controller/podcliqueset/components/podcliquesetreplica/gangterminate.go +++ b/operator/internal/controller/podcliqueset/components/podcliquesetreplica/gangterminate.go @@ -65,19 +65,26 @@ func (d deletionWork) hasPendingPCSReplicaDeletion() bool { } // getPCSReplicaDeletionWork identifies PCS replicas that need termination due to MinAvailable breaches. +// If terminationDelay is nil (gang termination disabled), returns empty work. func (r _resource) getPCSReplicaDeletionWork(ctx context.Context, logger logr.Logger, pcs *grovecorev1alpha1.PodCliqueSet) (*deletionWork, error) { + work := &deletionWork{ + minAvailableBreachedConstituents: make(map[int][]string), + } + + // If PCS-level terminationDelay is nil, gang termination is disabled - return empty work + if pcs.Spec.Template.TerminationDelay == nil { + return work, nil + } + var ( now = time.Now() pcsObjectKey = client.ObjectKeyFromObject(pcs) terminationDelay = pcs.Spec.Template.TerminationDelay.Duration deletionTasks = make([]utils.Task, 0, pcs.Spec.Replicas) - work = &deletionWork{ - minAvailableBreachedConstituents: make(map[int][]string), - } ) for pcsReplicaIndex := range int(pcs.Spec.Replicas) { - breachedPCSGNames, minPCSGWaitFor, err := r.getMinAvailableBreachedPCSGs(ctx, pcsObjectKey, pcsReplicaIndex, terminationDelay, now) + breachedPCSGNames, minPCSGWaitFor, err := r.getMinAvailableBreachedPCSGs(ctx, pcs, pcsReplicaIndex, now) if err != nil { return nil, err } @@ -105,7 +112,9 @@ func (r _resource) getPCSReplicaDeletionWork(ctx context.Context, logger logr.Lo } // getMinAvailableBreachedPCSGs retrieves PCSGs that have breached MinAvailable for a PCS replica. -func (r _resource) getMinAvailableBreachedPCSGs(ctx context.Context, pcsObjKey client.ObjectKey, pcsReplicaIndex int, terminationDelay time.Duration, since time.Time) ([]string, time.Duration, error) { +// It uses the effective terminationDelay for each PCSG (PCSG override if set, otherwise PCS default). +func (r _resource) getMinAvailableBreachedPCSGs(ctx context.Context, pcs *grovecorev1alpha1.PodCliqueSet, pcsReplicaIndex int, since time.Time) ([]string, time.Duration, error) { + pcsObjKey := client.ObjectKeyFromObject(pcs) pcsgList := &grovecorev1alpha1.PodCliqueScalingGroupList{} if err := r.client.List(ctx, pcsgList, @@ -119,11 +128,12 @@ func (r _resource) getMinAvailableBreachedPCSGs(ctx context.Context, pcsObjKey c ); err != nil { return nil, 0, err } - breachedPCSGNames, minWaitFor := getMinAvailableBreachedPCSGInfo(pcsgList.Items, terminationDelay, since) + breachedPCSGNames, minWaitFor := getMinAvailableBreachedPCSGInfoWithEffectiveDelay(pcsgList.Items, pcs, since) return breachedPCSGNames, minWaitFor, nil } // getMinAvailableBreachedPCLQsNotInPCSG retrieves standalone PCLQs that have breached MinAvailable. +// Standalone PCLQs use the PCS-level terminationDelay (no PCSG override available). func (r _resource) getMinAvailableBreachedPCLQsNotInPCSG(ctx context.Context, pcs *grovecorev1alpha1.PodCliqueSet, pcsReplicaIndex int, since time.Time) (breachedPCLQNames []string, minWaitFor time.Duration, skipPCSReplica bool, err error) { pclqFQNsNotInPCSG := make([]string, 0, len(pcs.Spec.Template.Cliques)) for _, pclqTemplateSpec := range pcs.Spec.Template.Cliques { @@ -143,7 +153,9 @@ func (r _resource) getMinAvailableBreachedPCLQsNotInPCSG(ctx context.Context, pc skipPCSReplica = true return } - breachedPCLQNames, minWaitFor = componentutils.GetMinAvailableBreachedPCLQInfo(pclqs, pcs.Spec.Template.TerminationDelay.Duration, since) + // Use PCS-level terminationDelay for standalone PCLQs (already validated non-nil at this point) + terminationDelay := &pcs.Spec.Template.TerminationDelay.Duration + breachedPCLQNames, minWaitFor = componentutils.GetMinAvailableBreachedPCLQInfo(pclqs, terminationDelay, since) return } @@ -163,9 +175,11 @@ func (r _resource) getExistingPCLQsByNames(ctx context.Context, namespace string return pclqs, notFoundPCLQFQNs, nil } -// getMinAvailableBreachedPCSGInfo filters PodCliqueScalingGroups that have grovecorev1alpha1.ConditionTypeMinAvailableBreached set to true. +// getMinAvailableBreachedPCSGInfoWithEffectiveDelay filters PodCliqueScalingGroups that have +// grovecorev1alpha1.ConditionTypeMinAvailableBreached set to true, using the effective terminationDelay +// for each PCSG (PCSG override if set, otherwise PCS default). // It returns the names of all such PodCliqueScalingGroups and minimum of all the waitDurations. -func getMinAvailableBreachedPCSGInfo(pcsgs []grovecorev1alpha1.PodCliqueScalingGroup, terminationDelay time.Duration, since time.Time) ([]string, time.Duration) { +func getMinAvailableBreachedPCSGInfoWithEffectiveDelay(pcsgs []grovecorev1alpha1.PodCliqueScalingGroup, pcs *grovecorev1alpha1.PodCliqueSet, since time.Time) ([]string, time.Duration) { pcsgCandidateNames := make([]string, 0, len(pcsgs)) waitForDurations := make([]time.Duration, 0, len(pcsgs)) for _, pcsg := range pcsgs { @@ -174,8 +188,24 @@ func getMinAvailableBreachedPCSGInfo(pcsgs []grovecorev1alpha1.PodCliqueScalingG continue } if cond.Status == metav1.ConditionTrue { + // Get the PCSG short name from the label to find the matching config + pcsgShortName, ok := pcsg.Labels[apicommon.LabelPodCliqueScalingGroup] + if !ok { + // If no label, skip this PCSG (shouldn't happen for properly created PCSGs) + continue + } + + // Get the PCSG config to check for override + pcsgConfig := componentutils.FindScalingGroupConfigByName(pcs.Spec.Template.PodCliqueScalingGroupConfigs, pcsgShortName) + effectiveDelay := componentutils.GetEffectiveTerminationDelay(pcs.Spec.Template.TerminationDelay, pcsgConfig) + + // If effective delay is nil (shouldn't happen at this point since we checked at PCS level), skip this PCSG + if effectiveDelay == nil { + continue + } + pcsgCandidateNames = append(pcsgCandidateNames, pcsg.Name) - waitFor := terminationDelay - since.Sub(cond.LastTransitionTime.Time) + waitFor := *effectiveDelay - since.Sub(cond.LastTransitionTime.Time) waitForDurations = append(waitForDurations, waitFor) } } diff --git a/operator/internal/webhook/admission/pcs/defaulting/handler_test.go b/operator/internal/webhook/admission/pcs/defaulting/handler_test.go index b5b7a8ab3..05a7246ce 100644 --- a/operator/internal/webhook/admission/pcs/defaulting/handler_test.go +++ b/operator/internal/webhook/admission/pcs/defaulting/handler_test.go @@ -147,8 +147,8 @@ func TestDefault(t *testing.T) { verify: func(t *testing.T, obj runtime.Object) { pcs, ok := obj.(*grovecorev1alpha1.PodCliqueSet) require.True(t, ok) - // Verify that the termination delay is set (this is one of the defaults applied) - assert.NotNil(t, pcs.Spec.Template.TerminationDelay) + // TerminationDelay should be nil by default (gang termination disabled) + assert.Nil(t, pcs.Spec.Template.TerminationDelay) }, }, { diff --git a/operator/internal/webhook/admission/pcs/defaulting/podcliqueset.go b/operator/internal/webhook/admission/pcs/defaulting/podcliqueset.go index ac6343808..cf4d0179f 100644 --- a/operator/internal/webhook/admission/pcs/defaulting/podcliqueset.go +++ b/operator/internal/webhook/admission/pcs/defaulting/podcliqueset.go @@ -17,20 +17,13 @@ package defaulting import ( - "time" - grovecorev1alpha1 "github.com/ai-dynamo/grove/operator/api/core/v1alpha1" "github.com/ai-dynamo/grove/operator/internal/utils" corev1 "k8s.io/api/core/v1" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/utils/ptr" ) -const ( - defaultTerminationDelay = 4 * time.Hour -) - // defaultPodCliqueSet adds defaults to a PodCliqueSet. func defaultPodCliqueSet(pcs *grovecorev1alpha1.PodCliqueSet) { if utils.IsEmptyStringType(pcs.Namespace) { @@ -45,13 +38,10 @@ func defaultPodCliqueSetSpec(spec *grovecorev1alpha1.PodCliqueSetSpec) { } // defaultPodCliqueSetTemplateSpec applies defaults to the template specification including cliques, scaling groups, and service configuration. +// Note: TerminationDelay is intentionally not defaulted - when nil, gang termination is disabled. func defaultPodCliqueSetTemplateSpec(spec *grovecorev1alpha1.PodCliqueSetTemplateSpec) { spec.Cliques = defaultPodCliqueTemplateSpecs(spec.Cliques) spec.PodCliqueScalingGroupConfigs = defaultPodCliqueScalingGroupConfigs(spec.PodCliqueScalingGroupConfigs) - if spec.TerminationDelay == nil { - spec.TerminationDelay = &metav1.Duration{Duration: defaultTerminationDelay} - } - spec.HeadlessServiceConfig = defaultHeadlessServiceConfig(spec.HeadlessServiceConfig) } diff --git a/operator/internal/webhook/admission/pcs/defaulting/podcliqueset_test.go b/operator/internal/webhook/admission/pcs/defaulting/podcliqueset_test.go index 613afe8c3..e68f589f6 100644 --- a/operator/internal/webhook/admission/pcs/defaulting/podcliqueset_test.go +++ b/operator/internal/webhook/admission/pcs/defaulting/podcliqueset_test.go @@ -18,7 +18,6 @@ package defaulting import ( "testing" - "time" grovecorev1alpha1 "github.com/ai-dynamo/grove/operator/api/core/v1alpha1" @@ -55,7 +54,7 @@ func TestDefaultPodCliqueSet(t *testing.T) { HeadlessServiceConfig: &grovecorev1alpha1.HeadlessServiceConfig{ PublishNotReadyAddresses: true, }, - TerminationDelay: &metav1.Duration{Duration: 4 * time.Hour}, + // TerminationDelay is nil by default, meaning gang termination is disabled }, }, } diff --git a/operator/internal/webhook/admission/pcs/validation/podcliqueset.go b/operator/internal/webhook/admission/pcs/validation/podcliqueset.go index 303dccac1..b768b2c2a 100644 --- a/operator/internal/webhook/admission/pcs/validation/podcliqueset.go +++ b/operator/internal/webhook/admission/pcs/validation/podcliqueset.go @@ -230,6 +230,16 @@ func (v *pcsValidator) validatePodCliqueScalingGroupConfigs(fldPath *field.Path) allErrs = append(allErrs, field.Invalid(fldPath.Index(i).Child("scaleConfig", "minReplicas"), *scalingGroupConfig.ScaleConfig.MinReplicas, "scaleConfig.minReplicas must be greater than or equal to minAvailable")) } } + + // validate TerminationDelay field + if scalingGroupConfig.TerminationDelay != nil { + // PCSG terminationDelay can only be set if PCS-level terminationDelay is set (gang termination is enabled) + if v.pcs.Spec.Template.TerminationDelay == nil { + allErrs = append(allErrs, field.Forbidden(fldPath.Index(i).Child("terminationDelay"), "terminationDelay can only be set on PodCliqueScalingGroupConfig when PodCliqueSetTemplateSpec.terminationDelay is set (gang termination is enabled)")) + } else if scalingGroupConfig.TerminationDelay.Duration <= 0 { + allErrs = append(allErrs, field.Invalid(fldPath.Index(i).Child("terminationDelay"), scalingGroupConfig.TerminationDelay, "terminationDelay must be greater than 0")) + } + } } // validate that the scaling group names are unique @@ -248,13 +258,14 @@ func (v *pcsValidator) validatePodCliqueScalingGroupConfigs(fldPath *field.Path) return allErrs } -// validateTerminationDelay validates that terminationDelay is set and greater than zero. +// validateTerminationDelay validates that terminationDelay, if set, is greater than zero. +// When terminationDelay is nil, gang termination is disabled for the entire PodCliqueSet. func (v *pcsValidator) validateTerminationDelay(fldPath *field.Path) field.ErrorList { allErrs := field.ErrorList{} - // This should ideally not happen, the defaulting webhook will always set the default value for terminationDelay. + // terminationDelay is optional - when nil, gang termination is disabled if v.pcs.Spec.Template.TerminationDelay == nil { - return append(allErrs, field.Required(fldPath, "terminationDelay is required")) + return allErrs } if v.pcs.Spec.Template.TerminationDelay.Duration <= 0 { allErrs = append(allErrs, field.Invalid(fldPath, v.pcs.Spec.Template.TerminationDelay, "terminationDelay must be greater than 0"))