Skip to content

Commit 693f2f0

Browse files
committed
Add enforce_primary_key_uniqueness support to converter
1 parent bf9076c commit 693f2f0

12 files changed

+501
-220
lines changed

deltacat/compute/converter/converter_session.py

+26-33
Original file line numberDiff line numberDiff line change
@@ -10,11 +10,11 @@
1010
)
1111
import logging
1212
from deltacat import logs
13-
from collections import defaultdict
1413
from deltacat.compute.converter.model.converter_session_params import (
1514
ConverterSessionParams,
1615
)
1716

17+
1818
from deltacat.compute.converter.constants import DEFAULT_MAX_PARALLEL_DATA_FILE_DOWNLOAD
1919
from deltacat.compute.converter.steps.convert import convert
2020
from deltacat.compute.converter.model.convert_input import ConvertInput
@@ -23,13 +23,16 @@
2323
parquet_files_dict_to_iceberg_data_files,
2424
)
2525
from deltacat.compute.converter.utils.converter_session_utils import (
26-
check_data_files_sequence_number,
2726
construct_iceberg_table_prefix,
2827
)
2928
from deltacat.compute.converter.pyiceberg.replace_snapshot import (
3029
commit_overwrite_snapshot,
30+
commit_append_snapshot,
3131
)
3232
from deltacat.compute.converter.pyiceberg.catalog import load_table
33+
from deltacat.compute.converter.utils.converter_session_utils import (
34+
group_all_files_to_each_bucket,
35+
)
3336

3437
logger = logs.configure_deltacat_logger(logging.getLogger(__name__))
3538

@@ -44,33 +47,15 @@ def converter_session(params: ConverterSessionParams, **kwargs):
4447
catalog = params.catalog
4548
table_name = params.iceberg_table_name
4649
iceberg_table = load_table(catalog, table_name)
50+
enforce_primary_key_uniqueness = params.enforce_primary_key_uniqueness
4751
data_file_dict, equality_delete_dict, pos_delete_dict = fetch_all_bucket_files(
4852
iceberg_table
4953
)
50-
51-
# files_for_each_bucket contains the following files list:
52-
# {partition_value: [(equality_delete_files_list, data_files_list, pos_delete_files_list)]
53-
files_for_each_bucket = defaultdict(tuple)
54-
for k, v in data_file_dict.items():
55-
logger.info(f"data_file: k, v:{k, v}")
56-
for k, v in equality_delete_dict.items():
57-
logger.info(f"equality_delete_file: k, v:{k, v}")
58-
for partition_value, equality_delete_file_list in equality_delete_dict.items():
59-
(
60-
result_equality_delete_file,
61-
result_data_file,
62-
) = check_data_files_sequence_number(
63-
data_files_list=data_file_dict[partition_value],
64-
equality_delete_files_list=equality_delete_dict[partition_value],
65-
)
66-
logger.info(f"result_data_file:{result_data_file}")
67-
logger.info(f"result_equality_delete_file:{result_equality_delete_file}")
68-
files_for_each_bucket[partition_value] = (
69-
result_data_file,
70-
result_equality_delete_file,
71-
[],
72-
)
73-
54+
convert_input_files_for_all_buckets = group_all_files_to_each_bucket(
55+
data_file_dict=data_file_dict,
56+
equality_delete_dict=equality_delete_dict,
57+
pos_delete_dict=pos_delete_dict,
58+
)
7459
iceberg_warehouse_bucket_name = params.iceberg_warehouse_bucket_name
7560
iceberg_namespace = params.iceberg_namespace
7661
iceberg_table_warehouse_prefix = construct_iceberg_table_prefix(
@@ -116,6 +101,7 @@ def convert_input_provider(index, item):
116101
iceberg_table_warehouse_prefix=iceberg_table_warehouse_prefix,
117102
identifier_fields=identifier_fields,
118103
compact_small_files=compact_small_files,
104+
enforce_primary_key_uniqueness=enforce_primary_key_uniqueness,
119105
position_delete_for_multiple_data_files=position_delete_for_multiple_data_files,
120106
max_parallel_data_file_download=max_parallel_data_file_download,
121107
)
@@ -125,7 +111,7 @@ def convert_input_provider(index, item):
125111
# Assuming that memory consume by each bucket doesn't exceed one node's memory limit.
126112
# TODO: Add split mechanism to split large buckets
127113
convert_tasks_pending = invoke_parallel(
128-
items=files_for_each_bucket.items(),
114+
items=convert_input_files_for_all_buckets.items(),
129115
ray_task=convert,
130116
max_parallelism=task_max_parallelism,
131117
options_provider=convert_options_provider,
@@ -143,9 +129,16 @@ def convert_input_provider(index, item):
143129
table_metadata=iceberg_table.metadata,
144130
files_dict_list=to_be_added_files_dict_list,
145131
)
146-
commit_overwrite_snapshot(
147-
iceberg_table=iceberg_table,
148-
# equality_delete_files + data file that all rows are deleted
149-
to_be_deleted_files_list=to_be_deleted_files_list[0],
150-
new_position_delete_files=new_position_delete_files,
151-
)
132+
print(f"debug_to_be_deleted_files:{to_be_deleted_files_list}")
133+
if not to_be_deleted_files_list:
134+
commit_append_snapshot(
135+
iceberg_table=iceberg_table,
136+
new_position_delete_files=new_position_delete_files,
137+
)
138+
else:
139+
commit_overwrite_snapshot(
140+
iceberg_table=iceberg_table,
141+
# equality_delete_files + data file that all rows are deleted
142+
to_be_deleted_files_list=to_be_deleted_files_list,
143+
new_position_delete_files=new_position_delete_files,
144+
)

