Skip to content

Commit bc4952d

Browse files
author
Agent
committed
Add Kafka integration tests and deploy-kafka EaaS task
- Add three integration tests (compose-created, compose-tagged, compose-untagged) that consume from Kafka to verify messages are published after each operation. - _get_kafka_end_offset now catches UnknownTopicOrPartitionError and returns 0 when the topic does not yet exist, avoiding a test failure on the very first publish to a fresh broker. - Fix KafkaProducer receiving the string 'none' as compression_type: messaging.py now converts 'none' -> None so kafka-python accepts it. - Deploy a single-broker Apache Kafka 3.9.2 KRaft instance in the EaaS pipeline; use emptyDir volumes so the container runs under OpenShift's restricted-v2 SCC (arbitrary non-root UID). - Configure CTS in the EaaS environment to use the Kafka backend with PLAINTEXT security and no compression. - Enable CI triggers (GitHub Actions + Konflux PR pipeline) for the feature/integration-tests branch. Generated-By: OpenCode (google-vertex-anthropic/claude-sonnet-4-6@default)
1 parent d999a50 commit bc4952d

6 files changed

Lines changed: 348 additions & 7 deletions

File tree

.github/workflows/gating.yaml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,9 +2,9 @@ name: Gating
22

33
on:
44
push:
5-
branches: [ "main" ]
5+
branches: [ "main", "feature/integration-tests" ]
66
pull_request:
7-
branches: [ "main" ]
7+
branches: [ "main", "feature/integration-tests" ]
88

99
jobs:
1010
tests:

.tekton/cts-pull-request.yaml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@ metadata:
77
build.appstudio.redhat.com/pull_request_number: '{{pull_request_number}}'
88
build.appstudio.redhat.com/target_branch: '{{target_branch}}'
99
pipelinesascode.tekton.dev/max-keep-runs: "3"
10-
pipelinesascode.tekton.dev/on-cel-expression: event == "pull_request" && target_branch == "main"
10+
pipelinesascode.tekton.dev/on-cel-expression: event == "pull_request" && target_branch in ["main", "feature/integration-tests"]
1111
creationTimestamp:
1212
labels:
1313
appstudio.openshift.io/application: cts

.tekton/integration-test-eaas.yaml

Lines changed: 151 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -457,6 +457,145 @@ spec:
457457
- name: kubeconfig-secret
458458
value: $(tasks.provision-environment.results.secretRef)
459459

