Skip to content
This repository was archived by the owner on Mar 26, 2025. It is now read-only.

Commit 39172c4

Browse files
committed
Fix topic creation in case of TLS
1 parent ce29e7d commit 39172c4

File tree

2 files changed

+44
-4
lines changed

2 files changed

+44
-4
lines changed

pkg/resources/cruisecontrol/cruisecontrol.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -64,7 +64,7 @@ func (r *Reconciler) Reconcile(log logr.Logger) error {
6464
if r.KafkaCluster.Spec.CruiseControlConfig.CruiseControlEndpoint == "" {
6565

6666
if r.KafkaCluster.Status.CruiseControlTopicStatus == "" || r.KafkaCluster.Status.CruiseControlTopicStatus == banzaicloudv1alpha1.CruiseControlTopicNotReady {
67-
err := generateCCTopic(r.KafkaCluster)
67+
err := generateCCTopic(r.KafkaCluster, r.Client)
6868
if err != nil {
6969
k8sutil.UpdateCCTopicStatus(r.Client, r.KafkaCluster, banzaicloudv1alpha1.CruiseControlTopicNotReady, log)
7070
return err

pkg/resources/cruisecontrol/topicManager.go

Lines changed: 43 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -15,21 +15,61 @@
1515
package cruisecontrol
1616

1717
import (
18+
"context"
19+
"crypto/tls"
20+
"crypto/x509"
1821
"fmt"
22+
"time"
1923

2024
banzaicloudv1alpha1 "github.com/banzaicloud/kafka-operator/pkg/apis/banzaicloud/v1alpha1"
2125
"github.com/banzaicloud/kafka-operator/pkg/resources/kafka"
2226
"github.com/goph/emperror"
2327
kafkaGo "github.com/segmentio/kafka-go"
28+
corev1 "k8s.io/api/core/v1"
29+
"k8s.io/apimachinery/pkg/types"
30+
"sigs.k8s.io/controller-runtime/pkg/client"
2431
)
2532

26-
func generateCCTopic(cluster *banzaicloudv1alpha1.KafkaCluster) error {
33+
func generateCCTopic(cluster *banzaicloudv1alpha1.KafkaCluster, client client.Client) error {
2734

28-
conn, err := kafkaGo.Dial("tcp",
35+
dialer := &kafkaGo.Dialer{}
36+
37+
if cluster.Spec.ListenersConfig.SSLSecrets != nil {
38+
tlsKeys := &corev1.Secret{}
39+
err := client.Get(context.TODO(), types.NamespacedName{Namespace: cluster.Namespace, Name: cluster.Spec.ListenersConfig.SSLSecrets.TLSSecretName}, tlsKeys)
40+
if err != nil {
41+
return emperror.Wrap(err, "could not get TLS secret to create CC topic")
42+
}
43+
serverCert := tlsKeys.Data["serverCert"]
44+
serverKey := tlsKeys.Data["serverKey"]
45+
caCert := tlsKeys.Data["caCert"]
46+
x509ServerCert, err := tls.X509KeyPair(serverCert, serverKey)
47+
if err != nil {
48+
return err
49+
}
50+
rootCAs := x509.NewCertPool()
51+
rootCAs.AppendCertsFromPEM(caCert)
52+
53+
dialer = &kafkaGo.Dialer{
54+
Timeout: 10 * time.Second,
55+
DualStack: true,
56+
TLS: &tls.Config{
57+
Certificates: []tls.Certificate{x509ServerCert},
58+
RootCAs: rootCAs,
59+
},
60+
}
61+
} else {
62+
dialer = &kafkaGo.Dialer{
63+
Timeout: 10 * time.Second,
64+
DualStack: true,
65+
}
66+
}
67+
conn, err := dialer.Dial("tcp",
2968
generateKafkaAddress(cluster))
3069
if err != nil {
3170
return emperror.Wrap(err, "could not create topic for CC because kafka is unavailable")
3271
}
72+
defer conn.Close()
3373
tConfig := kafkaGo.TopicConfig{
3474
Topic: "__CruiseControlMetrics",
3575
NumPartitions: 12,
@@ -45,7 +85,7 @@ func generateCCTopic(cluster *banzaicloudv1alpha1.KafkaCluster) error {
4585

4686
func generateKafkaAddress(cluster *banzaicloudv1alpha1.KafkaCluster) string {
4787
if cluster.Spec.HeadlessServiceEnabled {
48-
return fmt.Sprintf("%s.%s.svc.cluster.local:%d", fmt.Sprintf(kafka.HeadlessServiceTemplate, cluster.Name), cluster.Namespace, cluster.Spec.ListenersConfig.InternalListeners[0].ContainerPort)
88+
return fmt.Sprintf("%s:%d", fmt.Sprintf(kafka.HeadlessServiceTemplate, cluster.Name), cluster.Spec.ListenersConfig.InternalListeners[0].ContainerPort)
4989
}
5090
return fmt.Sprintf("%s.%s.svc.cluster.local:%d", fmt.Sprintf(kafka.AllBrokerServiceTemplate, cluster.Name), cluster.Namespace, cluster.Spec.ListenersConfig.InternalListeners[0].ContainerPort)
5191
}

0 commit comments

Comments
 (0)