Skip to content

Commit 6c66ac4

Browse files
authored
Merge pull request #15 from SciCatProject/kafka-configuration
Kafka configuration.
2 parents 79d48af + 1dccbb0 commit 6c66ac4

6 files changed

+119
-2
lines changed

resources/config.sample.json

+1
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
"bootstrap_servers": [
66
"HOST:9092"
77
],
8+
"individual_message_commit": false,
89
"enable_auto_commit": true,
910
"auto_offset_reset": "earliest"
1011
},

src/scicat_configuration.py

+33-1
Original file line numberDiff line numberDiff line change
@@ -89,6 +89,13 @@ def build_main_arg_parser() -> argparse.ArgumentParser:
8989

9090
@dataclass
9191
class RunOptions:
92+
"""RunOptions dataclass to store the configuration options.
93+
94+
Most of options don't have default values because they are expected
95+
to be set by the user either in the configuration file or through
96+
command line arguments.
97+
"""
98+
9299
config_file: str
93100
verbose: bool
94101
file_log: bool
@@ -102,12 +109,36 @@ class RunOptions:
102109
pyscicat: Optional[str] = None
103110

104111

112+
@dataclass
113+
class kafkaOptions:
114+
"""KafkaOptions dataclass to store the configuration options.
115+
116+
Default values are provided as they are not
117+
expected to be set by command line arguments.
118+
"""
119+
120+
topics: list[str] | str = "KAFKA_TOPIC_1,KAFKA_TOPIC_2"
121+
"""List of Kafka topics. Multiple topics can be separated by commas."""
122+
group_id: str = "GROUP_ID"
123+
"""Kafka consumer group ID."""
124+
bootstrap_servers: list[str] | str = "localhost:9092"
125+
"""List of Kafka bootstrap servers. Multiple servers can be separated by commas."""
126+
individual_message_commit: bool = False
127+
"""Commit for each topic individually."""
128+
enable_auto_commit: bool = True
129+
"""Enable Kafka auto commit."""
130+
auto_offset_reset: str = "earliest"
131+
"""Kafka auto offset reset."""
132+
133+
105134
@dataclass
106135
class ScicatConfig:
107136
original_dict: Mapping
108137
"""Original configuration dictionary in the json file."""
109138
run_options: RunOptions
110139
"""Merged configuration dictionary with command line arguments."""
140+
kafka_options: kafkaOptions
141+
"""Kafka configuration options read from files."""
111142

112143
def to_dict(self) -> dict:
113144
"""Return the configuration as a dictionary."""
@@ -119,7 +150,7 @@ def to_dict(self) -> dict:
119150
if isinstance(value, Mapping):
120151
original_dict[key] = dict(value)
121152

122-
copied = ScicatConfig(original_dict, self.run_options)
153+
copied = ScicatConfig(original_dict, self.run_options, self.kafka_options)
123154
return asdict(copied)
124155

125156

@@ -153,4 +184,5 @@ def build_scicat_config(input_args: argparse.Namespace) -> ScicatConfig:
153184
return ScicatConfig(
154185
original_dict=MappingProxyType(config_dict),
155186
run_options=RunOptions(**run_option_dict),
187+
kafka_options=kafkaOptions(**config_dict.setdefault('kafka', dict())),
156188
)

src/scicat_ingestor.py

+15
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,20 @@
11
# SPDX-License-Identifier: BSD-3-Clause
22
# Copyright (c) 2024 ScicatProject contributors (https://github.com/ScicatProject)
3+
import logging
4+
35
from scicat_configuration import build_main_arg_parser, build_scicat_config
6+
from scicat_kafka import build_consumer
47
from scicat_logging import build_logger
58

69

10+
def quit(logger: logging.Logger) -> None:
11+
"""Log the message and exit the program."""
12+
import sys
13+
14+
logger.info("Exiting ingestor")
15+
sys.exit()
16+
17+
718
def main() -> None:
819
"""Main entry point of the app."""
920
arg_parser = build_main_arg_parser()
@@ -14,3 +25,7 @@ def main() -> None:
1425
# Log the configuration as dictionary so that it is easier to read from the logs
1526
logger.info('Starting the Scicat Ingestor with the following configuration:')
1627
logger.info(config.to_dict())
28+
29+
# Kafka consumer
30+
if build_consumer(config.kafka_options, logger) is None:
31+
quit(logger)

