Skip to content

Commit 7947528

Browse files
authored
TODOs in storage/iceberg/impl.py in create_table_version (#497)
* resolving TODOs in create_table_version (storage/iceberg/impl.py) * resolved TODOs + cleaned; ready to merge * ran make lint
1 parent 22b2f40 commit 7947528

File tree

1 file changed

+24
-11
lines changed

1 file changed

+24
-11
lines changed

deltacat/storage/iceberg/impl.py

+24-11
Original file line numberDiff line numberDiff line change
@@ -393,8 +393,6 @@ def create_table_version(
393393
)
394394
raise TypeError(err_msg)
395395

396-
# TODO: ensure catalog.create_table() is idempotent
397-
# TODO: get table and commit new metadata if table already exists?
398396
identifier = _to_identifier(namespace, table_name)
399397
iceberg_schema = SchemaMapper.unmap(schema)
400398
sort_order = SortSchemeMapper.unmap(
@@ -407,15 +405,30 @@ def create_table_version(
407405
schema=iceberg_schema,
408406
case_sensitive=case_sensitive_col_names,
409407
)
410-
table = catalog.create_table(
411-
identifier=identifier,
412-
schema=iceberg_schema,
413-
location=location,
414-
partition_spec=partition_spec or UNPARTITIONED_PARTITION_SPEC,
415-
sort_order=sort_order or UNSORTED_SORT_ORDER,
416-
properties=table_properties or EMPTY_DICT,
417-
)
418-
logger.info(f"Created table: {table}")
408+
409+
existing_table = _try_load_iceberg_table(catalog, namespace, table_name)
410+
if existing_table is not None:
411+
table = existing_table
412+
logger.info(f"Table already exists: {table}")
413+
414+
if table_properties:
415+
try:
416+
with table.transaction() as transaction:
417+
transaction.set_properties(table_properties)
418+
logger.info(f"Updated table properties for {namespace}.{table_name}")
419+
except Exception as e:
420+
logger.warning(f"Failed to update table properties: {e}")
421+
else:
422+
table = catalog.create_table(
423+
identifier=identifier,
424+
schema=iceberg_schema,
425+
location=location,
426+
partition_spec=partition_spec or UNPARTITIONED_PARTITION_SPEC,
427+
sort_order=sort_order or UNSORTED_SORT_ORDER,
428+
properties=table_properties or EMPTY_DICT,
429+
)
430+
logger.info(f"Created table: {table}")
431+
419432
# no snapshot is committed on table creation, so return an undefined stream
420433
return Stream.of(locator=None, partition_scheme=None)
421434

0 commit comments

Comments
 (0)