460+
- name: deploy-kafka
461+
runAfter:
462+
- provision-environment
463+
taskSpec:
464+
params:
465+
- name: kubeconfig-secret
466+
type: string
467+
steps:
468+
- name: create-kafka
469+
image: quay.io/konflux-ci/appstudio-utils:latest
470+
script: |
471+
#!/usr/bin/env bash
472+
set -euo pipefail
473+
474+
KUBECONFIG=/tmp/kubeconfig
475+
kubectl get secret $(params.kubeconfig-secret) -o jsonpath='{.data.kubeconfig}' | base64 -d > $KUBECONFIG
476+
export KUBECONFIG
477+
478+
echo "=========================================="
479+
echo "Deploying Kafka (Apache KRaft single-node)"
480+
echo "=========================================="
481+
482+
kubectl apply -f - <<'EOFYAML'
483+
apiVersion: apps/v1
484+
kind: Deployment
485+
metadata:
486+
name: kafka
487+
labels:
488+
app: kafka
489+
spec:
490+
replicas: 1
491+
selector:
492+
matchLabels:
493+
app: kafka
494+
template:
495+
metadata:
496+
labels:
497+
app: kafka
498+
spec:
499+
initContainers:
500+
- name: copy-kafka-config
501+
image: docker.io/apache/kafka:3.9.2
502+
command: ["/bin/sh", "-c", "cp -r /opt/kafka/config/. /mnt/kafka-config/"]
503+
volumeMounts:
504+
- name: kafka-config
505+
mountPath: /mnt/kafka-config
506+
containers:
507+
- name: kafka
508+
image: docker.io/apache/kafka:3.9.2
509+
ports:
510+
- containerPort: 9092
511+
name: client
512+
- containerPort: 9093
513+
name: controller
514+
env:
515+
- name: KAFKA_NODE_ID
516+
value: "1"
517+
- name: KAFKA_PROCESS_ROLES
518+
value: "broker,controller"
519+
- name: KAFKA_CONTROLLER_QUORUM_VOTERS
520+
value: "1@localhost:9093"
521+
- name: KAFKA_LISTENERS
522+
value: "PLAINTEXT://:9092,CONTROLLER://:9093"
523+
- name: KAFKA_ADVERTISED_LISTENERS
524+
value: "PLAINTEXT://kafka:9092"
525+
- name: KAFKA_LISTENER_SECURITY_PROTOCOL_MAP
526+
value: "CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT"
527+
- name: KAFKA_CONTROLLER_LISTENER_NAMES
528+
value: "CONTROLLER"
529+
- name: KAFKA_AUTO_CREATE_TOPICS_ENABLE
530+
value: "true"
531+
- name: KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR
532+
value: "1"
533+
- name: KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR
534+
value: "1"
535+
- name: KAFKA_TRANSACTION_STATE_LOG_MIN_ISR
536+
value: "1"
537+
- name: KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS
538+
value: "0"
539+
resources:
540+
requests:
541+
memory: "256Mi"
542+
cpu: "100m"
543+
limits:
544+
memory: "512Mi"
545+
cpu: "500m"
546+
readinessProbe:
547+
tcpSocket:
548+
port: 9092
549+
initialDelaySeconds: 20
550+
periodSeconds: 5
551+
timeoutSeconds: 3
552+
failureThreshold: 24
553+
volumeMounts:
554+
- name: kafka-config
555+
mountPath: /opt/kafka/config
556+
- name: kafka-logs
557+
mountPath: /tmp/kafka-logs
558+
- name: kafka-gc-logs
559+
mountPath: /opt/kafka/logs
560+
volumes:
561+
- name: kafka-config
562+
emptyDir: {}
563+
- name: kafka-logs
564+
emptyDir: {}
565+
- name: kafka-gc-logs
566+
emptyDir: {}
567+
---
568+
apiVersion: v1
569+
kind: Service
570+
metadata:
571+
name: kafka
572+
labels:
573+
app: kafka
574+
spec:
575+
ports:
576+
- port: 9092
577+
targetPort: 9092
578+
name: client
579+
- port: 9093
580+
targetPort: 9093
581+
name: controller
582+
selector:
583+
app: kafka
584+
EOFYAML
585+
586+
echo "Waiting for Kafka to be ready..."
587+
if ! kubectl wait --for=condition=available --timeout=300s deployment/kafka; then
588+
echo "Kafka deployment failed! Debug info:"
589+
kubectl describe deployment kafka
590+
kubectl describe pod -l app=kafka
591+
kubectl logs -l app=kafka --tail=50 || echo "No logs available"
592+
exit 1
593+
fi
594+
echo "✓ Kafka is ready"
595+
params:
596+
- name: kubeconfig-secret
597+
value: $(tasks.provision-environment.results.secretRef)
598+
460599
- name: deploy-database
461600
runAfter:
462601
- provision-environment
@@ -597,6 +736,7 @@ spec:
597736
- deploy-database
598737
- deploy-openldap
599738
- deploy-dex
739+
- deploy-kafka
600740
taskSpec:
601741
params:
602742
- name: kubeconfig-secret
@@ -643,6 +783,14 @@ spec:
643783
]
644784
ADMINS = {"groups": [], "users": ["builder@example.com"]}
645785
ALLOWED_BUILDERS = {"groups": [], "users": ["builder@example.com"]}
786+
MESSAGING_BACKEND = "kafka"
787+
MESSAGING_BROKER_URLS = ["kafka:9092"]
788+
MESSAGING_KAFKA_SECURITY_PROTOCOL = "PLAINTEXT"
789+
MESSAGING_KAFKA_SASL_MECHANISM = ""
790+
MESSAGING_KAFKA_USERNAME = ""
791+
MESSAGING_KAFKA_PASSWORD = ""
792+
MESSAGING_KAFKA_COMPRESSION_TYPE = "none"
793+
MESSAGING_TOPIC_PREFIX = "cts."
646794
httpd.conf: |
647795
ServerRoot "/etc/httpd"
648796
PidFile /tmp/httpd.pid
@@ -916,9 +1064,9 @@ spec:
9161064
echo 'Installing Dex CA certificate...'
9171065
echo '$DEX_CA_B64' | base64 -d > /tmp/dex-ca.crt
9181066
919-
echo 'Installing pytest and requests...'
1067+
echo 'Installing pytest, requests, and kafka-python...'
9201068
python3 -m ensurepip
921-
python3 -m pip install --target /tmp/test-deps --quiet pytest requests
1069+
python3 -m pip install --target /tmp/test-deps --quiet pytest requests kafka-python
9221070
9231071
echo ''
9241072
echo 'Cloning repository...'
@@ -930,7 +1078,7 @@ spec:
9301078
9311079
echo ''
9321080
echo 'Running pytest...'
933-
PYTHONPATH=/tmp/test-deps REQUESTS_CA_BUNDLE=/tmp/dex-ca.crt CTS_URL=http://cts:8080 AUTH_BACKEND=oidc_or_kerberos DEX_URL=https://dex:5556 python3 -m pytest tests/test_integration_api.py -v -s -o addopts=
1081+
PYTHONPATH=/tmp/test-deps REQUESTS_CA_BUNDLE=/tmp/dex-ca.crt CTS_URL=http://cts:8080 AUTH_BACKEND=oidc_or_kerberos DEX_URL=https://dex:5556 KAFKA_URL=kafka:9092 python3 -m pytest tests/test_integration_api.py -v -s -o addopts=
9341082
"
9351083
TEST_RESULT=$?
9361084
set -e

cts/messaging.py

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -102,9 +102,15 @@ def _kafka_send_msg(msgs):
102102

103103
def _send():
104104
"""Inner function to send messages (will be retried on failure)"""
105+
compression = conf.messaging_kafka_compression_type
106+
# kafka-python uses Python None to mean "no compression"; the string
107+
# "none" (which may come from a config file) is not accepted.
108+
if compression and compression.lower() == "none":
109+
compression = None
110+
105111
config = {
106112
"bootstrap_servers": conf.messaging_broker_urls,
107-
"compression_type": conf.messaging_kafka_compression_type,
113+
"compression_type": compression,
108114
"security_protocol": conf.messaging_kafka_security_protocol,
109115
"sasl_mechanism": conf.messaging_kafka_sasl_mechanism,
110116
"sasl_plain_username": conf.messaging_kafka_username,

test-requirements.txt

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,3 +9,5 @@ tox
99
# Let's update this package to avoid this problem.
1010
itsdangerous>=1.1.0
1111
freezegun
12+
# Required for Kafka integration tests (test_integration_api.py)
13+
kafka-python

0 commit comments

Comments
 (0)