Skip to content
This repository was archived by the owner on Mar 26, 2025. It is now read-only.

Commit c470369

Browse files
authored
Prevent kafka controller from running into NodePort service deletion and re-creation cycles indefinitely (#928)
* Prevent kafka controller from running into NodePort service deletion and re-creation cycles indefinitely * Rename variable * Refactor existing implementation to improve readability * Fix unintended change * Update logs and comments to reduce confusion
1 parent 5a64fb0 commit c470369

File tree

4 files changed

+399
-81
lines changed

4 files changed

+399
-81
lines changed

pkg/resources/kafka/allBrokerService.go

Lines changed: 0 additions & 75 deletions
Original file line numberDiff line numberDiff line change
@@ -15,19 +15,8 @@
1515
package kafka
1616

1717
import (
18-
"context"
1918
"fmt"
2019

21-
"emperror.dev/errors"
22-
23-
"github.com/banzaicloud/koperator/api/v1beta1"
24-
25-
"k8s.io/apimachinery/pkg/labels"
26-
"k8s.io/apimachinery/pkg/selection"
27-
28-
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
29-
"sigs.k8s.io/controller-runtime/pkg/client"
30-
3120
corev1 "k8s.io/api/core/v1"
3221
"k8s.io/apimachinery/pkg/runtime"
3322

@@ -64,67 +53,3 @@ func (r *Reconciler) allBrokerService() runtime.Object {
6453
},
6554
}
6655
}
67-
68-
// deleteNonHeadlessServices deletes the all-broker service that was created for the current KafkaCluster
69-
// if there is any and also the service of each broker
70-
func (r *Reconciler) deleteNonHeadlessServices() error {
71-
ctx := context.Background()
72-
73-
svc := corev1.Service{
74-
TypeMeta: metav1.TypeMeta{
75-
APIVersion: corev1.SchemeGroupVersion.String(),
76-
Kind: "Service",
77-
},
78-
ObjectMeta: metav1.ObjectMeta{
79-
Namespace: r.KafkaCluster.GetNamespace(),
80-
Name: fmt.Sprintf(kafkautils.AllBrokerServiceTemplate, r.KafkaCluster.GetName()),
81-
},
82-
}
83-
84-
err := r.Client.Delete(ctx, &svc)
85-
if err != nil && client.IgnoreNotFound(err) != nil {
86-
return err
87-
}
88-
89-
// delete broker services
90-
labelSelector := labels.NewSelector()
91-
for k, v := range apiutil.LabelsForKafka(r.KafkaCluster.GetName()) {
92-
req, err := labels.NewRequirement(k, selection.Equals, []string{v})
93-
if err != nil {
94-
return err
95-
}
96-
labelSelector = labelSelector.Add(*req)
97-
}
98-
99-
// add "has label 'brokerId' to matching labels selector expression
100-
req, err := labels.NewRequirement(v1beta1.BrokerIdLabelKey, selection.Exists, nil)
101-
if err != nil {
102-
return err
103-
}
104-
labelSelector = labelSelector.Add(*req)
105-
106-
var services corev1.ServiceList
107-
err = r.Client.List(ctx, &services,
108-
client.InNamespace(r.KafkaCluster.GetNamespace()),
109-
client.MatchingLabelsSelector{Selector: labelSelector},
110-
)
111-
112-
if err != nil {
113-
return errors.WrapIfWithDetails(err, "failed to list services",
114-
"namespace", r.KafkaCluster.GetNamespace(),
115-
"label selector", labelSelector.String())
116-
}
117-
118-
for i := range services.Items {
119-
svc = services.Items[i]
120-
if !svc.GetDeletionTimestamp().IsZero() {
121-
continue
122-
}
123-
err = r.Client.Delete(ctx, &svc)
124-
if err != nil && client.IgnoreNotFound(err) != nil {
125-
return err
126-
}
127-
}
128-
129-
return nil
130-
}

pkg/resources/kafka/headlessService.go

