Skip to content

Commit a94a676

Browse files
author
Agent
committed
Add Kafka integration tests and deploy-kafka EaaS task
- Add deploy-kafka Tekton task deploying Bitnami Kafka 3.7.0 single-broker KRaft instance (kafka:9092, PLAINTEXT) in the ephemeral EaaS namespace - Configure CTS ProdConfiguration to use the Kafka messaging backend - Add offset-anchored _consume_kafka_message helper and _get_kafka_end_offset so each test only sees messages produced by its own action - Add test_kafka_compose_created/tagged/untagged integration tests, all skipped when KAFKA_URL is unset - Add kafka-python to test-requirements.txt - Pass KAFKA_URL=kafka:9092 in the run-tests task - Extend CI branch filters to include feature/integration-tests Generated-By: OpenCode (google-vertex-anthropic/claude-sonnet-4-6@default)
1 parent d999a50 commit a94a676

5 files changed

Lines changed: 305 additions & 6 deletions

File tree

.github/workflows/gating.yaml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,9 +2,9 @@ name: Gating
22

33
on:
44
push:
5-
branches: [ "main" ]
5+
branches: [ "main", "feature/integration-tests" ]
66
pull_request:
7-
branches: [ "main" ]
7+
branches: [ "main", "feature/integration-tests" ]
88

99
jobs:
1010
tests:

.tekton/cts-pull-request.yaml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@ metadata:
77
build.appstudio.redhat.com/pull_request_number: '{{pull_request_number}}'
88
build.appstudio.redhat.com/target_branch: '{{target_branch}}'
99
pipelinesascode.tekton.dev/max-keep-runs: "3"
10-
pipelinesascode.tekton.dev/on-cel-expression: event == "pull_request" && target_branch == "main"
10+
pipelinesascode.tekton.dev/on-cel-expression: event == "pull_request" && target_branch in ["main", "feature/integration-tests"]
1111
creationTimestamp:
1212
labels:
1313
appstudio.openshift.io/application: cts

.tekton/integration-test-eaas.yaml

Lines changed: 124 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -457,6 +457,118 @@ spec:
457457
- name: kubeconfig-secret
458458
value: $(tasks.provision-environment.results.secretRef)
459459

