Skip to content

Commit cd02582

Browse files
author
Agent
committed
Add Kafka integration tests and deploy-kafka EaaS task
- Add three self-verifying integration tests that verify Kafka messages are published for compose-created, compose-tagged, and compose-untagged events. Tests are skipped when KAFKA_URL is unset. - Add _get_kafka_end_offset() helper to snapshot the partition end offset before each action so only messages from that specific test are consumed. - Add _consume_kafka_message() with configurable timeout and optional predicate for draining stale in-flight messages from prior tests. - Use group_id=None in KafkaConsumer within _consume_kafka_message so the group-coordinator protocol is not triggered; with manual partition assignment and explicit seek the coordinator is not needed, and its extra TCP connections caused spurious ECONNREFUSED failures on the third test when the Kafka broker briefly shed connections. - Catch KafkaConnectionError during message iteration and re-raise as AssertionError with a clear diagnostic message. - Use a proper kafka.serializer.Deserializer subclass for JSON deserialization; subclassing the ABC prevents KafkaConsumer from wrapping it in DeserializeWrapper, which broke when the object was not callable. - Add deploy-kafka task to integration-test-eaas.yaml using the official apache/kafka:3.9.2 image with emptyDir volumes for config, logs, and data so the container runs under OpenShift's restricted-v2 SCC. - Configure CTS to use the Kafka backend in the EaaS test environment. - Fix messaging.py to convert the string 'none' to Python None for the compression_type argument to KafkaProducer. - Add kafka-python to test-requirements.txt. - Enable CI triggers on the feature/integration-tests branch. Generated-By: OpenCode (google-vertex-anthropic/claude-sonnet-4-6@default)
1 parent d999a50 commit cd02582

6 files changed

Lines changed: 418 additions & 7 deletions

File tree

.github/workflows/gating.yaml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,9 +2,9 @@ name: Gating
22

33
on:
44
push:
5-
branches: [ "main" ]
5+
branches: [ "main", "feature/integration-tests" ]
66
pull_request:
7-
branches: [ "main" ]
7+
branches: [ "main", "feature/integration-tests" ]
88

99
jobs:
1010
tests:

.tekton/cts-pull-request.yaml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@ metadata:
77
build.appstudio.redhat.com/pull_request_number: '{{pull_request_number}}'
88
build.appstudio.redhat.com/target_branch: '{{target_branch}}'
99
pipelinesascode.tekton.dev/max-keep-runs: "3"
10-
pipelinesascode.tekton.dev/on-cel-expression: event == "pull_request" && target_branch == "main"
10+
pipelinesascode.tekton.dev/on-cel-expression: event == "pull_request" && target_branch in ["main", "feature/integration-tests"]
1111
creationTimestamp:
1212
labels:
1313
appstudio.openshift.io/application: cts

.tekton/integration-test-eaas.yaml

Lines changed: 151 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -457,6 +457,145 @@ spec:
457457
- name: kubeconfig-secret
458458
value: $(tasks.provision-environment.results.secretRef)
459459

