Skip to content

Commit 071328a

Browse files
committed
address comments
1 parent b396fde commit 071328a

File tree

3 files changed

+23
-9
lines changed

3 files changed

+23
-9
lines changed

deltacat/compute/converter/steps/convert.py

+6-7
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,9 @@
88
import logging
99
from deltacat.compute.converter.model.convert_input import ConvertInput
1010
from deltacat.compute.converter.utils.s3u import upload_table_with_retry
11+
from deltacat.compute.converter.utils.converter_session_utils import (
12+
partition_value_record_to_partition_value_string,
13+
)
1114
from deltacat import logs
1215

1316
logger = logs.configure_deltacat_logger(logging.getLogger(__name__))
@@ -34,12 +37,8 @@ def convert(convert_input: ConvertInput):
3437

3538
logger.info(f"Starting convert task index: {convert_task_index}")
3639
data_files, equality_delete_files, position_delete_files = files_for_each_bucket[1]
37-
# Get string representation of partition value out of Record[partition_value]
38-
partition_value_str = (
39-
files_for_each_bucket[0].__repr__().split("[", 1)[1].split("]")[0]
40-
)
41-
partition_value_str = (
42-
files_for_each_bucket[0].__repr__().split("[", 1)[1].split("]")[0]
40+
partition_value_str = partition_value_record_to_partition_value_string(
41+
files_for_each_bucket[0]
4342
)
4443
partition_value = files_for_each_bucket[0]
4544
iceberg_table_warehouse_prefix_with_partition = (
@@ -81,7 +80,7 @@ def filter_rows_to_be_deleted(
8180
if positional_delete_table:
8281
# TODO: Add support for multiple identify columns
8382
identifier_column = identifier_columns[0]
84-
positional_delete_table = positional_delete_table.drop(identifier_column)
83+
positional_delete_table = positional_delete_table.drop([identifier_column])
8584
if len(positional_delete_table) == len(data_file_table):
8685
return True, None
8786
return False, positional_delete_table

deltacat/compute/converter/utils/converter_session_utils.py

+6
Original file line numberDiff line numberDiff line change
@@ -54,3 +54,9 @@ def construct_iceberg_table_prefix(
5454
iceberg_warehouse_bucket_name, table_name, iceberg_namespace
5555
):
5656
return f"{iceberg_warehouse_bucket_name}/{iceberg_namespace}/{table_name}/data"
57+
58+
59+
def partition_value_record_to_partition_value_string(partition):
60+
# Get string representation of partition value out of Record[partition_value]
61+
partition_value_str = partition.__repr__().split("[", 1)[1].split("]")[0]
62+
return partition_value_str

deltacat/compute/converter/utils/iceberg_columns.py

+11-2
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,21 @@
11
import pyarrow as pa
22
from typing import Union
33

4+
#
5+
ICEBERG_RESERVED_FIELD_ID_FOR_FILE_PATH_COLUMN = 2147483546
6+
7+
ICEBERG_RESERVED_FIELD_ID_FOR_POS_COLUMN = 2147483545
8+
49

510
def _get_iceberg_col_name(suffix):
611
return suffix
712

813

914
_ORDERED_RECORD_IDX_COLUMN_NAME = _get_iceberg_col_name("pos")
1015
_ORDERED_RECORD_IDX_COLUMN_TYPE = pa.int64()
11-
_ORDERED_RECORD_IDX_FIELD_METADATA = {b"PARQUET:field_id": "2147483545"}
16+
_ORDERED_RECORD_IDX_FIELD_METADATA = {
17+
b"PARQUET:field_id": f"{ICEBERG_RESERVED_FIELD_ID_FOR_POS_COLUMN}"
18+
}
1219
_ORDERED_RECORD_IDX_COLUMN_FIELD = pa.field(
1320
_ORDERED_RECORD_IDX_COLUMN_NAME,
1421
_ORDERED_RECORD_IDX_COLUMN_TYPE,
@@ -35,7 +42,9 @@ def append_record_idx_col(table: pa.Table, ordered_record_indices) -> pa.Table:
3542

3643
_FILE_PATH_COLUMN_NAME = _get_iceberg_col_name("file_path")
3744
_FILE_PATH_COLUMN_TYPE = pa.string()
38-
_FILE_PATH_FIELD_METADATA = {b"PARQUET:field_id": "2147483546"}
45+
_FILE_PATH_FIELD_METADATA = {
46+
b"PARQUET:field_id": f"{ICEBERG_RESERVED_FIELD_ID_FOR_FILE_PATH_COLUMN}"
47+
}
3948
_FILE_PATH_COLUMN_FIELD = pa.field(
4049
_FILE_PATH_COLUMN_NAME,
4150
_FILE_PATH_COLUMN_TYPE,

0 commit comments

Comments
 (0)