@@ -25,6 +25,7 @@ import (
2525 corev1 "k8s.io/api/core/v1"
2626 metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
2727 "k8s.io/apimachinery/pkg/runtime"
28+ "k8s.io/apimachinery/pkg/util/intstr"
2829
2930 apiutil "github.com/banzaicloud/koperator/api/util"
3031 "github.com/banzaicloud/koperator/api/v1beta1"
@@ -55,6 +56,90 @@ func (r *Reconciler) pod(id int32, brokerConfig *v1beta1.BrokerConfig, pvcs []co
5556 podname = fmt .Sprintf ("%s-controller-%d-" , r .KafkaCluster .Name , id )
5657 }
5758
59+ kafkaContainer := corev1.Container {
60+ Name : kafkaContainerName ,
61+ Image : util .GetBrokerImage (brokerConfig , r .KafkaCluster .Spec .GetClusterImage ()),
62+ Lifecycle : & corev1.Lifecycle {
63+ PreStop : & corev1.LifecycleHandler {
64+ Exec : & corev1.ExecAction {
65+ Command : []string {"bash" , "-c" , `
66+ if [[ -n "$ENVOY_SIDECAR_STATUS" ]]; then
67+ HEALTHYSTATUSCODE="200"
68+ SC=$(curl -s -o /dev/null -w "%{http_code}" http://localhost:15000/ready)
69+ if [[ "$SC" == "$HEALTHYSTATUSCODE" ]]; then
70+ kill -s TERM $(pidof java)
71+ else
72+ kill -s KILL $(pidof java)
73+ fi
74+ else
75+ kill -s TERM $(pidof java)
76+ fi` },
77+ },
78+ },
79+ },
80+ SecurityContext : brokerConfig .SecurityContext ,
81+ Env : generateEnvConfig (brokerConfig , []corev1.EnvVar {
82+ {
83+ Name : "CLASSPATH" ,
84+ Value : "/opt/kafka/libs/extensions/*" ,
85+ },
86+ {
87+ Name : "KAFKA_OPTS" ,
88+ Value : "-javaagent:/opt/jmx-exporter/jmx_prometheus.jar=9020:/etc/jmx-exporter/config.yaml" ,
89+ },
90+ {
91+ Name : "ENVOY_SIDECAR_STATUS" ,
92+ ValueFrom : & corev1.EnvVarSource {
93+ FieldRef : & corev1.ObjectFieldSelector {
94+ FieldPath : `metadata.annotations['sidecar.istio.io/status']` ,
95+ },
96+ },
97+ },
98+ }),
99+
100+ Command : command ,
101+ Ports : r .generateKafkaContainerPorts (log ),
102+ VolumeMounts : getVolumeMounts (brokerConfig .VolumeMounts , dataVolumeMount , r .KafkaCluster .Spec , r .KafkaCluster .Name ),
103+ Resources : * brokerConfig .GetResources (),
104+ }
105+
106+ if r .KafkaCluster .Spec .KRaftMode && brokerConfig .IsControllerNode () {
107+ controllerlistenerPort , err := findControllerListenerPort (r .KafkaCluster )
108+ if err != nil {
109+ log .Error (err , "failed to find controller listener port" )
110+ } else {
111+ kafkaContainer .LivenessProbe = & corev1.Probe {
112+ ProbeHandler : corev1.ProbeHandler {
113+ TCPSocket : & corev1.TCPSocketAction {
114+ Port : intstr.IntOrString {
115+ Type : intstr .Int ,
116+ IntVal : controllerlistenerPort ,
117+ },
118+ },
119+ },
120+ InitialDelaySeconds : 15 ,
121+ PeriodSeconds : 10 ,
122+ TimeoutSeconds : 5 ,
123+ FailureThreshold : 6 ,
124+ }
125+
126+ kafkaContainer .ReadinessProbe = & corev1.Probe {
127+ ProbeHandler : corev1.ProbeHandler {
128+ TCPSocket : & corev1.TCPSocketAction {
129+ Port : intstr.IntOrString {
130+ Type : intstr .Int ,
131+ IntVal : controllerlistenerPort ,
132+ },
133+ },
134+ },
135+ InitialDelaySeconds : 15 ,
136+ PeriodSeconds : 10 ,
137+ TimeoutSeconds : 5 ,
138+ FailureThreshold : 20 ,
139+ }
140+ }
141+ }
142+
58143 pod := & corev1.Pod {
59144 ObjectMeta : templates .ObjectMetaWithGeneratedNameAndAnnotations (
60145 podname ,
@@ -63,57 +148,10 @@ func (r *Reconciler) pod(id int32, brokerConfig *v1beta1.BrokerConfig, pvcs []co
63148 r .KafkaCluster ,
64149 ),
65150 Spec : corev1.PodSpec {
66- SecurityContext : brokerConfig .PodSecurityContext ,
67- InitContainers : getInitContainers (brokerConfig , r .KafkaCluster .Spec ),
68- Affinity : getAffinity (brokerConfig , r .KafkaCluster ),
69- Containers : append ([]corev1.Container {
70- {
71- Name : kafkaContainerName ,
72- Image : util .GetBrokerImage (brokerConfig , r .KafkaCluster .Spec .GetClusterImage ()),
73- Lifecycle : & corev1.Lifecycle {
74- PreStop : & corev1.LifecycleHandler {
75- Exec : & corev1.ExecAction {
76- Command : []string {"bash" , "-c" , `
77- if [[ -n "$ENVOY_SIDECAR_STATUS" ]]; then
78- HEALTHYSTATUSCODE="200"
79- SC=$(curl -s -o /dev/null -w "%{http_code}" http://localhost:15000/ready)
80- if [[ "$SC" == "$HEALTHYSTATUSCODE" ]]; then
81- kill -s TERM $(pidof java)
82- else
83- kill -s KILL $(pidof java)
84- fi
85- else
86- kill -s TERM $(pidof java)
87- fi` },
88- },
89- },
90- },
91- SecurityContext : brokerConfig .SecurityContext ,
92- Env : generateEnvConfig (brokerConfig , []corev1.EnvVar {
93- {
94- Name : "CLASSPATH" ,
95- Value : "/opt/kafka/libs/extensions/*" ,
96- },
97- {
98- Name : "KAFKA_OPTS" ,
99- Value : "-javaagent:/opt/jmx-exporter/jmx_prometheus.jar=9020:/etc/jmx-exporter/config.yaml" ,
100- },
101- {
102- Name : "ENVOY_SIDECAR_STATUS" ,
103- ValueFrom : & corev1.EnvVarSource {
104- FieldRef : & corev1.ObjectFieldSelector {
105- FieldPath : `metadata.annotations['sidecar.istio.io/status']` ,
106- },
107- },
108- },
109- }),
110-
111- Command : command ,
112- Ports : r .generateKafkaContainerPorts (log ),
113- VolumeMounts : getVolumeMounts (brokerConfig .VolumeMounts , dataVolumeMount , r .KafkaCluster .Spec , r .KafkaCluster .Name ),
114- Resources : * brokerConfig .GetResources (),
115- },
116- }, brokerConfig .Containers ... ),
151+ SecurityContext : brokerConfig .PodSecurityContext ,
152+ InitContainers : getInitContainers (brokerConfig , r .KafkaCluster .Spec ),
153+ Affinity : getAffinity (brokerConfig , r .KafkaCluster ),
154+ Containers : append ([]corev1.Container {kafkaContainer }, brokerConfig .Containers ... ),
117155 Volumes : getVolumes (brokerConfig .Volumes , dataVolume , r .KafkaCluster .Spec , r .KafkaCluster .Name , id ),
118156 RestartPolicy : corev1 .RestartPolicyNever ,
119157 TerminationGracePeriodSeconds : util .Int64Pointer (brokerConfig .GetTerminationGracePeriod ()),
@@ -605,3 +643,14 @@ func generateEnvConfig(brokerConfig *v1beta1.BrokerConfig, defaultEnvVars []core
605643
606644 return mergedEnv
607645}
646+
647+ func findControllerListenerPort (kc * v1beta1.KafkaCluster ) (int32 , error ) {
648+ for _ , listener := range kc .Spec .ListenersConfig .InternalListeners {
649+ if listener .UsedForControllerCommunication {
650+ return listener .ContainerPort , nil
651+ }
652+ }
653+
654+ // If no controller listener is found, return an error
655+ return 0 , fmt .Errorf ("no controller listener found" )
656+ }
0 commit comments