Skip to content

Commit 191a010

Browse files
reljadevMilan Reljin
andauthored
fix(kafka): topic provisioning for external Kafka cluster (#2070)
* fix(kafka): topic provisioning for external Kafka cluster * fix: replace bitnamilegacy/kafka with apache/kafka in provisioning job * fix: move helm hook annotations to job metadata, remove ttlSecondsAfterFinished --------- Co-authored-by: Milan Reljin <milanreljin@rivianvw.tech>
1 parent 845504d commit 191a010

File tree

2 files changed

+207
-1
lines changed

2 files changed

+207
-1
lines changed
Lines changed: 193 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,193 @@
1+
{{- if and (not .Values.kafka.enabled) .Values.externalKafka.provisioning.enabled }}
2+
{{- $provisioning := .Values.externalKafka.provisioning }}
3+
{{- $sasl := .Values.externalKafka.sasl | default dict }}
4+
{{- $saslEnabled := and $sasl.mechanism (ne $sasl.mechanism "None") }}
5+
{{- $saslFromSecret := $sasl.existingSecret }}
6+
{{- $securityProtocol := .Values.externalKafka.security.protocol | default "PLAINTEXT" | upper }}
7+
8+
{{/* Build bootstrap servers from cluster or host/port */}}
9+
{{- $brokers := list }}
10+
{{- if .Values.externalKafka.cluster }}
11+
{{- range .Values.externalKafka.cluster }}
12+
{{- $brokers = append $brokers (dict "host" .host "port" (int .port)) }}
13+
{{- end }}
14+
{{- else }}
15+
{{- $brokers = append $brokers (dict "host" .Values.externalKafka.host "port" (int .Values.externalKafka.port)) }}
16+
{{- end }}
17+
18+
{{- $bootstrapServers := list }}
19+
{{- range $brokers }}
20+
{{- $bootstrapServers = append $bootstrapServers (printf "%s:%d" .host .port) }}
21+
{{- end }}
22+
{{- $bootstrapServersString := join "," $bootstrapServers }}
23+
24+
{{/* Get topic list from externalKafka.provisioning.topics or fall back to kafka.provisioning.topics */}}
25+
{{- $topics := $provisioning.topics | default .Values.kafka.provisioning.topics }}
26+
{{- $replicationFactor := int ($provisioning.replicationFactor | default 1) }}
27+
{{- $defaultPartitions := int ($provisioning.numPartitions | default 1) }}
28+
29+
{{- $image := printf "%s:%s" ($provisioning.image.repository | default "apache/kafka") ($provisioning.image.tag | default "3.7.1") }}
30+
31+
apiVersion: batch/v1
32+
kind: Job
33+
metadata:
34+
name: {{ .Release.Name }}-kafka-provisioning
35+
labels:
36+
app.kubernetes.io/instance: {{ .Release.Name }}
37+
app.kubernetes.io/managed-by: {{ .Release.Service }}
38+
app.kubernetes.io/name: kafka
39+
app.kubernetes.io/component: kafka-provisioning
40+
annotations:
41+
"helm.sh/hook": "pre-install,pre-upgrade"
42+
"helm.sh/hook-delete-policy": "before-hook-creation"
43+
"helm.sh/hook-weight": "0"
44+
spec:
45+
{{- if .Values.hooks.activeDeadlineSeconds }}
46+
activeDeadlineSeconds: {{ .Values.hooks.activeDeadlineSeconds }}
47+
{{- end}}
48+
backoffLimit: 10
49+
template:
50+
metadata:
51+
labels:
52+
app.kubernetes.io/instance: {{ .Release.Name }}
53+
app.kubernetes.io/managed-by: {{ .Release.Service }}
54+
app.kubernetes.io/name: kafka
55+
app.kubernetes.io/component: kafka-provisioning
56+
spec:
57+
restartPolicy: OnFailure
58+
terminationGracePeriodSeconds: 0
59+
securityContext:
60+
fsGroup: 1000
61+
seccompProfile:
62+
type: RuntimeDefault
63+
{{- if $provisioning.nodeSelector }}
64+
nodeSelector:
65+
{{ toYaml $provisioning.nodeSelector | indent 8 }}
66+
{{- end }}
67+
{{- if $provisioning.tolerations }}
68+
tolerations:
69+
{{ toYaml $provisioning.tolerations | indent 8 }}
70+
{{- end }}
71+
initContainers:
72+
- name: wait-for-kafka
73+
image: {{ $image }}
74+
imagePullPolicy: IfNotPresent
75+
securityContext:
76+
allowPrivilegeEscalation: false
77+
capabilities:
78+
drop:
79+
- ALL
80+
readOnlyRootFilesystem: true
81+
runAsGroup: 1000
82+
runAsNonRoot: true
83+
runAsUser: 1000
84+
command:
85+
- /bin/bash
86+
args:
87+
- -ec
88+
- |
89+
{{- range $brokers }}
90+
timeout=120; until nc -z {{ .host }} {{ .port }} 2>/dev/null; do
91+
timeout=$((timeout - 2)); [ $timeout -le 0 ] && echo "Timed out waiting for {{ .host }}:{{ .port }}" && exit 1;
92+
sleep 2;
93+
done;
94+
{{- end }}
95+
echo "Kafka is available";
96+
{{- if $provisioning.resources }}
97+
resources:
98+
{{ toYaml $provisioning.resources | indent 12 }}
99+
{{- end }}
100+
containers:
101+
- name: kafka-provisioning
102+
image: {{ $image }}
103+
imagePullPolicy: IfNotPresent
104+
securityContext:
105+
allowPrivilegeEscalation: false
106+
capabilities:
107+
drop:
108+
- ALL
109+
readOnlyRootFilesystem: true
110+
runAsGroup: 1000
111+
runAsNonRoot: true
112+
runAsUser: 1000
113+
{{- if $saslFromSecret }}
114+
env:
115+
- name: KAFKA_SASL_MECHANISM
116+
valueFrom:
117+
secretKeyRef:
118+
name: {{ $sasl.existingSecret }}
119+
key: {{ ($sasl.existingSecretKeys).mechanism | default "mechanism" }}
120+
- name: KAFKA_SASL_USERNAME
121+
valueFrom:
122+
secretKeyRef:
123+
name: {{ $sasl.existingSecret }}
124+
key: {{ ($sasl.existingSecretKeys).username | default "username" }}
125+
- name: KAFKA_SASL_PASSWORD
126+
valueFrom:
127+
secretKeyRef:
128+
name: {{ $sasl.existingSecret }}
129+
key: {{ ($sasl.existingSecretKeys).password | default "password" }}
130+
{{- end }}
131+
command:
132+
- /bin/bash
133+
args:
134+
- -ec
135+
- |
136+
export CLIENT_CONF="${CLIENT_CONF:-/tmp/client.properties}"
137+
if [ ! -f "$CLIENT_CONF" ]; then
138+
echo "security.protocol={{ $securityProtocol }}" > "$CLIENT_CONF"
139+
{{- if $saslFromSecret }}
140+
echo "sasl.mechanism=${KAFKA_SASL_MECHANISM}" >> "$CLIENT_CONF"
141+
case "${KAFKA_SASL_MECHANISM}" in
142+
PLAIN)
143+
LOGIN_MODULE="org.apache.kafka.common.security.plain.PlainLoginModule"
144+
;;
145+
*)
146+
LOGIN_MODULE="org.apache.kafka.common.security.scram.ScramLoginModule"
147+
;;
148+
esac
149+
echo "sasl.jaas.config=${LOGIN_MODULE} required username=\"${KAFKA_SASL_USERNAME}\" password=\"${KAFKA_SASL_PASSWORD}\";" >> "$CLIENT_CONF"
150+
{{- else if $saslEnabled }}
151+
echo "sasl.mechanism={{ $sasl.mechanism }}" >> "$CLIENT_CONF"
152+
{{- if and $sasl.username $sasl.password }}
153+
{{- if eq $sasl.mechanism "PLAIN" }}
154+
echo "sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username=\"{{ $sasl.username }}\" password=\"{{ $sasl.password }}\";" >> "$CLIENT_CONF"
155+
{{- else }}
156+
echo "sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required username=\"{{ $sasl.username }}\" password=\"{{ $sasl.password }}\";" >> "$CLIENT_CONF"
157+
{{- end }}
158+
{{- end }}
159+
{{- end }}
160+
fi
161+
162+
kafka_provisioning_commands=(
163+
{{- range $topics }}
164+
"/opt/kafka/bin/kafka-topics.sh \
165+
--create \
166+
--if-not-exists \
167+
--bootstrap-server {{ $bootstrapServersString }} \
168+
--replication-factor {{ $replicationFactor }} \
169+
--partitions {{ .partitions | default $defaultPartitions }} \
170+
{{- range $key, $value := .config }}
171+
--config {{ $key }}={{ $value }} \
172+
{{- end }}
173+
--command-config ${CLIENT_CONF} \
174+
--topic {{ .name }}"
175+
{{- end }}
176+
)
177+
178+
echo "Starting provisioning of ${#kafka_provisioning_commands[@]} topics"
179+
for ((i=0; i < ${#kafka_provisioning_commands[@]}; i++)); do
180+
${kafka_provisioning_commands[i]}
181+
done
182+
echo "Provisioning succeeded"
183+
{{- if $provisioning.resources }}
184+
resources:
185+
{{ toYaml $provisioning.resources | indent 12 }}
186+
{{- end }}
187+
volumeMounts:
188+
- name: tmp
189+
mountPath: /tmp
190+
volumes:
191+
- name: tmp
192+
emptyDir: {}
193+
{{- end }}

charts/sentry/values.yaml

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2998,7 +2998,7 @@ externalKafka:
29982998
username: None
29992999
password: None
30003000
# (Optional) Use existing secret for SASL credentials instead of inline values
3001-
# existingSecret: ""
3001+
# existingSecret: "my-kafka-secret"
30023002
# existingSecretKeys:
30033003
# mechanism: "mechanism"
30043004
# username: "username"
@@ -3008,6 +3008,19 @@ externalKafka:
30083008
socket:
30093009
timeout:
30103010
ms: 1000
3011+
## Kafka topic provisioning for external Kafka
3012+
## When enabled, creates a Job that provisions Kafka topics
3013+
provisioning:
3014+
enabled: false
3015+
image:
3016+
repository: apache/kafka
3017+
tag: "3.7.1"
3018+
# topics: [] # If empty, uses kafka.provisioning.topics
3019+
replicationFactor: 3
3020+
numPartitions: 1
3021+
resources: {}
3022+
nodeSelector: {}
3023+
tolerations: []
30113024

30123025
sourcemaps:
30133026
enabled: false

0 commit comments

Comments
 (0)