Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,7 @@ COPY pyxis /home/pyxis
COPY utils /home/utils
COPY scripts /home/scripts
COPY templates /home/templates
COPY kafka /home/kafka
COPY pubtools-pulp-wrapper /home/pubtools-pulp-wrapper
COPY pubtools-marketplacesvm-wrapper /home/pubtools-marketplacesvm-wrapper
COPY developer-portal-wrapper /home/developer-portal-wrapper
Expand Down
107 changes: 107 additions & 0 deletions kafka/consumer.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,107 @@
#!/usr/bin/env python3

import argparse
import datetime
import json
import os
from pathlib import Path

from confluent_kafka import Consumer


def main():
parser = argparse.ArgumentParser(description="Consume messages from Kafka")
parser.add_argument(
"--bootstrap-servers-file",
type=Path,
required=True,
metavar="FILE",
help="Path to a file containing the Kafka bootstrap servers",
)
parser.add_argument(
"--username-file",
type=Path,
required=True,
metavar="FILE",
help="Path to a file containing the SASL username",
)
parser.add_argument(
"--password-file",
type=Path,
required=True,
metavar="FILE",
help="Path to a file containing the SASL password",
)
args = parser.parse_args()

def read_file(path: Path, name: str) -> str:
try:
return path.read_text().strip()
except OSError as e:
raise SystemExit(f"Failed to read {name} file {path}: {e}") from e

bootstrap_servers = read_file(args.bootstrap_servers_file, "bootstrap servers")
username = read_file(args.username_file, "username")
password = read_file(args.password_file, "password")
topic = os.environ.get("KAFKA_TOPIC")
if not topic:
raise SystemExit("Set KAFKA_TOPIC env variable")

config = {
"bootstrap.servers": bootstrap_servers,
"sasl.username": username,
"sasl.password": password,
"security.protocol": "SASL_SSL",
"sasl.mechanisms": "SCRAM-SHA-512",
"group.id": "kafka-python-getting-started",
"auto.offset.reset": "earliest",
}

consumer = Consumer(config)
consumer.subscribe([topic])

# Poll for new messages from Kafka and print them.
try:
while True:
msg = consumer.poll(1.0)
if msg is None:
# Initial message consumption may take up to
# `session.timeout.ms` for the consumer group to
# rebalance and start consuming
print("Waiting...")
elif msg.error():
print("ERROR: {}".format(msg.error()))
else:
seconds_since_epoch = msg.timestamp()[1] / 1000
time_str = (
datetime.datetime.utcfromtimestamp(seconds_since_epoch)
.replace(microsecond=0)
.isoformat()
)
value_bytes = msg.value()
value_str = value_bytes.decode("utf-8")
try:
value_json = json.loads(value_str)
value_pretty = json.dumps(value_json, indent=2)
except json.JSONDecodeError:
value_pretty = value_str
headers = msg.headers() or []
headers_dict = {
k: v.decode("utf-8") if isinstance(v, bytes) else v for k, v in headers
}
headers_pretty = json.dumps(headers_dict, indent=2) if headers_dict else "{}"
print(
"--- Consumed event from topic {} at {} ---".format(msg.topic(), time_str)
)
print("Headers:\n{}".format(headers_pretty))
print("Message:\n{}".format(value_pretty))
print("---")
except KeyboardInterrupt:
pass
finally:
# Leave group and commit final offsets
consumer.close()


if __name__ == "__main__":
main()
117 changes: 117 additions & 0 deletions kafka/producer.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,117 @@
#!/usr/bin/env python3

import argparse
import json
import os
from pathlib import Path

from confluent_kafka import Producer


def main():
parser = argparse.ArgumentParser(description="Produce messages to Kafka")
parser.add_argument(
"--json-file",
type=Path,
required=True,
metavar="FILE",
help="Path to a JSON file whose content will be sent as the message",
)
parser.add_argument(
"--bootstrap-servers-file",
type=Path,
required=True,
metavar="FILE",
help="Path to a file containing the Kafka bootstrap servers",
)
parser.add_argument(
"--username-file",
type=Path,
required=True,
metavar="FILE",
help="Path to a file containing the SASL username",
)
parser.add_argument(
"--password-file",
type=Path,
required=True,
metavar="FILE",
help="Path to a file containing the SASL password",
)
parser.add_argument(
"--header",
action="append",
default=[],
metavar="KEY=VALUE",
help="Message header (can be repeated). Example: --header advisory_state=updated",
)
args = parser.parse_args()

def read_file(path: Path, name: str) -> str:
try:
return path.read_text().strip()
except OSError as e:
raise SystemExit(f"Failed to read {name} file {path}: {e}") from e

