Skip to content

Commit b8c0f4d

Browse files
authored
fix: handle edge cases in compartment-based deployment rollouts (#118)
* fix: cleanup stale compartments from status * fix: handle incorrect deployment policy * fix: blocked nodes in batches * fix: switching compartments mid-rollout may cause faulty deltas
1 parent f5e0d80 commit b8c0f4d

File tree

4 files changed

+430
-79
lines changed

4 files changed

+430
-79
lines changed

operator/internal/controller/cluster_state_v2.go

Lines changed: 195 additions & 79 deletions
Original file line numberDiff line numberDiff line change
@@ -123,32 +123,62 @@ func BuildState(skyhooks *v1alpha1.SkyhookList, nodes *corev1.NodeList, deployme
123123
for _, node := range ret.skyhooks[idx].GetNodes() {
124124
ret.skyhooks[idx].AddCompartmentNode(v1alpha1.DefaultCompartmentName, node)
125125
}
126-
}
127-
128-
for _, deploymentPolicy := range deploymentPolicies.Items {
129-
if deploymentPolicy.Name == skyhook.Spec.DeploymentPolicy {
130-
for _, compartment := range deploymentPolicy.Spec.Compartments {
131-
// Load persisted batch state from CompartmentStatuses if it exists
132-
var batchState *v1alpha1.BatchProcessingState
126+
} else {
127+
// Deployment policy is specified - try to find it
128+
policyFound := false
129+
for _, deploymentPolicy := range deploymentPolicies.Items {
130+
if deploymentPolicy.Name == skyhook.Spec.DeploymentPolicy {
131+
policyFound = true
132+
for _, compartment := range deploymentPolicy.Spec.Compartments {
133+
// Load persisted batch state from CompartmentStatuses if it exists
134+
var batchState *v1alpha1.BatchProcessingState
135+
if skyhook.Status.CompartmentStatuses != nil {
136+
if status, exists := skyhook.Status.CompartmentStatuses[compartment.Name]; exists && status.BatchState != nil {
137+
batchState = status.BatchState
138+
}
139+
}
140+
ret.skyhooks[idx].AddCompartment(compartment.Name, wrapper.NewCompartmentWrapper(&compartment, batchState))
141+
}
142+
// use policy default
143+
var defaultBatchState *v1alpha1.BatchProcessingState
133144
if skyhook.Status.CompartmentStatuses != nil {
134-
if status, exists := skyhook.Status.CompartmentStatuses[compartment.Name]; exists && status.BatchState != nil {
135-
batchState = status.BatchState
145+
if status, exists := skyhook.Status.CompartmentStatuses[v1alpha1.DefaultCompartmentName]; exists && status.BatchState != nil {
146+
defaultBatchState = status.BatchState
136147
}
137148
}
138-
ret.skyhooks[idx].AddCompartment(compartment.Name, wrapper.NewCompartmentWrapper(&compartment, batchState))
149+
ret.skyhooks[idx].AddCompartment(v1alpha1.DefaultCompartmentName, wrapper.NewCompartmentWrapper(&v1alpha1.Compartment{
150+
Name: v1alpha1.DefaultCompartmentName,
151+
Budget: deploymentPolicy.Spec.Default.Budget,
152+
Strategy: deploymentPolicy.Spec.Default.Strategy,
153+
}, defaultBatchState))
154+
break
139155
}
140-
// use policy default
141-
var defaultBatchState *v1alpha1.BatchProcessingState
142-
if skyhook.Status.CompartmentStatuses != nil {
143-
if status, exists := skyhook.Status.CompartmentStatuses[v1alpha1.DefaultCompartmentName]; exists && status.BatchState != nil {
144-
defaultBatchState = status.BatchState
156+
}
157+
158+
// If deployment policy was specified but not found, mark it for error handling
159+
if !policyFound {
160+
ret.skyhooks[idx].GetSkyhook().AddCondition(metav1.Condition{
161+
Type: fmt.Sprintf("%s/DeploymentPolicyNotFound", v1alpha1.METADATA_PREFIX),
162+
Status: metav1.ConditionTrue,
163+
ObservedGeneration: skyhook.Generation,
164+
LastTransitionTime: metav1.Now(),
165+
Reason: "DeploymentPolicyNotFound",
166+
Message: fmt.Sprintf("DeploymentPolicy %q not found", skyhook.Spec.DeploymentPolicy),
167+
})
168+
ret.skyhooks[idx].GetSkyhook().Updated = true
169+
} else {
170+
// Policy found - clear any previous error condition if it exists
171+
if ret.skyhooks[idx].GetSkyhook().Status.Conditions != nil {
172+
for i, cond := range ret.skyhooks[idx].GetSkyhook().Status.Conditions {
173+
if cond.Type == fmt.Sprintf("%s/DeploymentPolicyNotFound", v1alpha1.METADATA_PREFIX) {
174+
// Remove the condition by creating a new slice without it
175+
conditions := ret.skyhooks[idx].GetSkyhook().Status.Conditions
176+
ret.skyhooks[idx].GetSkyhook().Status.Conditions = append(conditions[:i], conditions[i+1:]...)
177+
ret.skyhooks[idx].GetSkyhook().Updated = true
178+
break
179+
}
145180
}
146181
}
147-
ret.skyhooks[idx].AddCompartment(v1alpha1.DefaultCompartmentName, wrapper.NewCompartmentWrapper(&v1alpha1.Compartment{
148-
Name: v1alpha1.DefaultCompartmentName,
149-
Budget: deploymentPolicy.Spec.Default.Budget,
150-
Strategy: deploymentPolicy.Spec.Default.Strategy,
151-
}, defaultBatchState))
152182
}
153183
}
154184
}
@@ -567,7 +597,18 @@ func IntrospectSkyhook(skyhook SkyhookNodes, allSkyhooks []SkyhookNodes) bool {
567597
scrStatus := skyhook.Status()
568598
collectNodeStatus := skyhook.CollectNodeStatus()
569599

570-
// override the node status if the skyhook is in a skyhook controlled state. (e.g. disabled, paused, waiting)
600+
// Check if deployment policy is missing - this should block the skyhook
601+
hasMissingPolicy := false
602+
if skyhook.GetSkyhook().Status.Conditions != nil {
603+
for _, cond := range skyhook.GetSkyhook().Status.Conditions {
604+
if cond.Type == fmt.Sprintf("%s/DeploymentPolicyNotFound", v1alpha1.METADATA_PREFIX) && cond.Status == metav1.ConditionTrue {
605+
hasMissingPolicy = true
606+
break
607+
}
608+
}
609+
}
610+
611+
// override the node status if the skyhook is in a skyhook controlled state. (e.g. disabled, paused, waiting, blocked)
571612
if collectNodeStatus != v1alpha1.StatusComplete {
572613
switch {
573614
case skyhook.IsDisabled():
@@ -576,11 +617,17 @@ func IntrospectSkyhook(skyhook SkyhookNodes, allSkyhooks []SkyhookNodes) bool {
576617
case skyhook.IsPaused():
577618
collectNodeStatus = v1alpha1.StatusPaused
578619

620+
case hasMissingPolicy:
621+
collectNodeStatus = v1alpha1.StatusBlocked
622+
579623
default:
580624
if nextSkyhook := GetNextSkyhook(allSkyhooks); nextSkyhook != nil && nextSkyhook != skyhook {
581625
collectNodeStatus = v1alpha1.StatusWaiting
582626
}
583627
}
628+
} else if hasMissingPolicy {
629+
// Even if all nodes are complete, if policy is missing, we should still be blocked
630+
collectNodeStatus = v1alpha1.StatusBlocked
584631
}
585632

586633
if scrStatus != collectNodeStatus {
@@ -617,6 +664,40 @@ func evaluateCompletedBatches(skyhook SkyhookNodes) bool {
617664
if isComplete, successCount, failureCount := compartment.EvaluateCurrentBatch(); isComplete {
618665
batchSize := successCount + failureCount
619666

667+
// Count blocked nodes to determine if we should skip batch evaluation
668+
blockedCount := 0
669+
for _, node := range compartment.GetNodes() {
670+
if node.Status() == v1alpha1.StatusBlocked {
671+
blockedCount++
672+
}
673+
}
674+
675+
// If batchSize is 0 but batch is complete, check if all nodes are blocked
676+
// If all nodes are blocked, don't advance the batch - wait for them to become unblocked
677+
// Blocked nodes are not failures, they're just temporarily unable to proceed
678+
if batchSize == 0 {
679+
// If all nodes in the compartment are blocked, skip batch evaluation
680+
// The batch will be re-evaluated when nodes become unblocked
681+
if blockedCount > 0 && blockedCount == len(compartment.GetNodes()) {
682+
continue // Skip this compartment - all nodes blocked, wait for them to unblock
683+
}
684+
// If some nodes are blocked but not all, use blocked count as batch size
685+
// This handles mixed batches (some blocked, some completed/failed)
686+
if blockedCount > 0 {
687+
batchSize = blockedCount
688+
} else if compartment.GetBatchState().LastBatchSize > 0 {
689+
batchSize = compartment.GetBatchState().LastBatchSize
690+
}
691+
}
692+
693+
// If batch has blocked nodes but no successes/failures, don't treat as failure
694+
// Blocked nodes should not increment consecutive failures
695+
// Only evaluate if we have actual progress (successes or failures)
696+
if batchSize > 0 && successCount == 0 && failureCount == 0 && blockedCount == batchSize {
697+
// All nodes in batch are blocked - skip evaluation to avoid false failures
698+
continue
699+
}
700+
620701
// Update the compartment's batch state using strategy logic
621702
compartment.EvaluateAndUpdateBatchState(batchSize, successCount, failureCount)
622703

@@ -806,66 +887,14 @@ func (skyhook *skyhookNodes) ReportState() {
806887
}
807888
}
808889

809-
// Update compartment statuses if compartments exist
810-
if len(skyhook.compartments) > 0 {
811-
if skyhook.skyhook.Status.CompartmentStatuses == nil {
812-
skyhook.skyhook.Status.CompartmentStatuses = make(map[string]v1alpha1.CompartmentStatus)
813-
}
814-
815-
for name, compartment := range skyhook.compartments {
816-
newStatus := buildCompartmentStatus(compartment)
817-
if existing, ok := skyhook.skyhook.Status.CompartmentStatuses[name]; !ok || !compartmentStatusEqual(existing, newStatus) {
818-
skyhook.skyhook.Status.CompartmentStatuses[name] = newStatus
819-
skyhook.skyhook.Updated = true
820-
}
821-
}
822-
}
823-
824-
// reset metrics to zero
825-
ResetSkyhookMetricsToZero(skyhook)
826-
827-
// Set skyhook status metrics
828-
SetSkyhookStatusMetrics(skyhookName, skyhook.Status(), true)
829-
830-
// Set target count and node status metrics
831-
SetNodeTargetCountMetrics(skyhookName, float64(nodeCount))
832-
for status, count := range nodeStatusCounts {
833-
SetNodeStatusMetrics(skyhookName, status, float64(count))
834-
}
835-
836-
// Set package state and stage metrics
837-
for _package, versions := range packageStateStageCounts {
838-
for version, states := range versions {
839-
for state, stages := range states {
840-
for stage, count := range stages {
841-
SetPackageStateMetrics(skyhookName, _package, version, state, float64(count))
842-
SetPackageStageMetrics(skyhookName, _package, version, stage, float64(count))
843-
}
844-
}
845-
}
846-
}
890+
// Update compartment statuses
891+
updateCompartmentStatuses(skyhook)
847892

848-
// Set package restarts metrics
849-
for _package, versions := range packageRestarts {
850-
for version, restarts := range versions {
851-
SetPackageRestartsMetrics(skyhookName, _package, version, restarts)
852-
}
853-
}
854-
855-
// Set rollout metrics for each compartment (follows same pattern as other metrics)
856-
if len(skyhook.compartments) > 0 {
857-
policyName := skyhook.GetSkyhook().Spec.DeploymentPolicy
858-
if policyName == "" {
859-
policyName = LegacyPolicyName
860-
}
893+
// Clean up stale compartment statuses
894+
cleanupStaleCompartmentStatuses(skyhook)
861895

862-
for name, compartment := range skyhook.compartments {
863-
if status, ok := skyhook.skyhook.Status.CompartmentStatuses[name]; ok {
864-
strategy := getStrategyType(compartment)
865-
SetRolloutMetrics(skyhookName, policyName, name, strategy, status)
866-
}
867-
}
868-
}
896+
// Set all metrics
897+
setAllMetrics(skyhookName, skyhook, nodeStatusCounts, packageStateStageCounts, packageRestarts, nodeCount)
869898

870899
// Set current count of completed nodes
871900
completeNodes := fmt.Sprintf("%d/%d", nodeStatusCounts[v1alpha1.StatusComplete], nodeCount)
@@ -1041,6 +1070,93 @@ func (skyhook *skyhookNodes) AssignNodeToCompartment(node wrapper.SkyhookNode) (
10411070
return matches[0].name, nil
10421071
}
10431072

1073+
// updateCompartmentStatuses updates compartment statuses for all current compartments
1074+
func updateCompartmentStatuses(skyhook *skyhookNodes) {
1075+
if len(skyhook.compartments) == 0 {
1076+
return
1077+
}
1078+
if skyhook.skyhook.Status.CompartmentStatuses == nil {
1079+
skyhook.skyhook.Status.CompartmentStatuses = make(map[string]v1alpha1.CompartmentStatus)
1080+
}
1081+
1082+
for name, compartment := range skyhook.compartments {
1083+
newStatus := buildCompartmentStatus(compartment)
1084+
if existing, ok := skyhook.skyhook.Status.CompartmentStatuses[name]; !ok || !compartmentStatusEqual(existing, newStatus) {
1085+
skyhook.skyhook.Status.CompartmentStatuses[name] = newStatus
1086+
skyhook.skyhook.Updated = true
1087+
}
1088+
}
1089+
}
1090+
1091+
// cleanupStaleCompartmentStatuses removes compartment statuses that are no longer in the policy
1092+
func cleanupStaleCompartmentStatuses(skyhook *skyhookNodes) {
1093+
if skyhook.skyhook.Status.CompartmentStatuses == nil {
1094+
return
1095+
}
1096+
for compartmentName := range skyhook.skyhook.Status.CompartmentStatuses {
1097+
if _, exists := skyhook.compartments[compartmentName]; !exists {
1098+
delete(skyhook.skyhook.Status.CompartmentStatuses, compartmentName)
1099+
skyhook.skyhook.Updated = true
1100+
}
1101+
}
1102+
}
1103+
1104+
// setAllMetrics sets all metrics for the skyhook
1105+
func setAllMetrics(
1106+
skyhookName string,
1107+
skyhook *skyhookNodes,
1108+
nodeStatusCounts map[v1alpha1.Status]int,
1109+
packageStateStageCounts map[string]map[string]map[v1alpha1.State]map[v1alpha1.Stage]int,
1110+
packageRestarts map[string]map[string]int32,
1111+
nodeCount int,
1112+
) {
1113+
// reset metrics to zero
1114+
ResetSkyhookMetricsToZero(skyhook)
1115+
1116+
// Set skyhook status metrics
1117+
SetSkyhookStatusMetrics(skyhookName, skyhook.Status(), true)
1118+
1119+
// Set target count and node status metrics
1120+
SetNodeTargetCountMetrics(skyhookName, float64(nodeCount))
1121+
for status, count := range nodeStatusCounts {
1122+
SetNodeStatusMetrics(skyhookName, status, float64(count))
1123+
}
1124+
1125+
// Set package state and stage metrics
1126+
for _package, versions := range packageStateStageCounts {
1127+
for version, states := range versions {
1128+
for state, stages := range states {
1129+
for stage, count := range stages {
1130+
SetPackageStateMetrics(skyhookName, _package, version, state, float64(count))
1131+
SetPackageStageMetrics(skyhookName, _package, version, stage, float64(count))
1132+
}
1133+
}
1134+
}
1135+
}
1136+
1137+
// Set package restarts metrics
1138+
for _package, versions := range packageRestarts {
1139+
for version, restarts := range versions {
1140+
SetPackageRestartsMetrics(skyhookName, _package, version, restarts)
1141+
}
1142+
}
1143+
1144+
// Set rollout metrics for each compartment
1145+
if len(skyhook.compartments) > 0 {
1146+
policyName := skyhook.GetSkyhook().Spec.DeploymentPolicy
1147+
if policyName == "" {
1148+
policyName = LegacyPolicyName
1149+
}
1150+
1151+
for name, compartment := range skyhook.compartments {
1152+
if status, ok := skyhook.skyhook.Status.CompartmentStatuses[name]; ok {
1153+
strategy := getStrategyType(compartment)
1154+
SetRolloutMetrics(skyhookName, policyName, name, strategy, status)
1155+
}
1156+
}
1157+
}
1158+
}
1159+
10441160
// cleanupNodeMap removes nodes from the given map that no longer exist in currentNodes
10451161
// Returns false if nodeMap is nil, otherwise returns true if any nodes were removed
10461162
func cleanupNodeMap[T any](nodeMap map[string]T, currentNodes map[string]struct{}) bool {

0 commit comments

Comments
 (0)