88from kafka .admin import KafkaAdminClient
99from kafka .errors import NoBrokersAvailable , NodeNotReadyError , TopicAlreadyExistsError
1010from karapace import constants
11+ from karapace .anonymize_schemas import anonymize_avro
1112from karapace .config import Config , read_config
1213from karapace .schema_reader import KafkaSchemaReader
1314from karapace .utils import json_encode , KarapaceKafkaClient
14- from typing import Optional
15+ from typing import Dict , List , Optional , Tuple
1516
1617import argparse
1718import logging
@@ -138,33 +139,7 @@ def close(self):
138139 self .admin_client = None
139140
140141 def request_backup (self ):
141- if not self .consumer :
142- self .init_consumer ()
143- self .log .info ("Starting schema backup read for topic: %r" , self .topic_name )
144-
145- values = []
146- topic_fully_consumed = False
147-
148- while not topic_fully_consumed :
149-
150- raw_msg = self .consumer .poll (timeout_ms = self .timeout_ms )
151- topic_fully_consumed = len (raw_msg ) == 0
152-
153- for _ , messages in raw_msg .items ():
154- for message in messages :
155- key = message .key .decode ("utf8" )
156- try :
157- key = ujson .loads (key )
158- except ValueError :
159- self .log .debug ("Invalid JSON in message.key: %r, value: %r" , message .key , message .value )
160- value = None
161- if message .value :
162- value = message .value .decode ("utf8" )
163- try :
164- value = ujson .loads (value )
165- except ValueError :
166- self .log .debug ("Invalid JSON in message.value: %r, key: %r" , message .value , message .key )
167- values .append ((key , value ))
142+ values = self ._export ()
168143
169144 ser = ujson .dumps (values )
170145 if self .backup_location :
@@ -203,6 +178,65 @@ def restore_backup(self):
203178 self .log .debug ("Sent kafka msg key: %r, value: %r, offset: %r" , key , value , msg .offset )
204179 self .close ()
205180
181+ def export_anonymized_avro_schemas (self ):
182+ values = self ._export ()
183+ anonymized_schemas = []
184+
185+ for value in values :
186+ # The schemas topic contain all changes to schema metadata.
187+ # Check that the message has key `schema` and type is Avro schema.
188+ # The Avro schemas may have `schemaType` key, if not present the schema is Avro.
189+ if value [1 ] and "schema" in value [1 ] and value [1 ].get ("schemaType" , "AVRO" ) == "AVRO" :
190+ original_schema = ujson .loads (value [1 ].get ("schema" ))
191+ anonymized_schema = anonymize_avro .anonymize (original_schema )
192+ if anonymized_schema :
193+ if "subject" in value [0 ]:
194+ value [0 ]["subject" ] = anonymize_avro .anonymize_name (value [0 ]["subject" ])
195+ if "subject" in value [1 ]:
196+ value [1 ]["subject" ] = anonymize_avro .anonymize_name (value [1 ]["subject" ])
197+ value [1 ]["schema" ] = anonymized_schema
198+ anonymized_schemas .append ((value [0 ], value [1 ]))
199+ ser = ujson .dumps (anonymized_schemas )
200+ if self .backup_location :
201+ with open (self .backup_location , mode = "w" , encoding = "utf8" ) as fp :
202+ fp .write (ser )
203+ self .log .info ("Anonymized Avro schema export written to %r" , self .backup_location )
204+ else :
205+ print (ser )
206+ self .log .info ("Anonymized Avro schema export written to stdout" )
207+ self .close ()
208+
209+ def _export (self ) -> List [Tuple [str , Dict [str , str ]]]:
210+ if not self .consumer :
211+ self .init_consumer ()
212+ self .log .info ("Starting schema backup read for topic: %r" , self .topic_name )
213+
214+ values = []
215+ topic_fully_consumed = False
216+
217+ while not topic_fully_consumed :
218+
219+ raw_msg = self .consumer .poll (timeout_ms = self .timeout_ms )
220+ topic_fully_consumed = len (raw_msg ) == 0
221+
222+ for _ , messages in raw_msg .items ():
223+ for message in messages :
224+ key = message .key .decode ("utf8" )
225+ try :
226+ key = ujson .loads (key )
227+ except ValueError :
228+ self .log .debug ("Invalid JSON in message.key: %r, value: %r" , message .key , message .value )
229+ value = None
230+ if message .value :
231+ value = message .value .decode ("utf8" )
232+ try :
233+ value = ujson .loads (value )
234+ except ValueError :
235+ self .log .debug ("Invalid JSON in message.value: %r, key: %r" , message .value , message .key )
236+ values .append ((key , value ))
237+
238+ return values
239+
206240
207241def encode_value (value ):
208242 if value == "null" :
@@ -218,7 +252,10 @@ def parse_args():
218252
219253 parser_get = subparsers .add_parser ("get" , help = "Store the schema backup into a file" )
220254 parser_restore = subparsers .add_parser ("restore" , help = "Restore the schema backup from a file" )
221- for p in [parser_get , parser_restore ]:
255+ parser_export_anonymized_avro_schemas = subparsers .add_parser (
256+ "export-anonymized-avro-schemas" , help = "Export anonymized Avro schemas into a file"
257+ )
258+ for p in [parser_get , parser_restore , parser_export_anonymized_avro_schemas ]:
222259 p .add_argument ("--config" , help = "Configuration file path" , required = True )
223260 p .add_argument ("--location" , default = "" , help = "File path for the backup file" )
224261 p .add_argument ("--topic" , help = "Kafka topic name to be used" , required = False )
@@ -240,6 +277,9 @@ def main() -> int:
240277 if args .command == "restore" :
241278 sb .restore_backup ()
242279 return 0
280+ if args .command == "export-anonymized-avro-schemas" :
281+ sb .export_anonymized_avro_schemas ()
282+ return 0
243283 return 1
244284
245285
0 commit comments