Skip to content

Commit 9d34e10

Browse files
committed
feat(api): allow turning off node evacuations for rolling updates
1 parent 1343e0d commit 9d34e10

7 files changed

Lines changed: 57 additions & 2 deletions

File tree

api/v3alpha1/emqx_types_spec.go

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -150,7 +150,19 @@ type ReplicantsUpdateStrategy struct {
150150
MaxSurge *intstr.IntOrString `json:"maxSurge,omitempty"`
151151
}
152152

153+
type EvacuationStrategyType string
154+
155+
const (
156+
NodeEvacuationStrategy EvacuationStrategyType = "NodeEvacuation"
157+
DisabledEvacuationStrategy EvacuationStrategyType = "Disabled"
158+
)
159+
153160
type EvacuationStrategy struct {
161+
// Type of the evacuation policy.
162+
// +kubebuilder:validation:Enum=NodeEvacuation;Disabled
163+
// +kubebuilder:default=NodeEvacuation
164+
Type EvacuationStrategyType `json:"type"`
165+
154166
// Client disconnect rate (number per second).
155167
// Same as `conn-evict-rate` in [EMQX Node Evacuation](https://docs.emqx.com/en/emqx/v5.10/deploy/cluster/rebalancing.html#node-evacuation).
156168
// +kubebuilder:validation:Minimum=1
@@ -342,6 +354,10 @@ func (spec *EMQXSpec) HasReplicants() bool {
342354
return spec.ReplicantTemplate != nil && spec.ReplicantTemplate.Spec.Replicas != nil && *spec.ReplicantTemplate.Spec.Replicas > 0
343355
}
344356

357+
func (spec *EMQXSpec) IsEvacuationEnabled() bool {
358+
return spec.UpdateStrategy.EvacuationStrategy.Type != DisabledEvacuationStrategy
359+
}
360+
345361
func (s *ServiceTemplate) IsEnabled() bool {
346362
return s.Enabled != nil && *s.Enabled
347363
}

config/crd/bases/apps.emqx.io_emqxes.yaml

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15986,6 +15986,13 @@ spec:
1598615986
format: int32
1598715987
minimum: 1
1598815988
type: integer
15989+
type:
15990+
default: NodeEvacuation
15991+
description: Type of the evacuation policy.
15992+
enum:
15993+
- NodeEvacuation
15994+
- Disabled
15995+
type: string
1598915996
waitHealthCheck:
1599015997
default: 60
1599115998
description: |-
@@ -16002,6 +16009,8 @@ spec:
1600216009
format: int32
1600316010
minimum: 0
1600416011
type: integer
16012+
required:
16013+
- type
1600516014
type: object
1600616015
replicants:
1600716016
description: |-

internal/controller/suite_test.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -77,6 +77,12 @@ var emqx *crd.EMQX = &crd.EMQX{
7777
},
7878
Spec: crd.EMQXSpec{
7979
Image: "emqx",
80+
UpdateStrategy: crd.UpdateStrategy{
81+
Type: "RollingUpdate",
82+
EvacuationStrategy: crd.EvacuationStrategy{
83+
Type: "NodeEvacuation",
84+
},
85+
},
8086
},
8187
}
8288

internal/controller/sync_core_set.go

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -81,6 +81,9 @@ func (s *syncCoreSet) reconcile(r *reconcileRound, instance *crd.EMQX) subResult
8181
return s.scaleDown(r, instance, currentReplicas)
8282
}
8383

84+
// Stop evacuations of any updated pods.
85+
// This is done irrespective of whether Node Evacuation is enabled or not,
86+
// to avoid ending up in transient state if it was disabled mid-update.
8487
err := s.updateEvacuationState(r, instance)
8588
if err != nil {
8689
return reconcileError(emperror.Wrap(err, "failed to update evacuation state"))
@@ -270,7 +273,7 @@ func checkCorePodRemoval(
270273
return coreAdmission{Action: admissionWait, Reason: "node evacuation is still in progress"}
271274
}
272275

273-
if nodeInfo.Sessions > 0 {
276+
if nodeInfo.Sessions > 0 && instance.Spec.IsEvacuationEnabled() {
274277
if instance.Spec.NumCoreReplicas() == 1 && !instance.Spec.HasReplicants() {
275278
return coreAdmission{
276279
Action: admissionRemove,

internal/controller/sync_core_set_suite_test.go

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -170,6 +170,15 @@ var _ = Describe("Reconciler syncCoreSet", Ordered, func() {
170170
))
171171
})
172172

173+
It("node session > 0 & node evacuation is disabled", func() {
174+
instance.Spec.UpdateStrategy.EvacuationStrategy.Type = crd.DisabledEvacuationStrategy
175+
instance.Status.CoreNodes[1].Sessions = 99999
176+
admission := checkCorePodRemoval(round, instance, pod1, false)
177+
Expect(admission).Should(And(
178+
HaveField("Action", Equal(admissionRemove)),
179+
))
180+
})
181+
173182
It("node session is 0", func() {
174183
instance.Status.CoreNodes[1].Sessions = 0
175184
admission := checkCorePodRemoval(round, instance, pod1, false)

internal/controller/sync_replicant_sets.go

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -153,6 +153,8 @@ func (s *syncReplicantSets) ensureConsistency(
153153
}
154154

155155
// 2. Stop any ongoing node evacuation.
156+
// This is attempted irrespective of whether Node Evacuation is enabled or not,
157+
// to avoid ending up in transient state if it was disabled mid-update.
156158
stopped, err := s.stopStaleReplicantEvacuation(r, instance, pod)
157159
if err != nil {
158160
return err
@@ -474,7 +476,7 @@ func checkReplicantPodRemoval(instance *crd.EMQX, pod *corev1.Pod) replicantAdmi
474476
}
475477
}
476478

477-
if nodeInfo.Sessions > 0 {
479+
if nodeInfo.Sessions > 0 && instance.Spec.IsEvacuationEnabled() {
478480
if instance.Spec.NumReplicantReplicas() == 1 && instance.Spec.NumMaxSurgeReplicantReplicas() == 0 {
479481
return replicantAdmission{
480482
Action: admissionRemove,

internal/controller/sync_replicant_sets_suite_test.go

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -824,6 +824,16 @@ var _ = Describe("Reconciler syncReplicantSets admission", Ordered, func() {
824824
))
825825
})
826826

827+
It("node session > 0 & evacuation is disabled", func() {
828+
instance.Spec.UpdateStrategy.EvacuationStrategy.Type = crd.DisabledEvacuationStrategy
829+
instance.Status.ReplicantNodes[0].Sessions = 99999
830+
admission := checkReplicantPodRemoval(instance, currentPod)
831+
Expect(admission).Should(And(
832+
HaveField("Action", Equal(admissionRemove)),
833+
HaveField("Reason", ContainSubstring("safe to stop")),
834+
))
835+
})
836+
827837
It("node session is 0", func() {
828838
instance.Status.ReplicantNodes[0].Sessions = 0
829839
admission := checkReplicantPodRemoval(instance, currentPod)

0 commit comments

Comments
 (0)