3232OFFSET_EMPTY = - 1
3333LOG = logging .getLogger (__name__ )
3434
35+ KAFKA_CLIENT_CREATION_TIMEOUT_SECONDS = 2.0
36+ SCHEMA_TOPIC_CREATION_TIMEOUT_SECONDS = 5.0
37+
3538
3639def _create_consumer_from_config (config : Config ) -> KafkaConsumer :
3740 # Group not set on purpose, all consumers read the same data
@@ -184,7 +187,7 @@ def run(self) -> None:
184187 stack .enter_context (closing (self .admin_client ))
185188 except (NodeNotReadyError , NoBrokersAvailable , AssertionError ):
186189 LOG .warning ("[Admin Client] No Brokers available yet. Retrying" )
187- self ._stop .wait (timeout = 2.0 )
190+ self ._stop .wait (timeout = KAFKA_CLIENT_CREATION_TIMEOUT_SECONDS )
188191 except KafkaConfigurationError :
189192 LOG .exception ("[Admin Client] Invalid configuration. Bailing" )
190193 raise
@@ -206,7 +209,7 @@ def run(self) -> None:
206209 except Exception as e : # pylint: disable=broad-except
207210 LOG .exception ("[Consumer] Unexpected exception. Retrying" )
208211 self .stats .unexpected_exception (ex = e , where = "consumer_instantiation" )
209- self ._stop .wait (timeout = 2.0 )
212+ self ._stop .wait (timeout = KAFKA_CLIENT_CREATION_TIMEOUT_SECONDS )
210213
211214 schema_topic_exists = False
212215 schema_topic = new_schema_topic_from_config (self .config )
@@ -222,7 +225,7 @@ def run(self) -> None:
222225 schema_topic_exists = True
223226 except : # pylint: disable=bare-except
224227 LOG .exception ("[Schema Topic] Failed to create %r, retrying" , schema_topic .name )
225- self ._stop .wait (timeout = 5.0 )
228+ self ._stop .wait (timeout = SCHEMA_TOPIC_CREATION_TIMEOUT_SECONDS )
226229
227230 while not self ._stop .is_set ():
228231 try :
0 commit comments