deltacat/compute/converter/model/convert_input.py

+11-4
Original file line numberDiff line numberDiff line change
@@ -1,26 +1,29 @@
11
from __future__ import annotations
22
from typing import Dict, List
3+
from deltacat.compute.converter.model.convert_input_files import ConvertInputFiles
34

45

56
class ConvertInput(Dict):
67
@staticmethod
78
def of(
8-
files_for_each_bucket,
9+
convert_input_files,
910
convert_task_index,
1011
iceberg_table_warehouse_prefix,
1112
identifier_fields,
1213
compact_small_files,
14+
enforce_primary_key_uniqueness,
1315
position_delete_for_multiple_data_files,
1416
max_parallel_data_file_download,
1517
s3_file_system,
1618
) -> ConvertInput:
1719

1820
result = ConvertInput()
19-
result["files_for_each_bucket"] = files_for_each_bucket
21+
result["convert_input_files"] = convert_input_files
2022
result["convert_task_index"] = convert_task_index
2123
result["identifier_fields"] = identifier_fields
2224
result["iceberg_table_warehouse_prefix"] = iceberg_table_warehouse_prefix
2325
result["compact_small_files"] = compact_small_files
26+
result["enforce_primary_key_uniqueness"] = enforce_primary_key_uniqueness
2427
result[
2528
"position_delete_for_multiple_data_files"
2629
] = position_delete_for_multiple_data_files
@@ -30,8 +33,8 @@ def of(
3033
return result
3134

3235
@property
33-
def files_for_each_bucket(self) -> tuple:
34-
return self["files_for_each_bucket"]
36+
def convert_input_files(self) -> ConvertInputFiles:
37+
return self["convert_input_files"]
3538

3639
@property
3740
def identifier_fields(self) -> List[str]:
@@ -49,6 +52,10 @@ def iceberg_table_warehouse_prefix(self) -> str:
4952
def compact_small_files(self) -> bool:
5053
return self["compact_small_files"]
5154

55+
@property
56+
def enforce_primary_key_uniqueness(self) -> bool:
57+
return self["enforce_primary_key_uniqueness"]
58+
5259
@property
5360
def position_delete_for_multiple_data_files(self) -> bool:
5461
return self["position_delete_for_multiple_data_files"]
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,61 @@
1+
from __future__ import annotations
2+
from typing import Dict
3+
4+
5+
class ConvertInputFiles(Dict):
6+
@staticmethod
7+
def of(
8+
partition_value,
9+
all_data_files_for_dedupe=None,
10+
applicable_data_files=None,
11+
applicable_equality_delete_files=None,
12+
existing_position_delete_files=None,
13+
) -> ConvertInputFiles:
14+
15+
result = ConvertInputFiles()
16+
result["partition_value"] = partition_value
17+
result["all_data_files_for_dedupe"] = all_data_files_for_dedupe
18+
result["applicable_data_files"] = applicable_data_files
19+
result["applicable_equality_delete_files"] = applicable_equality_delete_files
20+
result["existing_position_delete_files"] = existing_position_delete_files
21+
return result
22+
23+
@property
24+
def partition_value(self):
25+
return self["partition_value"]
26+
27+
@property
28+
def all_data_files_for_dedupe(self):
29+
return self["all_data_files_for_dedupe"]
30+
31+
@property
32+
def applicable_data_files(self):
33+
return self["applicable_data_files"]
34+
35+
@property
36+
def applicable_equality_delete_files(self):
37+
return self["applicable_equality_delete_files"]
38+
39+
@property
40+
def existing_position_delete_files(self):
41+
return self["existing_position_delete_files"]
42+
43+
@partition_value.setter
44+
def partition_value(self, partition_value):
45+
self["partition_value"] = partition_value
46+
47+
@all_data_files_for_dedupe.setter
48+
def all_data_files_for_dedupe(self, all_data_files_for_dedupe):
49+
self["all_data_files_for_dedupe"] = all_data_files_for_dedupe
50+
51+
@applicable_data_files.setter
52+
def applicable_data_files(self, applicable_data_files):
53+
self["applicable_data_files"] = applicable_data_files
54+
55+
@applicable_equality_delete_files.setter
56+
def applicable_equality_delete_files(self, applicable_equality_delete_files):
57+
self["applicable_equality_delete_files"] = applicable_equality_delete_files
58+
59+
@existing_position_delete_files.setter
60+
def existing_position_delete_files(self, existing_position_delete_files):
61+
self["existing_position_delete_files"] = existing_position_delete_files

deltacat/compute/converter/model/converter_session_params.py

+11
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,9 @@ def of(params: Optional[Dict]) -> ConverterSessionParams:
2323
), "iceberg_namespace is a required arg"
2424
result = ConverterSessionParams(params)
2525

