44Copyright (c) 2019 Aiven Ltd
55See LICENSE for details
66"""
7+ from contextlib import closing , ExitStack
78from kafka import KafkaConsumer
89from kafka .admin import KafkaAdminClient , NewTopic
9- from kafka .errors import NoBrokersAvailable , NodeNotReadyError , TopicAlreadyExistsError
10+ from kafka .errors import KafkaConfigurationError , NoBrokersAvailable , NodeNotReadyError , TopicAlreadyExistsError
1011from karapace import constants
1112from karapace .config import Config
1213from karapace .master_coordinator import MasterCoordinator
1718from typing import Any , Dict , Optional
1819
1920import logging
20- import time
2121import ujson
2222
2323Offset = int
3232OFFSET_EMPTY = - 1
3333LOG = logging .getLogger (__name__ )
3434
35+ KAFKA_CLIENT_CREATION_TIMEOUT_SECONDS = 2.0
36+ SCHEMA_TOPIC_CREATION_TIMEOUT_SECONDS = 5.0
37+
38+
39+ def _create_consumer_from_config (config : Config ) -> KafkaConsumer :
40+ # Group not set on purpose, all consumers read the same data
41+ session_timeout_ms = config ["session_timeout_ms" ]
42+ request_timeout_ms = max (session_timeout_ms , KafkaConsumer .DEFAULT_CONFIG ["request_timeout_ms" ])
43+ return KafkaConsumer (
44+ config ["topic_name" ],
45+ enable_auto_commit = False ,
46+ api_version = (1 , 0 , 0 ),
47+ bootstrap_servers = config ["bootstrap_uri" ],
48+ client_id = config ["client_id" ],
49+ security_protocol = config ["security_protocol" ],
50+ ssl_cafile = config ["ssl_cafile" ],
51+ ssl_certfile = config ["ssl_certfile" ],
52+ ssl_keyfile = config ["ssl_keyfile" ],
53+ sasl_mechanism = config ["sasl_mechanism" ],
54+ sasl_plain_username = config ["sasl_plain_username" ],
55+ sasl_plain_password = config ["sasl_plain_password" ],
56+ auto_offset_reset = "earliest" ,
57+ session_timeout_ms = session_timeout_ms ,
58+ request_timeout_ms = request_timeout_ms ,
59+ kafka_client = KarapaceKafkaClient ,
60+ metadata_max_age_ms = config ["metadata_max_age_ms" ],
61+ )
62+
63+
64+ def _create_admin_client_from_config (config : Config ) -> KafkaAdminClient :
65+ return KafkaAdminClient (
66+ api_version_auto_timeout_ms = constants .API_VERSION_AUTO_TIMEOUT_MS ,
67+ bootstrap_servers = config ["bootstrap_uri" ],
68+ client_id = config ["client_id" ],
69+ security_protocol = config ["security_protocol" ],
70+ ssl_cafile = config ["ssl_cafile" ],
71+ ssl_certfile = config ["ssl_certfile" ],
72+ ssl_keyfile = config ["ssl_keyfile" ],
73+ sasl_mechanism = config ["sasl_mechanism" ],
74+ sasl_plain_username = config ["sasl_plain_username" ],
75+ sasl_plain_password = config ["sasl_plain_password" ],
76+ )
77+
78+
79+ def new_schema_topic_from_config (config : Config ) -> NewTopic :
80+ return NewTopic (
81+ name = config ["topic_name" ],
82+ num_partitions = constants .SCHEMA_TOPIC_NUM_PARTITIONS ,
83+ replication_factor = config ["replication_factor" ],
84+ topic_configs = {"cleanup.policy" : "compact" },
85+ )
86+
3587
3688class OffsetsWatcher :
3789 """Synchronization container for threads to wait until an offset is seen.
@@ -89,11 +141,9 @@ def __init__(
89141 self .schemas : Dict [int , TypedSchema ] = {}
90142 self .global_schema_id = 0
91143 self .admin_client : Optional [KafkaAdminClient ] = None
92- self .schema_topic = None
93144 self .topic_replication_factor = self .config ["replication_factor" ]
94145 self .consumer : Optional [KafkaConsumer ] = None
95146 self .offset_watcher = OffsetsWatcher ()
96- self .running = True
97147 self .id_lock = Lock ()
98148 self .stats = StatsClient (
99149 sentry_config = config ["sentry" ], # type: ignore[arg-type]
@@ -113,80 +163,9 @@ def __init__(
113163 self .offset = OFFSET_EMPTY
114164 self .ready = False
115165
116- def init_consumer (self ) -> None :
117- # Group not set on purpose, all consumers read the same data
118- session_timeout_ms = self .config ["session_timeout_ms" ]
119- request_timeout_ms = max (session_timeout_ms , KafkaConsumer .DEFAULT_CONFIG ["request_timeout_ms" ])
120- self .consumer = KafkaConsumer (
121- self .config ["topic_name" ],
122- enable_auto_commit = False ,
123- api_version = (1 , 0 , 0 ),
124- bootstrap_servers = self .config ["bootstrap_uri" ],
125- client_id = self .config ["client_id" ],
126- security_protocol = self .config ["security_protocol" ],
127- ssl_cafile = self .config ["ssl_cafile" ],
128- ssl_certfile = self .config ["ssl_certfile" ],
129- ssl_keyfile = self .config ["ssl_keyfile" ],
130- sasl_mechanism = self .config ["sasl_mechanism" ],
131- sasl_plain_username = self .config ["sasl_plain_username" ],
132- sasl_plain_password = self .config ["sasl_plain_password" ],
133- auto_offset_reset = "earliest" ,
134- session_timeout_ms = session_timeout_ms ,
135- request_timeout_ms = request_timeout_ms ,
136- kafka_client = KarapaceKafkaClient ,
137- metadata_max_age_ms = self .config ["metadata_max_age_ms" ],
138- )
139-
140- def init_admin_client (self ) -> bool :
141- try :
142- self .admin_client = KafkaAdminClient (
143- api_version_auto_timeout_ms = constants .API_VERSION_AUTO_TIMEOUT_MS ,
144- bootstrap_servers = self .config ["bootstrap_uri" ],
145- client_id = self .config ["client_id" ],
146- security_protocol = self .config ["security_protocol" ],
147- ssl_cafile = self .config ["ssl_cafile" ],
148- ssl_certfile = self .config ["ssl_certfile" ],
149- ssl_keyfile = self .config ["ssl_keyfile" ],
150- sasl_mechanism = self .config ["sasl_mechanism" ],
151- sasl_plain_username = self .config ["sasl_plain_username" ],
152- sasl_plain_password = self .config ["sasl_plain_password" ],
153- )
154- return True
155- except (NodeNotReadyError , NoBrokersAvailable , AssertionError ):
156- LOG .warning ("No Brokers available yet, retrying init_admin_client()" )
157- time .sleep (2.0 )
158- except : # pylint: disable=bare-except
159- LOG .exception ("Failed to initialize admin client, retrying init_admin_client()" )
160- time .sleep (2.0 )
161- return False
162-
163- @staticmethod
164- def get_new_schema_topic (config : dict ) -> NewTopic :
165- return NewTopic (
166- name = config ["topic_name" ],
167- num_partitions = constants .SCHEMA_TOPIC_NUM_PARTITIONS ,
168- replication_factor = config ["replication_factor" ],
169- topic_configs = {"cleanup.policy" : "compact" },
170- )
171-
172- def create_schema_topic (self ) -> bool :
173- assert self .admin_client is not None , "Thread must be started"
174-
175- schema_topic = self .get_new_schema_topic (self .config )
176- try :
177- LOG .info ("Creating topic: %r" , schema_topic )
178- self .admin_client .create_topics ([schema_topic ], timeout_ms = constants .TOPIC_CREATION_TIMEOUT_MS )
179- LOG .info ("Topic: %r created successfully" , self .config ["topic_name" ])
180- self .schema_topic = schema_topic
181- return True
182- except TopicAlreadyExistsError :
183- LOG .warning ("Topic: %r already exists" , self .config ["topic_name" ])
184- self .schema_topic = schema_topic
185- return True
186- except : # pylint: disable=bare-except
187- LOG .exception ("Failed to create topic: %r, retrying create_schema_topic()" , self .config ["topic_name" ])
188- time .sleep (5 )
189- return False
166+ # This event controls when the Reader should stop running, it will be
167+ # set by another thread (e.g. `KarapaceSchemaRegistry`)
168+ self ._stop = Event ()
190169
191170 def get_schema_id (self , new_schema : TypedSchema ) -> int :
192171 with self .id_lock :
@@ -198,34 +177,62 @@ def get_schema_id(self, new_schema: TypedSchema) -> int:
198177
199178 def close (self ) -> None :
200179 LOG .info ("Closing schema_reader" )
201- self .running = False
180+ self ._stop . set ()
202181
203182 def run (self ) -> None :
204- while self .running :
205- try :
206- if not self .admin_client :
207- if self .init_admin_client () is False :
208- continue
209- if not self .schema_topic :
210- if self .create_schema_topic () is False :
211- continue
212- if not self .consumer :
213- self .init_consumer ()
214- self .handle_messages ()
215- LOG .info ("Status: offset: %r, ready: %r" , self .offset , self .ready )
216- except Exception as e : # pylint: disable=broad-except
217- if self .stats :
183+ with ExitStack () as stack :
184+ while not self ._stop .is_set () and self .admin_client is None :
185+ try :
186+ self .admin_client = _create_admin_client_from_config (self .config )
187+ stack .enter_context (closing (self .admin_client ))
188+ except (NodeNotReadyError , NoBrokersAvailable , AssertionError ):
189+ LOG .warning ("[Admin Client] No Brokers available yet. Retrying" )
190+ self ._stop .wait (timeout = KAFKA_CLIENT_CREATION_TIMEOUT_SECONDS )
191+ except KafkaConfigurationError :
192+ LOG .exception ("[Admin Client] Invalid configuration. Bailing" )
193+ raise
194+ except Exception as e : # pylint: disable=broad-except
195+ LOG .exception ("[Admin Client] Unexpected exception. Retrying" )
196+ self .stats .unexpected_exception (ex = e , where = "admin_client_instantiation" )
197+ self ._stop .wait (timeout = 2.0 )
198+
199+ while not self ._stop .is_set () and self .consumer is None :
200+ try :
201+ self .consumer = _create_consumer_from_config (self .config )
202+ stack .enter_context (closing (self .consumer ))
203+ except (NodeNotReadyError , NoBrokersAvailable , AssertionError ):
204+ LOG .warning ("[Consumer] No Brokers available yet. Retrying" )
205+ self ._stop .wait (timeout = 2.0 )
206+ except KafkaConfigurationError :
207+ LOG .exception ("[Consumer] Invalid configuration. Bailing" )
208+ raise
209+ except Exception as e : # pylint: disable=broad-except
210+ LOG .exception ("[Consumer] Unexpected exception. Retrying" )
211+ self .stats .unexpected_exception (ex = e , where = "consumer_instantiation" )
212+ self ._stop .wait (timeout = KAFKA_CLIENT_CREATION_TIMEOUT_SECONDS )
213+
214+ schema_topic_exists = False
215+ schema_topic = new_schema_topic_from_config (self .config )
216+ schema_topic_create = [schema_topic ]
217+ while not self ._stop .is_set () and not schema_topic_exists :
218+ try :
219+ LOG .info ("[Schema Topic] Creating %r" , schema_topic )
220+ self .admin_client .create_topics (schema_topic_create , timeout_ms = constants .TOPIC_CREATION_TIMEOUT_MS )
221+ LOG .info ("[Schema Topic] Successfully created %r" , schema_topic .name )
222+ schema_topic_exists = True
223+ except TopicAlreadyExistsError :
224+ LOG .warning ("[Schema Topic] Already exists %r" , schema_topic .name )
225+ schema_topic_exists = True
226+ except : # pylint: disable=bare-except
227+ LOG .exception ("[Schema Topic] Failed to create %r, retrying" , schema_topic .name )
228+ self ._stop .wait (timeout = SCHEMA_TOPIC_CREATION_TIMEOUT_SECONDS )
229+
230+ while not self ._stop .is_set ():
231+ try :
232+ self .handle_messages ()
233+ except Exception as e : # pylint: disable=broad-except
218234 self .stats .unexpected_exception (ex = e , where = "schema_reader_loop" )
219- LOG .exception ("Unexpected exception in schema reader loop" )
220- try :
221- if self .admin_client :
222- self .admin_client .close ()
223- if self .consumer :
224- self .consumer .close ()
225- except Exception as e : # pylint: disable=broad-except
226- if self .stats :
227- self .stats .unexpected_exception (ex = e , where = "schema_reader_exit" )
228- LOG .exception ("Unexpected exception closing schema reader" )
235+ LOG .exception ("Unexpected exception in schema reader loop" )
229236
230237 def handle_messages (self ) -> None :
231238 assert self .consumer is not None , "Thread must be started"
0 commit comments