8
8
import logging
9
9
from deltacat .compute .converter .model .convert_input import ConvertInput
10
10
from deltacat .compute .converter .utils .s3u import upload_table_with_retry
11
+ from deltacat .compute .converter .utils .converter_session_utils import (
12
+ partition_value_record_to_partition_value_string ,
13
+ )
11
14
from deltacat import logs
12
15
13
16
logger = logs .configure_deltacat_logger (logging .getLogger (__name__ ))
@@ -34,12 +37,8 @@ def convert(convert_input: ConvertInput):
34
37
35
38
logger .info (f"Starting convert task index: { convert_task_index } " )
36
39
data_files , equality_delete_files , position_delete_files = files_for_each_bucket [1 ]
37
- # Get string representation of partition value out of Record[partition_value]
38
- partition_value_str = (
39
- files_for_each_bucket [0 ].__repr__ ().split ("[" , 1 )[1 ].split ("]" )[0 ]
40
- )
41
- partition_value_str = (
42
- files_for_each_bucket [0 ].__repr__ ().split ("[" , 1 )[1 ].split ("]" )[0 ]
40
+ partition_value_str = partition_value_record_to_partition_value_string (
41
+ files_for_each_bucket [0 ]
43
42
)
44
43
partition_value = files_for_each_bucket [0 ]
45
44
iceberg_table_warehouse_prefix_with_partition = (
@@ -81,7 +80,7 @@ def filter_rows_to_be_deleted(
81
80
if positional_delete_table :
82
81
# TODO: Add support for multiple identify columns
83
82
identifier_column = identifier_columns [0 ]
84
- positional_delete_table = positional_delete_table .drop (identifier_column )
83
+ positional_delete_table = positional_delete_table .drop ([ identifier_column ] )
85
84
if len (positional_delete_table ) == len (data_file_table ):
86
85
return True , None
87
86
return False , positional_delete_table
0 commit comments