diff --git a/python/destinations/kafka/README.md b/python/destinations/kafka/README.md new file mode 100644 index 00000000..c2e96bd0 --- /dev/null +++ b/python/destinations/kafka/README.md @@ -0,0 +1,52 @@ +# Kafka Replicator Sink + +[This connector](https://github.com/quixio/quix-samples/tree/main/python/destinations/kafka) demonstrates how to consume data from a Quix topic and produce it to an external Kafka cluster. + +This sink uses the `KafkaReplicatorSink` to serialize and produce messages to an external Kafka cluster, making it easy to replicate data between Kafka clusters or export data from Quix to other Kafka-based systems. + +## How to run + +Create a [Quix](https://portal.cloud.quix.io/signup?utm_campaign=github) account or log-in and visit the `Connectors` tab to use this connector. + +Clicking `Set up connector` allows you to enter your connection details and runtime parameters. + +Then either: +* click `Test connection & deploy` to deploy the pre-built and configured container into Quix. + +* or click `Customise connector` to inspect or alter the code before deployment. + +## Requirements / Prerequisites + +You'll need to have an external Kafka cluster accessible either locally or in the cloud. + +## Environment Variables + +The connector uses the following environment variables: + +### Required +- **input**: Name of the input topic to listen to. +- **SINK_OUTPUT_TOPIC**: The target Kafka topic name to produce to on the external Kafka cluster. +- **SINK_BOOTSTRAP_SERVERS**: The external Kafka broker address (e.g., localhost:9092 or broker.example.com:9092). + +### Optional +- **CONSUMER_GROUP**: Name of the consumer group for consuming from Quix. Default: "kafka_sink" +- **SINK_KEY_SERIALIZER**: Serializer to use for the message key. Options: json, bytes, string, double, integer. Default: "bytes" +- **SINK_VALUE_SERIALIZER**: Serializer to use for the message value. Options: json, bytes, string, double, integer. Default: "json" +- **SINK_AUTO_CREATE_TOPIC**: Whether to attempt to create the sink topic upon startup. Default: "true" + +### Authentication (Optional) +- **SINK_SECURITY_PROTOCOL**: Protocol used to communicate with brokers. Options: PLAINTEXT, SSL, SASL_PLAINTEXT, SASL_SSL +- **SINK_SASL_MECHANISM**: SASL mechanism for authentication. Options: PLAIN, SCRAM-SHA-256, SCRAM-SHA-512, GSSAPI, OAUTHBEARER, AWS_MSK_IAM +- **SINK_SASL_USERNAME**: SASL username for external Kafka authentication. +- **SINK_SASL_PASSWORD**: SASL password for external Kafka authentication. +- **SINK_SSL_CA_LOCATION**: Path to the SSL CA certificate file for secure connections. + +## Contribute + +Submit forked projects to the Quix [GitHub](https://github.com/quixio/quix-samples) repo. Any new project that we accept will be attributed to you and you'll receive $200 in Quix credit. + +## Open source + +This project is open source under the Apache 2.0 license and available in our [GitHub](https://github.com/quixio/quix-samples) repo. + +Please star us and mention us on social to show your appreciation. diff --git a/python/destinations/kafka/dockerfile b/python/destinations/kafka/dockerfile new file mode 100644 index 00000000..51b504f9 --- /dev/null +++ b/python/destinations/kafka/dockerfile @@ -0,0 +1,28 @@ +FROM python:3.12.5-slim-bookworm + +# Set environment variables for non-interactive setup and unbuffered output +ENV DEBIAN_FRONTEND=noninteractive \ + PYTHONUNBUFFERED=1 \ + PYTHONIOENCODING=UTF-8 \ + PYTHONPATH="/app" + +# Build argument for setting the main app path +ARG MAINAPPPATH=. + +# Set working directory inside the container +WORKDIR /app + +# Copy requirements to leverage Docker cache +COPY "${MAINAPPPATH}/requirements.txt" "${MAINAPPPATH}/requirements.txt" + +# Install dependencies without caching +RUN pip install --no-cache-dir -r "${MAINAPPPATH}/requirements.txt" + +# Copy entire application into container +COPY . . + +# Set working directory to main app path +WORKDIR "/app/${MAINAPPPATH}" + +# Define the container's startup command +ENTRYPOINT ["python3", "main.py"] diff --git a/python/destinations/kafka/library.json b/python/destinations/kafka/library.json new file mode 100644 index 00000000..166cabfd --- /dev/null +++ b/python/destinations/kafka/library.json @@ -0,0 +1,113 @@ +{ + "libraryItemId": "kafka-replicator-sink", + "name": "Kafka Replicator Sink", + "language": "Python", + "tags": { + "Pipeline Stage": ["Destination"], + "Type": ["Connectors"], + "Category": ["Data streaming"] + }, + "shortDescription": "Consume data from a Quix topic and produce it to an external Kafka cluster", + "DefaultFile": "main.py", + "EntryPoint": "dockerfile", + "RunEntryPoint": "main.py", + "IconFile": "icon.png", + "Variables": [ + { + "Name": "input", + "Type": "EnvironmentVariable", + "InputType": "InputTopic", + "Description": "Name of the input topic to listen to.", + "Required": true + }, + { + "Name": "SINK_AUTO_CREATE_TOPIC", + "Type": "EnvironmentVariable", + "InputType": "FreeText", + "Description": "Path to the SSL CA certificate file for secure connections. If not provided, system default CA certificates will be used", + "defaultValue": true, + "Required": false + }, + { + "Name": "CONSUMER_GROUP", + "Type": "EnvironmentVariable", + "InputType": "FreeText", + "Description": "Name of the consumer group", + "DefaultValue": "kafka_sink", + "Required": false + }, + { + "Name": "SINK_OUTPUT_TOPIC", + "Type": "EnvironmentVariable", + "InputType": "FreeText", + "Description": "The target Kafka topic name to produce to on the external Kafka cluster", + "Required": true + }, + { + "Name": "SINK_BOOTSTRAP_SERVERS", + "Type": "EnvironmentVariable", + "InputType": "FreeText", + "Description": "The external Kafka broker address (e.g., localhost:9092 or broker.example.com:9092)", + "Required": true + }, + { + "Name": "SINK_KEY_SERIALIZER", + "Type": "EnvironmentVariable", + "InputType": "FreeText", + "Description": "Serializer to use for the message key. Options: json, bytes, string, double, integer", + "DefaultValue": "bytes", + "Required": false + }, + { + "Name": "SINK_VALUE_SERIALIZER", + "Type": "EnvironmentVariable", + "InputType": "FreeText", + "Description": "Serializer to use for the message value. Options: json, bytes, string, double, integer", + "DefaultValue": "json", + "Required": false + }, + { + "Name": "SINK_SECURITY_PROTOCOL", + "Type": "EnvironmentVariable", + "InputType": "FreeText", + "Description": "Protocol used to communicate with brokers. Options: PLAINTEXT, SSL, SASL_PLAINTEXT, SASL_SSL", + "Required": false + }, + { + "Name": "SINK_SASL_MECHANISM", + "Type": "EnvironmentVariable", + "InputType": "FreeText", + "Description": "SASL mechanism for authentication. Options: PLAIN, SCRAM-SHA-256, SCRAM-SHA-512, GSSAPI, OAUTHBEARER, AWS_MSK_IAM", + "Required": false + }, + { + "Name": "SINK_SASL_USERNAME", + "Type": "EnvironmentVariable", + "InputType": "FreeText", + "Description": "SASL username for external Kafka authentication", + "Required": false + }, + { + "Name": "SINK_SASL_PASSWORD", + "Type": "EnvironmentVariable", + "InputType": "Secret", + "Description": "SASL password for external Kafka authentication", + "Required": false + }, + { + "Name": "SINK_SSL_CA_LOCATION", + "Type": "EnvironmentVariable", + "InputType": "FreeText", + "Description": "Path to the SSL CA certificate file for secure connections. If not provided, system default CA certificates will be used", + "Required": false + } + ], + "DeploySettings": { + "DeploymentType": "Service", + "CpuMillicores": 200, + "MemoryInMb": 200, + "Replicas": 1, + "PublicAccess": false, + "ValidateConnection": true + } +} diff --git a/python/destinations/kafka/main.py b/python/destinations/kafka/main.py new file mode 100644 index 00000000..63dc2738 --- /dev/null +++ b/python/destinations/kafka/main.py @@ -0,0 +1,72 @@ +import os +from typing import Tuple, Type + +from pydantic_settings import ( + BaseSettings as PydanticBaseSettings, + PydanticBaseSettingsSource, + SettingsConfigDict +) + +from quixstreams import Application +from quixstreams.kafka.configuration import ConnectionConfig + +from sink import KafkaReplicatorSink + + +class SinkConnectionConfig(ConnectionConfig): + """ + A ConnectionConfig subclass that reads configuration from environment variables + with a SINK_ prefix. + + This allows users to configure the sink's Kafka connection using environment + variables like SINK_BOOTSTRAP_SERVERS, SINK_SASL_USERNAME, etc. + + Example: + export SINK_BOOTSTRAP_SERVERS=kafka:9092 + export SINK_SECURITY_PROTOCOL=SASL_SSL + export SINK_SASL_MECHANISM=PLAIN + export SINK_SASL_USERNAME=myuser + export SINK_SASL_PASSWORD=mypass + + # Then create the config + config = SinkConnectionConfig() + sink = KafkaSink(broker_address=config, topic_name="output-topic") + """ + + model_config = SettingsConfigDict( + env_prefix="SINK_", + ) + + @classmethod + def settings_customise_sources( + cls, + settings_cls: Type[PydanticBaseSettings], + init_settings: PydanticBaseSettingsSource, + env_settings: PydanticBaseSettingsSource, + dotenv_settings: PydanticBaseSettingsSource, + file_secret_settings: PydanticBaseSettingsSource, + ) -> Tuple[PydanticBaseSettingsSource, ...]: + """ + Enable reading values from environment variables with SINK_ prefix. + """ + return init_settings, env_settings + + +app = Application( + consumer_group=os.environ["CONSUMER_GROUP"], + auto_offset_reset="earliest", +) +input_topic = app.topic(os.environ['input']) +kafka_sink = KafkaReplicatorSink( + broker_address=SinkConnectionConfig(), + topic_name=os.environ["SINK_OUTPUT_TOPIC"], + key_serializer=os.getenv("SINK_KEY_SERIALIZER", "bytes"), + value_serializer=os.getenv("SINK_VALUE_SERIALIZER", "json"), + origin_topic=input_topic, + auto_create_sink_topic=os.getenv("SINK_AUTO_CREATE_TOPIC", "true").lower() == "true", +) +app.dataframe(input_topic).sink(kafka_sink) + + +if __name__ == '__main__': + app.run() diff --git a/python/destinations/kafka/requirements.txt b/python/destinations/kafka/requirements.txt new file mode 100644 index 00000000..12b3aec5 --- /dev/null +++ b/python/destinations/kafka/requirements.txt @@ -0,0 +1,2 @@ +quixstreams==3.23.1 +python-dotenv diff --git a/tests/destinations/kafka/data.jsonlines b/tests/destinations/kafka/data.jsonlines new file mode 100644 index 00000000..492c61a9 --- /dev/null +++ b/tests/destinations/kafka/data.jsonlines @@ -0,0 +1,3 @@ +{"_key": "test0", "_value": {"k0": "v0", "k1": "v1"}} +{"_key": "test1", "_value": {"k0": "v0", "k1": "v1"}} +{"_key": "test2", "_value": {"k0": "v0", "k1": "v1"}} diff --git a/tests/destinations/kafka/docker-compose.test.yml b/tests/destinations/kafka/docker-compose.test.yml new file mode 100644 index 00000000..f00c6d6c --- /dev/null +++ b/tests/destinations/kafka/docker-compose.test.yml @@ -0,0 +1,130 @@ +# timeout: 60 +services: + # Quix Kafka instance (source) + quix-kafka: + image: docker.redpanda.com/redpandadata/redpanda:v24.2.4 + command: + - redpanda + - start + - --kafka-addr internal://0.0.0.0:9092 + - --advertise-kafka-addr internal://quix-kafka:9092 + - --mode dev-container + - --smp 1 + healthcheck: + test: ["CMD-SHELL", "rpk cluster health | grep -E 'Healthy:.+true' || exit 1"] + interval: 5s + timeout: 10s + retries: 10 + networks: + - test-network + stop_grace_period: 3s + + # External Kafka instance (destination) + external-kafka: + image: docker.redpanda.com/redpandadata/redpanda:v24.2.4 + command: + - redpanda + - start + - --kafka-addr internal://0.0.0.0:9092 + - --advertise-kafka-addr internal://external-kafka:9092 + - --mode dev-container + - --smp 1 + healthcheck: + test: ["CMD-SHELL", "rpk cluster health | grep -E 'Healthy:.+true' || exit 1"] + interval: 5s + timeout: 10s + retries: 10 + networks: + - test-network + stop_grace_period: 3s + + # Generate data to Quix Kafka + data-generator: + build: + context: ../../framework + dockerfile: Dockerfile + command: > + sh -c " + echo 'Waiting for Quix Kafka to be ready...' && + sleep 5 && + echo 'Generating test data to Quix Kafka...' && + python /tests/generate_data.py && + echo 'Test data generated successfully' && + echo 'Keeping data-generator alive to prevent test abort...' && + tail -f /dev/null + " + volumes: + - ./generate_data.py:/tests/generate_data.py:ro + - ./data.jsonlines:/tests/data.jsonlines:ro + working_dir: /tests + networks: + - test-network + depends_on: + quix-kafka: + condition: service_healthy + stop_grace_period: 3s + + # Kafka replicator sink (consumes from quix, produces to external) + kafka-sink: + build: + context: ../../../python/destinations/kafka + dockerfile: dockerfile + entrypoint: > + sh -c " + echo 'Waiting for data generator to populate Quix Kafka...' && + sleep 10 && + echo 'Starting Kafka replicator sink...' && + python3 main.py + " + environment: + - Quix__Broker__Address=quix-kafka:9092 + - Quix__Consumer__Group=kafka-sink-test + - Quix__Deployment__Id=test-kafka-sink + - input=source-topic + - CONSUMER_GROUP=kafka-sink-test + - SINK_OUTPUT_TOPIC=replicated-topic + - SINK_BOOTSTRAP_SERVERS=external-kafka:9092 + - SINK_KEY_SERIALIZER=bytes + - SINK_VALUE_SERIALIZER=json + networks: + - test-network + depends_on: + quix-kafka: + condition: service_healthy + external-kafka: + condition: service_healthy + data-generator: + condition: service_started + stop_grace_period: 3s + + # Verify output in external Kafka + test-verifier: + build: + context: ../../framework + dockerfile: Dockerfile + command: > + sh -c " + echo 'Waiting for kafka sink to replicate data...' && + sleep 10 && + echo 'Verifying replicated data in external Kafka...' && + python /tests/verify_output.py + " + environment: + - EXTERNAL_BROKER_ADDRESS=external-kafka:9092 + - TEST_OUTPUT_TOPIC=replicated-topic + - TEST_TIMEOUT=40 + volumes: + - ./verify_output.py:/tests/verify_output.py:ro + working_dir: / + networks: + - test-network + depends_on: + external-kafka: + condition: service_healthy + kafka-sink: + condition: service_started + stop_grace_period: 3s + +networks: + test-network: + driver: bridge diff --git a/tests/destinations/kafka/generate_data.py b/tests/destinations/kafka/generate_data.py new file mode 100644 index 00000000..32b921c0 --- /dev/null +++ b/tests/destinations/kafka/generate_data.py @@ -0,0 +1,21 @@ +import datetime + +from quixstreams.sources.community.file.local import LocalFileSource +from quixstreams import Application + + +def ts_setter(row): + return int(datetime.datetime.now().timestamp()) + + +app = Application( + broker_address="quix-kafka:9092", + auto_offset_reset="earliest" +) +app.add_source( + source=LocalFileSource(filepath="./data.jsonlines", timestamp_setter=ts_setter), + topic=app.topic("source-topic"), +) + +if __name__ == "__main__": + app.run() diff --git a/tests/destinations/kafka/verify_output.py b/tests/destinations/kafka/verify_output.py new file mode 100644 index 00000000..7398e891 --- /dev/null +++ b/tests/destinations/kafka/verify_output.py @@ -0,0 +1,84 @@ +import os +import time +from quixstreams import Application +from quixstreams.sinks.core.list import ListSink + + +def main(): + broker_address = os.getenv("EXTERNAL_BROKER_ADDRESS", "external-kafka:9092") + output_topic = os.getenv("TEST_OUTPUT_TOPIC", "replicated-topic") + timeout = int(os.getenv("TEST_TIMEOUT", "40")) + expected_count = 3 + + print(f"Consuming from output topic: {output_topic}") + print(f"Expected {expected_count} messages") + + app = Application( + broker_address=broker_address, + consumer_group=f"test-consumer-{int(time.time())}", + auto_offset_reset="earliest" + ) + + topic = app.topic(output_topic) + list_sink = ListSink(metadata=True) + + sdf = app.dataframe(topic) + sdf.sink(list_sink) + + app.run(count=expected_count, timeout=timeout) + + message_count = len(list_sink) + print(f"Received {message_count} messages from output topic") + + if message_count < expected_count: + print(f"FAILED: Expected {expected_count} messages, got {message_count}") + exit(1) + + print("Verifying message structure...") + + # Expected data from data.jsonlines + expected_keys = ["test0", "test1", "test2"] + received_keys = [] + + for i, message in enumerate(list_sink): + print(f"Message {i}: {message}") + + # Verify the message has the expected structure + if "_key" not in message: + print(f"FAILED: Message {i} missing '_key' field") + exit(1) + + received_keys.append(message["_key"].decode()) + + # Verify _value structure + value = message + if not isinstance(value, dict): + print(f"FAILED: Message {i} _value should be a dict, got {type(value)}") + exit(1) + + if "k0" not in value or "k1" not in value: + print(f"FAILED: Message {i} _value missing expected fields (k0, k1)") + exit(1) + + if value["k0"] != "v0" or value["k1"] != "v1": + print(f"FAILED: Message {i} _value has unexpected values") + exit(1) + + # Verify all expected keys were received + print(f"Received keys: {received_keys}") + print(f"Expected keys: {expected_keys}") + + for expected_key in expected_keys: + if expected_key not in received_keys: + print(f"FAILED: Expected key '{expected_key}' not found in messages") + exit(1) + + print(f"Success: Verified {message_count} messages with correct structure") + print(f"- All messages have required fields: _key, _value") + print(f"- All expected keys present: {expected_keys}") + print(f"- All values have correct structure and content") + exit(0) + + +if __name__ == "__main__": + main()