Skip to content

Commit 2029c40

Browse files
authored
feat(main): add test for automq (#58)
Signed-off-by: cuisongliu <[email protected]>
1 parent d832db1 commit 2029c40

File tree

7 files changed

+151
-14
lines changed

7 files changed

+151
-14
lines changed

.github/workflows/test.yml

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -84,7 +84,8 @@ jobs:
8484
sudo sealos run labring/minio:RELEASE.2024-01-11T07-46-16Z labring/kube-prometheus-stack:v0.63.0
8585
sudo sealos run labring/kafka-ui:v0.7.1
8686
sleep 10
87-
sudo kubectl get pods -A --show-labels
87+
sudo kubectl get pods -A
88+
sudo kubectl get svc -A
8889
- name: build
8990
run: |
9091
sudo make e2e

e2e/automq_cluster_controller_test.go

Lines changed: 130 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -18,9 +18,15 @@ package e2e
1818

1919
import (
2020
"context"
21+
"fmt"
2122
"os"
2223
"time"
2324

25+
"github.com/cuisongliu/automq-operator/internal/controller"
26+
v2 "k8s.io/api/apps/v1"
27+
"k8s.io/apimachinery/pkg/labels"
28+
"sigs.k8s.io/controller-runtime/pkg/reconcile"
29+
2430
infrav1beta1 "github.com/cuisongliu/automq-operator/api/v1beta1"
2531
. "github.com/onsi/ginkgo/v2"
2632
. "github.com/onsi/gomega"
@@ -33,7 +39,7 @@ import (
3339
var _ = Describe("automq_controller", func() {
3440
Context("automq_controller cr tests", func() {
3541
ctx := context.Background()
36-
namespaceName := "automq-operator"
42+
namespaceName := "automq-cr"
3743
namespace := &v1.Namespace{
3844
ObjectMeta: metav1.ObjectMeta{
3945
Name: namespaceName,
@@ -44,22 +50,23 @@ var _ = Describe("automq_controller", func() {
4450
automq.Name = "automq-s1"
4551
automq.Namespace = namespaceName
4652
automq.Spec.ClusterID = "rZdE0DjZSrqy96PXrMUZVw"
47-
48-
BeforeEach(func() {
53+
It("create cr namespace", func() {
4954
By("Creating the Namespace to perform the tests")
5055
err := k8sClient.Create(ctx, namespace)
5156
Expect(err).To(Not(HaveOccurred()))
52-
By("Setting the NAMESPACE_NAME ENV VAR which stores the Operand image")
53-
err = os.Setenv("NAMESPACE_NAME", namespaceName)
54-
Expect(err).To(Not(HaveOccurred()))
5557
})
56-
It("Update Endpoint", func() {
58+
It("create cr", func() {
59+
By("get minio ip and port")
60+
minioService := &v1.Service{}
61+
err := k8sClient.Get(ctx, client.ObjectKey{Namespace: "minio", Name: "minio"}, minioService)
62+
Expect(err).To(Not(HaveOccurred()))
63+
ip := minioService.Spec.ClusterIP
5764
By("creating the custom resource for the automq")
58-
err := k8sClient.Get(ctx, client.ObjectKeyFromObject(automq), automq)
65+
err = k8sClient.Get(ctx, client.ObjectKeyFromObject(automq), automq)
5966
if err != nil && errors.IsNotFound(err) {
6067
// Let's mock our custom resource at the same way that we would
6168
// apply on the cluster the manifest under config/samples
62-
automq.Spec.S3.Endpoint = "http://minio.minio.svc.cluster.local:9000"
69+
automq.Spec.S3.Endpoint = fmt.Sprintf("http://%s:9000", ip)
6370
automq.Spec.S3.Bucket = "ko3"
6471
automq.Spec.S3.AccessKeyID = "admin"
6572
automq.Spec.S3.SecretAccessKey = "minio123"
@@ -72,7 +79,120 @@ var _ = Describe("automq_controller", func() {
7279
Expect(err).To(Not(HaveOccurred()))
7380
}
7481
})
75-
AfterEach(func() {
82+
It("should successfully reconcile the resource", func() {
83+
By("Reconciling the created resource")
84+
controllerReconciler := &controller.AutoMQReconciler{
85+
Client: k8sClient,
86+
Scheme: k8sClient.Scheme(),
87+
Finalizer: "apps.cuisongliu.com/automq.finalizer",
88+
MountTZ: true,
89+
}
90+
_, err := controllerReconciler.Reconcile(ctx, reconcile.Request{
91+
NamespacedName: client.ObjectKeyFromObject(automq),
92+
})
93+
Expect(err).NotTo(HaveOccurred())
94+
})
95+
It("get automq deployment", func() {
96+
ctx := context.Background()
97+
Eventually(func() error {
98+
deployment := &v2.DeploymentList{}
99+
labelSelector := labels.Set(map[string]string{"app.kubernetes.io/owner-by": "automq", "app.kubernetes.io/instance": automq.Name}).AsSelector()
100+
err := k8sClient.List(ctx, deployment, &client.ListOptions{Namespace: automq.Namespace, LabelSelector: labelSelector})
101+
if err != nil {
102+
return err
103+
}
104+
if len(deployment.Items) != 4 {
105+
return fmt.Errorf("expected 4 deploy, found %d", len(deployment.Items))
106+
}
107+
for i, deploy := range deployment.Items {
108+
if deploy.Status.ReadyReplicas != 1 {
109+
return fmt.Errorf("expected deploy %d ready replicas to be 1, got '%d'", i, deploy.Status.ReadyReplicas)
110+
}
111+
}
112+
return nil
113+
}, "60s", "1s").Should(Succeed())
114+
})
115+
It("check controller status", func() {
116+
ctx := context.Background()
117+
Eventually(func() error {
118+
podList := &v1.PodList{}
119+
labelSelector := labels.Set(map[string]string{"app.kubernetes.io/owner-by": "automq", "app.kubernetes.io/instance": automq.Name, "app.kubernetes.io/role": "controller"}).AsSelector()
120+
err := k8sClient.List(ctx, podList, &client.ListOptions{Namespace: automq.Namespace, LabelSelector: labelSelector})
121+
if err != nil {
122+
return err
123+
}
124+
if len(podList.Items) != 1 {
125+
return fmt.Errorf("expected 3 pod, found %d", len(podList.Items))
126+
}
127+
for i, pod := range podList.Items {
128+
if pod.Status.Phase != v1.PodRunning {
129+
return fmt.Errorf("expected pod %d phase to be 'Running', got '%s'", i, pod.Status.Phase)
130+
}
131+
}
132+
return nil
133+
}, "60s", "1s").Should(Succeed())
134+
})
135+
136+
It("check broker status", func() {
137+
ctx := context.Background()
138+
Eventually(func() error {
139+
podList := &v1.PodList{}
140+
labelSelector := labels.Set(map[string]string{"app.kubernetes.io/owner-by": "automq", "app.kubernetes.io/instance": automq.Name, "app.kubernetes.io/role": "broker"}).AsSelector()
141+
err := k8sClient.List(ctx, podList, &client.ListOptions{Namespace: automq.Namespace, LabelSelector: labelSelector})
142+
if err != nil {
143+
return err
144+
}
145+
if len(podList.Items) != 3 {
146+
return fmt.Errorf("expected 1 pod, found %d", len(podList.Items))
147+
}
148+
for i, pod := range podList.Items {
149+
if pod.Status.Phase != v1.PodRunning {
150+
return fmt.Errorf("expected pod %d phase to be 'Running', got '%s'", i, pod.Status.Phase)
151+
}
152+
}
153+
return nil
154+
}, "60s", "1s").Should(Succeed())
155+
})
156+
It("check automq status", func() {
157+
ctx := context.Background()
158+
Eventually(func() error {
159+
err := k8sClient.Get(ctx, client.ObjectKeyFromObject(automq), automq)
160+
if err != nil {
161+
return err
162+
}
163+
if automq.Status.Phase != infrav1beta1.AutoMQReady {
164+
return fmt.Errorf("expected automq phase to be 'Ready', got '%s'", automq.Status.Phase)
165+
}
166+
if automq.Status.ControllerReplicas != automq.Spec.Controller.Replicas {
167+
return fmt.Errorf("expected automq controller replicas to be %d, got '%d'", automq.Spec.Controller.Replicas, automq.Status.ControllerReplicas)
168+
}
169+
if automq.Status.BrokerReplicas != automq.Spec.Broker.Replicas {
170+
return fmt.Errorf("expected automq broker replicas to be %d, got '%d'", automq.Spec.Broker.Replicas, automq.Status.BrokerReplicas)
171+
}
172+
showReadyPods := automq.Spec.Controller.Replicas + automq.Spec.Broker.Replicas
173+
if automq.Status.ReadyPods != showReadyPods {
174+
return fmt.Errorf("expected automq ready pods to be %d, got '%d'", showReadyPods, automq.Status.ReadyPods)
175+
}
176+
if len(automq.Status.ControllerAddresses) != int(automq.Spec.Controller.Replicas) {
177+
return fmt.Errorf("expected automq controller addresses to have %d elements, got '%d'", automq.Spec.Controller.Replicas, len(automq.Status.ControllerAddresses))
178+
}
179+
if automq.Status.BootstrapInternalAddress == "" {
180+
return fmt.Errorf("expected automq bootstrap internal address to be set")
181+
}
182+
bootstrapService := fmt.Sprintf("%s.%s.svc:%d", "automq-"+"broker-bootstrap", automq.Namespace, 9092)
183+
if automq.Status.BootstrapInternalAddress != bootstrapService {
184+
return fmt.Errorf("expected automq bootstrap internal address to be '%s', got '%s'", bootstrapService, automq.Status.BootstrapInternalAddress)
185+
}
186+
for i, address := range automq.Status.ControllerAddresses {
187+
controllerService := fmt.Sprintf("%d@%s.%s.svc:%d", i, "automq-controller-"+fmt.Sprintf("%d", i), automq.Namespace, 9093)
188+
if address != controllerService {
189+
return fmt.Errorf("expected automq controller address %d to be '%s', got '%s'", i, controllerService, address)
190+
}
191+
}
192+
return nil
193+
}, "60s", "1s").Should(Succeed())
194+
})
195+
It("clean automq", func() {
76196
By("removing the custom resource for the automq")
77197
found := &infrav1beta1.AutoMQ{}
78198
err := k8sClient.Get(ctx, client.ObjectKeyFromObject(automq), found)

e2e/automq_cluster_test.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -212,6 +212,8 @@ var _ = BeforeSuite(func() {
212212
go func() {
213213
controller.APIRegistry(context.Background(), k8sClient)
214214
}()
215+
err = os.Setenv("NAMESPACE_NAME", "default")
216+
Expect(err).To(Not(HaveOccurred()))
215217
})
216218

217219
var _ = AfterSuite(func() {

internal/controller/automq_apis.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ package controller
1818

1919
import (
2020
"context"
21+
2122
"github.com/gin-gonic/gin"
2223
v1 "k8s.io/api/core/v1"
2324
ctrl "sigs.k8s.io/controller-runtime"

internal/controller/automq_controller.go

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -172,8 +172,9 @@ func (r *AutoMQReconciler) reconcile(ctx context.Context, obj client.Object) (ct
172172
r.syncKafkaBootstrapService,
173173
}
174174
var ifRunning bool
175-
for _, fn := range pipelines {
175+
for index, fn := range pipelines {
176176
ifRunning = fn(ctx, automq)
177+
log.V(1).Info("update reconcile controller automq", "ifRunning", ifRunning, "index", index)
177178
if !ifRunning {
178179
break
179180
}
@@ -220,6 +221,7 @@ func (r *AutoMQReconciler) syncStatus(ctx context.Context, automq *infrav1beta1.
220221
}
221222

222223
func (r *AutoMQReconciler) s3Service(ctx context.Context, obj *infrav1beta1.AutoMQ) bool {
224+
log := log.FromContext(ctx)
223225
conditionType := "SyncS3ServiceReady"
224226
sg, err := storage.NewBucket(storage.Config{
225227
Type: "s3",
@@ -229,6 +231,7 @@ func (r *AutoMQReconciler) s3Service(ctx context.Context, obj *infrav1beta1.Auto
229231
Endpoint: obj.Spec.S3.Endpoint,
230232
})
231233
if err != nil {
234+
log.Error(err, "Failed to create S3 Bucket interface for the custom resource", "name", obj.Name, "namespace", obj.Namespace)
232235
meta.SetStatusCondition(&obj.Status.Conditions, metav1.Condition{
233236
Type: conditionType,
234237
Status: metav1.ConditionFalse,
@@ -240,6 +243,7 @@ func (r *AutoMQReconciler) s3Service(ctx context.Context, obj *infrav1beta1.Auto
240243
}
241244
err = sg.MkBucket(ctx, obj.Spec.S3.Bucket)
242245
if err != nil && !strings.Contains(err.Error(), "BucketAlready") {
246+
log.Error(err, "Failed to create S3 Bucket interface for the custom resource", "name", obj.Name, "namespace", obj.Namespace)
243247
meta.SetStatusCondition(&obj.Status.Conditions, metav1.Condition{
244248
Type: conditionType,
245249
Status: metav1.ConditionFalse,
@@ -264,6 +268,7 @@ func (r *AutoMQReconciler) scriptConfigmap(ctx context.Context, obj *infrav1beta
264268
conditionType := "SyncConfigmapReady"
265269
data, err := defaults.Asset("defaults/up.sh")
266270
if err != nil {
271+
log.Error(err, "Failed to create script configmap for the custom resource", "name", obj.Name, "namespace", obj.Namespace)
267272
meta.SetStatusCondition(&obj.Status.Conditions, metav1.Condition{
268273
Type: conditionType,
269274
Status: metav1.ConditionFalse,
@@ -291,6 +296,7 @@ func (r *AutoMQReconciler) scriptConfigmap(ctx context.Context, obj *infrav1beta
291296
log.V(1).Info("create or update configmap by AutoMQ", "OperationResult", change)
292297
return nil
293298
}); err != nil {
299+
log.Error(err, "Failed to create script configmap for the custom resource", "name", obj.Name, "namespace", obj.Namespace)
294300
meta.SetStatusCondition(&obj.Status.Conditions, metav1.Condition{
295301
Type: conditionType,
296302
Status: metav1.ConditionFalse,

internal/controller/automq_controller_b.go

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -100,7 +100,7 @@ func (r *AutoMQReconciler) syncBrokerScale(ctx context.Context, obj *infrav1beta
100100

101101
func (r *AutoMQReconciler) syncBrokers(ctx context.Context, obj *infrav1beta1.AutoMQ) bool {
102102
conditionType := "SyncBrokerReady"
103-
103+
log := log.FromContext(ctx)
104104
// 1. sync pvc
105105
// 2. sync deploy
106106
// 3. sync svc
@@ -115,6 +115,7 @@ func (r *AutoMQReconciler) syncBrokers(ctx context.Context, obj *infrav1beta1.Au
115115
Reason: "BrokerPVCReconciling",
116116
Message: fmt.Sprintf("Failed to create pvc for the custom resource (%s): (%s)", obj.Name, err),
117117
})
118+
log.Error(err, "Failed to create pvc for the custom resource", "name", obj.Name, "role", brokerRole)
118119
return true
119120
}
120121
if err := r.syncBrokerService(ctx, obj, int32(i)); err != nil {
@@ -125,6 +126,7 @@ func (r *AutoMQReconciler) syncBrokers(ctx context.Context, obj *infrav1beta1.Au
125126
Reason: "BrokerServiceReconciling",
126127
Message: fmt.Sprintf("Failed to create service for the custom resource (%s): (%s)", obj.Name, err),
127128
})
129+
log.Error(err, "Failed to create service for the custom resource", "name", obj.Name, "role", brokerRole)
128130
return true
129131
}
130132
if err := r.syncBrokerDeploy(ctx, obj, int32(i)); err != nil {
@@ -135,6 +137,7 @@ func (r *AutoMQReconciler) syncBrokers(ctx context.Context, obj *infrav1beta1.Au
135137
Reason: "BrokerSTSReconciling",
136138
Message: fmt.Sprintf("Failed to create deploy for the custom resource (%s): (%s)", obj.Name, err),
137139
})
140+
log.Error(err, "Failed to create deploy for the custom resource", "name", obj.Name, "role", brokerRole)
138141
return true
139142
}
140143
}

internal/controller/automq_controller_c.go

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ package controller
1919
import (
2020
"context"
2121
"fmt"
22+
"sigs.k8s.io/controller-runtime/pkg/log"
2223
"strings"
2324

2425
"github.com/aws/aws-sdk-go-v2/aws"
@@ -90,7 +91,7 @@ func (r *AutoMQReconciler) syncControllersScale(ctx context.Context, obj *infrav
9091

9192
func (r *AutoMQReconciler) syncControllers(ctx context.Context, obj *infrav1beta1.AutoMQ) bool {
9293
conditionType := "SyncControllerReady"
93-
94+
log := log.FromContext(ctx)
9495
// 1. sync pvc
9596
// 2. sync deploy
9697
// 3. sync svc
@@ -105,6 +106,7 @@ func (r *AutoMQReconciler) syncControllers(ctx context.Context, obj *infrav1beta
105106
Reason: "ControllerPVCReconciling",
106107
Message: fmt.Sprintf("Failed to create pvc for the custom resource (%s): (%s)", obj.Name, err),
107108
})
109+
log.Error(err, "Failed to create pvc for the custom resource (%s)", obj.Name, "role", controllerRole)
108110
return true
109111
}
110112
if err := r.syncControllerDeploy(ctx, obj, int32(i)); err != nil {
@@ -115,6 +117,7 @@ func (r *AutoMQReconciler) syncControllers(ctx context.Context, obj *infrav1beta
115117
Reason: "ControllerSTSReconciling",
116118
Message: fmt.Sprintf("Failed to create deploy for the custom resource (%s): (%s)", obj.Name, err),
117119
})
120+
log.Error(err, "Failed to create deploy for the custom resource (%s)", obj.Name, "role", controllerRole)
118121
return true
119122
}
120123
if err := r.syncControllerService(ctx, obj, int32(i)); err != nil {
@@ -125,6 +128,7 @@ func (r *AutoMQReconciler) syncControllers(ctx context.Context, obj *infrav1beta
125128
Reason: "ControllerServiceReconciling",
126129
Message: fmt.Sprintf("Failed to create service for the custom resource (%s): (%s)", obj.Name, err),
127130
})
131+
log.Error(err, "Failed to create service for the custom resource (%s)", obj.Name, "role", controllerRole)
128132
return true
129133
}
130134
}

0 commit comments

Comments
 (0)