460+
- name: deploy-kafka
461+
runAfter:
462+
- provision-environment
463+
taskSpec:
464+
params:
465+
- name: kubeconfig-secret
466+
type: string
467+
steps:
468+
- name: create-kafka
469+
image: quay.io/konflux-ci/appstudio-utils:latest
470+
script: |
471+
#!/usr/bin/env bash
472+
set -euo pipefail
473+
474+
KUBECONFIG=/tmp/kubeconfig
475+
kubectl get secret $(params.kubeconfig-secret) -o jsonpath='{.data.kubeconfig}' | base64 -d > $KUBECONFIG
476+
export KUBECONFIG
477+
478+
echo "=========================================="
479+
echo "Deploying Kafka (Apache KRaft single-node)"
480+
echo "=========================================="
481+
482+
kubectl apply -f - <<'EOFYAML'
483+
apiVersion: apps/v1
484+
kind: Deployment
485+
metadata:
486+
name: kafka
487+
labels:
488+
app: kafka
489+
spec:
490+
replicas: 1
491+
selector:
492+
matchLabels:
493+
app: kafka
494+
template:
495+
metadata:
496+
labels:
497+
app: kafka
498+
spec:
499+
initContainers:
500+
- name: copy-kafka-config
501+
image: docker.io/apache/kafka:3.9.2
502+
command: ["/bin/sh", "-c", "cp -r /opt/kafka/config/. /mnt/kafka-config/"]
503+
volumeMounts:
504+
- name: kafka-config
505+
mountPath: /mnt/kafka-config
506+
containers:
507+
- name: kafka
508+
image: docker.io/apache/kafka:3.9.2
509+
ports:
510+
- containerPort: 9092
511+
name: client
512+
- containerPort: 9093
513+
name: controller
514+
env:
515+
- name: KAFKA_NODE_ID
516+
value: "1"
517+
- name: KAFKA_PROCESS_ROLES
518+
value: "broker,controller"
519+
- name: KAFKA_CONTROLLER_QUORUM_VOTERS
520+
value: "1@localhost:9093"
521+
- name: KAFKA_LISTENERS
522+
value: "PLAINTEXT://:9092,CONTROLLER://:9093"
523+
- name: KAFKA_ADVERTISED_LISTENERS
524+
value: "PLAINTEXT://kafka:9092"
525+
- name: KAFKA_LISTENER_SECURITY_PROTOCOL_MAP
526+
value: "CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT"
527+
- name: KAFKA_CONTROLLER_LISTENER_NAMES
528+
value: "CONTROLLER"
529+
- name: KAFKA_AUTO_CREATE_TOPICS_ENABLE
530+
value: "true"
531+
- name: KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR
532+
value: "1"
533+
- name: KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR
534+
value: "1"
535+
- name: KAFKA_TRANSACTION_STATE_LOG_MIN_ISR
536+
value: "1"
537+
- name: KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS
538+
value: "0"
539+
resources:
540+
requests:
541+
memory: "256Mi"
542+
cpu: "100m"
543+
limits:
544+
memory: "512Mi"
545+
cpu: "500m"
546+
readinessProbe:
547+
tcpSocket:
548+
port: 9092
549+
initialDelaySeconds: 20
550+
periodSeconds: 5
551+
timeoutSeconds: 3
552+
failureThreshold: 24
553+
volumeMounts:
554+
- name: kafka-config
555+
mountPath: /opt/kafka/config
556+
- name: kafka-logs
557+
mountPath: /tmp/kafka-logs
558+
- name: kafka-gc-logs
559+
mountPath: /opt/kafka/logs
560+
volumes:
561+
- name: kafka-config
562+
emptyDir: {}
563+
- name: kafka-logs
564+
emptyDir: {}
565+
- name: kafka-gc-logs
566+
emptyDir: {}
567+
---
568+
apiVersion: v1
569+
kind: Service
570+
metadata:
571+
name: kafka
572+
labels:
573+
app: kafka
574+
spec:
575+
ports:
576+
- port: 9092
577+
targetPort: 9092
578+
name: client
579+
- port: 9093
580+
targetPort: 9093
581+
name: controller
582+
selector:
583+
app: kafka
584+
EOFYAML
585+
586+
echo "Waiting for Kafka to be ready..."
587+
if ! kubectl wait --for=condition=available --timeout=300s deployment/kafka; then
588+
echo "Kafka deployment failed! Debug info:"
589+
kubectl describe deployment kafka
590+
kubectl describe pod -l app=kafka
591+
kubectl logs -l app=kafka --tail=50 || echo "No logs available"
592+
exit 1
593+
fi
594+
echo "✓ Kafka is ready"
595+
params:
596+
- name: kubeconfig-secret
597+
value: $(tasks.provision-environment.results.secretRef)
598+
460599
- name: deploy-database
461600
runAfter:
462601
- provision-environment
@@ -597,6 +736,7 @@ spec:
597736
- deploy-database
598737
- deploy-openldap
599738
- deploy-dex
739+
- deploy-kafka
600740
taskSpec:
601741
params:
602742
- name: kubeconfig-secret
@@ -643,6 +783,14 @@ spec:
643783
]
644784
ADMINS = {"groups": [], "users": ["builder@example.com"]}
645785
ALLOWED_BUILDERS = {"groups": [], "users": ["builder@example.com"]}
786+
MESSAGING_BACKEND = "kafka"
787+
MESSAGING_BROKER_URLS = ["kafka:9092"]
788+
MESSAGING_KAFKA_SECURITY_PROTOCOL = "PLAINTEXT"
789+
MESSAGING_KAFKA_SASL_MECHANISM = ""
790+
MESSAGING_KAFKA_USERNAME = ""
791+
MESSAGING_KAFKA_PASSWORD = ""
792+
MESSAGING_KAFKA_COMPRESSION_TYPE = "none"
793+
MESSAGING_TOPIC_PREFIX = "cts."
646794
httpd.conf: |
647795
ServerRoot "/etc/httpd"
648796
PidFile /tmp/httpd.pid
@@ -916,9 +1064,9 @@ spec:
9161064
echo 'Installing Dex CA certificate...'
9171065
echo '$DEX_CA_B64' | base64 -d > /tmp/dex-ca.crt
9181066
919-
echo 'Installing pytest and requests...'
1067+
echo 'Installing pytest, requests, and kafka-python...'
9201068
python3 -m ensurepip
921-
python3 -m pip install --target /tmp/test-deps --quiet pytest requests
1069+
python3 -m pip install --target /tmp/test-deps --quiet pytest requests kafka-python
9221070
9231071
echo ''
9241072
echo 'Cloning repository...'
@@ -930,7 +1078,7 @@ spec:
9301078
9311079
echo ''
9321080
echo 'Running pytest...'
933-
PYTHONPATH=/tmp/test-deps REQUESTS_CA_BUNDLE=/tmp/dex-ca.crt CTS_URL=http://cts:8080 AUTH_BACKEND=oidc_or_kerberos DEX_URL=https://dex:5556 python3 -m pytest tests/test_integration_api.py -v -s -o addopts=
1081+
PYTHONPATH=/tmp/test-deps REQUESTS_CA_BUNDLE=/tmp/dex-ca.crt CTS_URL=http://cts:8080 AUTH_BACKEND=oidc_or_kerberos DEX_URL=https://dex:5556 KAFKA_URL=kafka:9092 python3 -m pytest tests/test_integration_api.py -v -s -o addopts=
9341082
"
9351083
TEST_RESULT=$?
9361084
set -e

cts/messaging.py

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -102,9 +102,15 @@ def _kafka_send_msg(msgs):
102102

103103
def _send():
104104
"""Inner function to send messages (will be retried on failure)"""
105+
compression = conf.messaging_kafka_compression_type
106+
# kafka-python uses Python None to mean "no compression"; the string
107+
# "none" (which may come from a config file) is not accepted.
108+
if compression and compression.lower() == "none":
109+
compression = None
110+
105111
config = {
106112
"bootstrap_servers": conf.messaging_broker_urls,
107-
"compression_type": conf.messaging_kafka_compression_type,
113+
"compression_type": compression,
108114
"security_protocol": conf.messaging_kafka_security_protocol,
109115
"sasl_mechanism": conf.messaging_kafka_sasl_mechanism,
110116
"sasl_plain_username": conf.messaging_kafka_username,

test-requirements.txt

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,3 +9,5 @@ tox
99
# Let's update this package to avoid this problem.
1010
itsdangerous>=1.1.0
1111
freezegun
12+
# Required for Kafka integration tests (test_integration_api.py)
13+
kafka-python

0 commit comments

Comments
 (0)