77from enum import Enum
88from kafka import KafkaConsumer , KafkaProducer
99from kafka .admin import KafkaAdminClient
10- from kafka .errors import KafkaError , TopicAlreadyExistsError
10+ from kafka .errors import TopicAlreadyExistsError
1111from kafka .structs import PartitionMetadata
1212from karapace import constants
1313from karapace .anonymize_schemas import anonymize_avro
@@ -292,6 +292,8 @@ def __init__(self, config: Config, backup_path: str, topic_option: Optional[str]
292292 self .timeout_ms = 1000
293293 self .timeout_kafka_producer = 5
294294
295+ self .producer_exception : Optional [Exception ] = None
296+
295297 # Schema key formatter
296298 self .key_formatter = None
297299 if self .topic_name == constants .DEFAULT_SCHEMA_TOPIC or self .config .get ("force_key_correction" , False ):
@@ -311,19 +313,23 @@ def restore_backup(self) -> None:
311313 self ._restore_backup_version_2 (producer , fp )
312314 else :
313315 self ._restore_backup_version_1_single_array (producer , fp )
316+ producer .flush (timeout = self .timeout_kafka_producer )
317+ if self .producer_exception is not None :
318+ raise BackupError ("Error while producing restored messages" ) from self .producer_exception
319+
320+ def producer_error_callback (self , exception : Exception ):
321+ self .producer_exception = exception
314322
315323 def _handle_restore_message (self , producer : KafkaProducer , item : Tuple [str , str ]) -> None :
316324 key = self .encode_key (item [0 ])
317325 value = encode_value (item [1 ])
318- LOG .debug ("Trying to send kafka msg key: %r, value: %r" , key , value )
319- try :
320- msg = producer .send (self .topic_name , key = key , value = value , partition = PARTITION_ZERO )
321- producer .flush (timeout = self .timeout_kafka_producer )
322- metadata = msg .get (timeout = self .timeout_kafka_producer )
323- except KafkaError as ex :
324- raise BackupError ("Error while producing restored message" ) from ex
325- else :
326- LOG .debug ("Sent kafka msg key: %r, value: %r, offset: %r" , key , value , metadata .offset )
326+ LOG .debug ("Sending kafka msg key: %r, value: %r" , key , value )
327+ producer .send (
328+ self .topic_name ,
329+ key = key ,
330+ value = value ,
331+ partition = PARTITION_ZERO ,
332+ ).add_errback (self .producer_error_callback )
327333
328334 def _restore_backup_version_1_single_array (self , producer : KafkaProducer , fp : IO ) -> None :
329335 raw_msg = fp .read ()
0 commit comments