@@ -203,17 +203,19 @@ def create_table(self,
203203 TableInput = table_input )
204204
205205 def add_partitions (self , database , table , partition_paths , file_format ,
206- extra_args ):
206+ compression , extra_args ):
207207 if not partition_paths :
208208 return None
209209 partitions = list ()
210210 for partition in partition_paths :
211211 if file_format == "parquet" :
212212 partition_def = Glue .parquet_partition_definition (
213- partition = partition )
213+ partition = partition , compression = compression )
214214 elif file_format == "csv" :
215215 partition_def = Glue .csv_partition_definition (
216- partition = partition , extra_args = extra_args )
216+ partition = partition ,
217+ compression = compression ,
218+ extra_args = extra_args )
217219 else :
218220 raise UnsupportedFileFormat (file_format )
219221 partitions .append (partition_def )
@@ -225,8 +227,12 @@ def add_partitions(self, database, table, partition_paths, file_format,
225227 DatabaseName = database ,
226228 TableName = table ,
227229 PartitionInputList = page )
228- if len (res ["Errors" ]) > 0 :
229- raise ApiError (f"{ res ['Errors' ][0 ]} " )
230+ for error in res ["Errors" ]:
231+ if "ErrorDetail" in error :
232+ if "ErrorCode" in error ["ErrorDetail" ]:
233+ if error ["ErrorDetail" ][
234+ "ErrorCode" ] != "AlreadyExistsException" :
235+ raise ApiError (f"{ error } " )
230236
231237 def get_connection_details (self , name ):
232238 return self ._client_glue .get_connection (
@@ -355,7 +361,7 @@ def csv_table_definition(table, partition_cols_schema, schema, path,
355361 "InputFormat" : "org.apache.hadoop.mapred.TextInputFormat" ,
356362 "OutputFormat" :
357363 "org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat" ,
358- "Compressed" : True ,
364+ "Compressed" : compressed ,
359365 "NumberOfBuckets" : - 1 ,
360366 "SerdeInfo" : {
361367 "Parameters" : param ,
@@ -375,7 +381,8 @@ def csv_table_definition(table, partition_cols_schema, schema, path,
375381 }
376382
377383 @staticmethod
378- def csv_partition_definition (partition , extra_args ):
384+ def csv_partition_definition (partition , compression , extra_args ):
385+ compressed = False if compression is None else True
379386 sep = extra_args ["sep" ] if "sep" in extra_args else ","
380387 serde = extra_args .get ("serde" )
381388 if serde == "OpenCSVSerDe" :
@@ -394,6 +401,7 @@ def csv_partition_definition(partition, extra_args):
394401 "StorageDescriptor" : {
395402 "InputFormat" : "org.apache.hadoop.mapred.TextInputFormat" ,
396403 "Location" : partition [0 ],
404+ "Compressed" : compressed ,
397405 "SerdeInfo" : {
398406 "Parameters" : param ,
399407 "SerializationLibrary" : serde_fullname ,
@@ -454,11 +462,13 @@ def parquet_table_definition(table, partition_cols_schema, schema, path,
454462 }
455463
456464 @staticmethod
457- def parquet_partition_definition (partition ):
465+ def parquet_partition_definition (partition , compression ):
466+ compressed = False if compression is None else True
458467 return {
459468 "StorageDescriptor" : {
460469 "InputFormat" : "org.apache.hadoop.mapred.TextInputFormat" ,
461470 "Location" : partition [0 ],
471+ "Compressed" : compressed ,
462472 "SerdeInfo" : {
463473 "Parameters" : {
464474 "serialization.format" : "1"
0 commit comments