@@ -200,7 +200,10 @@ def deduce_schema(self, file):
200200
201201 # Deduce the schema from this given data record.
202202 if isinstance (json_object , dict ):
203- self .deduce_schema_for_line (json_object , schema_map )
203+ self .deduce_schema_for_line (
204+ json_object = json_object ,
205+ schema_map = schema_map ,
206+ )
204207 elif isinstance (json_object , Exception ):
205208 self .log_error (
206209 f'Record could not be parsed: Exception: { json_object } ' )
@@ -218,20 +221,35 @@ def deduce_schema(self, file):
218221
219222 return schema_map , self .error_logs
220223
221- def deduce_schema_for_line (self , json_object , schema_map ):
224+ def deduce_schema_for_line (self , json_object , schema_map , base_path = None ):
222225 """Figures out the BigQuery schema for the given 'json_object' and
223226 updates 'schema_map' with the latest info. A 'schema_map' entry of type
224227 'soft' is a provisional entry that can be overwritten by a subsequent
225228 'soft' or 'hard' entry. If both the old and new have the same type,
226229 then they must be compatible.
230+
231+ 'base_path' is the string representing the current path within the
232+ nested record that leads to this specific entry.
227233 """
228234 for key , value in json_object .items ():
229235 schema_entry = schema_map .get (key )
230- new_schema_entry = self .get_schema_entry (key , value )
231- schema_map [key ] = self .merge_schema_entry (schema_entry ,
232- new_schema_entry )
233-
234- def merge_schema_entry (self , old_schema_entry , new_schema_entry ):
236+ new_schema_entry = self .get_schema_entry (
237+ key = key ,
238+ value = value ,
239+ base_path = base_path ,
240+ )
241+ schema_map [key ] = self .merge_schema_entry (
242+ old_schema_entry = schema_entry ,
243+ new_schema_entry = new_schema_entry ,
244+ base_path = base_path ,
245+ )
246+
247+ def merge_schema_entry (
248+ self ,
249+ old_schema_entry ,
250+ new_schema_entry ,
251+ base_path = None ,
252+ ):
235253 """Merges the 'new_schema_entry' into the 'old_schema_entry' and return
236254 a merged schema entry. Recursively merges in sub-fields as well.
237255
@@ -240,6 +258,10 @@ def merge_schema_entry(self, old_schema_entry, new_schema_entry):
240258 returned as the new schema_entry. Returns None if the field should
241259 be removed from the schema due to internal consistency errors.
242260
261+ 'base_path' is the string representing the current path within the
262+ nested record that leads to this specific entry. This is used during
263+ error logging.
264+
243265 An Exception is thrown if an unexpected programming error is detected.
244266 The calling routine should stop processing the file.
245267 """
@@ -310,50 +332,71 @@ def merge_schema_entry(self, old_schema_entry, new_schema_entry):
310332 new_fields = new_info ['fields' ]
311333 for key , new_entry in new_fields .items ():
312334 old_entry = old_fields .get (key )
313- old_fields [key ] = self .merge_schema_entry (old_entry , new_entry )
335+ new_base_path = json_full_path (base_path , old_name )
336+ old_fields [key ] = self .merge_schema_entry (
337+ old_schema_entry = old_entry ,
338+ new_schema_entry = new_entry ,
339+ base_path = new_base_path ,
340+ )
314341 return old_schema_entry
315342
343+ full_old_name = json_full_path (base_path , old_name )
344+ full_new_name = json_full_path (base_path , new_name )
345+
316346 # For all other types, the old_mode must be the same as the new_mode. It
317347 # might seem reasonable to allow a NULLABLE {primitive_type} to be
318348 # upgraded to a REPEATED {primitive_type}, but currently 'bq load' does
319349 # not support that so we must also follow that rule.
320350 if old_mode != new_mode :
321351 self .log_error (
322352 f'Ignoring non-RECORD field with mismatched mode: '
323- f'old=({ old_status } ,{ old_name } ,{ old_mode } ,{ old_type } ); '
324- f'new=({ new_status } ,{ new_name } ,{ new_mode } ,{ new_type } )' )
353+ f'old=({ old_status } ,{ full_old_name } ,{ old_mode } ,{ old_type } ); '
354+ f'new=({ new_status } ,{ full_new_name } ,{ new_mode } ,{ new_type } )' )
325355 return None
326356
327357 # Check that the converted types are compatible.
328358 candidate_type = convert_type (old_type , new_type )
329359 if not candidate_type :
330360 self .log_error (
331361 f'Ignoring field with mismatched type: '
332- f'old=({ old_status } ,{ old_name } ,{ old_mode } ,{ old_type } ); '
333- f'new=({ new_status } ,{ new_name } ,{ new_mode } ,{ new_type } )' )
362+ f'old=({ old_status } ,{ full_old_name } ,{ old_mode } ,{ old_type } ); '
363+ f'new=({ new_status } ,{ full_new_name } ,{ new_mode } ,{ new_type } )' )
334364 return None
335365
336366 new_info ['type' ] = candidate_type
337367 return new_schema_entry
338368
339- def get_schema_entry (self , key , value ):
369+ def get_schema_entry (self , key , value , base_path = None ):
340370 """Determines the 'schema_entry' of the (key, value) pair. Calls
341371 deduce_schema_for_line() recursively if the value is another object
342372 instead of a primitive (this will happen only for JSON input file).
373+
374+ 'base_path' is the string representing the current path within the
375+ nested record that leads to this specific entry.
343376 """
344377 value_mode , value_type = self .infer_bigquery_type (value )
345378 if not value_mode or not value_type :
346379 return None
347380
348381 if value_type == 'RECORD' :
382+ new_base_path = json_full_path (base_path , key )
349383 # recursively figure out the RECORD
350384 fields = OrderedDict ()
351385 if value_mode == 'NULLABLE' :
352- self .deduce_schema_for_line (value , fields )
386+ self .deduce_schema_for_line (
387+ json_object = value ,
388+ schema_map = fields ,
389+ base_path = new_base_path ,
390+ )
353391 else :
354392 for val in value :
355- self .deduce_schema_for_line (val , fields )
356- # yapf: disable
393+ self .deduce_schema_for_line (
394+ json_object = val ,
395+ schema_map = fields ,
396+ base_path = new_base_path ,
397+ )
398+
399+ # yapf: disable
357400 schema_entry = OrderedDict ([
358401 ('status' , 'hard' ),
359402 ('filled' , True ),
@@ -539,7 +582,8 @@ def flatten_schema(self, schema_map):
539582 keep_nulls = self .keep_nulls ,
540583 sorted_schema = self .sorted_schema ,
541584 infer_mode = self .infer_mode ,
542- sanitize_names = self .sanitize_names )
585+ sanitize_names = self .sanitize_names ,
586+ )
543587
544588 def run (self , input_file = sys .stdin , output_file = sys .stdout ):
545589 """Read the data records from the input_file and print out the BigQuery
@@ -745,6 +789,17 @@ def flatten_schema_map(
745789 return schema
746790
747791
792+ def json_full_path (base_path , key ):
793+ """Return the dot-separated JSON full path to a particular key.
794+ e.g. 'server.config.port'. Column names in CSV files are never nested,
795+ so this will always return `key`.
796+ """
797+ if base_path is None or base_path == "" :
798+ return key
799+ else :
800+ return f'{ base_path } .{ key } '
801+
802+
748803def main ():
749804 # Configure command line flags.
750805 parser = argparse .ArgumentParser (
0 commit comments