Skip to content

Commit bf9076c

Browse files
[Converter][Test]Add working end-to-end converter compute test with Minio S3 endpoint (#494)
* [Converter][Test]Add working end-to-end converter compute test with Minio S3 endpoint * address comments --------- Co-authored-by: Miranda <[email protected]>
1 parent 618d98f commit bf9076c

13 files changed

+424
-20
lines changed

Makefile

+2
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,7 @@ test-integration: install
4444
docker-compose -f dev/iceberg-integration/docker-compose-integration.yml up -d
4545
sleep 3
4646
docker-compose -f dev/iceberg-integration/docker-compose-integration.yml exec -T spark-iceberg ipython ./provision.py
47+
export SPARK_LOCAL_IP="127.0.0.1"
4748
venv/bin/python -m pytest deltacat/tests/integ -v -m integration
4849

4950
test-converter:
@@ -52,6 +53,7 @@ test-converter:
5253
docker-compose -f dev/iceberg-integration/docker-compose-integration.yml up -d
5354
sleep 3
5455
docker-compose -f dev/iceberg-integration/docker-compose-integration.yml exec -T spark-iceberg ipython ./provision.py
56+
export SPARK_LOCAL_IP="127.0.0.1"
5557
venv/bin/python -m pytest deltacat/tests/compute/converter -vv
5658

5759
test-integration-rebuild:

deltacat/compute/converter/converter_session.py

+10-2
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414
from deltacat.compute.converter.model.converter_session_params import (
1515
ConverterSessionParams,
1616
)
17+
1718
from deltacat.compute.converter.constants import DEFAULT_MAX_PARALLEL_DATA_FILE_DOWNLOAD
1819
from deltacat.compute.converter.steps.convert import convert
1920
from deltacat.compute.converter.model.convert_input import ConvertInput
@@ -23,6 +24,7 @@
2324
)
2425
from deltacat.compute.converter.utils.converter_session_utils import (
2526
check_data_files_sequence_number,
27+
construct_iceberg_table_prefix,
2628
)
2729
from deltacat.compute.converter.pyiceberg.replace_snapshot import (
2830
commit_overwrite_snapshot,
@@ -70,7 +72,13 @@ def converter_session(params: ConverterSessionParams, **kwargs):
7072
)
7173