26+
result.enforce_primary_key_uniqueness = params.get(
27+
"enforce_primary_key_uniqueness", False
28+
)
2629
result.compact_small_files = params.get("compact_small_files", False)
2730

2831
# For Iceberg v3 spec, option to produce delete vector that can establish 1:1 mapping with data files.
@@ -51,6 +54,14 @@ def iceberg_warehouse_bucket_name(self) -> str:
5154
def iceberg_namespace(self) -> str:
5255
return self["iceberg_namespace"]
5356

57+
@property
58+
def enforce_primary_key_uniqueness(self) -> bool:
59+
return self["enforce_primary_key_uniqueness"]
60+
61+
@enforce_primary_key_uniqueness.setter
62+
def enforce_primary_key_uniqueness(self, enforce_primary_key_uniqueness) -> None:
63+
self["compact_small_files"] = enforce_primary_key_uniqueness
64+
5465
@property
5566
def compact_small_files(self) -> bool:
5667
return self["compact_small_files"]

deltacat/compute/converter/pyiceberg/replace_snapshot.py

+31-6
Original file line numberDiff line numberDiff line change
@@ -159,16 +159,41 @@ def commit_overwrite_snapshot(
159159
):
160160
commit_uuid = uuid.uuid4()
161161
with iceberg_table.transaction() as tx:
162+
print(
163+
f"iceberg_table.metadata.name_mapping:{iceberg_table.metadata.name_mapping()}"
164+
)
162165
if iceberg_table.metadata.name_mapping() is None:
163-
iceberg_table.set_properties(
166+
tx.set_properties(
164167
**{
165-
"schema.name-mapping.default": iceberg_table.table_metadata.schema().name_mapping.model_dump_json()
168+
"schema.name-mapping.default": tx.table_metadata.schema().name_mapping.model_dump_json()
166169
}
167170
)
168171
with tx.update_snapshot().overwrite(
169172
commit_uuid=commit_uuid
170173
) as overwrite_snapshot:
171-
for data_file in new_position_delete_files:
172-
overwrite_snapshot.append_data_file(data_file)
173-
for original_data_file in to_be_deleted_files_list:
174-
overwrite_snapshot.delete_data_file(original_data_file)
174+
if new_position_delete_files:
175+
for data_file in new_position_delete_files:
176+
overwrite_snapshot.append_data_file(data_file)
177+
if to_be_deleted_files_list:
178+
for original_data_file in to_be_deleted_files_list:
179+
overwrite_snapshot.delete_data_file(original_data_file)
180+
181+
182+
def commit_append_snapshot(iceberg_table, new_position_delete_files):
183+
commit_uuid = uuid.uuid4()
184+
with iceberg_table.transaction() as tx:
185+
print(
186+
f"iceberg_table.metadata.name_mapping:{iceberg_table.metadata.name_mapping()}"
187+
)
188+
if iceberg_table.metadata.name_mapping() is None:
189+
tx.set_properties(
190+
**{
191+
"schema.name-mapping.default": tx.table_metadata.schema().name_mapping.model_dump_json()
192+
}
193+
)
194+
with tx.update_snapshot().fast_append(
195+
commit_uuid=commit_uuid
196+
) as append_snapshot:
197+
if new_position_delete_files:
198+
for data_file in new_position_delete_files:
199+
append_snapshot.append_data_file(data_file)

0 commit comments

Comments
 (0)