Skip to content

Commit d2c035c

Browse files
authored
Merge pull request #1185 from emqx/feat/EMQX-14563/hpa-compat
feat(scale): fix subresource compatibility + HPA integration
2 parents 951db30 + 80a8691 commit d2c035c

12 files changed

Lines changed: 545 additions & 108 deletions

File tree

api/v3alpha1/emqx_types.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ import (
2323
// +kubebuilder:object:root=true
2424
// +kubebuilder:subresource:status
2525
// +kubebuilder:resource:shortName=emqx,path=emqxes
26-
// +kubebuilder:subresource:scale:specpath=.spec.replicantTemplate.spec.replicas,statuspath=.status.replicantNodeReplicas
26+
// +kubebuilder:subresource:scale:specpath=.spec.replicantTemplate.spec.replicas,statuspath=.status.replicantReplicas,selectorpath=.status.replicantSelector
2727
// +kubebuilder:printcolumn:name="Status",type="string",JSONPath=".status.conditions[?(@.status==\"True\")].type"
2828
// +kubebuilder:printcolumn:name="Age",type="date",JSONPath=".metadata.creationTimestamp"
2929

api/v3alpha1/emqx_types_status.go

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,20 @@ type EMQXStatus struct {
2929
// +listMapKey=type
3030
Conditions []metav1.Condition `json:"conditions,omitempty"`
3131

32+
// Observed replica count for the core set.
33+
CoreReplicas int32 `json:"coreReplicas"`
34+
35+
// Serialized label selector matching core pods.
36+
CoreSelector string `json:"coreSelector,omitempty"`
37+
38+
// Observed replica count for the replicant set pods.
39+
// Used by the scale subresource.
40+
ReplicantReplicas int32 `json:"replicantReplicas"`
41+
42+
// Serialized label selector matching replicant pods.
43+
// Used by the scale subresource for HPA pod discovery.
44+
ReplicantSelector string `json:"replicantSelector,omitempty"`
45+
3246
// Status of each core node in the cluster.
3347
CoreNodes []EMQXNode `json:"coreNodes,omitempty"`
3448
// Summary status of the set of core nodes.

config/crd/bases/apps.emqx.io_emqxes.yaml

Lines changed: 23 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16193,6 +16193,13 @@ spec:
1619316193
- readyReplicas
1619416194
- updatedReplicas
1619516195
type: object
16196+
coreReplicas:
16197+
description: Observed replica count for the core set.
16198+
format: int32
16199+
type: integer
16200+
coreSelector:
16201+
description: Serialized label selector matching core pods.
16202+
type: string
1619616203
dsReplication:
1619716204
description: Status of EMQX Durable Storage replication.
1619816205
properties:
@@ -16351,12 +16358,27 @@ spec:
1635116358
- readyReplicas
1635216359
- updateReplicas
1635316360
type: object
16361+
replicantReplicas:
16362+
description: |-
16363+
Observed replica count for the replicant set pods.
16364+
Used by the scale subresource.
16365+
format: int32
16366+
type: integer
16367+
replicantSelector:
16368+
description: |-
16369+
Serialized label selector matching replicant pods.
16370+
Used by the scale subresource for HPA pod discovery.
16371+
type: string
16372+
required:
16373+
- coreReplicas
16374+
- replicantReplicas
1635416375
type: object
1635516376
type: object
1635616377
served: true
1635716378
storage: true
1635816379
subresources:
1635916380
scale:
16381+
labelSelectorPath: .status.replicantSelector
1636016382
specReplicasPath: .spec.replicantTemplate.spec.replicas
16361-
statusReplicasPath: .status.replicantNodeReplicas
16383+
statusReplicasPath: .status.replicantReplicas
1636216384
status: {}

internal/controller/add_replicant_set.go

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -155,8 +155,6 @@ func generateReplicaSet(instance *crd.EMQX) *appsv1.ReplicaSet {
155155
template := instance.Spec.ReplicantTemplate
156156

157157
// Add a PreStop hook to leave the cluster when the pod is asked to stop.
158-
// This is especially important when DS Raft is enabled, otherwise there will be a
159-
// lot of leftover records in the DS cluster metadata.
160158
lifecycle := &corev1.Lifecycle{}
161159
if template.Spec.Lifecycle != nil {
162160
lifecycle = template.Spec.Lifecycle.DeepCopy()
@@ -167,9 +165,8 @@ func generateReplicaSet(instance *crd.EMQX) *appsv1.ReplicaSet {
167165
},
168166
}
169167

170-
readinessProbe := resources.EvacuationReadinessProbe()
171-
172168
// Prefer evacuation-aware probe over older-version defaults.
169+
readinessProbe := resources.EvacuationReadinessProbe()
173170
if template.Spec.ReadinessProbe != nil {
174171
if template.Spec.ReadinessProbe.HTTPGet != nil &&
175172
template.Spec.ReadinessProbe.HTTPGet.Path != "/status" {

internal/controller/load_state.go

Lines changed: 14 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ package controller
22

33
import (
44
"context"
5+
"reflect"
56

67
emperror "emperror.dev/errors"
78
crd "github.com/emqx/emqx-operator/api/v3alpha1"
@@ -28,11 +29,14 @@ type reconcileStatePodFilter interface {
2829
}
2930

3031
type podsManagedBy struct {
31-
metav1.Object
32+
manager metav1.Object
3233
}
3334

3435
func (self podsManagedBy) passes(pod *corev1.Pod) bool {
35-
return self.Object != nil && util.IsPodManagedBy(pod, self.Object)
36+
if self.manager == nil && reflect.ValueOf(self.manager).IsNil() {
37+
return false
38+
}
39+
return util.IsPodManagedBy(pod, self.manager)
3640
}
3741

3842
type podsWithRole struct {
@@ -191,6 +195,14 @@ func (r *reconcileState) outdatedReplicantPods(instance *crd.EMQX) []*corev1.Pod
191195
return out
192196
}
193197

198+
func (r *reconcileState) numReplicants() int32 {
199+
out := int32(0)
200+
for _, rs := range r.replicantSets {
201+
out += rs.Status.Replicas
202+
}
203+
return out
204+
}
205+
194206
func (r *reconcileState) numReadyReplicants() int32 {
195207
out := int32(0)
196208
for _, rs := range r.replicantSets {

internal/controller/sync_replicant_sets.go

Lines changed: 13 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -45,31 +45,34 @@ func (s *syncReplicantSets) reconcile(r *reconcileRound, instance *crd.EMQX) sub
4545
}
4646

4747
// Steady state: handle scale-up and scale-down.
48+
specReplicas := ptr.Deref(updateRs.Spec.Replicas, 0)
4849
desiredReplicas := instance.Spec.NumReplicantReplicas()
49-
currentReplicas := ptr.Deref(updateRs.Spec.Replicas, 0)
50+
currentReplicas := updateRs.Status.Replicas
5051

51-
if currentReplicas < desiredReplicas {
52+
if specReplicas < desiredReplicas {
5253
r.log.V(1).Info("scaling up replicantSet",
5354
"replicaSet", klog.KObj(updateRs),
54-
"from", currentReplicas,
55+
"from", specReplicas,
5556
"to", desiredReplicas,
5657
)
5758
return s.scaleUp(r, updateRs, desiredReplicas)
5859
}
5960

60-
if currentReplicas > desiredReplicas {
61+
if specReplicas > desiredReplicas {
6162
r.log.V(1).Info("scaling down replicantSet",
6263
"replicaSet", klog.KObj(updateRs),
63-
"from", currentReplicas,
64+
"from", specReplicas,
6465
"to", desiredReplicas,
6566
)
66-
return s.scaleDown(r, instance, updateRs, currentReplicas, desiredReplicas)
67+
return s.scaleDown(r, instance, updateRs, specReplicas, desiredReplicas)
6768
}
6869

6970
// Steady state: clean up stale artifacts.
70-
err := s.ensureConsistency(r, instance, updateRs)
71-
if err != nil {
72-
return reconcileError(emperror.Wrap(err, "failed to restore replicant consistency"))
71+
if currentReplicas == specReplicas {
72+
err := s.ensureConsistency(r, instance, updateRs)
73+
if err != nil {
74+
return reconcileError(emperror.Wrap(err, "failed to restore replicant consistency"))
75+
}
7376
}
7477

7578
return subResult{}
@@ -142,7 +145,7 @@ func (s *syncReplicantSets) ensureConsistency(
142145
instance *crd.EMQX,
143146
updateRs *appsv1.ReplicaSet,
144147
) error {
145-
for _, pod := range r.state.podsManagedBy(updateRs) {
148+
for _, pod := range r.state.listPods(podsManagedBy{updateRs}, podsAlive{}) {
146149
// 1. Check if pod has stale scale-down annotations.
147150
dirty := s.removeStaleReplicantAnnotations(pod)
148151
if !dirty {

internal/controller/update_emqx_status.go

Lines changed: 18 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ import (
1111
"github.com/emqx/emqx-operator/internal/emqx/api"
1212
appsv1 "k8s.io/api/apps/v1"
1313
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
14+
"k8s.io/apimachinery/pkg/labels"
1415
"k8s.io/klog/v2"
1516
)
1617

@@ -23,17 +24,28 @@ func (u *updateStatus) reconcile(r *reconcileRound, instance *crd.EMQX) subResul
2324

2425
// Core: count pods on each revision for rolling update progress.
2526
coreSet := r.state.coreSet()
27+
status.CoreReplicas = 0
28+
status.CoreSelector = labels.Set(instance.DefaultLabelsWith(crd.CoreLabels())).String()
2629
status.CoreNodesStatus.UpdatedReplicas = 0
2730
status.CoreNodesStatus.CurrentReplicas = 0
28-
for _, pod := range r.state.podsManagedBy(r.state.coreSet()) {
29-
if r.state.partOfCoreSetRevision(pod, coreSet.Status.UpdateRevision) {
30-
status.CoreNodesStatus.UpdatedReplicas++
31-
}
32-
if r.state.partOfCoreSetRevision(pod, coreSet.Status.CurrentRevision) {
33-
status.CoreNodesStatus.CurrentReplicas++
31+
if coreSet != nil {
32+
for _, pod := range r.state.podsManagedBy(coreSet) {
33+
if r.state.partOfCoreSetRevision(pod, coreSet.Status.UpdateRevision) {
34+
status.CoreNodesStatus.UpdatedReplicas++
35+
}
36+
if r.state.partOfCoreSetRevision(pod, coreSet.Status.CurrentRevision) {
37+
status.CoreNodesStatus.CurrentReplicas++
38+
}
3439
}
3540
}
3641

42+
status.ReplicantReplicas = 0
43+
status.ReplicantSelector = ""
44+
if instance.Spec.HasReplicants() {
45+
status.ReplicantReplicas = r.state.numReplicants()
46+
status.ReplicantSelector = labels.Set(instance.DefaultLabelsWith(crd.ReplicantLabels())).String()
47+
}
48+
3749
// Replicant: multi-ReplicaSet pattern retained.
3850
currentReplicantSet, updateReplicantSet := switchReplicantSet(r, instance)
3951

test/e2e/e2e_suite_test.go

Lines changed: 10 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -85,32 +85,26 @@ var _ = BeforeSuite(func() {
8585
By("load emqx-operator docker image into kind cluster")
8686
Expect(util.LoadImageToKindClusterWithName(projectImage)).To(Succeed())
8787

88+
By("install Metrics Server")
89+
Expect(util.InstallMetricsServer()).To(Succeed())
90+
8891
// The tests-e2e are intended to run on a temporary cluster that is created and destroyed for testing.
8992
// To prevent errors when tests run in environments with Prometheus or CertManager already installed,
9093
// we check for their presence before execution.
91-
// Setup Prometheus and CertManager before the suite if not skipped and if not already installed
9294
if !skipPrometheusInstall {
93-
if !util.IsPrometheusCRDsInstalled() {
94-
By("install Prometheus Operator")
95-
Expect(util.InstallPrometheusOperator()).To(Succeed())
96-
isPrometheusInstalled = true
97-
} else {
98-
GinkgoWriter.Println("WARNING: Prometheus Operator is already installed, skipping installation.")
99-
}
95+
By("install Prometheus Operator")
96+
Expect(util.InstallPrometheusOperator()).To(Succeed())
97+
isPrometheusInstalled = true
10098
}
10199
if !skipCertManagerInstall {
102-
if !util.IsCertManagerCRDsInstalled() {
103-
By("install CertManager")
104-
Expect(util.InstallCertManager()).To(Succeed())
105-
isCertManagerInstalled = true
106-
} else {
107-
GinkgoWriter.Println("WARNING: CertManager is already installed, skipping installation.")
108-
}
100+
By("install CertManager")
101+
Expect(util.InstallCertManager()).To(Succeed())
102+
isCertManagerInstalled = true
109103
}
110104
})
111105

112106
var _ = AfterSuite(func() {
113-
// Teardown Prometheus and CertManager after the suite if not skipped and if they were not already installed
107+
util.UnnstallMetricsServer()
114108
if !skipPrometheusInstall && isPrometheusInstalled {
115109
By("uninstall Prometheus Operator")
116110
util.UninstallPrometheusOperator()

0 commit comments

Comments
 (0)