Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
46 changes: 46 additions & 0 deletions .github/workflows/test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,52 @@ jobs:
if-no-files-found: ignore
retention-days: 1

test-stress:
name: E2E Stress Tests
strategy:
fail-fast: true
matrix:
include:
- profile: short
steps: 5
interval: 5s
- profile: long
steps: 15
interval: 10s

runs-on: ubuntu-latest
env:
TEST_E2E_DIAGNOSTIC_REPORT_PATH: ${{ github.workspace }}/test-report
steps:
- uses: actions/checkout@v4

- uses: actions/setup-go@v5
with:
go-version: ${{ env.GO_VERSION }}

- run: go mod tidy

- run: make test-e2e-stress
env:
TEST_E2E_STRESS_STEPS: ${{ matrix.steps }}
TEST_E2E_STRESS_STEP_INTERVAL: ${{ matrix.interval }}

- uses: actions/upload-artifact@v4
if: failure()
with:
name: test-report-stress-${{ matrix.profile }}
path: ${{ env.TEST_E2E_DIAGNOSTIC_REPORT_PATH }}
if-no-files-found: ignore

- run: go tool covdata textfmt -i /tmp/kind/data/emqx-operator-system/test-e2e-coverage -o ${{ github.workspace }}/cover.stress.out

- uses: actions/upload-artifact@v4
with:
name: coverprofile-stress-${{ matrix.profile }}
path: ${{ github.workspace }}/cover.stress.out
if-no-files-found: ignore
retention-days: 1

test-upgrade:
name: E2E Upgrade Tests - ${{ matrix.profile }}
strategy:
Expand Down
25 changes: 16 additions & 9 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -82,24 +82,31 @@ lint-fix: golangci-lint ## Run golangci-lint linter and perform fixes
TEST_E2E_UPGRADE_IMAGE_INITIAL ?= emqx/emqx:5.10.2
TEST_E2E_UPGRADE_IMAGE_UPGRADE ?= emqx/emqx:6.1.0

TEST_E2E_STRESS_STEPS ?= 8
TEST_E2E_STRESS_STEP_INTERVAL ?= 5s
TEST_E2E_STRESS_IMAGE ?= emqx/emqx:6.1.0

.PHONY: test
test: manifests generate fmt vet envtest ## Run tests.
KUBEBUILDER_ASSETS="$(shell $(ENVTEST) use $(ENVTEST_K8S_VERSION) --bin-dir $(LOCALBIN) -p path)" go test -v $$(go list ./... | grep -v /e2e) -coverprofile ./cover.out

# TODO(user): To use a different vendor for e2e tests, modify the setup under 'tests/e2e'.
# The default setup assumes Kind is pre-installed and builds/loads the Manager Docker image locally.
# Prometheus and CertManager are installed by default; skip with:
# - PROMETHEUS_INSTALL_SKIP=true
# - CERT_MANAGER_INSTALL_SKIP=true
# Prometheus is installed by default; skip with:
# - TEST_E2E_SKIP_PROMETHEUS_INSTALL=true
.PHONY: test-e2e
test-e2e: manifests generate e2e-test-cluster ## Run general E2E tests. Expected an isolated environment using Kind.
go test ./test/e2e/ -v -ginkgo.v -timeout 60m

test-e2e-upgrade: manifests generate e2e-test-cluster ## Run E2E upgrade tests. Expected an isolated environment using Kind.
go test ./test/e2e/ -v -ginkgo.v -timeout 20m -ginkgo.focus="EMQX Upgrade Test" \
go test ./test/e2e/upgrade -v -ginkgo.v -timeout 20m \
-emqx-image-initial=$(TEST_E2E_UPGRADE_IMAGE_INITIAL) \
-emqx-image-upgrade=$(TEST_E2E_UPGRADE_IMAGE_UPGRADE)

test-e2e-stress: manifests generate e2e-test-cluster ## Run E2E stress tests. Expected an isolated environment using Kind.
go test ./test/e2e/stress -v -ginkgo.v -timeout 20m \
-stress-steps=$(TEST_E2E_STRESS_STEPS) \
-step-interval=$(TEST_E2E_STRESS_STEP_INTERVAL) \
-emqx-image=$(TEST_E2E_STRESS_IMAGE)