7274
iceberg_warehouse_bucket_name = params.iceberg_warehouse_bucket_name
73-
print(f"iceberg_warehouse_bucket_name:{iceberg_warehouse_bucket_name}")
75+
iceberg_namespace = params.iceberg_namespace
76+
iceberg_table_warehouse_prefix = construct_iceberg_table_prefix(
77+
iceberg_warehouse_bucket_name=iceberg_warehouse_bucket_name,
78+
table_name=table_name,
79+
iceberg_namespace=iceberg_namespace,
80+
)
81+
logger.info(f"iceberg_warehouse_bucket_name:{iceberg_warehouse_bucket_name}")
7482
merge_keys = params.merge_keys
7583
# Using table identifier fields as merge keys if merge keys not provided
7684
if not merge_keys:
@@ -105,7 +113,7 @@ def convert_input_provider(index, item):
105113
"convert_input": ConvertInput.of(
106114
files_for_each_bucket=item,
107115
convert_task_index=index,
108-
iceberg_warehouse_bucket_name=iceberg_warehouse_bucket_name,
116+
iceberg_table_warehouse_prefix=iceberg_table_warehouse_prefix,
109117
identifier_fields=identifier_fields,
110118
compact_small_files=compact_small_files,
111119
position_delete_for_multiple_data_files=position_delete_for_multiple_data_files,

deltacat/compute/converter/model/convert_input.py

+11-4
Original file line numberDiff line numberDiff line change
@@ -7,23 +7,26 @@ class ConvertInput(Dict):
77
def of(
88
files_for_each_bucket,
99
convert_task_index,
10-
iceberg_warehouse_bucket_name,
10+
iceberg_table_warehouse_prefix,
1111
identifier_fields,
1212
compact_small_files,
1313
position_delete_for_multiple_data_files,
1414
max_parallel_data_file_download,
15+
s3_file_system,
1516
) -> ConvertInput:
1617

1718
result = ConvertInput()
1819
result["files_for_each_bucket"] = files_for_each_bucket
1920
result["convert_task_index"] = convert_task_index
2021
result["identifier_fields"] = identifier_fields
21-
result["iceberg_warehouse_bucket_name"] = iceberg_warehouse_bucket_name
22+
result["iceberg_table_warehouse_prefix"] = iceberg_table_warehouse_prefix
2223
result["compact_small_files"] = compact_small_files
2324
result[
2425
"position_delete_for_multiple_data_files"
2526
] = position_delete_for_multiple_data_files
2627
result["max_parallel_data_file_download"] = max_parallel_data_file_download
28+
result["s3_file_system"] = s3_file_system
29+
2730
return result
2831

2932
@property
@@ -39,8 +42,8 @@ def convert_task_index(self) -> int:
3942
return self["convert_task_index"]
4043

4144
@property
42-
def iceberg_warehouse_bucket_name(self) -> str:
43-
return self["iceberg_warehouse_bucket_name"]
45+
def iceberg_table_warehouse_prefix(self) -> str:
46+
return self["iceberg_table_warehouse_prefix"]
4447

4548
@property
4649
def compact_small_files(self) -> bool:
@@ -53,3 +56,7 @@ def position_delete_for_multiple_data_files(self) -> bool:
5356
@property
5457
def max_parallel_data_file_download(self) -> int:
5558
return self["max_parallel_data_file_download"]
59+
60+
@property
61+
def s3_file_system(self):
62+
return self["s3_file_system"]

deltacat/compute/converter/model/converter_session_params.py

+7
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,9 @@ def of(params: Optional[Dict]) -> ConverterSessionParams:
1818
assert (
1919
params.get("iceberg_warehouse_bucket_name") is not None
2020
), "iceberg_warehouse_bucket_name is a required arg"
21+
assert (
22+
params.get("iceberg_namespace") is not None
23+
), "iceberg_namespace is a required arg"
2124
result = ConverterSessionParams(params)
2225

2326
result.compact_small_files = params.get("compact_small_files", False)
@@ -44,6 +47,10 @@ def iceberg_table_name(self) -> str:
4447
def iceberg_warehouse_bucket_name(self) -> str:
4548
return self["iceberg_warehouse_bucket_name"]
4649

50+
@property
51+
def iceberg_namespace(self) -> str:
52+
return self["iceberg_namespace"]
53+
4754
@property
4855
def compact_small_files(self) -> bool:
4956
return self["compact_small_files"]

deltacat/compute/converter/pyiceberg/replace_snapshot.py

+1-1
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@
1212
)
1313
import itertools
1414
from pyiceberg.utils.concurrent import ExecutorFactory
15-
from pyiceberg.table import UpdateSnapshot, _SnapshotProducer
15+
from pyiceberg.table.update.snapshot import UpdateSnapshot, _SnapshotProducer
1616

1717

1818
class _ReplaceFiles(_SnapshotProducer["_ReplaceFiles"]):

deltacat/compute/converter/steps/convert.py

+26-9
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,9 @@
88
import logging
99
from deltacat.compute.converter.model.convert_input import ConvertInput
1010
from deltacat.compute.converter.utils.s3u import upload_table_with_retry
11+
from deltacat.compute.converter.utils.converter_session_utils import (
12+
partition_value_record_to_partition_value_string,
13+
)
1114
from deltacat import logs
1215