460+
- name: deploy-kafka
461+
runAfter:
462+
- provision-environment
463+
taskSpec:
464+
params:
465+
- name: kubeconfig-secret
466+
type: string
467+
steps:
468+
- name: create-kafka
469+
image: quay.io/konflux-ci/appstudio-utils:latest
470+
script: |
471+
#!/usr/bin/env bash
472+
set -euo pipefail
473+
474+
KUBECONFIG=/tmp/kubeconfig
475+
kubectl get secret $(params.kubeconfig-secret) -o jsonpath='{.data.kubeconfig}' | base64 -d > $KUBECONFIG
476+
export KUBECONFIG
477+
478+
echo "=========================================="
479+
echo "Deploying Kafka (Bitnami KRaft single-node)"
480+
echo "=========================================="
481+
482+
kubectl apply -f - <<'EOFYAML'
483+
apiVersion: apps/v1
484+
kind: Deployment
485+
metadata:
486+
name: kafka
487+
labels:
488+
app: kafka
489+
spec:
490+
replicas: 1
491+
selector:
492+
matchLabels:
493+
app: kafka
494+
template:
495+
metadata:
496+
labels:
497+
app: kafka
498+
spec:
499+
containers:
500+
- name: kafka
501+
image: docker.io/bitnami/kafka:3.7.0
502+
ports:
503+
- containerPort: 9092
504+
name: client
505+
- containerPort: 9093
506+
name: controller
507+
env:
508+
- name: KAFKA_CFG_NODE_ID
509+
value: "0"
510+
- name: KAFKA_CFG_PROCESS_ROLES
511+
value: "controller,broker"
512+
- name: KAFKA_CFG_CONTROLLER_QUORUM_VOTERS
513+
value: "0@kafka:9093"
514+
- name: KAFKA_CFG_LISTENERS
515+
value: "PLAINTEXT://:9092,CONTROLLER://:9093"
516+
- name: KAFKA_CFG_ADVERTISED_LISTENERS
517+
value: "PLAINTEXT://kafka:9092"
518+
- name: KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP
519+
value: "CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT"
520+
- name: KAFKA_CFG_CONTROLLER_LISTENER_NAMES
521+
value: "CONTROLLER"
522+
- name: KAFKA_CFG_AUTO_CREATE_TOPICS_ENABLE
523+
value: "true"
524+
- name: ALLOW_PLAINTEXT_LISTENER
525+
value: "yes"
526+
resources:
527+
requests:
528+
memory: "256Mi"
529+
cpu: "100m"
530+
limits:
531+
memory: "512Mi"
532+
cpu: "500m"
533+
readinessProbe:
534+
tcpSocket:
535+
port: 9092
536+
initialDelaySeconds: 20
537+
periodSeconds: 5
538+
timeoutSeconds: 3
539+
failureThreshold: 24
540+
---
541+
apiVersion: v1
542+
kind: Service
543+
metadata:
544+
name: kafka
545+
labels:
546+
app: kafka
547+
spec:
548+
ports:
549+
- port: 9092
550+
targetPort: 9092
551+
name: client
552+
- port: 9093
553+
targetPort: 9093
554+
name: controller
555+
selector:
556+
app: kafka
557+
EOFYAML
558+
559+
echo "Waiting for Kafka to be ready..."
560+
if ! kubectl wait --for=condition=available --timeout=300s deployment/kafka; then
561+
echo "Kafka deployment failed! Debug info:"
562+
kubectl describe deployment kafka
563+
kubectl describe pod -l app=kafka
564+
kubectl logs -l app=kafka --tail=50 || echo "No logs available"
565+
exit 1
566+
fi
567+
echo "✓ Kafka is ready"
568+
params:
569+
- name: kubeconfig-secret
570+
value: $(tasks.provision-environment.results.secretRef)
571+
460572
- name: deploy-database
461573
runAfter:
462574
- provision-environment
@@ -597,6 +709,7 @@ spec:
597709
- deploy-database
598710
- deploy-openldap
599711
- deploy-dex
712+
- deploy-kafka
600713
taskSpec:
601714
params:
602715
- name: kubeconfig-secret
@@ -643,6 +756,14 @@ spec:
643756
]
644757
ADMINS = {"groups": [], "users": ["builder@example.com"]}
645758
ALLOWED_BUILDERS = {"groups": [], "users": ["builder@example.com"]}
759+
MESSAGING_BACKEND = "kafka"
760+
MESSAGING_BROKER_URLS = ["kafka:9092"]
761+
MESSAGING_KAFKA_SECURITY_PROTOCOL = "PLAINTEXT"
762+
MESSAGING_KAFKA_SASL_MECHANISM = ""
763+
MESSAGING_KAFKA_USERNAME = ""
764+
MESSAGING_KAFKA_PASSWORD = ""
765+
MESSAGING_KAFKA_COMPRESSION_TYPE = "none"
766+
MESSAGING_TOPIC_PREFIX = "cts."
646767
httpd.conf: |
647768
ServerRoot "/etc/httpd"
648769
PidFile /tmp/httpd.pid
@@ -916,9 +1037,9 @@ spec:
9161037
echo 'Installing Dex CA certificate...'
9171038
echo '$DEX_CA_B64' | base64 -d > /tmp/dex-ca.crt
9181039
919-
echo 'Installing pytest and requests...'
1040+
echo 'Installing pytest, requests, and kafka-python...'
9201041
python3 -m ensurepip
921-
python3 -m pip install --target /tmp/test-deps --quiet pytest requests
1042+
python3 -m pip install --target /tmp/test-deps --quiet pytest requests kafka-python
9221043
9231044
echo ''
9241045
echo 'Cloning repository...'
@@ -930,7 +1051,7 @@ spec:
9301051
9311052
echo ''
9321053
echo 'Running pytest...'
933-
PYTHONPATH=/tmp/test-deps REQUESTS_CA_BUNDLE=/tmp/dex-ca.crt CTS_URL=http://cts:8080 AUTH_BACKEND=oidc_or_kerberos DEX_URL=https://dex:5556 python3 -m pytest tests/test_integration_api.py -v -s -o addopts=
1054+
PYTHONPATH=/tmp/test-deps REQUESTS_CA_BUNDLE=/tmp/dex-ca.crt CTS_URL=http://cts:8080 AUTH_BACKEND=oidc_or_kerberos DEX_URL=https://dex:5556 KAFKA_URL=kafka:9092 python3 -m pytest tests/test_integration_api.py -v -s -o addopts=
9341055
"
9351056
TEST_RESULT=$?
9361057
set -e