src/scicat_kafka.py

+62
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,62 @@
1+
# SPDX-License-Identifier: BSD-3-Clause
2+
# Copyright (c) 2024 ScicatProject contributors (https://github.com/ScicatProject)
3+
import logging
4+
5+
from confluent_kafka import Consumer
6+
7+
from scicat_configuration import kafkaOptions
8+
9+
10+
def collect_consumer_options(options: kafkaOptions) -> dict:
11+
"""Build a Kafka consumer and configure it according to the ``options``."""
12+
from dataclasses import asdict
13+
14+
# Build logger and formatter
15+
config_dict = {
16+
key.replace('_', '.'): value
17+
for key, value in asdict(options).items()
18+
if key not in ('topics', 'individual_message_commit')
19+
}
20+
config_dict['enable.auto.commit'] = (
21+
not options.individual_message_commit
22+
) and options.enable_auto_commit
23+
return config_dict
24+
25+
26+
def collect_kafka_topics(options: kafkaOptions) -> list[str]:
27+
"""Return the Kafka topics as a list."""
28+
if isinstance(options.topics, str):
29+
return options.topics.split(',')
30+
elif isinstance(options.topics, list):
31+
return options.topics
32+
else:
33+
raise TypeError('The topics must be a list or a comma-separated string.')
34+
35+
36+
def build_consumer(kafka_options: kafkaOptions, logger: logging.Logger) -> Consumer:
37+
"""Build a Kafka consumer and configure it according to the ``options``."""
38+
consumer_options = collect_consumer_options(kafka_options)
39+
logger.info('Connecting to Kafka with the following parameters:')
40+
logger.info(consumer_options)
41+
consumer = Consumer(consumer_options)
42+
if not validate_consumer(consumer, logger):
43+
return None
44+
45+
kafka_topics = collect_kafka_topics(kafka_options)
46+
logger.info(f'Subscribing to the following Kafka topics: {kafka_topics}')
47+
consumer.subscribe(kafka_topics)
48+
return Consumer(consumer_options)
49+
50+
51+
def validate_consumer(consumer: Consumer, logger: logging.Logger) -> bool:
52+
try:
53+
consumer.list_topics(timeout=1)
54+
except Exception as err:
55+
logger.error(
56+
"Kafka consumer could not be instantiated. "
57+
f"Error message from kafka thread: \n{err}"
58+
)
59+
return False
60+
else:
61+
logger.info('Kafka consumer successfully instantiated')
62+
return True

tests/test_logging.py

+2-1
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22

33
import pytest
44

5-
from scicat_configuration import RunOptions, ScicatConfig
5+
from scicat_configuration import RunOptions, ScicatConfig, kafkaOptions
66

77

88
@pytest.fixture
@@ -22,6 +22,7 @@ def scicat_config(tmp_path: pathlib.Path) -> ScicatConfig:
2222
check_by_job_id=True,
2323
pyscicat='test',
2424
),
25+
kafka_options=kafkaOptions(),
2526
)
2627

2728

tests/test_scicat_configuration.py

+6
Original file line numberDiff line numberDiff line change
@@ -77,3 +77,9 @@ def test_scicat_config_original_dict_read_only(scicat_config: ScicatConfig) -> N
7777
assert isinstance(scicat_config.original_dict, MappingProxyType)
7878
for sub_option in scicat_config.original_dict.values():
7979
assert isinstance(sub_option, MappingProxyType)
80+
81+
82+
def test_scicat_config_kafka_options(scicat_config: ScicatConfig) -> None:
83+
"""Test if the Kafka options are correctly read."""
84+
assert scicat_config.kafka_options.topics == ["KAFKA_TOPIC_1", "KAFKA_TOPIC_2"]
85+
assert scicat_config.kafka_options.enable_auto_commit

0 commit comments

Comments
 (0)