From 816fe06900ea82d091d60dfcf52795ab3ad0b1c4 Mon Sep 17 00:00:00 2001 From: Agent Date: Fri, 3 Jul 2026 10:50:04 +0000 Subject: [PATCH] Add Kafka integration tests and deploy-kafka EaaS task - Deploy single-node Apache Kafka broker (KRaft mode) in the EaaS ephemeral namespace before CTS starts, using emptyDir volumes to satisfy the restricted-v2 SCC on OpenShift. - Add a long-lived module-scoped kafka_consumer fixture that subscribes to all three CTS topics and acts as a cursor; an autouse drain check after every test ensures no message is silently left unconsumed. - Introduce CTSClient to pair an HTTP client with the Kafka consumer. Tag management methods (create_tag, add/remove_tagger/untagger) are members of CTSClient alongside the compose helpers, so every mutating action in tests uses a consistent interface. - Kafka assertions (compose-created, compose-tagged, compose-untagged) are embedded in CTSClient.import_compose, tag_compose, and untag_compose via _assert_compose_message; they activate only when KAFKA_URL is set so all tests continue to run without a broker. - Fix messaging.py: convert the string 'none' to None for compression_type so kafka-python selects no compression correctly. - Add kafka-python to test-requirements.txt; pass KAFKA_URL=kafka:9092 to pytest in the run-tests step. Generated-By: OpenCode (google-vertex-anthropic/claude-sonnet-4-6@default) --- .tekton/integration-test-eaas.yaml | 154 +++++++++- cts/messaging.py | 8 +- test-requirements.txt | 2 + tests/test_integration_api.py | 469 +++++++++++++++++++++-------- 4 files changed, 506 insertions(+), 127 deletions(-) diff --git a/.tekton/integration-test-eaas.yaml b/.tekton/integration-test-eaas.yaml index ef737855..50662841 100644 --- a/.tekton/integration-test-eaas.yaml +++ b/.tekton/integration-test-eaas.yaml @@ -403,6 +403,145 @@ spec: - name: kubeconfig-secret value: $(tasks.provision-environment.results.secretRef) + - name: deploy-kafka + runAfter: + - provision-environment + taskSpec: + params: + - name: kubeconfig-secret + type: string + steps: + - name: create-kafka + image: quay.io/konflux-ci/appstudio-utils:latest + script: | + #!/usr/bin/env bash + set -euo pipefail + + KUBECONFIG=/tmp/kubeconfig + kubectl get secret $(params.kubeconfig-secret) -o jsonpath='{.data.kubeconfig}' | base64 -d > $KUBECONFIG + export KUBECONFIG + + echo "==========================================" + echo "Deploying Kafka (Apache KRaft single-node)" + echo "==========================================" + + kubectl apply -f - <<'EOFYAML' + apiVersion: apps/v1 + kind: Deployment + metadata: + name: kafka + labels: + app: kafka + spec: + replicas: 1 + selector: + matchLabels: + app: kafka + template: + metadata: + labels: + app: kafka + spec: + initContainers: + - name: copy-kafka-config + image: docker.io/apache/kafka:3.9.2 + command: ["/bin/sh", "-c", "cp -r /opt/kafka/config/. /mnt/kafka-config/"] + volumeMounts: + - name: kafka-config + mountPath: /mnt/kafka-config + containers: + - name: kafka + image: docker.io/apache/kafka:3.9.2 + ports: + - containerPort: 9092 + name: client + - containerPort: 9093 + name: controller + env: + - name: KAFKA_NODE_ID + value: "1" + - name: KAFKA_PROCESS_ROLES + value: "broker,controller" + - name: KAFKA_CONTROLLER_QUORUM_VOTERS + value: "1@localhost:9093" + - name: KAFKA_LISTENERS + value: "PLAINTEXT://:9092,CONTROLLER://:9093" + - name: KAFKA_ADVERTISED_LISTENERS + value: "PLAINTEXT://kafka:9092" + - name: KAFKA_LISTENER_SECURITY_PROTOCOL_MAP + value: "CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT" + - name: KAFKA_CONTROLLER_LISTENER_NAMES + value: "CONTROLLER" + - name: KAFKA_AUTO_CREATE_TOPICS_ENABLE + value: "true" + - name: KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR + value: "1" + - name: KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR + value: "1" + - name: KAFKA_TRANSACTION_STATE_LOG_MIN_ISR + value: "1" + - name: KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS + value: "0" + resources: + requests: + memory: "256Mi" + cpu: "100m" + limits: + memory: "512Mi" + cpu: "500m" + readinessProbe: + tcpSocket: + port: 9092 + initialDelaySeconds: 20 + periodSeconds: 5 + timeoutSeconds: 3 + failureThreshold: 24 + volumeMounts: + - name: kafka-config + mountPath: /opt/kafka/config + - name: kafka-logs + mountPath: /tmp/kafka-logs + - name: kafka-gc-logs + mountPath: /opt/kafka/logs + volumes: + - name: kafka-config + emptyDir: {} + - name: kafka-logs + emptyDir: {} + - name: kafka-gc-logs + emptyDir: {} + --- + apiVersion: v1 + kind: Service + metadata: + name: kafka + labels: + app: kafka + spec: + ports: + - port: 9092 + targetPort: 9092 + name: client + - port: 9093 + targetPort: 9093 + name: controller + selector: + app: kafka + EOFYAML + + echo "Waiting for Kafka to be ready..." + if ! kubectl wait --for=condition=available --timeout=300s deployment/kafka; then + echo "Kafka deployment failed! Debug info:" + kubectl describe deployment kafka + kubectl describe pod -l app=kafka + kubectl logs -l app=kafka --tail=50 || echo "No logs available" + exit 1 + fi + echo "✓ Kafka is ready" + params: + - name: kubeconfig-secret + value: $(tasks.provision-environment.results.secretRef) + - name: deploy-database runAfter: - provision-environment @@ -548,6 +687,7 @@ spec: - deploy-database - deploy-openldap - deploy-dex + - deploy-kafka when: - input: $(tasks.parse-snapshot.results.ldap-server-present) operator: in @@ -599,6 +739,14 @@ spec: ] ADMINS = {"groups": [], "users": ["builder@example.com"]} ALLOWED_BUILDERS = {"groups": [], "users": ["builder@example.com"]} + MESSAGING_BACKEND = "kafka" + MESSAGING_BROKER_URLS = ["kafka:9092"] + MESSAGING_KAFKA_SECURITY_PROTOCOL = "PLAINTEXT" + MESSAGING_KAFKA_SASL_MECHANISM = "" + MESSAGING_KAFKA_USERNAME = "" + MESSAGING_KAFKA_PASSWORD = "" + MESSAGING_KAFKA_COMPRESSION_TYPE = "none" + MESSAGING_TOPIC_PREFIX = "cts." httpd.conf: | ServerRoot "/etc/httpd" PidFile /tmp/httpd.pid @@ -877,9 +1025,9 @@ spec: echo 'Installing Dex CA certificate...' echo '$DEX_CA_B64' | base64 -d > /tmp/dex-ca.crt - echo 'Installing pytest and requests...' + echo 'Installing pytest, requests, and kafka-python...' python3 -m ensurepip - python3 -m pip install --target /tmp/test-deps --quiet pytest requests + python3 -m pip install --target /tmp/test-deps --quiet pytest requests kafka-python echo '' echo 'Cloning repository...' @@ -891,7 +1039,7 @@ spec: echo '' echo 'Running pytest...' - PYTHONPATH=/tmp/test-deps REQUESTS_CA_BUNDLE=/tmp/dex-ca.crt CTS_URL=http://cts:8080 AUTH_BACKEND=oidc_or_kerberos DEX_URL=https://dex:5556 python3 -m pytest tests/test_integration_api.py -v -s -o addopts= + PYTHONPATH=/tmp/test-deps REQUESTS_CA_BUNDLE=/tmp/dex-ca.crt CTS_URL=http://cts:8080 AUTH_BACKEND=oidc_or_kerberos DEX_URL=https://dex:5556 KAFKA_URL=kafka:9092 python3 -m pytest tests/test_integration_api.py -v -s -o addopts= " TEST_RESULT=$? set -e diff --git a/cts/messaging.py b/cts/messaging.py index e2c35384..f93fe634 100644 --- a/cts/messaging.py +++ b/cts/messaging.py @@ -102,9 +102,15 @@ def _kafka_send_msg(msgs): def _send(): """Inner function to send messages (will be retried on failure)""" + compression = conf.messaging_kafka_compression_type + # kafka-python uses Python None to mean "no compression"; the string + # "none" (which may come from a config file) is not accepted. + if compression and compression.lower() == "none": + compression = None + config = { "bootstrap_servers": conf.messaging_broker_urls, - "compression_type": conf.messaging_kafka_compression_type, + "compression_type": compression, "security_protocol": conf.messaging_kafka_security_protocol, "sasl_mechanism": conf.messaging_kafka_sasl_mechanism, "sasl_plain_username": conf.messaging_kafka_username, diff --git a/test-requirements.txt b/test-requirements.txt index 5bae9f9e..4b9f1209 100644 --- a/test-requirements.txt +++ b/test-requirements.txt @@ -9,3 +9,5 @@ tox # Let's update this package to avoid this problem. itsdangerous>=1.1.0 freezegun +# Required for Kafka integration tests (test_integration_api.py) +kafka-python diff --git a/tests/test_integration_api.py b/tests/test_integration_api.py index 17277de6..76a4438e 100755 --- a/tests/test_integration_api.py +++ b/tests/test_integration_api.py @@ -224,96 +224,152 @@ def _create_compose_info( } -# Helper functions for common test operations - - -def create_tag(http_client, name, description, documentation): - """Create a tag and return the response data""" - tag_data = { - "name": name, - "description": description, - "documentation": documentation, - } - status, data = http_client.post("/api/1/tags/", tag_data) - assert status == 200, f"Failed to create tag: {data}" - assert isinstance(data, dict) - assert data["name"] == name - assert "id" in data - return data - - -def import_compose( - http_client, release_short, release_version, date, compose_type="test", respin=1 -): - """Import a compose and return the response data""" - compose_info = _create_compose_info( - release_short, release_version, date, compose_type, respin - ) - status, data = http_client.post("/api/1/composes/", {"compose_info": compose_info}) - assert status == 200, f"Failed to import compose: {data}" - assert isinstance(data, dict) - assert "payload" in data - assert "compose" in data["payload"] - return data - - -def tag_compose(http_client, compose_id, tag_name): - """Tag a compose and return the response data""" - status, data = http_client.patch( - f"/api/1/composes/{compose_id}", {"action": "tag", "tag": tag_name} - ) - assert status == 200, f"Failed to tag compose: {data}" - assert tag_name in data.get("tags", []) - return data - - -def untag_compose(http_client, compose_id, tag_name): - """Untag a compose and return the response data""" - status, data = http_client.patch( - f"/api/1/composes/{compose_id}", {"action": "untag", "tag": tag_name} - ) - assert status == 200, f"Failed to untag compose: {data}" - assert tag_name not in data.get("tags", []) - return data +class CTSClient: + """Combines an HTTP client with an optional Kafka consumer. + The Kafka consumer is ``None`` when ``KAFKA_URL`` is not set, in which + case all Kafka assertions are silently skipped so tests run in both + environments without modification. + """ -def _manage_tag_user(http_client, tag_id, action, username): - """Internal helper to manage tag users (taggers/untaggers)""" - status, data = http_client.patch( - f"/api/1/tags/{tag_id}", {"action": action, "username": username} - ) - assert status == 200, f"Failed to {action}: {data}" + def __init__(self, http_client, kafka_consumer=None): + self.http = http_client + self.kafka = kafka_consumer - list_name = action.rsplit("_", 1)[1] + "s" + def _assert_kafka_message(self, topic, event_name, compose_id): + """Consume one Kafka message and assert it matches the expected event. - if action.startswith("add_"): - assert username in data[list_name], f"Expected {username} in {list_name}" - else: + When ``self.kafka`` is ``None`` (no Kafka broker configured), this + method returns immediately without making any assertions. + """ + if self.kafka is None: + return + msg = _consume_one(self.kafka, topic) assert ( - username not in data[list_name] - ), f"Expected {username} not in {list_name}" - - return data - - -def add_tagger(http_client, tag_id, username): - """Add a tagger to a tag and return the response data""" - return _manage_tag_user(http_client, tag_id, "add_tagger", username) - - -def remove_tagger(http_client, tag_id, username): - """Remove a tagger from a tag and return the response data""" - return _manage_tag_user(http_client, tag_id, "remove_tagger", username) + msg.get("event") == event_name + ), f"Expected event={event_name!r}, got event={msg.get('event')!r}" + assert msg.get("compose") is not None, f"Message missing 'compose' key: {msg}" + compose_info_data = msg["compose"].get("compose_info", {}) + assert compose_id in str( + compose_info_data + ), f"Message compose_info does not reference compose {compose_id}: {msg}" + + def create_tag(self, name, description, documentation): + """Create a tag and return the response data.""" + tag_data = { + "name": name, + "description": description, + "documentation": documentation, + } + status, data = self.http.post("/api/1/tags/", tag_data) + assert status == 200, f"Failed to create tag: {data}" + assert isinstance(data, dict) + assert data["name"] == name + assert "id" in data + return data + + def _manage_tag_user(self, tag_id, action, username): + """Internal helper to manage tag users (taggers/untaggers).""" + status, data = self.http.patch( + f"/api/1/tags/{tag_id}", {"action": action, "username": username} + ) + assert status == 200, f"Failed to {action}: {data}" + + list_name = action.rsplit("_", 1)[1] + "s" + + if action.startswith("add_"): + assert username in data[list_name], f"Expected {username} in {list_name}" + else: + assert ( + username not in data[list_name] + ), f"Expected {username} not in {list_name}" + + return data + + def add_tagger(self, tag_id, username): + """Add a tagger to a tag and return the response data.""" + return self._manage_tag_user(tag_id, "add_tagger", username) + + def remove_tagger(self, tag_id, username): + """Remove a tagger from a tag and return the response data.""" + return self._manage_tag_user(tag_id, "remove_tagger", username) + + def add_untagger(self, tag_id, username): + """Add an untagger to a tag and return the response data.""" + return self._manage_tag_user(tag_id, "add_untagger", username) + + def remove_untagger(self, tag_id, username): + """Remove an untagger from a tag and return the response data.""" + return self._manage_tag_user(tag_id, "remove_untagger", username) + + def import_compose( + self, + release_short, + release_version, + date, + compose_type="test", + respin=1, + ): + """Import a compose and return the response data. + + Also asserts that CTS published a ``compose-created`` Kafka message + when a Kafka consumer is configured. + """ + compose_info = _create_compose_info( + release_short, release_version, date, compose_type, respin + ) + status, data = self.http.post( + "/api/1/composes/", {"compose_info": compose_info} + ) + assert status == 200, f"Failed to import compose: {data}" + assert isinstance(data, dict) + assert "payload" in data + assert "compose" in data["payload"] + compose_id = data["payload"]["compose"]["id"] + self._assert_kafka_message("cts.compose-created", "compose-created", compose_id) + return data + + def tag_compose(self, compose_id, tag_name): + """Tag a compose and return the response data. + + Also asserts that CTS published a ``compose-tagged`` Kafka message + when a Kafka consumer is configured. + """ + status, data = self.http.patch( + f"/api/1/composes/{compose_id}", {"action": "tag", "tag": tag_name} + ) + assert status == 200, f"Failed to tag compose: {data}" + assert tag_name in data.get("tags", []) + self._assert_kafka_message("cts.compose-tagged", "compose-tagged", compose_id) + return data + + def untag_compose(self, compose_id, tag_name): + """Untag a compose and return the response data. + + Also asserts that CTS published a ``compose-untagged`` Kafka message + when a Kafka consumer is configured. + """ + status, data = self.http.patch( + f"/api/1/composes/{compose_id}", {"action": "untag", "tag": tag_name} + ) + assert status == 200, f"Failed to untag compose: {data}" + assert tag_name not in data.get("tags", []) + self._assert_kafka_message( + "cts.compose-untagged", "compose-untagged", compose_id + ) + return data -def add_untagger(http_client, tag_id, username): - """Add an untagger to a tag and return the response data""" - return _manage_tag_user(http_client, tag_id, "add_untagger", username) +@pytest.fixture(scope="module") +def cts_client(write_http_client, kafka_consumer): + """CTSClient wrapping the write HTTP client and the Kafka consumer.""" + return CTSClient(write_http_client, kafka_consumer) -def remove_untagger(http_client, tag_id, username): - """Remove an untagger from a tag and return the response data""" - return _manage_tag_user(http_client, tag_id, "remove_untagger", username) +@pytest.fixture(scope="module") +def cts_auth_client(auth_http_client_builder, kafka_consumer): + """CTSClient wrapping the authenticated builder HTTP client and the Kafka consumer.""" + return CTSClient(auth_http_client_builder, kafka_consumer) # Tests @@ -348,20 +404,22 @@ def test_composes_list(http_client): print(f" Found {len(data['items'])} composes") -def test_composes_pagination(write_http_client): +def test_composes_pagination(cts_client): """Test that pagination parameters work correctly""" - # Import 3 test composes + # Import 3 test composes. Kafka assertions are included via cts_client. compose_ids = [] for i in range(1, 4): - response = import_compose( - write_http_client, "PaginationTest", "1.0", f"2025010{i}" + response = cts_client.import_compose( + "PaginationTest", + "1.0", + f"2025010{i}", ) compose_ids.append(response["payload"]["compose"]["id"]) print(f" Imported {len(compose_ids)} composes for pagination test") # Test page 1 with per_page=2 - status, data = write_http_client.get("/api/1/composes/?page=1&per_page=2") + status, data = cts_client.http.get("/api/1/composes/?page=1&per_page=2") assert status == 200 assert isinstance(data, dict) assert "items" in data @@ -375,7 +433,7 @@ def test_composes_pagination(write_http_client): print(f" Page 1 (per_page=2): {len(data['items'])} items, total: {total}") # Test page 2 with per_page=2 - should have 1 item (we imported 3 total) - status, data = write_http_client.get("/api/1/composes/?page=2&per_page=2") + status, data = cts_client.http.get("/api/1/composes/?page=2&per_page=2") assert status == 200 assert "items" in data assert ( @@ -411,11 +469,10 @@ def test_404_handling(http_client): # Workflow tests -def test_workflow_tag_creation(write_http_client): +def test_workflow_tag_creation(cts_client): """Test creating a tag and managing taggers/untaggers""" # Step 1: Create a tag - data = create_tag( - write_http_client, + data = cts_client.create_tag( "integration-test-tag", "Tag created during integration testing", "https://example.com/docs/integration-test", @@ -429,30 +486,30 @@ def test_workflow_tag_creation(write_http_client): print(f" 2. Initial taggers: {data['taggers']}, untaggers: {data['untaggers']}") # Step 2: Add a tagger - data = add_tagger(write_http_client, tag_id, "test-user") + data = cts_client.add_tagger(tag_id, "test-user") print(f" 3. Added tagger 'test-user': taggers={data['taggers']}") # Step 3: Add an untagger - data = add_untagger(write_http_client, tag_id, "other-user") + data = cts_client.add_untagger(tag_id, "other-user") assert "test-user" in data["taggers"] print(f" 4. Added untagger 'other-user': untaggers={data['untaggers']}") # Step 4: Add another tagger - data = add_tagger(write_http_client, tag_id, "another-user") + data = cts_client.add_tagger(tag_id, "another-user") assert set(data["taggers"]) == {"test-user", "another-user"} print(f" 5. Added tagger 'another-user': taggers={data['taggers']}") # Step 5: Remove a tagger - data = remove_tagger(write_http_client, tag_id, "test-user") + data = cts_client.remove_tagger(tag_id, "test-user") assert "another-user" in data["taggers"] print(f" 6. Removed tagger 'test-user': taggers={data['taggers']}") # Step 6: Remove the untagger - data = remove_untagger(write_http_client, tag_id, "other-user") + data = cts_client.remove_untagger(tag_id, "other-user") print(f" 7. Removed untagger 'other-user': untaggers={data['untaggers']}") # Step 7: Verify final state - status, final_data = write_http_client.get(f"/api/1/tags/{tag_id}") + status, final_data = cts_client.http.get(f"/api/1/tags/{tag_id}") assert status == 200 assert final_data["taggers"] == ["another-user"] assert final_data["untaggers"] == [] @@ -462,29 +519,45 @@ def test_workflow_tag_creation(write_http_client): print(" ✓ Tag creation and tagger/untagger management completed successfully") -def test_workflow_compose_import(write_http_client): +def test_workflow_compose_import(cts_client): """Test importing a compose""" - data = import_compose(write_http_client, "IntegrationTest", "1.0", "20250101") + data = cts_client.import_compose( + "IntegrationTest", + "1.0", + "20250101", + ) compose_id = data["payload"]["compose"]["id"] print(f" Imported compose: {compose_id}") -def test_workflow_respin_increment(write_http_client): +def test_workflow_respin_increment(cts_client): """Test that respin numbers are automatically incremented for duplicate composes""" # Import first compose - response1 = import_compose(write_http_client, "RespinTest", "1.0", "20250102") + response1 = cts_client.import_compose( + "RespinTest", + "1.0", + "20250102", + ) compose_id1 = response1["payload"]["compose"]["id"] respin1 = response1["payload"]["compose"]["respin"] print(f" 1. First compose: {compose_id1} (respin: {respin1})") # Import second compose with same release/date - respin should auto-increment - response2 = import_compose(write_http_client, "RespinTest", "1.0", "20250102") + response2 = cts_client.import_compose( + "RespinTest", + "1.0", + "20250102", + ) compose_id2 = response2["payload"]["compose"]["id"] respin2 = response2["payload"]["compose"]["respin"] print(f" 2. Second compose: {compose_id2} (respin: {respin2})") # Import third compose - respin should increment again - response3 = import_compose(write_http_client, "RespinTest", "1.0", "20250102") + response3 = cts_client.import_compose( + "RespinTest", + "1.0", + "20250102", + ) compose_id3 = response3["payload"]["compose"]["id"] respin3 = response3["payload"]["compose"]["respin"] print(f" 3. Third compose: {compose_id3} (respin: {respin3})") @@ -505,11 +578,10 @@ def test_workflow_respin_increment(write_http_client): print(f" ✓ Respin auto-increment verified: {respin1} → {respin2} → {respin3}") -def test_workflow_full_lifecycle(write_http_client): +def test_workflow_full_lifecycle(cts_client): """Test complete workflow: create tag, import compose, tag it, untag it""" # Step 1: Create a tag - tag_response = create_tag( - write_http_client, + tag_response = cts_client.create_tag( "workflow-test", "Tag for workflow testing", "https://example.com/docs/workflow", @@ -519,35 +591,37 @@ def test_workflow_full_lifecycle(write_http_client): print(f" 1. Created tag: {tag_name} (ID: {tag_id})") # Step 2: Import a compose - compose_response = import_compose( - write_http_client, "WorkflowTest", "1.0", "20250101" + compose_response = cts_client.import_compose( + "WorkflowTest", + "1.0", + "20250101", ) compose_id = compose_response["payload"]["compose"]["id"] print(f" 2. Imported compose: {compose_id}") # Verify compose has no tags initially - status, compose_data = write_http_client.get(f"/api/1/composes/{compose_id}") + status, compose_data = cts_client.http.get(f"/api/1/composes/{compose_id}") assert status == 200 assert "tags" in compose_data initial_tags = compose_data.get("tags", []) print(f" 3. Initial tags: {initial_tags}") # Step 3: Tag the compose - tag_result = tag_compose(write_http_client, compose_id, tag_name) + tag_result = cts_client.tag_compose(compose_id, tag_name) print(f" 4. Tagged compose with '{tag_name}': {tag_result.get('tags', [])}") # Step 4: Verify tag was applied - status, compose_data = write_http_client.get(f"/api/1/composes/{compose_id}") + status, compose_data = cts_client.http.get(f"/api/1/composes/{compose_id}") assert status == 200 assert tag_name in compose_data.get("tags", []) print(f" 5. Verified tags: {compose_data.get('tags', [])}") # Step 5: Untag the compose - untag_result = untag_compose(write_http_client, compose_id, tag_name) + untag_result = cts_client.untag_compose(compose_id, tag_name) print(f" 6. Untagged compose: {untag_result.get('tags', [])}") # Step 6: Verify tag was removed - status, compose_data = write_http_client.get(f"/api/1/composes/{compose_id}") + status, compose_data = cts_client.http.get(f"/api/1/composes/{compose_id}") assert status == 200 assert tag_name not in compose_data.get("tags", []) print(f" 7. Final tags: {compose_data.get('tags', [])}") @@ -572,18 +646,9 @@ def test_auth_unauthenticated_write_returns_401(http_client): assert status != 200, "Unauthenticated write must not succeed" -def test_auth_builder_can_post_compose(auth_http_client_builder): +def test_auth_builder_can_post_compose(cts_auth_client): """Authenticated 'builder' user (in ALLOWED_BUILDERS) can POST a compose.""" - compose_info = _create_compose_info("AuthBuilderTest", "1.0", "20260101") - status, data = auth_http_client_builder.post( - "/api/1/composes/", {"compose_info": compose_info} - ) - assert ( - status == 200 - ), f"Expected 200 for authenticated builder POST, got {status}. Response: {data}" - assert isinstance(data, dict) - assert "payload" in data - assert "compose" in data["payload"] + data = cts_auth_client.import_compose("AuthBuilderTest", "1.0", "20260101") compose_id = data["payload"]["compose"]["id"] assert compose_id, "Compose ID must be non-empty" @@ -616,3 +681,161 @@ def test_auth_get_endpoints_accessible_without_token(http_client): status == 200 ), f"Expected 200 for unauthenticated GET /api/1/tags/, got {status}" assert isinstance(data, dict), "GET /api/1/tags/ must return a dict" + + +# Kafka messaging helpers +# These helpers integrate Kafka assertions into existing workflow tests. +# Assertions are active only when KAFKA_URL is set; the tests run in either case. + +_KAFKA_CONSUMER_TIMEOUT_MS = int(os.environ.get("KAFKA_CONSUMER_TIMEOUT_MS", 30000)) + +# Topics that CTS publishes to. +_CTS_KAFKA_TOPICS = [ + "cts.compose-created", + "cts.compose-tagged", + "cts.compose-untagged", +] + + +def _make_json_deserializer(): + """Return a ``kafka.serializer.Deserializer`` subclass for JSON messages. + + We import ``kafka.serializer.Deserializer`` lazily (inside the function) so + that the rest of the test module can be imported even when ``kafka-python`` + is not installed. Subclassing the ABC causes ``isinstance`` to return True + in ``KafkaConsumer.__init__``, which prevents the consumer from wrapping our + class in ``DeserializeWrapper`` — the wrapper treats the deserializer as a + plain callable and breaks when it is not one. + """ + from kafka.serializer import Deserializer + + class _JsonDeserializer(Deserializer): + def deserialize(self, topic, headers, data): + return json.loads(data.decode("utf-8")) + + def close(self): + pass + + return _JsonDeserializer() + + +@pytest.fixture(scope="module") +def kafka_consumer(): + """Return a long-lived KafkaConsumer subscribed to all CTS topics. + + The consumer is positioned at the *current* end of each topic when the + module starts, so it only sees messages produced during this test run. It + acts as a cursor: each call to ``_consume_one`` advances the position + forward, making offset tracking unnecessary. + + Returns ``None`` when ``KAFKA_URL`` is not set. All Kafka-aware helpers + and tests check for ``None`` and skip their assertions accordingly, so the + full test suite runs in environments without a Kafka broker. + """ + kafka_url = os.environ.get("KAFKA_URL") + if not kafka_url: + yield None + return + + from kafka import KafkaConsumer, TopicPartition + from kafka.errors import KafkaConnectionError, KafkaTimeoutError + + try: + consumer = KafkaConsumer( + bootstrap_servers=kafka_url, + # No group_id: we use manual partition assignment, so the + # group-coordinator protocol is not needed. + group_id=None, + value_deserializer=_make_json_deserializer(), + request_timeout_ms=10000, + ) + except (KafkaTimeoutError, KafkaConnectionError) as exc: + pytest.fail(f"Cannot connect to Kafka broker at {kafka_url}: {exc}") + return + + from kafka.errors import UnknownTopicOrPartitionError + + # Assign all topic partitions and seek to the current end so we only + # see messages produced during this test run. + partitions = [TopicPartition(t, 0) for t in _CTS_KAFKA_TOPICS] + consumer.assign(partitions) + for tp in partitions: + try: + consumer.seek_to_end(tp) + except (UnknownTopicOrPartitionError, KafkaTimeoutError): + # Topic may not exist yet (no messages published); seek to 0 so + # that the first message on the topic is visible. + consumer.seek(tp, 0) + + yield consumer + consumer.close() + + +@pytest.fixture(autouse=True) +def _kafka_drain_check(kafka_consumer, request): + """After each test, assert that no Kafka messages were left unconsumed. + + Any message on a CTS topic that was not explicitly consumed by the test is + a sign of a bug (e.g. the application sent a duplicate or unexpected + message, or the test forgot to consume a message it produced). The fixture + fails the test in that case so problems are caught immediately. + """ + yield + if kafka_consumer is None: + return + from kafka.errors import KafkaConnectionError + + stale = [] + try: + records = kafka_consumer.poll(timeout_ms=500, max_records=10) + except KafkaConnectionError: + records = {} + for recs in records.values(): + for rec in recs: + stale.append((rec.topic, rec.offset, rec.value)) + if stale: + details = "\n".join( + f" topic={t!r} offset={o} value={v!r}" for t, o, v in stale + ) + pytest.fail( + f"Unconsumed Kafka messages found after test {request.node.name!r}:\n" + + details + ) + + +def _consume_one(consumer, topic, timeout_ms=None): + """Consume and return the next message on *topic* from *consumer*. + + The consumer is long-lived and acts as a cursor, so successive calls to + this function return successive messages in order without any offset + bookkeeping. + + Raises ``AssertionError`` if a message arrives on any topic other than + *topic* (unexpected message), or if no message arrives within *timeout_ms* + (default: ``_KAFKA_CONSUMER_TIMEOUT_MS``). + """ + from kafka.errors import KafkaConnectionError + + if timeout_ms is None: + timeout_ms = _KAFKA_CONSUMER_TIMEOUT_MS + + deadline_ms = timeout_ms + while deadline_ms > 0: + poll_ms = min(deadline_ms, 500) + try: + records = consumer.poll(timeout_ms=poll_ms, max_records=1) + except KafkaConnectionError as exc: + raise AssertionError( + f"Kafka broker disconnected while consuming topic '{topic}': {exc}" + ) from exc + for tp_key, recs in records.items(): + for rec in recs: + if tp_key.topic != topic: + raise AssertionError( + f"Expected message on topic '{topic}' but received one on '{tp_key.topic}' (offset={rec.offset}, value={rec.value!r})" + ) + return rec.value + deadline_ms -= poll_ms + raise AssertionError( + f"No message received on Kafka topic '{topic}' within {timeout_ms} ms" + )