Skip to content

Commit c065174

Browse files
authored
liveness/readiness probe for Kraft controllers (#108)
1 parent 73a458e commit c065174

File tree

10 files changed

+292
-58
lines changed

10 files changed

+292
-58
lines changed

api/assets/assets.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,4 +25,7 @@ var (
2525

2626
//go:embed kafka/jmx-exporter.yml
2727
KafkaJmxExporterYaml string
28+
29+
//go:embed kafka/kraft-controller-healthcheck.sh
30+
KraftControllerHealthcheckSh string
2831
)

api/assets/kafka/jmx-exporter.yml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,11 @@
11
lowercaseOutputName: true
22
rules:
33
# Special cases and very specific rules
4+
# Export kraft current state metrics
5+
- pattern: 'kafka.server<type=raft-metrics><>current-state: (.+)'
6+
name: kafka_server_raft_metrics_current_state_$1
7+
type: GAUGE
8+
value: 1
49
- pattern: 'kafka.server<type=(app-info), id=(\d+)><>(Version): ([-.~+\w\d]+)'
510
name: kafka_server_$1_$3
611
type: COUNTER
Lines changed: 51 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,51 @@
1+
#!/bin/bash
2+
# Copyright 2025 Cisco Systems, Inc. and/or its affiliates
3+
#
4+
# Licensed under the Apache License, Version 2.0 (the "License");
5+
# you may not use this file except in compliance with the License.
6+
# You may obtain a copy of the License at
7+
#
8+
# http://www.apache.org/licenses/LICENSE-2.0
9+
#
10+
# Unless required by applicable law or agreed to in writing, software
11+
# distributed under the License is distributed on an "AS IS" BASIS,
12+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
# See the License for the specific language governing permissions and
14+
# limitations under the License.
15+
16+
17+
# This script returns a successful exit code (0) if the controller is a follower or leader. For any other state, it returns a failure exit code (1).
18+
# In addition, if the environment variable KRAFT_HEALTH_CHECK_SKIP is set to "true" (case insensitive), the script will exit successfully without performing any checks.
19+
20+
skip_check=$(echo "$KRAFT_HEALTH_CHECK_SKIP" | tr '[:upper:]' '[:lower:]')
21+
22+
if [ "$skip_check" = "true" ]; then
23+
echo "KRAFT_HEALTH_CHECK_SKIP is set to TRUE. Exiting health check."
24+
exit 0
25+
fi
26+
27+
JMX_ENDPOINT="http://localhost:9020/metrics"
28+
METRIC_PREFIX="kafka_server_raft_metrics_current_state_"
29+
30+
# Fetch the matching current-state metric with value of 1.0 from the JMX endpoint
31+
MATCHING_METRIC=$(curl -s "$JMX_ENDPOINT" | grep "^${METRIC_PREFIX}" | awk '$2 == 1.0 {print $1}')
32+
33+
# If it's not empty, it means we found a metric with a value of 1.0.
34+
if [ -n "$MATCHING_METRIC" ]; then
35+
# Determine the state of the controller using the last field name of the metric
36+
# Possible values are leader, candidate, voted, follower, unattached, observer
37+
STATE=$(echo "$MATCHING_METRIC" | rev | cut -d'_' -f1 | rev)
38+
39+
# Check if the extracted state is 'leader' or 'follower'
40+
if [ "$STATE" == "leader" ] || [ "$STATE" == "follower" ]; then
41+
echo "The controller is in a healthy quorum state."
42+
exit 0
43+
else
44+
# Any other state (e.g., 'candidate', 'unattached', 'observer') is not considered healthy
45+
echo "Failure: The controller is in an unexpected state: $STATE. Expecting 'leader' or 'follower'."
46+
exit 1
47+
fi
48+
else
49+
echo "JMX Exporter endpoint is not avaiable or kafka_server_raft_metrics_current_state_ was not found."
50+
exit 0
51+
fi

controllers/tests/kafkacluster_controller_kafka_test.go

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ import (
2828
"k8s.io/apimachinery/pkg/util/intstr"
2929
"sigs.k8s.io/controller-runtime/pkg/client"
3030

31+
"github.com/banzaicloud/koperator/api/assets"
3132
"github.com/banzaicloud/koperator/api/v1beta1"
3233
"github.com/banzaicloud/koperator/pkg/resources/kafkamonitoring"
3334
"github.com/banzaicloud/koperator/pkg/util"
@@ -386,6 +387,17 @@ func expectKafkaBrokerPod(ctx context.Context, kafkaCluster *v1beta1.KafkaCluste
386387
Expect(container.Image).To(Equal("ghcr.io/banzaicloud/kafka:2.13-3.4.1"))
387388
Expect(container.Lifecycle).NotTo(BeNil())
388389
Expect(container.Lifecycle.PreStop).NotTo(BeNil())
390+
if kafkaCluster.Spec.KRaftMode && broker.BrokerConfig.IsControllerNode() {
391+
Expect(container.LivenessProbe).NotTo(BeNil())
392+
Expect(container.LivenessProbe.Exec.Command).NotTo(BeEmpty())
393+
Expect(container.LivenessProbe.Exec.Command[0]).To(Equal("/bin/bash"))
394+
Expect(container.LivenessProbe.Exec.Command[1]).To(Equal("-c"))
395+
Expect(container.LivenessProbe.Exec.Command[2]).To(Equal(assets.KraftControllerHealthcheckSh))
396+
Expect(container.ReadinessProbe).NotTo(BeNil())
397+
} else {
398+
Expect(container.LivenessProbe).To(BeNil())
399+
Expect(container.ReadinessProbe).To(BeNil())
400+
}
389401
getEnvName := func(c corev1.EnvVar) string { return c.Name }
390402

391403
// when passing a slice to ConsistOf(), the slice needs to be the only argument, which is not applicable here,

pkg/resources/kafka/configmap.go

Lines changed: 1 addition & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -442,12 +442,7 @@ func generateListenerSpecificConfig(kcs *v1beta1.KafkaClusterSpec, serverPasses
442442
b := broker.ReadOnlyConfig
443443
trimmedConfig := strings.TrimSpace(b)
444444

445-
if strings.Contains(trimmedConfig, kafkautils.KafkaConfigSecurityInterBrokerProtocol+"=") {
446-
log.Info("Security InterBrokerProtocol is set for this broker, skipping config update", "broker", broker)
447-
} else {
448-
log.Info("Security InterBrokerProtocol NOT found for broker, setting inter.broker.listener.name",
449-
"interBrokerListenerName", interBrokerListenerName, "broker", broker)
450-
445+
if !strings.Contains(trimmedConfig, kafkautils.KafkaConfigSecurityInterBrokerProtocol+"=") {
451446
if err := brokerConfig.Set(kafkautils.KafkaConfigInterBrokerListenerName, interBrokerListenerName); err != nil {
452447
log.Error(err, fmt.Sprintf("setting '%s' parameter in broker configuration resulted in an error",
453448
kafkautils.KafkaConfigInterBrokerListenerName))

pkg/resources/kafka/pod.go

Lines changed: 97 additions & 51 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,9 @@ import (
2525
corev1 "k8s.io/api/core/v1"
2626
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
2727
"k8s.io/apimachinery/pkg/runtime"
28+
"k8s.io/apimachinery/pkg/util/intstr"
2829

30+
"github.com/banzaicloud/koperator/api/assets"
2931
apiutil "github.com/banzaicloud/koperator/api/util"
3032
"github.com/banzaicloud/koperator/api/v1beta1"
3133
"github.com/banzaicloud/koperator/pkg/k8sutil"
@@ -55,6 +57,86 @@ func (r *Reconciler) pod(id int32, brokerConfig *v1beta1.BrokerConfig, pvcs []co
5557
podname = fmt.Sprintf("%s-controller-%d-", r.KafkaCluster.Name, id)
5658
}
5759

60+
kafkaContainer := corev1.Container{
61+
Name: kafkaContainerName,
62+
Image: util.GetBrokerImage(brokerConfig, r.KafkaCluster.Spec.GetClusterImage()),
63+
Lifecycle: &corev1.Lifecycle{
64+
PreStop: &corev1.LifecycleHandler{
65+
Exec: &corev1.ExecAction{
66+
Command: []string{"bash", "-c", `
67+
if [[ -n "$ENVOY_SIDECAR_STATUS" ]]; then
68+
HEALTHYSTATUSCODE="200"
69+
SC=$(curl -s -o /dev/null -w "%{http_code}" http://localhost:15000/ready)
70+
if [[ "$SC" == "$HEALTHYSTATUSCODE" ]]; then
71+
kill -s TERM $(pidof java)
72+
else
73+
kill -s KILL $(pidof java)
74+
fi
75+
else
76+
kill -s TERM $(pidof java)
77+
fi`},
78+
},
79+
},
80+
},
81+
SecurityContext: brokerConfig.SecurityContext,
82+
Env: generateEnvConfig(brokerConfig, []corev1.EnvVar{
83+
{
84+
Name: "CLASSPATH",
85+
Value: "/opt/kafka/libs/extensions/*",
86+
},
87+
{
88+
Name: "KAFKA_OPTS",
89+
Value: "-javaagent:/opt/jmx-exporter/jmx_prometheus.jar=9020:/etc/jmx-exporter/config.yaml",
90+
},
91+
{
92+
Name: "ENVOY_SIDECAR_STATUS",
93+
ValueFrom: &corev1.EnvVarSource{
94+
FieldRef: &corev1.ObjectFieldSelector{
95+
FieldPath: `metadata.annotations['sidecar.istio.io/status']`,
96+
},
97+
},
98+
},
99+
}),
100+
101+
Command: command,
102+
Ports: r.generateKafkaContainerPorts(log),
103+
VolumeMounts: getVolumeMounts(brokerConfig.VolumeMounts, dataVolumeMount, r.KafkaCluster.Spec, r.KafkaCluster.Name),
104+
Resources: *brokerConfig.GetResources(),
105+
}
106+
107+
if r.KafkaCluster.Spec.KRaftMode && brokerConfig.IsControllerNode() {
108+
controllerlistenerPort, err := findControllerListenerPort(r.KafkaCluster)
109+
if err != nil {
110+
log.Error(err, "failed to find controller listener port")
111+
} else {
112+
kafkaContainer.ReadinessProbe = &corev1.Probe{
113+
ProbeHandler: corev1.ProbeHandler{
114+
TCPSocket: &corev1.TCPSocketAction{
115+
Port: intstr.IntOrString{
116+
Type: intstr.Int,
117+
IntVal: controllerlistenerPort,
118+
},
119+
},
120+
},
121+
InitialDelaySeconds: 0,
122+
PeriodSeconds: 5,
123+
TimeoutSeconds: 5,
124+
FailureThreshold: 20,
125+
}
126+
}
127+
kafkaContainer.LivenessProbe = &corev1.Probe{
128+
ProbeHandler: corev1.ProbeHandler{
129+
Exec: &corev1.ExecAction{
130+
Command: []string{"/bin/bash", "-c", assets.KraftControllerHealthcheckSh},
131+
},
132+
},
133+
InitialDelaySeconds: 30,
134+
PeriodSeconds: 10,
135+
TimeoutSeconds: 5,
136+
FailureThreshold: 6,
137+
}
138+
}
139+
58140
pod := &corev1.Pod{
59141
ObjectMeta: templates.ObjectMetaWithGeneratedNameAndAnnotations(
60142
podname,
@@ -63,57 +145,10 @@ func (r *Reconciler) pod(id int32, brokerConfig *v1beta1.BrokerConfig, pvcs []co
63145
r.KafkaCluster,
64146
),
65147
Spec: corev1.PodSpec{
66-
SecurityContext: brokerConfig.PodSecurityContext,
67-
InitContainers: getInitContainers(brokerConfig, r.KafkaCluster.Spec),
68-
Affinity: getAffinity(brokerConfig, r.KafkaCluster),
69-
Containers: append([]corev1.Container{
70-
{
71-
Name: kafkaContainerName,
72-
Image: util.GetBrokerImage(brokerConfig, r.KafkaCluster.Spec.GetClusterImage()),
73-
Lifecycle: &corev1.Lifecycle{
74-
PreStop: &corev1.LifecycleHandler{
75-
Exec: &corev1.ExecAction{
76-
Command: []string{"bash", "-c", `
77-
if [[ -n "$ENVOY_SIDECAR_STATUS" ]]; then
78-
HEALTHYSTATUSCODE="200"
79-
SC=$(curl -s -o /dev/null -w "%{http_code}" http://localhost:15000/ready)
80-
if [[ "$SC" == "$HEALTHYSTATUSCODE" ]]; then
81-
kill -s TERM $(pidof java)
82-
else
83-
kill -s KILL $(pidof java)
84-
fi
85-
else
86-
kill -s TERM $(pidof java)
87-
fi`},
88-
},
89-
},
90-
},
91-
SecurityContext: brokerConfig.SecurityContext,
92-
Env: generateEnvConfig(brokerConfig, []corev1.EnvVar{
93-
{
94-
Name: "CLASSPATH",
95-
Value: "/opt/kafka/libs/extensions/*",
96-
},
97-
{
98-
Name: "KAFKA_OPTS",
99-
Value: "-javaagent:/opt/jmx-exporter/jmx_prometheus.jar=9020:/etc/jmx-exporter/config.yaml",
100-
},
101-
{
102-
Name: "ENVOY_SIDECAR_STATUS",
103-
ValueFrom: &corev1.EnvVarSource{
104-
FieldRef: &corev1.ObjectFieldSelector{
105-
FieldPath: `metadata.annotations['sidecar.istio.io/status']`,
106-
},
107-
},
108-
},
109-
}),
110-
111-
Command: command,
112-
Ports: r.generateKafkaContainerPorts(log),
113-
VolumeMounts: getVolumeMounts(brokerConfig.VolumeMounts, dataVolumeMount, r.KafkaCluster.Spec, r.KafkaCluster.Name),
114-
Resources: *brokerConfig.GetResources(),
115-
},
116-
}, brokerConfig.Containers...),
148+
SecurityContext: brokerConfig.PodSecurityContext,
149+
InitContainers: getInitContainers(brokerConfig, r.KafkaCluster.Spec),
150+
Affinity: getAffinity(brokerConfig, r.KafkaCluster),
151+
Containers: append([]corev1.Container{kafkaContainer}, brokerConfig.Containers...),
117152
Volumes: getVolumes(brokerConfig.Volumes, dataVolume, r.KafkaCluster.Spec, r.KafkaCluster.Name, id),
118153
RestartPolicy: corev1.RestartPolicyNever,
119154
TerminationGracePeriodSeconds: util.Int64Pointer(brokerConfig.GetTerminationGracePeriod()),
@@ -605,3 +640,14 @@ func generateEnvConfig(brokerConfig *v1beta1.BrokerConfig, defaultEnvVars []core
605640

606641
return mergedEnv
607642
}
643+
644+
func findControllerListenerPort(kc *v1beta1.KafkaCluster) (int32, error) {
645+
for _, listener := range kc.Spec.ListenersConfig.InternalListeners {
646+
if listener.UsedForControllerCommunication {
647+
return listener.ContainerPort, nil
648+
}
649+
}
650+
651+
// If no controller listener is found, return an error
652+
return 0, fmt.Errorf("no controller listener found")
653+
}

tests/e2e/const.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -68,6 +68,11 @@ const (
6868
zookeeperClusterTemplate = "templates/zookeeper_cluster.yaml.tmpl"
6969

7070
kubectlNotFoundErrorMsg = "NotFound"
71+
72+
kafkaLabelSelectorBrokers = "app=kafka,isControllerNode=false"
73+
kafkaLabelSelectorControllers = "app=kafka,isControllerNode=true"
74+
kafkaLabelSelectorAll = "app=kafka"
75+
jmxExporterPort = "9020"
7176
)
7277

7378
func apiGroupKoperatorDependencies() map[string]string {

tests/e2e/go.mod

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ require (
1212
github.com/onsi/ginkgo/v2 v2.13.1
1313
github.com/onsi/gomega v1.30.0
1414
github.com/twmb/franz-go v1.15.2
15+
k8s.io/api v0.28.4
1516
k8s.io/apiextensions-apiserver v0.28.4
1617
k8s.io/apimachinery v0.28.4
1718
sigs.k8s.io/yaml v1.4.0
@@ -118,7 +119,6 @@ require (
118119
gopkg.in/inf.v0 v0.9.1 // indirect
119120
gopkg.in/yaml.v2 v2.4.0 // indirect
120121
gopkg.in/yaml.v3 v3.0.1 // indirect
121-
k8s.io/api v0.28.4 // indirect
122122
k8s.io/client-go v0.28.4 // indirect
123123
k8s.io/klog/v2 v2.110.1 // indirect
124124
k8s.io/kube-openapi v0.0.0-20231113174909-778a5567bc1e // indirect

tests/e2e/koperator_suite_test.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -60,13 +60,16 @@ var _ = When("Testing e2e test altogether", Ordered, func() {
6060
testInstallZookeeperCluster()
6161
testInstallKafkaCluster("../../config/samples/simplekafkacluster.yaml")
6262
testProduceConsumeInternal()
63+
testJmxExporter()
6364
testUninstallKafkaCluster()
6465
testInstallKafkaCluster("../../config/samples/simplekafkacluster_ssl.yaml")
6566
testProduceConsumeInternalSSL(defaultTLSSecretName)
67+
testJmxExporter()
6668
testUninstallKafkaCluster()
6769
testUninstallZookeeperCluster()
6870
testInstallKafkaCluster("../../config/samples/kraft/simplekafkacluster_kraft.yaml")
6971
testProduceConsumeInternal()
72+
testJmxExporter()
7073
testUninstallKafkaCluster()
7174
testUninstall()
7275
snapshotClusterAndCompare(snapshottedInfo)

0 commit comments

Comments
 (0)