Lines changed: 96 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,12 +18,17 @@ import (
1818
"context"
1919
"fmt"
2020

21+
"emperror.dev/errors"
2122
corev1 "k8s.io/api/core/v1"
2223
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
24+
"k8s.io/apimachinery/pkg/labels"
2325
"k8s.io/apimachinery/pkg/runtime"
26+
"k8s.io/apimachinery/pkg/selection"
2427
"k8s.io/apimachinery/pkg/util/intstr"
2528
"sigs.k8s.io/controller-runtime/pkg/client"
2629

30+
"github.com/banzaicloud/koperator/api/v1beta1"
31+
2732
apiutil "github.com/banzaicloud/koperator/api/util"
2833
"github.com/banzaicloud/koperator/pkg/resources/templates"
2934
kafkautils "github.com/banzaicloud/koperator/pkg/util/kafka"
@@ -92,3 +97,94 @@ func (r *Reconciler) deleteHeadlessService() error {
9297

9398
return err
9499
}
100+
101+
// deleteNonHeadlessServices deletes the all-broker service that was created for the current KafkaCluster
102+
// if there is any and also the service of each broker
103+
func (r *Reconciler) deleteNonHeadlessServices(ctx context.Context) error {
104+
svc := corev1.Service{
105+
TypeMeta: metav1.TypeMeta{
106+
APIVersion: corev1.SchemeGroupVersion.String(),
107+
Kind: "Service",
108+
},
109+
ObjectMeta: metav1.ObjectMeta{
110+
Namespace: r.KafkaCluster.GetNamespace(),
111+
Name: fmt.Sprintf(kafkautils.AllBrokerServiceTemplate, r.KafkaCluster.GetName()),
112+
},
113+
}
114+
115+
err := r.Client.Delete(ctx, &svc)
116+
if err != nil && client.IgnoreNotFound(err) != nil {
117+
return err
118+
}
119+
120+
// delete broker services
121+
labelSelector := labels.NewSelector()
122+
for k, v := range apiutil.LabelsForKafka(r.KafkaCluster.GetName()) {
123+
req, err := labels.NewRequirement(k, selection.Equals, []string{v})
124+
if err != nil {
125+
return err
126+
}
127+
labelSelector = labelSelector.Add(*req)
128+
}
129+
130+
// add "has label 'brokerId' to matching labels selector expression
131+
req, err := labels.NewRequirement(v1beta1.BrokerIdLabelKey, selection.Exists, nil)
132+
if err != nil {
133+
return err
134+
}
135+
labelSelector = labelSelector.Add(*req)
136+
137+
var services corev1.ServiceList
138+
err = r.Client.List(ctx, &services,
139+
client.InNamespace(r.KafkaCluster.GetNamespace()),
140+
client.MatchingLabelsSelector{Selector: labelSelector},
141+
)
142+
143+
if err != nil {
144+
return errors.WrapIfWithDetails(err, "failed to list services",
145+
"namespace", r.KafkaCluster.GetNamespace(),
146+
"label selector", labelSelector.String())
147+
}
148+
149+
// if NodePort is used for any of the external listeners, the corresponding services need to remain
150+
// so that clients from outside the Kubernetes cluster can reach the brokers
151+
filteredSvcsToDelete := services
152+
if isNodePortAccessMethodInUseAmongExternalListeners(r.KafkaCluster.Spec.ListenersConfig.ExternalListeners) {
153+
filteredSvcsToDelete = nonNodePortServices(services)
154+
}
155+
156+
for _, svc := range filteredSvcsToDelete.Items {
157+
if !svc.GetDeletionTimestamp().IsZero() {
158+
continue
159+
}
160+
err = r.Client.Delete(ctx, &svc)
161+
if err != nil && client.IgnoreNotFound(err) != nil {
162+
return err
163+
}
164+
}
165+
166+
return nil
167+
}
168+
169+
// isNodePortAccessMethodInUseAmongExternalListeners returns true when users specify any of the external listeners to use NodePort
170+
func isNodePortAccessMethodInUseAmongExternalListeners(externalListeners []v1beta1.ExternalListenerConfig) bool {
171+
for _, externalListener := range externalListeners {
172+
if externalListener.GetAccessMethod() == corev1.ServiceTypeNodePort {
173+
return true
174+
}
175+
}
176+
177+
return false
178+
}
179+
180+
func nonNodePortServices(services corev1.ServiceList) corev1.ServiceList {
181+
var nonNodePortSvc corev1.ServiceList
182+
183+
for _, svc := range services.Items {
184+
if svc.Spec.Type != corev1.ServiceTypeNodePort {
185+
nonNodePortSvc.Items = append(nonNodePortSvc.Items, svc)
186+
}
187+
}
188+
189+
return nonNodePortSvc
190+
}

0 commit comments

Comments
 (0)