The konflux ITS should verify that publishing messages to Kafka works for each image built.
Agent-Generated Plan
Plan: Add Integration Tests for Kafka Messaging
Context
The repository already has a well-structured integration test framework:
tests/test_integration_api.py — pytest-based tests that exercise the live CTS API over HTTP.
.tekton/integration-test-eaas.yaml — the Konflux EaaS pipeline that provisions an ephemeral namespace, deploys CTS (and its dependencies), then runs those tests inside a pod.
cts/messaging.py — the Kafka (and UMB) messaging backend. The publish() function is called asynchronously after every compose create/tag/untag operation.
tests/test_events.py — existing unit tests that mock KafkaProducer to verify that _kafka_send_msg is invoked correctly.
The issue asks the integration-test suite (i.e., the EaaS pipeline) to verify that publishing messages to Kafka actually works for each image built. This is distinct from the unit tests in test_events.py, which mock the broker entirely.
What is Missing
- A running Kafka broker is not deployed in the EaaS pipeline. There is no
deploy-kafka task.
- No integration test in
tests/test_integration_api.py verifies that a Kafka message is emitted when CTS performs an operation (compose creation, tagging, untagging).
- CTS is not configured to use the Kafka backend in the EaaS environment — the
ProdConfiguration in the test namespace has MESSAGING_BACKEND unset.
Design
Kafka deployment in EaaS
Deploy a single-broker Kafka instance (Apache Kafka or Redpanda) in the ephemeral namespace. Requirements:
- Must be accessible inside the pod as
kafka:9092 (plain PLAINTEXT — no TLS or SASL needed in the ephemeral test environment).
- Must be ready before CTS starts.
- Must be lightweight enough to run in the EaaS resource envelope (≤ 512 Mi RAM).
Recommended image: docker.io/bitnami/kafka:3.7 (single-broker, no ZooKeeper required with KRaft mode). The Bitnami image runs as a non-root user, which is compatible with OpenShift's restricted-v2 SCC.
Minimal environment variables to bring up a KRaft single-node broker:
KAFKA_CFG_NODE_ID=0
KAFKA_CFG_PROCESS_ROLES=controller,broker
KAFKA_CFG_CONTROLLER_QUORUM_VOTERS=0@kafka:9093
KAFKA_CFG_LISTENERS=PLAINTEXT://:9092,CONTROLLER://:9093
KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://kafka:9092
KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT
KAFKA_CFG_CONTROLLER_LISTENER_NAMES=CONTROLLER
KAFKA_CFG_AUTO_CREATE_TOPICS_ENABLE=true
ALLOW_PLAINTEXT_LISTENER=yes
A Service named kafka exposes port 9092.
CTS configuration change in EaaS
Update the cts-config ConfigMap in the deploy-cts task to add:
MESSAGING_BACKEND = "kafka"
MESSAGING_BROKER_URLS = ["kafka:9092"]
MESSAGING_KAFKA_SECURITY_PROTOCOL = "PLAINTEXT"
MESSAGING_KAFKA_SASL_MECHANISM = ""
MESSAGING_KAFKA_USERNAME = ""
MESSAGING_KAFKA_PASSWORD = ""
MESSAGING_KAFKA_COMPRESSION_TYPE = "none"
MESSAGING_TOPIC_PREFIX = "cts."
Because PLAINTEXT does not use SASL, the sasl_plain_username and sasl_plain_password fields passed to KafkaProducer will be empty strings, which is harmless when security_protocol is PLAINTEXT.
New integration tests
Add a new test class (or standalone test functions) in tests/test_integration_api.py. These tests:
- Import a compose via
POST /api/1/composes/.
- Connect to Kafka at
KAFKA_URL (an environment variable defaulting to kafka:9092, skipped when unset).
- Use
kafka-python's KafkaConsumer to consume from the topic cts.compose-created (and cts.compose-tagged, cts.compose-untagged for tagging tests), with a short poll timeout.
- Assert that the expected message payload was received.
The consumer should use auto_offset_reset="earliest" and a unique group_id per test run (e.g. f"test-{uuid.uuid4()}") so that messages published before the consumer subscribes are still visible.
New tests to add:
| Test name |
Action |
Kafka topic verified |
test_kafka_compose_created |
POST /api/1/composes/ |
cts.compose-created |
test_kafka_compose_tagged |
POST /api/1/composes/ then PATCH … tag |
cts.compose-tagged |
test_kafka_compose_untagged |
Tag then PATCH … untag |
cts.compose-untagged |
All three should be skipped when KAFKA_URL is not set (preserving the existing behaviour for runs that do not have a Kafka broker).
EaaS pipeline changes
- Add
deploy-kafka task — runs after provision-environment, before deploy-cts. Deploys the Bitnami Kafka StatefulSet/Deployment and Service. Waits for the broker to be ready (TCP probe on port 9092).
- Update
deploy-cts task — add runAfter: [deploy-kafka] dependency and inject the Kafka config into the cts-config ConfigMap.
- Update
run-tests task — install kafka-python alongside pytest, and pass KAFKA_URL=kafka:9092 into the kubectl exec environment.
Files to change
| File |
Change |
.tekton/integration-test-eaas.yaml |
Add deploy-kafka task; update deploy-cts to depend on it and inject Kafka config; update run-tests to install kafka-python and set KAFKA_URL |
tests/test_integration_api.py |
Add kafka_url fixture and three new test functions |
No changes are needed to cts/messaging.py, cts/config.py, or any existing unit test.
Alternatives considered
Mock Kafka in the integration test — the issue explicitly asks to verify that publishing works for each built image, so a real broker is required.
Use Redpanda — lighter weight than Kafka, but requires --privileged or a specific SCC that may not be available in the EaaS namespace. Bitnami Kafka is safer.
Edge Cases and Constraints
- The CTS
publish() function is asynchronous (background thread). Tests must allow a few seconds for the message to arrive before asserting. A KafkaConsumer.poll(timeout_ms=10000) call is sufficient.
- Kafka auto-creates topics (
KAFKA_CFG_AUTO_CREATE_TOPICS_ENABLE=true), so no topic pre-creation step is needed.
- The existing unit tests (
test_events.py) mock KafkaProducer and are not affected by this change.
- The Kafka consumer in the test must close cleanly (use a
try/finally or context manager) to avoid leaving open connections.
kafka-python is already a transitive dependency (it is imported in cts/messaging.py), but it is not listed in test-requirements.txt. The EaaS run-tests step installs packages on the fly, so adding kafka-python to the pip install line in the pipeline script is sufficient. For completeness, it can also be added to test-requirements.txt.
Acceptance Criteria
- Running
pytest tests/test_integration_api.py -v -k "not kafka" -o addopts= against a CTS instance without KAFKA_URL set produces no new errors (all three new Kafka tests are skipped with "requires KAFKA_URL").
- Running
pytest tests/test_integration_api.py::test_kafka_compose_created -v -o addopts= with CTS_URL and KAFKA_URL both pointing to live services exits with code 0.
- Running
pytest tests/test_integration_api.py::test_kafka_compose_tagged -v -o addopts= with CTS_URL and KAFKA_URL pointing to live services exits with code 0.
- Running
pytest tests/test_integration_api.py::test_kafka_compose_untagged -v -o addopts= with CTS_URL and KAFKA_URL pointing to live services exits with code 0.
- The
deploy-kafka task in .tekton/integration-test-eaas.yaml exists and appears before deploy-cts in the dependency graph (i.e., deploy-cts lists deploy-kafka in its runAfter field).
- The
run-tests task's kubectl exec script includes kafka-python in its pip install command and passes KAFKA_URL=kafka:9092 to pytest.
The konflux ITS should verify that publishing messages to Kafka works for each image built.
Agent-Generated Plan
Plan: Add Integration Tests for Kafka Messaging
Context
The repository already has a well-structured integration test framework:
tests/test_integration_api.py— pytest-based tests that exercise the live CTS API over HTTP..tekton/integration-test-eaas.yaml— the Konflux EaaS pipeline that provisions an ephemeral namespace, deploys CTS (and its dependencies), then runs those tests inside a pod.cts/messaging.py— the Kafka (and UMB) messaging backend. Thepublish()function is called asynchronously after every compose create/tag/untag operation.tests/test_events.py— existing unit tests that mockKafkaProducerto verify that_kafka_send_msgis invoked correctly.The issue asks the integration-test suite (i.e., the EaaS pipeline) to verify that publishing messages to Kafka actually works for each image built. This is distinct from the unit tests in
test_events.py, which mock the broker entirely.What is Missing
deploy-kafkatask.tests/test_integration_api.pyverifies that a Kafka message is emitted when CTS performs an operation (compose creation, tagging, untagging).ProdConfigurationin the test namespace hasMESSAGING_BACKENDunset.Design
Kafka deployment in EaaS
Deploy a single-broker Kafka instance (Apache Kafka or Redpanda) in the ephemeral namespace. Requirements:
kafka:9092(plainPLAINTEXT— no TLS or SASL needed in the ephemeral test environment).Recommended image:
docker.io/bitnami/kafka:3.7(single-broker, no ZooKeeper required with KRaft mode). The Bitnami image runs as a non-root user, which is compatible with OpenShift'srestricted-v2SCC.Minimal environment variables to bring up a KRaft single-node broker:
A
Servicenamedkafkaexposes port 9092.CTS configuration change in EaaS
Update the
cts-configConfigMap in thedeploy-ctstask to add:Because
PLAINTEXTdoes not use SASL, thesasl_plain_usernameandsasl_plain_passwordfields passed toKafkaProducerwill be empty strings, which is harmless whensecurity_protocolisPLAINTEXT.New integration tests
Add a new test class (or standalone test functions) in
tests/test_integration_api.py. These tests:POST /api/1/composes/.KAFKA_URL(an environment variable defaulting tokafka:9092, skipped when unset).kafka-python'sKafkaConsumerto consume from the topiccts.compose-created(andcts.compose-tagged,cts.compose-untaggedfor tagging tests), with a short poll timeout.The consumer should use
auto_offset_reset="earliest"and a uniquegroup_idper test run (e.g.f"test-{uuid.uuid4()}") so that messages published before the consumer subscribes are still visible.New tests to add:
test_kafka_compose_createdPOST /api/1/composes/cts.compose-createdtest_kafka_compose_taggedPOST /api/1/composes/thenPATCH … tagcts.compose-taggedtest_kafka_compose_untaggedPATCH … untagcts.compose-untaggedAll three should be skipped when
KAFKA_URLis not set (preserving the existing behaviour for runs that do not have a Kafka broker).EaaS pipeline changes
deploy-kafkatask — runs afterprovision-environment, beforedeploy-cts. Deploys the Bitnami Kafka StatefulSet/Deployment and Service. Waits for the broker to be ready (TCP probe on port 9092).deploy-ctstask — addrunAfter: [deploy-kafka]dependency and inject the Kafka config into thects-configConfigMap.run-teststask — installkafka-pythonalongsidepytest, and passKAFKA_URL=kafka:9092into thekubectl execenvironment.Files to change
.tekton/integration-test-eaas.yamldeploy-kafkatask; updatedeploy-ctsto depend on it and inject Kafka config; updaterun-teststo installkafka-pythonand setKAFKA_URLtests/test_integration_api.pykafka_urlfixture and three new test functionsNo changes are needed to
cts/messaging.py,cts/config.py, or any existing unit test.Alternatives considered
Mock Kafka in the integration test — the issue explicitly asks to verify that publishing works for each built image, so a real broker is required.
Use Redpanda — lighter weight than Kafka, but requires
--privilegedor a specific SCC that may not be available in the EaaS namespace. Bitnami Kafka is safer.Edge Cases and Constraints
publish()function is asynchronous (background thread). Tests must allow a few seconds for the message to arrive before asserting. AKafkaConsumer.poll(timeout_ms=10000)call is sufficient.KAFKA_CFG_AUTO_CREATE_TOPICS_ENABLE=true), so no topic pre-creation step is needed.test_events.py) mockKafkaProducerand are not affected by this change.try/finallyor context manager) to avoid leaving open connections.kafka-pythonis already a transitive dependency (it is imported incts/messaging.py), but it is not listed intest-requirements.txt. The EaaSrun-testsstep installs packages on the fly, so addingkafka-pythonto thepip installline in the pipeline script is sufficient. For completeness, it can also be added totest-requirements.txt.Acceptance Criteria
pytest tests/test_integration_api.py -v -k "not kafka" -o addopts=against a CTS instance withoutKAFKA_URLset produces no new errors (all three new Kafka tests are skipped with "requires KAFKA_URL").pytest tests/test_integration_api.py::test_kafka_compose_created -v -o addopts=withCTS_URLandKAFKA_URLboth pointing to live services exits with code 0.pytest tests/test_integration_api.py::test_kafka_compose_tagged -v -o addopts=withCTS_URLandKAFKA_URLpointing to live services exits with code 0.pytest tests/test_integration_api.py::test_kafka_compose_untagged -v -o addopts=withCTS_URLandKAFKA_URLpointing to live services exits with code 0.deploy-kafkatask in.tekton/integration-test-eaas.yamlexists and appears beforedeploy-ctsin the dependency graph (i.e.,deploy-ctslistsdeploy-kafkain itsrunAfterfield).run-teststask'skubectl execscript includeskafka-pythonin itspip installcommand and passesKAFKA_URL=kafka:9092topytest.