.PHONY: test-e2e-helm
test-e2e-helm: e2e-test-cluster ## Run Helm chart E2E tests. Expected an isolated environment using Kind.
go test ./test/e2e-helm/ -v -ginkgo.v -timeout 20m
Expand Down Expand Up @@ -143,11 +150,11 @@ doc-crd-v3: ## Generate documentation for the `apps.emqx.io/v3beta1` CRD.
# (i.e. docker build --platform linux/arm64). However, you must enable docker buildKit for it.
# More info: https://docs.docker.com/develop/develop-images/build_enhancements/
.PHONY: docker-build
docker-build: ## Build docker image with the manager.
docker-build: generate ## Build docker image with the manager.
$(CONTAINER_TOOL) build -t ${OPERATOR_IMAGE} .

.PHONY: docker-build-coverage
docker-build-coverage: Dockerfile.coverage ## Build docker image with the manager and code coverage enabled.
docker-build-coverage: Dockerfile.coverage generate ## Build docker image with the manager and code coverage enabled.
$(CONTAINER_TOOL) build -t ${OPERATOR_IMAGE} -f Dockerfile.coverage .

.PHONY: docker-push
Expand All @@ -162,7 +169,7 @@ docker-push: ## Push docker image with the manager.
# OPERATOR_IMAGE=<myregistry/image:<tag>> then the export will fail)
PLATFORMS ?= linux/arm64,linux/amd64
.PHONY: docker-buildx
docker-buildx: Dockerfile.cross ## Build and push docker image for the manager for cross-platform support
docker-buildx: Dockerfile.cross generate ## Build and push docker image for the manager for cross-platform support
- $(CONTAINER_TOOL) buildx create --name emqx-operator-builder
$(CONTAINER_TOOL) buildx use emqx-operator-builder
- $(CONTAINER_TOOL) buildx build --push --platform=$(PLATFORMS) --tag ${OPERATOR_IMAGE} -f Dockerfile.cross .
Expand Down
10 changes: 0 additions & 10 deletions internal/controller/load_state.go
Original file line number Diff line number Diff line change
Expand Up @@ -151,16 +151,6 @@ func (r *reconcileState) updateReplicantSet(instance *crd.EMQX) *appsv1.ReplicaS
return nil
}

// partOfReplicantSets checks if a pod belongs to any replicant ReplicaSet.
func (r *reconcileState) partOfReplicantSet(pod *corev1.Pod) bool {
for _, rs := range r.replicantSets {
if util.IsPodManagedBy(pod, rs) {
return true
}
}
return false
}

