@@ -200,7 +200,10 @@ def deduce_schema(self, file):
200200
201201 # Deduce the schema from this given data record.
202202 if isinstance (json_object , dict ):
203- self .deduce_schema_for_line (json_object , schema_map )
203+ self .deduce_schema_for_line (
204+ json_object = json_object ,
205+ schema_map = schema_map ,
206+ )
204207 elif isinstance (json_object , Exception ):
205208 self .log_error (
206209 f'Record could not be parsed: Exception: { json_object } ' )
@@ -230,17 +233,23 @@ def deduce_schema_for_line(self, json_object, schema_map, base_path=None):
230233 """
231234 for key , value in json_object .items ():
232235 schema_entry = schema_map .get (key )
233- new_schema_entry = self .get_schema_entry (key ,
234- value ,
235- base_path = base_path )
236- schema_map [key ] = self .merge_schema_entry (schema_entry ,
237- new_schema_entry ,
238- base_path = base_path )
239-
240- def merge_schema_entry (self ,
241- old_schema_entry ,
242- new_schema_entry ,
243- base_path = None ):
236+ new_schema_entry = self .get_schema_entry (
237+ key = key ,
238+ value = value ,
239+ base_path = base_path ,
240+ )
241+ schema_map [key ] = self .merge_schema_entry (
242+ old_schema_entry = schema_entry ,
243+ new_schema_entry = new_schema_entry ,
244+ base_path = base_path ,
245+ )
246+
247+ def merge_schema_entry (
248+ self ,
249+ old_schema_entry ,
250+ new_schema_entry ,
251+ base_path = None ,
252+ ):
244253 """Merges the 'new_schema_entry' into the 'old_schema_entry' and return
245254 a merged schema entry. Recursively merges in sub-fields as well.
246255
@@ -325,9 +334,10 @@ def merge_schema_entry(self,
325334 old_entry = old_fields .get (key )
326335 new_base_path = json_full_path (base_path , old_name )
327336 old_fields [key ] = self .merge_schema_entry (
328- old_entry ,
329- new_entry ,
330- base_path = new_base_path )
337+ old_schema_entry = old_entry ,
338+ new_schema_entry = new_entry ,
339+ base_path = new_base_path ,
340+ )
331341 return old_schema_entry
332342
333343 full_old_name = json_full_path (base_path , old_name )
@@ -373,15 +383,20 @@ def get_schema_entry(self, key, value, base_path=None):
373383 # recursively figure out the RECORD
374384 fields = OrderedDict ()
375385 if value_mode == 'NULLABLE' :
376- self .deduce_schema_for_line (value ,
377- fields ,
378- base_path = new_base_path )
386+ self .deduce_schema_for_line (
387+ json_object = value ,
388+ schema_map = fields ,
389+ base_path = new_base_path ,
390+ )
379391 else :
380392 for val in value :
381- self .deduce_schema_for_line (val ,
382- fields ,
383- base_path = new_base_path )
384- # yapf: disable
393+ self .deduce_schema_for_line (
394+ json_object = val ,
395+ schema_map = fields ,
396+ base_path = new_base_path ,
397+ )
398+
399+ # yapf: disable
385400 schema_entry = OrderedDict ([
386401 ('status' , 'hard' ),
387402 ('filled' , True ),
@@ -567,7 +582,8 @@ def flatten_schema(self, schema_map):
567582 keep_nulls = self .keep_nulls ,
568583 sorted_schema = self .sorted_schema ,
569584 infer_mode = self .infer_mode ,
570- sanitize_names = self .sanitize_names )
585+ sanitize_names = self .sanitize_names ,
586+ )
571587
572588 def run (self , input_file = sys .stdin , output_file = sys .stdout ):
573589 """Read the data records from the input_file and print out the BigQuery
0 commit comments