Skip to content

Commit 71ca4b6

Browse files
author
Agent
committed
Add Kafka integration tests and deploy-kafka EaaS task
- Add three integration tests in tests/test_integration_api.py: test_kafka_compose_created, test_kafka_compose_tagged, test_kafka_compose_untagged. Each test verifies that the corresponding Kafka topic receives a message after a CTS compose operation. All three tests are skipped when KAFKA_URL is not set. - Add a kafka_url module-scoped fixture that skips when KAFKA_URL is unset. - Add a _consume_kafka_message helper that uses kafka-python KafkaConsumer with auto_offset_reset=earliest and a unique group_id per call so that messages published before the consumer subscribes are still visible. - Add kafka-python to test-requirements.txt. - Add deploy-kafka task to .tekton/integration-test-eaas.yaml deploying a Bitnami Kafka 3.7 single-node KRaft broker accessible at kafka:9092. - Add deploy-kafka to deploy-cts runAfter dependency list. - Inject MESSAGING_BACKEND and related Kafka settings into the cts-config ConfigMap inside deploy-cts so CTS publishes to the ephemeral Kafka broker. - Update run-tests kubectl exec script to install kafka-python alongside pytest and to pass KAFKA_URL=kafka:9092 to the pytest invocation. Closes #84 Generated-By: OpenCode (google-vertex-anthropic/claude-sonnet-4-6@default)
1 parent d999a50 commit 71ca4b6

3 files changed

Lines changed: 267 additions & 3 deletions

File tree

.tekton/integration-test-eaas.yaml

Lines changed: 124 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -457,6 +457,118 @@ spec:
457457
- name: kubeconfig-secret
458458
value: $(tasks.provision-environment.results.secretRef)
459459

460+
- name: deploy-kafka
461+
runAfter:
462+
- provision-environment
463+
taskSpec:
464+
params:
465+
- name: kubeconfig-secret
466+
type: string
467+
steps:
468+
- name: create-kafka
469+
image: quay.io/konflux-ci/appstudio-utils:latest
470+
script: |
471+
#!/usr/bin/env bash
472+
set -euo pipefail
473+
474+
KUBECONFIG=/tmp/kubeconfig
475+
kubectl get secret $(params.kubeconfig-secret) -o jsonpath='{.data.kubeconfig}' | base64 -d > $KUBECONFIG
476+
export KUBECONFIG
477+
478+
echo "=========================================="
479+
echo "Deploying Kafka (Bitnami KRaft single-node)"
480+
echo "=========================================="
481+
482+
kubectl apply -f - <<'EOFYAML'
483+
apiVersion: apps/v1
484+
kind: Deployment
485+
metadata:
486+
name: kafka
487+
labels:
488+
app: kafka
489+
spec:
490+
replicas: 1
491+
selector:
492+
matchLabels:
493+
app: kafka
494+
template:
495+
metadata:
496+
labels:
497+
app: kafka
498+
spec:
499+
containers:
500+
- name: kafka
501+
image: docker.io/bitnami/kafka:3.7
502+
ports:
503+
- containerPort: 9092
504+
name: client
505+
- containerPort: 9093
506+
name: controller
507+
env:
508+
- name: KAFKA_CFG_NODE_ID
509+
value: "0"
510+
- name: KAFKA_CFG_PROCESS_ROLES
511+
value: "controller,broker"
512+
- name: KAFKA_CFG_CONTROLLER_QUORUM_VOTERS
513+
value: "0@kafka:9093"
514+
- name: KAFKA_CFG_LISTENERS
515+
value: "PLAINTEXT://:9092,CONTROLLER://:9093"
516+
- name: KAFKA_CFG_ADVERTISED_LISTENERS
517+
value: "PLAINTEXT://kafka:9092"
518+
- name: KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP
519+
value: "CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT"
520+
- name: KAFKA_CFG_CONTROLLER_LISTENER_NAMES
521+
value: "CONTROLLER"
522+
- name: KAFKA_CFG_AUTO_CREATE_TOPICS_ENABLE
523+
value: "true"
524+
- name: ALLOW_PLAINTEXT_LISTENER
525+
value: "yes"
526+
resources:
527+
requests:
528+
memory: "256Mi"
529+
cpu: "100m"
530+
limits:
531+
memory: "512Mi"
532+
cpu: "500m"
533+
readinessProbe:
534+
tcpSocket:
535+
port: 9092
536+
initialDelaySeconds: 20
537+
periodSeconds: 5
538+
timeoutSeconds: 3
539+
failureThreshold: 24
540+
---
541+
apiVersion: v1
542+
kind: Service
543+
metadata:
544+
name: kafka
545+
labels:
546+
app: kafka
547+
spec:
548+
ports:
549+
- port: 9092
550+
targetPort: 9092
551+
name: client
552+
- port: 9093
553+
targetPort: 9093
554+
name: controller
555+
selector:
556+
app: kafka
557+
EOFYAML
558+
559+
echo "Waiting for Kafka to be ready..."
560+
if ! kubectl wait --for=condition=available --timeout=300s deployment/kafka; then
561+
echo "Kafka deployment failed! Debug info:"
562+
kubectl describe deployment kafka
563+
kubectl describe pod -l app=kafka
564+
kubectl logs -l app=kafka --tail=50 || echo "No logs available"
565+
exit 1
566+
fi
567+
echo "✓ Kafka is ready"
568+
params:
569+
- name: kubeconfig-secret
570+
value: $(tasks.provision-environment.results.secretRef)
571+
460572
- name: deploy-database
461573
runAfter:
462574
- provision-environment
@@ -597,6 +709,7 @@ spec:
597709
- deploy-database
598710
- deploy-openldap
599711
- deploy-dex
712+
- deploy-kafka
600713
taskSpec:
601714
params:
602715
- name: kubeconfig-secret
@@ -643,6 +756,14 @@ spec:
643756
]
644757
ADMINS = {"groups": [], "users": ["builder@example.com"]}
645758
ALLOWED_BUILDERS = {"groups": [], "users": ["builder@example.com"]}
759+
MESSAGING_BACKEND = "kafka"
760+
MESSAGING_BROKER_URLS = ["kafka:9092"]
761+
MESSAGING_KAFKA_SECURITY_PROTOCOL = "PLAINTEXT"
762+
MESSAGING_KAFKA_SASL_MECHANISM = ""
763+
MESSAGING_KAFKA_USERNAME = ""
764+
MESSAGING_KAFKA_PASSWORD = ""
765+
MESSAGING_KAFKA_COMPRESSION_TYPE = "none"
766+
MESSAGING_TOPIC_PREFIX = "cts."
646767
httpd.conf: |
647768
ServerRoot "/etc/httpd"
648769
PidFile /tmp/httpd.pid
@@ -916,9 +1037,9 @@ spec:
9161037
echo 'Installing Dex CA certificate...'
9171038
echo '$DEX_CA_B64' | base64 -d > /tmp/dex-ca.crt
9181039
919-
echo 'Installing pytest and requests...'
1040+
echo 'Installing pytest, requests, and kafka-python...'
9201041
python3 -m ensurepip
921-
python3 -m pip install --target /tmp/test-deps --quiet pytest requests
1042+
python3 -m pip install --target /tmp/test-deps --quiet pytest requests kafka-python
9221043
9231044
echo ''
9241045
echo 'Cloning repository...'
@@ -930,7 +1051,7 @@ spec:
9301051
9311052
echo ''
9321053
echo 'Running pytest...'
933-
PYTHONPATH=/tmp/test-deps REQUESTS_CA_BUNDLE=/tmp/dex-ca.crt CTS_URL=http://cts:8080 AUTH_BACKEND=oidc_or_kerberos DEX_URL=https://dex:5556 python3 -m pytest tests/test_integration_api.py -v -s -o addopts=
1054+
PYTHONPATH=/tmp/test-deps REQUESTS_CA_BUNDLE=/tmp/dex-ca.crt CTS_URL=http://cts:8080 AUTH_BACKEND=oidc_or_kerberos DEX_URL=https://dex:5556 KAFKA_URL=kafka:9092 python3 -m pytest tests/test_integration_api.py -v -s -o addopts=
9341055
"
9351056
TEST_RESULT=$?
9361057
set -e

