17
17
def convert (convert_input : ConvertInput ):
18
18
files_for_each_bucket = convert_input .files_for_each_bucket
19
19
convert_task_index = convert_input .convert_task_index
20
- iceberg_warehouse_bucket_name = convert_input .iceberg_warehouse_bucket_name
20
+ iceberg_table_warehouse_prefix = convert_input .iceberg_table_warehouse_prefix
21
21
identifier_fields = convert_input .identifier_fields
22
22
compact_small_files = convert_input .compact_small_files
23
23
position_delete_for_multiple_data_files = (
24
24
convert_input .position_delete_for_multiple_data_files
25
25
)
26
26
max_parallel_data_file_download = convert_input .max_parallel_data_file_download
27
-
27
+ s3_file_system = convert_input . s3_file_system
28
28
if not position_delete_for_multiple_data_files :
29
29
raise NotImplementedError (
30
30
f"Distributed file level position delete compute is not supported yet"
@@ -34,16 +34,27 @@ def convert(convert_input: ConvertInput):
34
34
35
35
logger .info (f"Starting convert task index: { convert_task_index } " )
36
36
data_files , equality_delete_files , position_delete_files = files_for_each_bucket [1 ]
37
+ # Get string representation of partition value out of Record[partition_value]
38
+ partition_value_str = (
39
+ files_for_each_bucket [0 ].__repr__ ().split ("[" , 1 )[1 ].split ("]" )[0 ]
40
+ )
41
+ partition_value_str = (
42
+ files_for_each_bucket [0 ].__repr__ ().split ("[" , 1 )[1 ].split ("]" )[0 ]
43
+ )
37
44
partition_value = files_for_each_bucket [0 ]
45
+ iceberg_table_warehouse_prefix_with_partition = (
46
+ f"{ iceberg_table_warehouse_prefix } /{ partition_value_str } "
47
+ )
38
48
(
39
49
to_be_deleted_files_list ,
40
50
to_be_added_files_list ,
41
51
) = compute_pos_delete_with_limited_parallelism (
42
52
data_files_list = data_files ,
43
53
identifier_columns = identifier_fields ,
44
54
equality_delete_files_list = equality_delete_files ,
45
- iceberg_warehouse_bucket_name = iceberg_warehouse_bucket_name ,
55
+ iceberg_table_warehouse_prefix_with_partition = iceberg_table_warehouse_prefix_with_partition ,
46
56
max_parallel_data_file_download = max_parallel_data_file_download ,
57
+ s3_file_system = s3_file_system ,
47
58
)
48
59
to_be_delete_files_dict = defaultdict ()
49
60
to_be_delete_files_dict [partition_value ] = to_be_deleted_files_list
@@ -68,7 +79,9 @@ def filter_rows_to_be_deleted(
68
79
f"length_pos_delete_table, { len (positional_delete_table )} , length_data_table:{ len (data_file_table )} "
69
80
)
70
81
if positional_delete_table :
71
- positional_delete_table = positional_delete_table .drop (["primarykey" ])
82
+ # TODO: Add support for multiple identify columns
83
+ identifier_column = identifier_columns [0 ]
84
+ positional_delete_table = positional_delete_table .drop (identifier_column )
72
85
if len (positional_delete_table ) == len (data_file_table ):
73
86
return True , None
74
87
return False , positional_delete_table
@@ -78,7 +91,8 @@ def compute_pos_delete(
78
91
equality_delete_table ,
79
92
data_file_table ,
80
93
identifier_columns ,
81
- iceberg_warehouse_bucket_name ,
94
+ iceberg_table_warehouse_prefix_with_partition ,
95
+ s3_file_system ,
82
96
):
83
97
delete_whole_file , new_position_delete_table = filter_rows_to_be_deleted (
84
98
data_file_table = data_file_table ,
@@ -89,7 +103,10 @@ def compute_pos_delete(
89
103
logger .info (f"compute_pos_delete_table:{ new_position_delete_table .to_pydict ()} " )
90
104
if new_position_delete_table :
91
105
new_pos_delete_s3_link = upload_table_with_retry (
92
- new_position_delete_table , iceberg_warehouse_bucket_name , {}
106
+ table = new_position_delete_table ,
107
+ s3_url_prefix = iceberg_table_warehouse_prefix_with_partition ,
108
+ s3_table_writer_kwargs = {},
109
+ s3_file_system = s3_file_system ,
93
110
)
94
111
return delete_whole_file , new_pos_delete_s3_link
95
112
@@ -126,8 +143,9 @@ def compute_pos_delete_with_limited_parallelism(
126
143
data_files_list ,
127
144
identifier_columns ,
128
145
equality_delete_files_list ,
129
- iceberg_warehouse_bucket_name ,
146
+ iceberg_table_warehouse_prefix_with_partition ,
130
147
max_parallel_data_file_download ,
148
+ s3_file_system ,
131
149
):
132
150
to_be_deleted_file_list = []
133
151
to_be_added_pos_delete_file_list = []
@@ -144,8 +162,9 @@ def compute_pos_delete_with_limited_parallelism(
144
162
delete_whole_file , new_pos_delete_s3_link = compute_pos_delete (
145
163
equality_delete_table = equality_delete_table ,
146
164
data_file_table = data_table ,
147
- iceberg_warehouse_bucket_name = iceberg_warehouse_bucket_name ,
165
+ iceberg_table_warehouse_prefix_with_partition = iceberg_table_warehouse_prefix_with_partition ,
148
166
identifier_columns = identifier_columns ,
167
+ s3_file_system = s3_file_system ,
149
168
)
150
169
if delete_whole_file :
151
170
to_be_deleted_file_list .extend (data_files )
@@ -182,7 +201,6 @@ def download_parquet_with_daft_hash_applied(
182
201
io_config = io_config ,
183
202
coerce_int96_timestamp_unit = coerce_int96_timestamp_unit ,
184
203
)
185
- logger .info (f"debug_identify_columns:{ identify_columns } " )
186
204
df = df .select (daft .col (identify_columns [0 ]).hash ())
187
205
arrow_table = df .to_arrow ()
188
206
return arrow_table
0 commit comments