Skip to content

Commit c45c06b

Browse files
author
Agent
committed
Add Kafka integration tests and deploy-kafka EaaS task
Deploys a single-node Apache Kafka 3.9.2 broker (KRaft mode) in the EaaS ephemeral namespace and adds Kafka message assertions to the existing workflow integration tests. Pipeline changes (.tekton/integration-test-eaas.yaml): - Add deploy-kafka task that starts apache/kafka:3.9.2 in KRaft mode. Three emptyDir volumes (kafka-config, kafka-logs, kafka-gc-logs) and an initContainer (copy-kafka-config) satisfy OpenShift restricted-v2 SCC. KAFKA_CONTROLLER_QUORUM_VOTERS uses localhost:9093 to avoid the bootstrap deadlock. deploy-cts lists deploy-kafka in runAfter. - Pass KAFKA_URL=kafka:9092 and install kafka-python in run-tests. Test changes (tests/test_integration_api.py): - kafka_url module fixture reads KAFKA_URL; returns None (not skip) when unset so all existing workflow tests always run. - kafka_message_on(kafka_url, topic) context manager snapshots the partition-0 end offset before its body, then consumes the first message at or after that offset after the body completes. No predicates: every received message is returned as-is and asserted by the caller, so unexpected messages cause test failures. - kafka_messages_on(kafka_url, topic, count) batch variant for scenarios that perform N actions on the same topic. - import_compose, tag_compose, untag_compose each accept an optional kafka_url parameter; when set, wraps the HTTP call in the context manager and asserts the event name and compose_info reference. - test_workflow_compose_import, test_workflow_respin_increment, test_workflow_full_lifecycle all pass kafka_url to the helpers so Kafka assertions run as part of the existing scenarios (not as a separate test). The respin test uses kafka_messages_on to capture all three compose-created messages in one batch. Bug fix (cts/messaging.py): - Convert the string "none" to Python None for compression_type so KafkaProducer does not reject the value with "Not supported codec". Generated-By: OpenCode (google-vertex-anthropic/claude-sonnet-4-6@default)
1 parent 37bad18 commit c45c06b

4 files changed

Lines changed: 481 additions & 42 deletions

File tree

.tekton/integration-test-eaas.yaml

Lines changed: 151 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -457,6 +457,145 @@ spec:
457457
- name: kubeconfig-secret
458458
value: $(tasks.provision-environment.results.secretRef)
459459