bootstrap_servers = read_file(args.bootstrap_servers_file, "bootstrap servers")
username = read_file(args.username_file, "username")
password = read_file(args.password_file, "password")
topic = os.environ.get("KAFKA_TOPIC")
if not topic:
raise SystemExit("Set KAFKA_TOPIC env variable")

config = {
"bootstrap.servers": bootstrap_servers,
"sasl.username": username,
"sasl.password": password,
"security.protocol": "SASL_SSL",
"sasl.mechanisms": "SCRAM-SHA-512",
"acks": "all",
"retries": 5,
"message.timeout.ms": 60000, # max ms from produce() to delivery/failure (1 min)
}

# Create Producer instance
producer = Producer(config)

# Optional per-message delivery callback (triggered by poll() or flush())
# when a message has been successfully delivered or permanently
# failed delivery (after retries).
def delivery_callback(err, msg):
if err:
print("ERROR: Message failed delivery: {}".format(err))
else:
payload = json.loads(msg.value().decode("utf-8"))
name = payload.get("metadata", {}).get("name", "<no name>")
print(
"Produced event to topic {topic}: metadata.name = {name}".format(
topic=msg.topic(), name=name
)
)

with open(args.json_file) as f:
myjson = json.load(f)

headers = []
for h in args.header:
if "=" not in h:
raise SystemExit(f"Invalid --header {h!r}: expected KEY=VALUE")
key, _, value = h.partition("=")
headers.append((key.strip(), value.encode("utf-8")))

producer.produce(
topic=topic,
value=json.dumps(myjson),
headers=headers if headers else None,
callback=delivery_callback,
)

# Trigger any outstanding delivery report callbacks.
producer.poll(0)

# Block until the messages are delivered.
producer.flush()


if __name__ == "__main__":
main()
77 changes: 77 additions & 0 deletions kafka/test_consumer.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
"""Unit tests for kafka.consumer."""

import os
import sys
import tempfile
from pathlib import Path
from unittest.mock import MagicMock, patch

import pytest

# Ensure kafka dir is on path when running pytest from repo root
sys.path.insert(0, str(Path(__file__).resolve().parent))
import consumer as consumer_module # noqa: E402


@pytest.fixture
def temp_files():
"""Create temp dir with bootstrap, username, and password files."""
with tempfile.TemporaryDirectory() as tmpdir:
p = Path(tmpdir)
(p / "bootstrap.txt").write_text("broker1:9096,broker2:9096")
(p / "username.txt").write_text("test-user")
(p / "password.txt").write_text("test-password")
yield p


@pytest.fixture
def consumer_argv(temp_files):
"""argv for consumer with all required file options."""
return [
"consumer.py",
"--bootstrap-servers-file",
str(temp_files / "bootstrap.txt"),
"--username-file",
str(temp_files / "username.txt"),
"--password-file",
str(temp_files / "password.txt"),
]


@patch.dict(os.environ, {"KAFKA_TOPIC": "test.topic"})
@patch("consumer.Consumer")
def test_consumer_main_builds_config_from_files(MockConsumer, consumer_argv, temp_files):
"""Consumer main() builds config from file contents and subscribes to KAFKA_TOPIC."""
mock_consumer = MagicMock()
mock_consumer.poll.side_effect = KeyboardInterrupt() # exit loop on first poll
MockConsumer.return_value = mock_consumer

with patch.object(sys, "argv", consumer_argv):
consumer_module.main()

MockConsumer.assert_called_once()
config = MockConsumer.call_args[0][0]
assert config["bootstrap.servers"] == "broker1:9096,broker2:9096"
assert config["sasl.username"] == "test-user"
assert config["sasl.password"] == "test-password"
assert config["security.protocol"] == "SASL_SSL"
assert config["group.id"] == "kafka-python-getting-started"
mock_consumer.subscribe.assert_called_once_with(["test.topic"])
mock_consumer.close.assert_called_once()


def test_consumer_main_requires_kafka_topic(consumer_argv):
"""Consumer main() exits when KAFKA_TOPIC is not set."""
with patch.dict(os.environ, {}, clear=False):
os.environ.pop("KAFKA_TOPIC", None)
with patch.object(sys, "argv", consumer_argv):
with pytest.raises(SystemExit):
consumer_module.main()


def test_consumer_main_missing_required_arg():
"""Consumer main() exits when required args are missing."""
with patch.dict(os.environ, {"KAFKA_TOPIC": "t"}):
with patch.object(sys, "argv", ["consumer.py"]):
with pytest.raises(SystemExit):
consumer_module.main()
Loading