@@ -73,6 +73,9 @@ class SchemaGenerator:
7373 # Detect floats inside quotes.
7474 FLOAT_MATCHER = re .compile (r'^[-]?\d+\.\d+$' )
7575
76+ # Valid field name characters of BigQuery
77+ FIELD_NAME_MATCHER = re .compile (r'[^a-zA-Z0-9_]' )
78+
7679 def __init__ (self ,
7780 input_format = 'json' ,
7881 infer_mode = False ,
@@ -114,8 +117,8 @@ def __init__(self,
114117
115118 # This option generally wants to be turned on as any inferred schema
116119 # will not be accepted by `bq load` when it contains illegal characters.
117- # Characters such as #, / or -. Neither will it be accepted if the column name
118- # in the schema is larger than 128 characters.
120+ # Characters such as #, / or -. Neither will it be accepted if the
121+ # column name in the schema is larger than 128 characters.
119122 self .sanitize_names = sanitize_names
120123
121124 def log_error (self , msg ):
@@ -323,7 +326,6 @@ def get_schema_entry(self, key, value):
323326 if not value_mode or not value_type :
324327 return None
325328
326- # yapf: disable
327329 if value_type == 'RECORD' :
328330 # recursively figure out the RECORD
329331 fields = OrderedDict ()
@@ -332,39 +334,48 @@ def get_schema_entry(self, key, value):
332334 else :
333335 for val in value :
334336 self .deduce_schema_for_line (val , fields )
335- schema_entry = OrderedDict ([('status' , 'hard' ),
336- ('filled' , True ),
337- ('info' , OrderedDict ([
338- ('fields' , fields ),
339- ('mode' , value_mode ),
340- ('name' , key ),
341- ('type' , value_type ),
342- ]))])
337+ # yapf: disable
338+ schema_entry = OrderedDict ([
339+ ('status' , 'hard' ),
340+ ('filled' , True ),
341+ ('info' , OrderedDict ([
342+ ('fields' , fields ),
343+ ('mode' , value_mode ),
344+ ('name' , key ),
345+ ('type' , value_type ),
346+ ])),
347+ ])
343348 elif value_type == '__null__' :
344- schema_entry = OrderedDict ([('status' , 'soft' ),
345- ('filled' , False ),
346- ('info' , OrderedDict ([
347- ('mode' , 'NULLABLE' ),
348- ('name' , key ),
349- ('type' , 'STRING' ),
350- ]))])
349+ schema_entry = OrderedDict ([
350+ ('status' , 'soft' ),
351+ ('filled' , False ),
352+ ('info' , OrderedDict ([
353+ ('mode' , 'NULLABLE' ),
354+ ('name' , key ),
355+ ('type' , 'STRING' ),
356+ ])),
357+ ])
351358 elif value_type == '__empty_array__' :
352- schema_entry = OrderedDict ([('status' , 'soft' ),
353- ('filled' , False ),
354- ('info' , OrderedDict ([
355- ('mode' , 'REPEATED' ),
356- ('name' , key ),
357- ('type' , 'STRING' ),
358- ]))])
359+ schema_entry = OrderedDict ([
360+ ('status' , 'soft' ),
361+ ('filled' , False ),
362+ ('info' , OrderedDict ([
363+ ('mode' , 'REPEATED' ),
364+ ('name' , key ),
365+ ('type' , 'STRING' ),
366+ ])),
367+ ])
359368 elif value_type == '__empty_record__' :
360- schema_entry = OrderedDict ([('status' , 'soft' ),
361- ('filled' , False ),
362- ('info' , OrderedDict ([
363- ('fields' , OrderedDict ()),
364- ('mode' , value_mode ),
365- ('name' , key ),
366- ('type' , 'RECORD' ),
367- ]))])
369+ schema_entry = OrderedDict ([
370+ ('status' , 'soft' ),
371+ ('filled' , False ),
372+ ('info' , OrderedDict ([
373+ ('fields' , OrderedDict ()),
374+ ('mode' , value_mode ),
375+ ('name' , key ),
376+ ('type' , 'RECORD' ),
377+ ])),
378+ ])
368379 else :
369380 # Empty fields are returned as empty strings, and must be treated as
370381 # a (soft String) to allow clobbering by subsquent non-empty fields.
@@ -374,13 +385,15 @@ def get_schema_entry(self, key, value):
374385 else :
375386 status = 'hard'
376387 filled = True
377- schema_entry = OrderedDict ([('status' , status ),
378- ('filled' , filled ),
379- ('info' , OrderedDict ([
380- ('mode' , value_mode ),
381- ('name' , key ),
382- ('type' , value_type ),
383- ]))])
388+ schema_entry = OrderedDict ([
389+ ('status' , status ),
390+ ('filled' , filled ),
391+ ('info' , OrderedDict ([
392+ ('mode' , value_mode ),
393+ ('name' , key ),
394+ ('type' , value_type ),
395+ ])),
396+ ])
384397 # yapf: enable
385398 return schema_entry
386399
@@ -435,8 +448,8 @@ def infer_value_type(self, value):
435448 # Implement the same type inference algorithm as 'bq load' for
436449 # quoted values that look like ints, floats or bools.
437450 if self .INTEGER_MATCHER .match (value ):
438- if int (value ) < self .INTEGER_MIN_VALUE or \
439- self .INTEGER_MAX_VALUE < int (value ):
451+ if ( int (value ) < self .INTEGER_MIN_VALUE
452+ or self .INTEGER_MAX_VALUE < int (value ) ):
440453 return 'QFLOAT' # quoted float
441454 else :
442455 return 'QINTEGER' # quoted integer
@@ -618,11 +631,13 @@ def is_string_type(thetype):
618631 ]
619632
620633
621- def flatten_schema_map (schema_map ,
622- keep_nulls = False ,
623- sorted_schema = True ,
624- infer_mode = False ,
625- sanitize_names = False ):
634+ def flatten_schema_map (
635+ schema_map ,
636+ keep_nulls = False ,
637+ sorted_schema = True ,
638+ infer_mode = False ,
639+ sanitize_names = False ,
640+ ):
626641 """Converts the 'schema_map' into a more flatten version which is
627642 compatible with BigQuery schema.
628643
@@ -647,7 +662,8 @@ def flatten_schema_map(schema_map,
647662 else schema_map .items ()
648663 for name , meta in map_items :
649664 # Skip over fields which have been explicitly removed
650- if not meta : continue
665+ if not meta :
666+ continue
651667
652668 status = meta ['status' ]
653669 filled = meta ['filled' ]
@@ -679,16 +695,24 @@ def flatten_schema_map(schema_map,
679695 else :
680696 # Recursively flatten the sub-fields of a RECORD entry.
681697 new_value = flatten_schema_map (
682- value , keep_nulls , sorted_schema , sanitize_names )
698+ schema_map = value ,
699+ keep_nulls = keep_nulls ,
700+ sorted_schema = sorted_schema ,
701+ infer_mode = infer_mode ,
702+ sanitize_names = sanitize_names ,
703+ )
683704 elif key == 'type' and value in ['QINTEGER' , 'QFLOAT' , 'QBOOLEAN' ]:
705+ # Convert QINTEGER -> INTEGER, similarly for QFLAT and QBOOLEAN.
684706 new_value = value [1 :]
685707 elif key == 'mode' :
686708 if infer_mode and value == 'NULLABLE' and filled :
687709 new_value = 'REQUIRED'
688710 else :
689711 new_value = value
690712 elif key == 'name' and sanitize_names :
691- new_value = re .sub ('[^a-zA-Z0-9_]' , '_' , value )[0 :127 ]
713+ new_value = SchemaGenerator .FIELD_NAME_MATCHER .sub (
714+ '_' , value ,
715+ )[0 :127 ]
692716 else :
693717 new_value = value
694718 new_info [key ] = new_value
0 commit comments