Skip to content
This repository was archived by the owner on Mar 26, 2025. It is now read-only.

Commit ce29e7d

Browse files
committed
Fix remaining bugs on service mesh support
1 parent c73e5a9 commit ce29e7d

File tree

5 files changed

+14
-12
lines changed

5 files changed

+14
-12
lines changed

pkg/apis/banzaicloud/v1alpha1/kafkacluster_types.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -42,8 +42,8 @@ type KafkaClusterSpec struct {
4242

4343
// KafkaClusterStatus defines the observed state of KafkaCluster
4444
type KafkaClusterStatus struct {
45-
BrokersState map[int32]*BrokerState `json:"brokersState,omitempty"`
46-
CruiseControlTopicStatus *CruiseControlTopicStatus `json:"cruiseControlTopicStatus,omitempty"`
45+
BrokersState map[int32]*BrokerState `json:"brokersState,omitempty"`
46+
CruiseControlTopicStatus CruiseControlTopicStatus `json:"cruiseControlTopicStatus,omitempty"`
4747
}
4848

4949
// BrokerConfig defines the broker configuration

pkg/apis/banzaicloud/v1alpha1/zz_generated.deepcopy.go

Lines changed: 0 additions & 5 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

pkg/k8sutil/status.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -132,7 +132,7 @@ func updateGracefulScaleStatus(c client.Client, brokerId int32, cluster *banzaic
132132
func UpdateCCTopicStatus(c client.Client, cluster *banzaicloudv1alpha1.KafkaCluster, ccTopicStatus banzaicloudv1alpha1.CruiseControlTopicStatus, logger logr.Logger) error {
133133
typeMeta := cluster.TypeMeta
134134

135-
cluster.Status.CruiseControlTopicStatus = &ccTopicStatus
135+
cluster.Status.CruiseControlTopicStatus = ccTopicStatus
136136

137137
err := c.Status().Update(context.Background(), cluster)
138138
if errors.IsNotFound(err) {
@@ -150,7 +150,7 @@ func UpdateCCTopicStatus(c client.Client, cluster *banzaicloudv1alpha1.KafkaClus
150150
return emperror.Wrap(err, "could not get config for updating status")
151151
}
152152

153-
cluster.Status.CruiseControlTopicStatus = &ccTopicStatus
153+
cluster.Status.CruiseControlTopicStatus = ccTopicStatus
154154

155155
err = c.Status().Update(context.Background(), cluster)
156156
if errors.IsNotFound(err) {

pkg/resources/cruisecontrol/cruisecontrol.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -63,7 +63,7 @@ func (r *Reconciler) Reconcile(log logr.Logger) error {
6363

6464
if r.KafkaCluster.Spec.CruiseControlConfig.CruiseControlEndpoint == "" {
6565

66-
if *r.KafkaCluster.Status.CruiseControlTopicStatus != banzaicloudv1alpha1.CruiseControlTopicReady {
66+
if r.KafkaCluster.Status.CruiseControlTopicStatus == "" || r.KafkaCluster.Status.CruiseControlTopicStatus == banzaicloudv1alpha1.CruiseControlTopicNotReady {
6767
err := generateCCTopic(r.KafkaCluster)
6868
if err != nil {
6969
k8sutil.UpdateCCTopicStatus(r.Client, r.KafkaCluster, banzaicloudv1alpha1.CruiseControlTopicNotReady, log)
@@ -75,7 +75,7 @@ func (r *Reconciler) Reconcile(log logr.Logger) error {
7575
}
7676
}
7777

78-
if *r.KafkaCluster.Status.CruiseControlTopicStatus == banzaicloudv1alpha1.CruiseControlTopicReady {
78+
if r.KafkaCluster.Status.CruiseControlTopicStatus == banzaicloudv1alpha1.CruiseControlTopicReady {
7979
for _, res := range []resources.ResourceWithLogs{
8080
r.service,
8181
r.configMap,

pkg/resources/cruisecontrol/topicManager.go

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@ import (
2626
func generateCCTopic(cluster *banzaicloudv1alpha1.KafkaCluster) error {
2727

2828
conn, err := kafkaGo.Dial("tcp",
29-
fmt.Sprintf("%s.%s.svc.cluster.local:%d", fmt.Sprintf(kafka.AllBrokerServiceTemplate, cluster.Name), cluster.Namespace, cluster.Spec.ListenersConfig.InternalListeners[0].ContainerPort))
29+
generateKafkaAddress(cluster))
3030
if err != nil {
3131
return emperror.Wrap(err, "could not create topic for CC because kafka is unavailable")
3232
}
@@ -42,3 +42,10 @@ func generateCCTopic(cluster *banzaicloudv1alpha1.KafkaCluster) error {
4242
}
4343
return nil
4444
}
45+
46+
func generateKafkaAddress(cluster *banzaicloudv1alpha1.KafkaCluster) string {
47+
if cluster.Spec.HeadlessServiceEnabled {
48+
return fmt.Sprintf("%s.%s.svc.cluster.local:%d", fmt.Sprintf(kafka.HeadlessServiceTemplate, cluster.Name), cluster.Namespace, cluster.Spec.ListenersConfig.InternalListeners[0].ContainerPort)
49+
}
50+
return fmt.Sprintf("%s.%s.svc.cluster.local:%d", fmt.Sprintf(kafka.AllBrokerServiceTemplate, cluster.Name), cluster.Namespace, cluster.Spec.ListenersConfig.InternalListeners[0].ContainerPort)
51+
}

0 commit comments

Comments
 (0)