1316
logger = logs.configure_deltacat_logger(logging.getLogger(__name__))
@@ -17,14 +20,14 @@
1720
def convert(convert_input: ConvertInput):
1821
files_for_each_bucket = convert_input.files_for_each_bucket
1922
convert_task_index = convert_input.convert_task_index
20-
iceberg_warehouse_bucket_name = convert_input.iceberg_warehouse_bucket_name
23+
iceberg_table_warehouse_prefix = convert_input.iceberg_table_warehouse_prefix
2124
identifier_fields = convert_input.identifier_fields
2225
compact_small_files = convert_input.compact_small_files
2326
position_delete_for_multiple_data_files = (
2427
convert_input.position_delete_for_multiple_data_files
2528
)
2629
max_parallel_data_file_download = convert_input.max_parallel_data_file_download
27-
30+
s3_file_system = convert_input.s3_file_system
2831
if not position_delete_for_multiple_data_files:
2932
raise NotImplementedError(
3033
f"Distributed file level position delete compute is not supported yet"
@@ -34,16 +37,23 @@ def convert(convert_input: ConvertInput):
3437

3538
logger.info(f"Starting convert task index: {convert_task_index}")
3639
data_files, equality_delete_files, position_delete_files = files_for_each_bucket[1]
40+
partition_value_str = partition_value_record_to_partition_value_string(
41+
files_for_each_bucket[0]
42+
)
3743
partition_value = files_for_each_bucket[0]
44+
iceberg_table_warehouse_prefix_with_partition = (
45+
f"{iceberg_table_warehouse_prefix}/{partition_value_str}"
46+
)
3847
(
3948
to_be_deleted_files_list,
4049
to_be_added_files_list,
4150
) = compute_pos_delete_with_limited_parallelism(
4251
data_files_list=data_files,
4352
identifier_columns=identifier_fields,
4453
equality_delete_files_list=equality_delete_files,
45-
iceberg_warehouse_bucket_name=iceberg_warehouse_bucket_name,
54+
iceberg_table_warehouse_prefix_with_partition=iceberg_table_warehouse_prefix_with_partition,
4655
max_parallel_data_file_download=max_parallel_data_file_download,
56+
s3_file_system=s3_file_system,
4757
)
4858
to_be_delete_files_dict = defaultdict()
4959
to_be_delete_files_dict[partition_value] = to_be_deleted_files_list
@@ -68,7 +78,9 @@ def filter_rows_to_be_deleted(
6878
f"length_pos_delete_table, {len(positional_delete_table)}, length_data_table:{len(data_file_table)}"
6979
)
7080
if positional_delete_table:
71-
positional_delete_table = positional_delete_table.drop(["primarykey"])
81+
# TODO: Add support for multiple identify columns
82+
identifier_column = identifier_columns[0]
83+
positional_delete_table = positional_delete_table.drop([identifier_column])
7284
if len(positional_delete_table) == len(data_file_table):
7385
return True, None
7486
return False, positional_delete_table
@@ -78,7 +90,8 @@ def compute_pos_delete(
7890
equality_delete_table,
7991
data_file_table,
8092
identifier_columns,
81-
iceberg_warehouse_bucket_name,
93+
iceberg_table_warehouse_prefix_with_partition,
94+
s3_file_system,
8295
):
8396
delete_whole_file, new_position_delete_table = filter_rows_to_be_deleted(
8497
data_file_table=data_file_table,
@@ -89,7 +102,10 @@ def compute_pos_delete(
89102
logger.info(f"compute_pos_delete_table:{new_position_delete_table.to_pydict()}")
90103
if new_position_delete_table:
91104
new_pos_delete_s3_link = upload_table_with_retry(
92-
new_position_delete_table, iceberg_warehouse_bucket_name, {}
105+
table=new_position_delete_table,
106+
s3_url_prefix=iceberg_table_warehouse_prefix_with_partition,
107+
s3_table_writer_kwargs={},
108+
s3_file_system=s3_file_system,
93109
)
94110
return delete_whole_file, new_pos_delete_s3_link
95111

@@ -126,8 +142,9 @@ def compute_pos_delete_with_limited_parallelism(
126142
data_files_list,
127143
identifier_columns,
128144
equality_delete_files_list,
129-
iceberg_warehouse_bucket_name,
145+
iceberg_table_warehouse_prefix_with_partition,
130146
max_parallel_data_file_download,
147+
s3_file_system,
131148
):
132149
to_be_deleted_file_list = []
133150
to_be_added_pos_delete_file_list = []
@@ -144,8 +161,9 @@ def compute_pos_delete_with_limited_parallelism(
144161
delete_whole_file, new_pos_delete_s3_link = compute_pos_delete(
145162
equality_delete_table=equality_delete_table,
146163
data_file_table=data_table,
147-
iceberg_warehouse_bucket_name=iceberg_warehouse_bucket_name,
164+
iceberg_table_warehouse_prefix_with_partition=iceberg_table_warehouse_prefix_with_partition,
148165
identifier_columns=identifier_columns,
166+
s3_file_system=s3_file_system,
149167
)
150168
if delete_whole_file:
151169
to_be_deleted_file_list.extend(data_files)
@@ -182,7 +200,6 @@ def download_parquet_with_daft_hash_applied(
182200
io_config=io_config,
183201
coerce_int96_timestamp_unit=coerce_int96_timestamp_unit,
184202
)
185-
logger.info(f"debug_identify_columns:{identify_columns}")
186203
df = df.select(daft.col(identify_columns[0]).hash())
187204
arrow_table = df.to_arrow()
188205
return arrow_table

deltacat/compute/converter/utils/converter_session_utils.py

+12
Original file line numberDiff line numberDiff line change
@@ -48,3 +48,15 @@ def append_larger_sequence_number_data_files(data_files_list):
4848
sublist_file_list.append(file)
4949
result.append(sublist_file_list)
5050
return result
51+
52+
53+
def construct_iceberg_table_prefix(
54+
iceberg_warehouse_bucket_name, table_name, iceberg_namespace
55+
):
56+
return f"{iceberg_warehouse_bucket_name}/{iceberg_namespace}/{table_name}/data"
57+
58+
59+
def partition_value_record_to_partition_value_string(partition):
60+
# Get string representation of partition value out of Record[partition_value]
61+
partition_value_str = partition.__repr__().split("[", 1)[1].split("]")[0]
62+
return partition_value_str

deltacat/compute/converter/utils/iceberg_columns.py

+17-1
Original file line numberDiff line numberDiff line change
@@ -1,16 +1,27 @@
11
import pyarrow as pa
22
from typing import Union
33

4+
# Refer to: https://iceberg.apache.org/spec/#reserved-field-ids for reserved field ids
5+
ICEBERG_RESERVED_FIELD_ID_FOR_FILE_PATH_COLUMN = 2147483546
6+
7+
# Refer to: https://iceberg.apache.org/spec/#reserved-field-ids for reserved field ids
8+
ICEBERG_RESERVED_FIELD_ID_FOR_POS_COLUMN = 2147483545
9+
410

511
def _get_iceberg_col_name(suffix):
6-
return f"{suffix}"
12+
return suffix
713

814

915
_ORDERED_RECORD_IDX_COLUMN_NAME = _get_iceberg_col_name("pos")
1016
_ORDERED_RECORD_IDX_COLUMN_TYPE = pa.int64()
17+
_ORDERED_RECORD_IDX_FIELD_METADATA = {
18+
b"PARQUET:field_id": f"{ICEBERG_RESERVED_FIELD_ID_FOR_POS_COLUMN}"
19+
}
1120
_ORDERED_RECORD_IDX_COLUMN_FIELD = pa.field(
1221
_ORDERED_RECORD_IDX_COLUMN_NAME,
1322
_ORDERED_RECORD_IDX_COLUMN_TYPE,
23+
metadata=_ORDERED_RECORD_IDX_FIELD_METADATA,
24+
nullable=False,
1425
)
1526

1627

@@ -32,7 +43,12 @@ def append_record_idx_col(table: pa.Table, ordered_record_indices) -> pa.Table:
3243

3344
_FILE_PATH_COLUMN_NAME = _get_iceberg_col_name("file_path")
3445
_FILE_PATH_COLUMN_TYPE = pa.string()
46+
_FILE_PATH_FIELD_METADATA = {
47+
b"PARQUET:field_id": f"{ICEBERG_RESERVED_FIELD_ID_FOR_FILE_PATH_COLUMN}"
48+
}
3549
_FILE_PATH_COLUMN_FIELD = pa.field(
3650
_FILE_PATH_COLUMN_NAME,
3751
_FILE_PATH_COLUMN_TYPE,
52+
metadata=_FILE_PATH_FIELD_METADATA,
53+
nullable=False,
3854
)

deltacat/compute/converter/utils/s3u.py

+6-2
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,7 @@ def upload_table_with_retry(
5757
s3_table_writer_kwargs: Optional[Dict[str, Any]],
5858
content_type: ContentType = ContentType.PARQUET,
5959
max_records_per_file: Optional[int] = 4000000,
60+
s3_file_system=None,
6061
**s3_client_kwargs,
6162
) -> List[str]:
6263
"""
@@ -72,9 +73,12 @@ def upload_table_with_retry(
7273
if s3_table_writer_kwargs is None:
7374
s3_table_writer_kwargs = {}
7475

75-
s3_file_system = get_s3_file_system(content_type=content_type)
76+
if not s3_file_system:
77+
s3_file_system = get_s3_file_system(content_type=content_type)
7678
capture_object = CapturedBlockWritePaths()
77-
block_write_path_provider = UuidBlockWritePathProvider(capture_object)
79+
block_write_path_provider = UuidBlockWritePathProvider(
80+
capture_object=capture_object
81+
)
7882
s3_table_writer_func = get_table_writer(table)
7983
table_record_count = get_table_length(table)
8084
if max_records_per_file is None or not table_record_count:

deltacat/tests/compute/converter/conftest.py

+8
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
import pytest
22
from pyspark.sql import SparkSession
33
import os
4+
import ray
45
from pyiceberg.catalog import Catalog, load_catalog
56

67

@@ -70,3 +71,10 @@ def session_catalog() -> Catalog:
7071
"s3.secret-access-key": "password",
7172
},
7273
)
74+
75+
76+
@pytest.fixture(autouse=True, scope="module")
77+
def setup_ray_cluster():
78+
ray.init(local_mode=True, ignore_reinit_error=True)
79+
yield
80+
ray.shutdown()

0 commit comments

Comments
 (0)