@@ -393,8 +393,6 @@ def create_table_version(
393
393
)
394
394
raise TypeError (err_msg )
395
395
396
- # TODO: ensure catalog.create_table() is idempotent
397
- # TODO: get table and commit new metadata if table already exists?
398
396
identifier = _to_identifier (namespace , table_name )
399
397
iceberg_schema = SchemaMapper .unmap (schema )
400
398
sort_order = SortSchemeMapper .unmap (
@@ -407,15 +405,30 @@ def create_table_version(
407
405
schema = iceberg_schema ,
408
406
case_sensitive = case_sensitive_col_names ,
409
407
)
410
- table = catalog .create_table (
411
- identifier = identifier ,
412
- schema = iceberg_schema ,
413
- location = location ,
414
- partition_spec = partition_spec or UNPARTITIONED_PARTITION_SPEC ,
415
- sort_order = sort_order or UNSORTED_SORT_ORDER ,
416
- properties = table_properties or EMPTY_DICT ,
417
- )
418
- logger .info (f"Created table: { table } " )
408
+
409
+ existing_table = _try_load_iceberg_table (catalog , namespace , table_name )
410
+ if existing_table is not None :
411
+ table = existing_table
412
+ logger .info (f"Table already exists: { table } " )
413
+
414
+ if table_properties :
415
+ try :
416
+ with table .transaction () as transaction :
417
+ transaction .set_properties (table_properties )
418
+ logger .info (f"Updated table properties for { namespace } .{ table_name } " )
419
+ except Exception as e :
420
+ logger .warning (f"Failed to update table properties: { e } " )
421
+ else :
422
+ table = catalog .create_table (
423
+ identifier = identifier ,
424
+ schema = iceberg_schema ,
425
+ location = location ,
426
+ partition_spec = partition_spec or UNPARTITIONED_PARTITION_SPEC ,
427
+ sort_order = sort_order or UNSORTED_SORT_ORDER ,
428
+ properties = table_properties or EMPTY_DICT ,
429
+ )
430
+ logger .info (f"Created table: { table } " )
431
+
419
432
# no snapshot is committed on table creation, so return an undefined stream
420
433
return Stream .of (locator = None , partition_scheme = None )
421
434
0 commit comments