460+
- name: deploy-kafka
461+
runAfter:
462+
- provision-environment
463+
taskSpec:
464+
params:
465+
- name: kubeconfig-secret
466+
type: string
467+
steps:
468+
- name: create-kafka
469+
image: quay.io/konflux-ci/appstudio-utils:latest
470+
script: |
471+
#!/usr/bin/env bash
472+
set -euo pipefail
473+
474+
KUBECONFIG=/tmp/kubeconfig
475+
kubectl get secret $(params.kubeconfig-secret) -o jsonpath='{.data.kubeconfig}' | base64 -d > $KUBECONFIG
476+
export KUBECONFIG
477+
478+
echo "=========================================="
479+
echo "Deploying Kafka (Apache KRaft single-node)"
480+
echo "=========================================="
481+
482+
kubectl apply -f - <<'EOFYAML'
483+
apiVersion: apps/v1
484+
kind: Deployment
485+
metadata:
486+
name: kafka
487+
labels:
488+
app: kafka
489+
spec:
490+
replicas: 1
491+
selector:
492+
matchLabels:
493+
app: kafka
494+
template:
495+
metadata:
496+
labels:
497+
app: kafka
498+
spec:
499+
initContainers:
500+
- name: copy-kafka-config
501+
image: docker.io/apache/kafka:3.9.2
502+
command: ["/bin/sh", "-c", "cp -r /opt/kafka/config/. /mnt/kafka-config/"]
503+
volumeMounts:
504+
- name: kafka-config
505+
mountPath: /mnt/kafka-config
506+
containers:
507+
- name: kafka
508+
image: docker.io/apache/kafka:3.9.2
509+
ports:
510+
- containerPort: 9092
511+
name: client
512+
- containerPort: 9093
513+
name: controller
514+
env:
515+
- name: KAFKA_NODE_ID
516+
value: "1"
517+
- name: KAFKA_PROCESS_ROLES
518+
value: "broker,controller"
519+
- name: KAFKA_CONTROLLER_QUORUM_VOTERS
520+
value: "1@localhost:9093"
521+
- name: KAFKA_LISTENERS
522+
value: "PLAINTEXT://:9092,CONTROLLER://:9093"
523+
- name: KAFKA_ADVERTISED_LISTENERS
524+
value: "PLAINTEXT://kafka:9092"
525+
- name: KAFKA_LISTENER_SECURITY_PROTOCOL_MAP
526+
value: "CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT"
527+
- name: KAFKA_CONTROLLER_LISTENER_NAMES
528+
value: "CONTROLLER"
529+
- name: KAFKA_AUTO_CREATE_TOPICS_ENABLE
530+
value: "true"
531+
- name: KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR
532+
value: "1"
533+
- name: KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR
534+
value: "1"
535+
- name: KAFKA_TRANSACTION_STATE_LOG_MIN_ISR
536+
value: "1"
537+
- name: KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS
538+
value: "0"
539+
resources:
540+
requests:
541+
memory: "256Mi"
542+
cpu: "100m"
543+
limits:
544+
memory: "512Mi"
545+
cpu: "500m"
546+
readinessProbe:
547+
tcpSocket:
548+
port: 9092
549+
initialDelaySeconds: 20
550+
periodSeconds: 5
551+
timeoutSeconds: 3
552+
failureThreshold: 24
553+
volumeMounts:
554+
- name: kafka-config
555+
mountPath: /opt/kafka/config
556+
- name: kafka-logs
557+
mountPath: /tmp/kafka-logs
558+
- name: kafka-gc-logs
559+
mountPath: /opt/kafka/logs
560+
volumes:
561+
- name: kafka-config
562+
emptyDir: {}
563+
- name: kafka-logs
564+
emptyDir: {}
565+
- name: kafka-gc-logs
566+
emptyDir: {}
567+
---
568+
apiVersion: v1
569+
kind: Service
570+
metadata:
571+
name: kafka
572+
labels:
573+
app: kafka
574+
spec:
575+
ports:
576+
- port: 9092
577+
targetPort: 9092
578+
name: client
579+
- port: 9093
580+
targetPort: 9093
581+
name: controller
582+
selector:
583+
app: kafka
584+
EOFYAML
585+
586+
echo "Waiting for Kafka to be ready..."
587+
if ! kubectl wait --for=condition=available --timeout=300s deployment/kafka; then
588+
echo "Kafka deployment failed! Debug info:"
589+
kubectl describe deployment kafka
590+
kubectl describe pod -l app=kafka
591+
kubectl logs -l app=kafka --tail=50 || echo "No logs available"
592+
exit 1
593+
fi
594+
echo "✓ Kafka is ready"
595+
params:
596+
- name: kubeconfig-secret
597+
value: $(tasks.provision-environment.results.secretRef)
598+
460599
- name: deploy-database
461600
runAfter:
462601
- provision-environment
@@ -597,6 +736,7 @@ spec:
597736
- deploy-database
598737
- deploy-openldap
599738
- deploy-dex
739+
- deploy-kafka
600740
taskSpec:
601741
params:
602742
- name: kubeconfig-secret
@@ -643,6 +783,14 @@ spec:
643783
]
644784
ADMINS = {"groups": [], "users": ["builder@example.com"]}
645785
ALLOWED_BUILDERS = {"groups": [], "users": ["builder@example.com"]}
786+
MESSAGING_BACKEND = "kafka"
787+
MESSAGING_BROKER_URLS = ["kafka:9092"]
788+
MESSAGING_KAFKA_SECURITY_PROTOCOL = "PLAINTEXT"
789+
MESSAGING_KAFKA_SASL_MECHANISM = ""
790+
MESSAGING_KAFKA_USERNAME = ""
791+
MESSAGING_KAFKA_PASSWORD = ""
792+
MESSAGING_KAFKA_COMPRESSION_TYPE = "none"
793+
MESSAGING_TOPIC_PREFIX = "cts."
646794
httpd.conf: |
647795
ServerRoot "/etc/httpd"
648796
PidFile /tmp/httpd.pid
@@ -916,9 +1064,9 @@ spec:
9161064
echo 'Installing Dex CA certificate...'
9171065
echo '$DEX_CA_B64' | base64 -d > /tmp/dex-ca.crt
9181066
919-
echo 'Installing pytest and requests...'
1067+
echo 'Installing pytest, requests, and kafka-python...'
9201068
python3 -m ensurepip
921-
python3 -m pip install --target /tmp/test-deps --quiet pytest requests
1069+
python3 -m pip install --target /tmp/test-deps --quiet pytest requests kafka-python
9221070
9231071
echo ''
9241072
echo 'Cloning repository...'
@@ -930,7 +1078,7 @@ spec:
9301078
9311079
echo ''
9321080
echo 'Running pytest...'
933-
PYTHONPATH=/tmp/test-deps REQUESTS_CA_BUNDLE=/tmp/dex-ca.crt CTS_URL=http://cts:8080 AUTH_BACKEND=oidc_or_kerberos DEX_URL=https://dex:5556 python3 -m pytest tests/test_integration_api.py -v -s -o addopts=
1081+
PYTHONPATH=/tmp/test-deps REQUESTS_CA_BUNDLE=/tmp/dex-ca.crt CTS_URL=http://cts:8080 AUTH_BACKEND=oidc_or_kerberos DEX_URL=https://dex:5556 KAFKA_URL=kafka:9092 python3 -m pytest tests/test_integration_api.py -v -s -o addopts=
9341082
"
9351083
TEST_RESULT=$?
9361084
set -e

cts/messaging.py

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -102,9 +102,15 @@ def _kafka_send_msg(msgs):
102102

103103
def _send():
104104
"""Inner function to send messages (will be retried on failure)"""
105+
compression = conf.messaging_kafka_compression_type
106+
# kafka-python uses Python None to mean "no compression"; the string
107+
# "none" (which may come from a config file) is not accepted.
108+
if compression and compression.lower() == "none":
109+
compression = None
110+
105111
config = {
106112
"bootstrap_servers": conf.messaging_broker_urls,
107-
"compression_type": conf.messaging_kafka_compression_type,
113+
"compression_type": compression,
108114
"security_protocol": conf.messaging_kafka_security_protocol,
109115
"sasl_mechanism": conf.messaging_kafka_sasl_mechanism,
110116
"sasl_plain_username": conf.messaging_kafka_username,

test-requirements.txt

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,3 +9,5 @@ tox
99
# Let's update this package to avoid this problem.
1010
itsdangerous>=1.1.0
1111
freezegun
12+
# Required for Kafka integration tests (test_integration_api.py)
13+
kafka-python

0 commit comments

Comments
 (0)