|
15 | 15 | import json |
16 | 16 | import os |
17 | 17 | import ssl |
| 18 | +import time |
| 19 | +import uuid |
18 | 20 | import urllib.parse |
19 | 21 | from urllib.request import urlopen, Request |
20 | 22 | from urllib.error import HTTPError, URLError |
@@ -645,3 +647,177 @@ def test_auth_get_endpoints_accessible_without_token(http_client): |
645 | 647 | status == 200 |
646 | 648 | ), f"Expected 200 for unauthenticated GET /api/1/tags/, got {status}" |
647 | 649 | assert isinstance(data, dict), "GET /api/1/tags/ must return a dict" |
| 650 | + |
| 651 | + |
| 652 | +# Kafka messaging integration tests |
| 653 | +# These tests require a live Kafka broker and are skipped when KAFKA_URL is not set. |
| 654 | + |
| 655 | +_KAFKA_CONSUMER_TIMEOUT_MS = int(os.environ.get("KAFKA_CONSUMER_TIMEOUT_MS", 30000)) |
| 656 | + |
| 657 | + |
| 658 | +@pytest.fixture(scope="module") |
| 659 | +def kafka_url(): |
| 660 | + """Return the Kafka broker URL from the KAFKA_URL environment variable. |
| 661 | +
|
| 662 | + Skips all tests that depend on this fixture when the variable is not set, |
| 663 | + preserving backward compatibility with environments that have no Kafka broker. |
| 664 | + """ |
| 665 | + url = os.environ.get("KAFKA_URL") |
| 666 | + if not url: |
| 667 | + pytest.skip("requires KAFKA_URL") |
| 668 | + return url |
| 669 | + |
| 670 | + |
| 671 | +def _get_kafka_end_offset(kafka_url, topic): |
| 672 | + """Return the current end offset for partition 0 of *topic*. |
| 673 | +
|
| 674 | + Call this **before** performing an action so that the subsequent |
| 675 | + :func:`_consume_kafka_message` call will skip any pre-existing messages |
| 676 | + and only return messages produced by that specific action. |
| 677 | + """ |
| 678 | + from kafka import KafkaConsumer, TopicPartition |
| 679 | + |
| 680 | + probe = KafkaConsumer(bootstrap_servers=kafka_url, consumer_timeout_ms=1000) |
| 681 | + tp = TopicPartition(topic, 0) |
| 682 | + probe.assign([tp]) |
| 683 | + probe.seek_to_end(tp) |
| 684 | + offset = probe.position(tp) |
| 685 | + probe.close() |
| 686 | + return offset |
| 687 | + |
| 688 | + |
| 689 | +def _consume_kafka_message(kafka_url, topic, start_offset, timeout_ms=None): |
| 690 | + """Poll *topic* for the first message at or after *start_offset*. |
| 691 | +
|
| 692 | + *start_offset* must be obtained by calling :func:`_get_kafka_end_offset` |
| 693 | + **before** the action that is expected to produce the message, so that |
| 694 | + messages published by earlier tests are not mistakenly returned. |
| 695 | +
|
| 696 | + Returns the decoded JSON dict of the first matching message, or raises |
| 697 | + ``AssertionError`` if no message arrives within *timeout_ms*. |
| 698 | + """ |
| 699 | + from kafka import KafkaConsumer, TopicPartition |
| 700 | + |
| 701 | + if timeout_ms is None: |
| 702 | + timeout_ms = _KAFKA_CONSUMER_TIMEOUT_MS |
| 703 | + |
| 704 | + consumer = KafkaConsumer( |
| 705 | + bootstrap_servers=kafka_url, |
| 706 | + auto_offset_reset="earliest", |
| 707 | + group_id=f"cts-test-{uuid.uuid4()}", |
| 708 | + value_deserializer=lambda b: json.loads(b.decode("utf-8")), |
| 709 | + consumer_timeout_ms=timeout_ms, |
| 710 | + ) |
| 711 | + tp = TopicPartition(topic, 0) |
| 712 | + consumer.assign([tp]) |
| 713 | + consumer.seek(tp, start_offset) |
| 714 | + try: |
| 715 | + for record in consumer: |
| 716 | + return record.value |
| 717 | + raise AssertionError( |
| 718 | + f"No message received on Kafka topic '{topic}' at offset >={start_offset}" |
| 719 | + f" within {timeout_ms} ms" |
| 720 | + ) |
| 721 | + finally: |
| 722 | + consumer.close() |
| 723 | + |
| 724 | + |
| 725 | +def test_kafka_compose_created(write_http_client, kafka_url): |
| 726 | + """Importing a compose must publish a message on the cts.compose-created topic.""" |
| 727 | + # Snapshot the end offset before the action so we skip any pre-existing messages. |
| 728 | + start_offset = _get_kafka_end_offset(kafka_url, "cts.compose-created") |
| 729 | + |
| 730 | + data = import_compose(write_http_client, "KafkaCreatedTest", "1.0", "20260601") |
| 731 | + compose_id = data["payload"]["compose"]["id"] |
| 732 | + print(f" Imported compose: {compose_id}") |
| 733 | + |
| 734 | + msg = _consume_kafka_message(kafka_url, "cts.compose-created", start_offset) |
| 735 | + |
| 736 | + assert msg is not None, "Expected a message on cts.compose-created, got None" |
| 737 | + assert ( |
| 738 | + msg.get("event") == "compose-created" |
| 739 | + ), f"Expected event='compose-created', got event={msg.get('event')!r}" |
| 740 | + assert msg.get("compose") is not None, f"Message missing 'compose' key: {msg}" |
| 741 | + compose_info = msg["compose"].get("compose_info", {}) |
| 742 | + assert compose_id in str( |
| 743 | + compose_info |
| 744 | + ), f"Message compose_info does not reference expected compose {compose_id}: {msg}" |
| 745 | + print(f" ✓ Received compose-created message: {msg}") |
| 746 | + |
| 747 | + |
| 748 | +def test_kafka_compose_tagged(write_http_client, kafka_url): |
| 749 | + """Tagging a compose must publish a message on the cts.compose-tagged topic.""" |
| 750 | + # Create a tag and import a compose |
| 751 | + tag_data = create_tag( |
| 752 | + write_http_client, |
| 753 | + "kafka-tag-test", |
| 754 | + "Tag for Kafka tagged test", |
| 755 | + "https://example.com/docs/kafka-tag-test", |
| 756 | + ) |
| 757 | + tag_name = tag_data["name"] |
| 758 | + |
| 759 | + compose_data = import_compose( |
| 760 | + write_http_client, "KafkaTaggedTest", "1.0", "20260602" |
| 761 | + ) |
| 762 | + compose_id = compose_data["payload"]["compose"]["id"] |
| 763 | + print(f" Imported compose: {compose_id}, tag: {tag_name}") |
| 764 | + |
| 765 | + # Snapshot the end offset before tagging so we skip earlier messages. |
| 766 | + start_offset = _get_kafka_end_offset(kafka_url, "cts.compose-tagged") |
| 767 | + |
| 768 | + # Tag the compose — this should trigger a cts.compose-tagged message. |
| 769 | + tag_compose(write_http_client, compose_id, tag_name) |
| 770 | + print(f" Tagged compose with '{tag_name}'") |
| 771 | + |
| 772 | + msg = _consume_kafka_message(kafka_url, "cts.compose-tagged", start_offset) |
| 773 | + |
| 774 | + assert msg is not None, "Expected a message on cts.compose-tagged, got None" |
| 775 | + assert ( |
| 776 | + msg.get("event") == "compose-tagged" |
| 777 | + ), f"Expected event='compose-tagged', got event={msg.get('event')!r}" |
| 778 | + assert msg.get("compose") is not None, f"Message missing 'compose' key: {msg}" |
| 779 | + compose_info = msg["compose"].get("compose_info", {}) |
| 780 | + assert compose_id in str( |
| 781 | + compose_info |
| 782 | + ), f"Message compose_info does not reference expected compose {compose_id}: {msg}" |
| 783 | + print(f" ✓ Received compose-tagged message: {msg}") |
| 784 | + |
| 785 | + |
| 786 | +def test_kafka_compose_untagged(write_http_client, kafka_url): |
| 787 | + """Untagging a compose must publish a message on the cts.compose-untagged topic.""" |
| 788 | + # Create a tag, import a compose, tag it, then untag it. |
| 789 | + tag_data = create_tag( |
| 790 | + write_http_client, |
| 791 | + "kafka-untag-test", |
| 792 | + "Tag for Kafka untagged test", |
| 793 | + "https://example.com/docs/kafka-untag-test", |
| 794 | + ) |
| 795 | + tag_name = tag_data["name"] |
| 796 | + |
| 797 | + compose_data = import_compose( |
| 798 | + write_http_client, "KafkaUntaggedTest", "1.0", "20260603" |
| 799 | + ) |
| 800 | + compose_id = compose_data["payload"]["compose"]["id"] |
| 801 | + print(f" Imported compose: {compose_id}, tag: {tag_name}") |
| 802 | + |
| 803 | + tag_compose(write_http_client, compose_id, tag_name) |
| 804 | + print(f" Tagged compose with '{tag_name}'") |
| 805 | + |
| 806 | + # Snapshot the end offset before untagging so we skip earlier messages. |
| 807 | + start_offset = _get_kafka_end_offset(kafka_url, "cts.compose-untagged") |
| 808 | + |
| 809 | + untag_compose(write_http_client, compose_id, tag_name) |
| 810 | + print(f" Untagged compose") |
| 811 | + |
| 812 | + msg = _consume_kafka_message(kafka_url, "cts.compose-untagged", start_offset) |
| 813 | + |
| 814 | + assert msg is not None, "Expected a message on cts.compose-untagged, got None" |
| 815 | + assert ( |
| 816 | + msg.get("event") == "compose-untagged" |
| 817 | + ), f"Expected event='compose-untagged', got event={msg.get('event')!r}" |
| 818 | + assert msg.get("compose") is not None, f"Message missing 'compose' key: {msg}" |
| 819 | + compose_info = msg["compose"].get("compose_info", {}) |
| 820 | + assert compose_id in str( |
| 821 | + compose_info |
| 822 | + ), f"Message compose_info does not reference expected compose {compose_id}: {msg}" |
| 823 | + print(f" ✓ Received compose-untagged message: {msg}") |
0 commit comments