@@ -13,37 +13,43 @@ def collect_consumer_options(options: kafkaOptions) -> dict:
13
13
14
14
# Build logger and formatter
15
15
config_dict = {
16
- key .replace ('_' , '.' ): value
16
+ key .replace ("_" , "." ): value
17
17
for key , value in asdict (options ).items ()
18
- if key not in (' topics' , ' individual_message_commit' )
18
+ if key not in (" topics" , " individual_message_commit" )
19
19
}
20
- config_dict [' enable.auto.commit' ] = (
20
+ config_dict [" enable.auto.commit" ] = (
21
21
not options .individual_message_commit
22
22
) and options .enable_auto_commit
23
+ if isinstance (bootstrap_servers := options .bootstrap_servers , list ):
24
+ # Convert the list to a comma-separated string
25
+ config_dict ["bootstrap.servers" ] = "," .join (bootstrap_servers )
26
+ else :
27
+ config_dict ["bootstrap.servers" ] = bootstrap_servers
28
+
23
29
return config_dict
24
30
25
31
26
32
def collect_kafka_topics (options : kafkaOptions ) -> list [str ]:
27
33
"""Return the Kafka topics as a list."""
28
34
if isinstance (options .topics , str ):
29
- return options .topics .split (',' )
35
+ return options .topics .split ("," )
30
36
elif isinstance (options .topics , list ):
31
37
return options .topics
32
38
else :
33
- raise TypeError (' The topics must be a list or a comma-separated string.' )
39
+ raise TypeError (" The topics must be a list or a comma-separated string." )
34
40
35
41
36
42
def build_consumer (kafka_options : kafkaOptions , logger : logging .Logger ) -> Consumer :
37
43
"""Build a Kafka consumer and configure it according to the ``options``."""
38
44
consumer_options = collect_consumer_options (kafka_options )
39
- logger .info (' Connecting to Kafka with the following parameters:' )
45
+ logger .info (" Connecting to Kafka with the following parameters:" )
40
46
logger .info (consumer_options )
41
47
consumer = Consumer (consumer_options )
42
48
if not validate_consumer (consumer , logger ):
43
49
return None
44
50
45
51
kafka_topics = collect_kafka_topics (kafka_options )
46
- logger .info (f' Subscribing to the following Kafka topics: { kafka_topics } ' )
52
+ logger .info (f" Subscribing to the following Kafka topics: { kafka_topics } " )
47
53
consumer .subscribe (kafka_topics )
48
54
return Consumer (consumer_options )
49
55
@@ -58,5 +64,5 @@ def validate_consumer(consumer: Consumer, logger: logging.Logger) -> bool:
58
64
)
59
65
return False
60
66
else :
61
- logger .info (' Kafka consumer successfully instantiated' )
67
+ logger .info (" Kafka consumer successfully instantiated" )
62
68
return True
0 commit comments