test-requirements.txt

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,3 +9,5 @@ tox
99
# Let's update this package to avoid this problem.
1010
itsdangerous>=1.1.0
1111
freezegun
12+
# Required for Kafka integration tests (test_integration_api.py)
13+
kafka-python

tests/test_integration_api.py

Lines changed: 141 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,8 @@
1515
import json
1616
import os
1717
import ssl
18+
import time
19+
import uuid
1820
import urllib.parse
1921
from urllib.request import urlopen, Request
2022
from urllib.error import HTTPError, URLError
@@ -645,3 +647,142 @@ def test_auth_get_endpoints_accessible_without_token(http_client):
645647
status == 200
646648
), f"Expected 200 for unauthenticated GET /api/1/tags/, got {status}"
647649
assert isinstance(data, dict), "GET /api/1/tags/ must return a dict"
650+
651+
652+
# Kafka messaging integration tests
653+
# These tests require a live Kafka broker and are skipped when KAFKA_URL is not set.
654+
655+
656+
@pytest.fixture(scope="module")
657+
def kafka_url():
658+
"""Return the Kafka broker URL from the KAFKA_URL environment variable.
659+
660+
Skips all tests that depend on this fixture when the variable is not set,
661+
preserving backward compatibility with environments that have no Kafka broker.
662+
"""
663+
url = os.environ.get("KAFKA_URL")
664+
if not url:
665+
pytest.skip("requires KAFKA_URL")
666+
return url
667+
668+
669+
def _consume_kafka_message(kafka_url, topic, timeout_ms=10000):
670+
"""Poll *topic* for a single message and return its value (decoded JSON dict).
671+
672+
Uses ``auto_offset_reset='earliest'`` and a per-call UUID group_id so that
673+
messages published before the consumer subscribes are still visible.
674+
675+
Returns the first message value received within *timeout_ms*, or raises
676+
``AssertionError`` if no message arrives in time.
677+
"""
678+
from kafka import KafkaConsumer
679+
680+
group_id = f"cts-test-{uuid.uuid4()}"
681+
consumer = KafkaConsumer(
682+
topic,
683+
bootstrap_servers=kafka_url,
684+
auto_offset_reset="earliest",
685+
group_id=group_id,
686+
value_deserializer=lambda b: json.loads(b.decode("utf-8")),
687+
consumer_timeout_ms=timeout_ms,
688+
)
689+
try:
690+
for record in consumer:
691+
return record.value
692+
raise AssertionError(
693+
f"No message received on Kafka topic '{topic}' within {timeout_ms} ms"
694+
)
695+
finally:
696+
consumer.close()
697+
698+
699+
def test_kafka_compose_created(write_http_client, kafka_url):
700+
"""Importing a compose must publish a message on the cts.compose-created topic."""
701+
data = import_compose(write_http_client, "KafkaCreatedTest", "1.0", "20260601")
702+
compose_id = data["payload"]["compose"]["id"]
703+
print(f" Imported compose: {compose_id}")
704+
705+
# Allow a short grace period for the async publisher to send the message.
706+
time.sleep(2)
707+
708+
msg = _consume_kafka_message(kafka_url, "cts.compose-created")
709+
710+
assert msg is not None, "Expected a message on cts.compose-created, got None"
711+
assert msg.get("event") == "compose-created", (
712+
f"Expected event='compose-created', got event={msg.get('event')!r}"
713+
)
714+
# Verify the message is not empty / a silent zero-payload stub
715+
assert msg.get("compose_id") or msg.get("id") or "compose" in str(msg), (
716+
f"Message does not reference any compose: {msg}"
717+
)
718+
print(f" ✓ Received compose-created message: {msg}")
719+
720+
721+
def test_kafka_compose_tagged(write_http_client, kafka_url):
722+
"""Tagging a compose must publish a message on the cts.compose-tagged topic."""
723+
# Create a tag and import a compose
724+
tag_data = create_tag(
725+
write_http_client,
726+
"kafka-tag-test",
727+
"Tag for Kafka tagged test",
728+
"https://example.com/docs/kafka-tag-test",
729+
)
730+
tag_name = tag_data["name"]
731+
732+
compose_data = import_compose(write_http_client, "KafkaTaggedTest", "1.0", "20260602")
733+
compose_id = compose_data["payload"]["compose"]["id"]
734+
print(f" Imported compose: {compose_id}, tag: {tag_name}")
735+
736+
# Tag the compose — this should trigger a cts.compose-tagged message.
737+
tag_compose(write_http_client, compose_id, tag_name)
738+
print(f" Tagged compose with '{tag_name}'")
739+
740+
# Allow a short grace period for the async publisher.
741+
time.sleep(2)
742+
743+
msg = _consume_kafka_message(kafka_url, "cts.compose-tagged")
744+
745+
assert msg is not None, "Expected a message on cts.compose-tagged, got None"
746+
assert msg.get("event") == "compose-tagged", (
747+
f"Expected event='compose-tagged', got event={msg.get('event')!r}"
748+
)
749+
assert msg.get("compose_id") or msg.get("id") or "compose" in str(msg), (
750+
f"Message does not reference any compose: {msg}"
751+
)
752+
print(f" ✓ Received compose-tagged message: {msg}")
753+
754+
755+
def test_kafka_compose_untagged(write_http_client, kafka_url):
756+
"""Untagging a compose must publish a message on the cts.compose-untagged topic."""
757+
# Create a tag, import a compose, tag it, then untag it.
758+
tag_data = create_tag(
759+
write_http_client,
760+
"kafka-untag-test",
761+
"Tag for Kafka untagged test",
762+
"https://example.com/docs/kafka-untag-test",
763+
)
764+
tag_name = tag_data["name"]
765+
766+
compose_data = import_compose(write_http_client, "KafkaUntaggedTest", "1.0", "20260603")
767+
compose_id = compose_data["payload"]["compose"]["id"]
768+
print(f" Imported compose: {compose_id}, tag: {tag_name}")
769+
770+
tag_compose(write_http_client, compose_id, tag_name)
771+
print(f" Tagged compose with '{tag_name}'")
772+
773+
untag_compose(write_http_client, compose_id, tag_name)
774+
print(f" Untagged compose")
775+
776+
# Allow a short grace period for the async publisher.
777+
time.sleep(2)
778+
779+
msg = _consume_kafka_message(kafka_url, "cts.compose-untagged")
780+
781+
assert msg is not None, "Expected a message on cts.compose-untagged, got None"
782+
assert msg.get("event") == "compose-untagged", (
783+
f"Expected event='compose-untagged', got event={msg.get('event')!r}"
784+
)
785+
assert msg.get("compose_id") or msg.get("id") or "compose" in str(msg), (
786+
f"Message does not reference any compose: {msg}"
787+
)
788+
print(f" ✓ Received compose-untagged message: {msg}")

0 commit comments

Comments
 (0)