test-requirements.txt

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,3 +9,5 @@ tox
99
# Let's update this package to avoid this problem.
1010
itsdangerous>=1.1.0
1111
freezegun
12+
# Required for Kafka integration tests (test_integration_api.py)
13+
kafka-python

tests/test_integration_api.py

Lines changed: 176 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,8 @@
1515
import json
1616
import os
1717
import ssl
18+
import time
19+
import uuid
1820
import urllib.parse
1921
from urllib.request import urlopen, Request
2022
from urllib.error import HTTPError, URLError
@@ -645,3 +647,177 @@ def test_auth_get_endpoints_accessible_without_token(http_client):
645647
status == 200
646648
), f"Expected 200 for unauthenticated GET /api/1/tags/, got {status}"
647649
assert isinstance(data, dict), "GET /api/1/tags/ must return a dict"
650+
651+
652+
# Kafka messaging integration tests
653+
# These tests require a live Kafka broker and are skipped when KAFKA_URL is not set.
654+
655+
_KAFKA_CONSUMER_TIMEOUT_MS = int(os.environ.get("KAFKA_CONSUMER_TIMEOUT_MS", 30000))
656+
657+
658+
@pytest.fixture(scope="module")
659+
def kafka_url():
660+
"""Return the Kafka broker URL from the KAFKA_URL environment variable.
661+
662+
Skips all tests that depend on this fixture when the variable is not set,
663+
preserving backward compatibility with environments that have no Kafka broker.
664+
"""
665+
url = os.environ.get("KAFKA_URL")
666+
if not url:
667+
pytest.skip("requires KAFKA_URL")
668+
return url
669+
670+
671+
def _get_kafka_end_offset(kafka_url, topic):
672+
"""Return the current end offset for partition 0 of *topic*.
673+
674+
Call this **before** performing an action so that the subsequent
675+
:func:`_consume_kafka_message` call will skip any pre-existing messages
676+
and only return messages produced by that specific action.
677+
"""
678+
from kafka import KafkaConsumer, TopicPartition
679+
680+
probe = KafkaConsumer(bootstrap_servers=kafka_url, consumer_timeout_ms=1000)
681+
tp = TopicPartition(topic, 0)
682+
probe.assign([tp])
683+
probe.seek_to_end(tp)
684+
offset = probe.position(tp)
685+
probe.close()
686+
return offset
687+
688+
689+
def _consume_kafka_message(kafka_url, topic, start_offset, timeout_ms=None):
690+
"""Poll *topic* for the first message at or after *start_offset*.
691+
692+
*start_offset* must be obtained by calling :func:`_get_kafka_end_offset`
693+
**before** the action that is expected to produce the message, so that
694+
messages published by earlier tests are not mistakenly returned.
695+
696+
Returns the decoded JSON dict of the first matching message, or raises
697+
``AssertionError`` if no message arrives within *timeout_ms*.
698+
"""
699+
from kafka import KafkaConsumer, TopicPartition
700+
701+
if timeout_ms is None:
702+
timeout_ms = _KAFKA_CONSUMER_TIMEOUT_MS
703+
704+
consumer = KafkaConsumer(
705+
bootstrap_servers=kafka_url,
706+
auto_offset_reset="earliest",
707+
group_id=f"cts-test-{uuid.uuid4()}",
708+
value_deserializer=lambda b: json.loads(b.decode("utf-8")),
709+
consumer_timeout_ms=timeout_ms,
710+
)
711+
tp = TopicPartition(topic, 0)
712+
consumer.assign([tp])
713+
consumer.seek(tp, start_offset)
714+
try:
715+
for record in consumer:
716+
return record.value
717+
raise AssertionError(
718+
f"No message received on Kafka topic '{topic}' at offset >={start_offset}"
719+
f" within {timeout_ms} ms"
720+
)
721+
finally:
722+
consumer.close()
723+
724+
725+
def test_kafka_compose_created(write_http_client, kafka_url):
726+
"""Importing a compose must publish a message on the cts.compose-created topic."""
727+
# Snapshot the end offset before the action so we skip any pre-existing messages.
728+
start_offset = _get_kafka_end_offset(kafka_url, "cts.compose-created")
729+
730+
data = import_compose(write_http_client, "KafkaCreatedTest", "1.0", "20260601")
731+
compose_id = data["payload"]["compose"]["id"]
732+
print(f" Imported compose: {compose_id}")
733+
734+
msg = _consume_kafka_message(kafka_url, "cts.compose-created", start_offset)
735+
736+
assert msg is not None, "Expected a message on cts.compose-created, got None"
737+
assert (
738+
msg.get("event") == "compose-created"
739+
), f"Expected event='compose-created', got event={msg.get('event')!r}"
740+
assert msg.get("compose") is not None, f"Message missing 'compose' key: {msg}"
741+
compose_info = msg["compose"].get("compose_info", {})
742+
assert compose_id in str(
743+
compose_info
744+
), f"Message compose_info does not reference expected compose {compose_id}: {msg}"
745+
print(f" ✓ Received compose-created message: {msg}")
746+
747+
748+
def test_kafka_compose_tagged(write_http_client, kafka_url):
749+
"""Tagging a compose must publish a message on the cts.compose-tagged topic."""
750+
# Create a tag and import a compose
751+
tag_data = create_tag(
752+
write_http_client,
753+
"kafka-tag-test",
754+
"Tag for Kafka tagged test",
755+
"https://example.com/docs/kafka-tag-test",
756+
)
757+
tag_name = tag_data["name"]
758+
759+
compose_data = import_compose(
760+
write_http_client, "KafkaTaggedTest", "1.0", "20260602"
761+
)
762+
compose_id = compose_data["payload"]["compose"]["id"]
763+
print(f" Imported compose: {compose_id}, tag: {tag_name}")
764+
765+
# Snapshot the end offset before tagging so we skip earlier messages.
766+
start_offset = _get_kafka_end_offset(kafka_url, "cts.compose-tagged")
767+
768+
# Tag the compose — this should trigger a cts.compose-tagged message.
769+
tag_compose(write_http_client, compose_id, tag_name)
770+
print(f" Tagged compose with '{tag_name}'")
771+
772+
msg = _consume_kafka_message(kafka_url, "cts.compose-tagged", start_offset)
773+
774+
assert msg is not None, "Expected a message on cts.compose-tagged, got None"
775+
assert (
776+
msg.get("event") == "compose-tagged"
777+
), f"Expected event='compose-tagged', got event={msg.get('event')!r}"
778+
assert msg.get("compose") is not None, f"Message missing 'compose' key: {msg}"
779+
compose_info = msg["compose"].get("compose_info", {})
780+
assert compose_id in str(
781+
compose_info
782+
), f"Message compose_info does not reference expected compose {compose_id}: {msg}"
783+
print(f" ✓ Received compose-tagged message: {msg}")
784+
785+
786+
def test_kafka_compose_untagged(write_http_client, kafka_url):
787+
"""Untagging a compose must publish a message on the cts.compose-untagged topic."""
788+
# Create a tag, import a compose, tag it, then untag it.
789+
tag_data = create_tag(
790+
write_http_client,
791+
"kafka-untag-test",
792+
"Tag for Kafka untagged test",
793+
"https://example.com/docs/kafka-untag-test",
794+
)
795+
tag_name = tag_data["name"]
796+
797+
compose_data = import_compose(
798+
write_http_client, "KafkaUntaggedTest", "1.0", "20260603"
799+
)
800+
compose_id = compose_data["payload"]["compose"]["id"]
801+
print(f" Imported compose: {compose_id}, tag: {tag_name}")
802+
803+
tag_compose(write_http_client, compose_id, tag_name)
804+
print(f" Tagged compose with '{tag_name}'")
805+
806+
# Snapshot the end offset before untagging so we skip earlier messages.
807+
start_offset = _get_kafka_end_offset(kafka_url, "cts.compose-untagged")
808+
809+
untag_compose(write_http_client, compose_id, tag_name)
810+
print(f" Untagged compose")
811+
812+
msg = _consume_kafka_message(kafka_url, "cts.compose-untagged", start_offset)
813+
814+
assert msg is not None, "Expected a message on cts.compose-untagged, got None"
815+
assert (
816+
msg.get("event") == "compose-untagged"
817+
), f"Expected event='compose-untagged', got event={msg.get('event')!r}"
818+
assert msg.get("compose") is not None, f"Message missing 'compose' key: {msg}"
819+
compose_info = msg["compose"].get("compose_info", {})
820+
assert compose_id in str(
821+
compose_info
822+
), f"Message compose_info does not reference expected compose {compose_id}: {msg}"
823+
print(f" ✓ Received compose-untagged message: {msg}")

0 commit comments

Comments
 (0)