// outdatedReplicantReplicaSets returns all replicant ReplicaSets except the update revision set,
// sorted by creation timestamp (oldest first).
func (r *reconcileState) outdatedReplicantSets(instance *crd.EMQX) []*appsv1.ReplicaSet {
Expand Down
13 changes: 7 additions & 6 deletions internal/controller/sync_core_set.go
Original file line number Diff line number Diff line change
Expand Up @@ -372,27 +372,28 @@ func (s *syncCoreSet) startEvacuation(
// migrationTargetNodes returns the list of EMQX nodes to migrate workloads to.
// * In core-only cluster, targets are pods of the core set.
// * In core-replicant cluster, targets are:
// - pods in the "update" replicant set, if it has at least 1 ready replica,
// - pods in the "update" replicant set, if it has at least 1 node up and running,
// - pods in any replicant set otherwise.
func migrationTargetNodes(r *reconcileRound, instance *crd.EMQX) []string {
targets := []string{}
fallback := []string{}
if instance.Spec.HasReplicants() {
updateReplicantSet := r.state.updateReplicantSet(instance)
if updateReplicantSet == nil {
return targets
}
updateReady := updateReplicantSet.Status.ReadyReplicas > 0
for _, node := range instance.Status.ReplicantNodes {
pod := r.state.podWithName(node.PodName)
if pod == nil {
continue
}
if updateReady && util.IsPodManagedBy(pod, updateReplicantSet) {
targets = append(targets, node.Name)
}
if r.state.partOfReplicantSet(pod) {
if util.IsPodManagedBy(pod, updateReplicantSet) {
targets = append(targets, node.Name)
}
fallback = append(fallback, node.Name)
}
if len(targets) == 0 {
return fallback
}
} else {
for _, node := range instance.Status.CoreNodes {
Expand Down
36 changes: 24 additions & 12 deletions test/e2e/emqx.go → test/e2e/assert.go
Original file line number Diff line number Diff line change
@@ -1,3 +1,19 @@
/*
Copyright 2026.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package e2e

import (
Expand All @@ -8,7 +24,7 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)

func checkEMQXReady(g Gomega, afterTime ...metav1.Time) {
func EMQXReady(g Gomega, afterTime ...metav1.Time) {
var cond metav1.Condition
g.Expect(KubectlOut("get", "emqx", "emqx", "-o", "jsonpath={.status.conditions[?(@.type==\"Ready\")]}")).
To(UnmarshalInto(&cond), "Failed to get emqx status")
Expand All @@ -24,7 +40,7 @@ func checkEMQXReady(g Gomega, afterTime ...metav1.Time) {
}
}

func checkEMQXStatus(g Gomega, coreReplicas int) {
func CoresStable(g Gomega, coreReplicas int) {
var status crd.CoreNodesStatus
var nodes []crd.EMQXNode
var podList corev1.PodList
Expand Down Expand Up @@ -75,7 +91,7 @@ func checkEMQXStatus(g Gomega, coreReplicas int) {
)
}

func checkNoReplicants(g Gomega) {
func NoReplicants(g Gomega) {
g.Expect(KubectlOut("get", "emqx", "emqx",
"-o", "jsonpath={.status.replicantNodesStatus.currentReplicas}",
)).To(Equal("0"), "EMQX cluster status has replicant replicas")
Expand All @@ -84,7 +100,7 @@ func checkNoReplicants(g Gomega) {
)).To(BeEmpty(), "EMQX cluster status lists replicant nodes")
}

func checkReplicantStatus(g Gomega, replicantReplicas int) {
func ReplicantsStable(g Gomega, replicantReplicas int) {
var status crd.ReplicantNodesStatus
g.Expect(KubectlOut("get", "emqx", "emqx", "-o", "jsonpath={.status.replicantNodesStatus}")).
To(UnmarshalInto(&status), "Failed to get EMQX replicant nodes status")
Expand All @@ -96,10 +112,6 @@ func checkReplicantStatus(g Gomega, replicantReplicas int) {
),
"EMQX status does not have expected number of replicant nodes",
)
checkReplicantNodesStatusRevision(g, status, replicantReplicas)
}

func checkReplicantNodesStatusRevision(g Gomega, status crd.ReplicantNodesStatus, replicas int) {
var podList corev1.PodList
g.Expect(status).To(
And(
Expand All @@ -118,12 +130,12 @@ func checkReplicantNodesStatusRevision(g Gomega, status crd.ReplicantNodesStatus
"-o", "json",
)).To(UnmarshalInto(&podList), "Failed to list replicant pods")
g.Expect(podList.Items).To(
HaveLen(replicas),
"EMQX cluster does not have %d current revision replicant pods", replicas,
HaveLen(replicantReplicas),
"EMQX cluster does not have %d current revision replicant pods", replicantReplicas,
)
}

func checkDSReplicationStatus(g Gomega, coreReplicas int) {
func DSReplicationStable(g Gomega, coreReplicas int) {
status := &crd.DSReplicationStatus{}
replicationFactor := min(3, coreReplicas)
g.Expect(KubectlOut("get", "emqx", "emqx", "-o", "jsonpath={.status.dsReplication}")).
Expand All @@ -146,7 +158,7 @@ func checkDSReplicationStatus(g Gomega, coreReplicas int) {
)
}

func checkDSReplicationHealthy(g Gomega) {
func DSReplicationHealthy(g Gomega) {
g.Expect(KubectlOut("exec", "service/emqx-listeners", "--", "emqx", "ctl", "ds", "info")).
NotTo(
ContainSubstring("(!)"),
Expand Down
22 changes: 22 additions & 0 deletions test/e2e/const.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
/*
Copyright 2026.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package e2e

const (
// namespace where the project is deployed in
Namespace = "emqx-operator-system"
)
Loading
Loading