Skip to content

Commit 11ac5ad

Browse files
committed
[Converter][Test]Add working end-to-end converter compute test with Minio S3 endpoint
1 parent 5a56390 commit 11ac5ad

13 files changed

+409
-20
lines changed

Makefile

+2
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,7 @@ test-integration: install
4444
docker-compose -f dev/iceberg-integration/docker-compose-integration.yml up -d
4545
sleep 3
4646
docker-compose -f dev/iceberg-integration/docker-compose-integration.yml exec -T spark-iceberg ipython ./provision.py
47+
export SPARK_LOCAL_IP="127.0.0.1"
4748
venv/bin/python -m pytest deltacat/tests/integ -v -m integration
4849

4950
test-converter:
@@ -52,6 +53,7 @@ test-converter:
5253
docker-compose -f dev/iceberg-integration/docker-compose-integration.yml up -d
5354
sleep 3
5455
docker-compose -f dev/iceberg-integration/docker-compose-integration.yml exec -T spark-iceberg ipython ./provision.py
56+
export SPARK_LOCAL_IP="127.0.0.1"
5557
venv/bin/python -m pytest deltacat/tests/compute/converter -vv
5658

5759
test-integration-rebuild:

deltacat/compute/converter/_dev/example_single_merge_key_converter.py

+1-1
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ def get_s3_path(
2727

2828

2929
def get_bucket_name():
30-
return "metadata-py4j-zyiqin1"
30+
return "converter-test"
3131

3232

3333
def get_credential():

deltacat/compute/converter/converter_session.py

+10-2
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414
from deltacat.compute.converter.model.converter_session_params import (
1515
ConverterSessionParams,
1616
)
17+
1718
from deltacat.compute.converter.constants import DEFAULT_MAX_PARALLEL_DATA_FILE_DOWNLOAD
1819
from deltacat.compute.converter.steps.convert import convert
1920
from deltacat.compute.converter.model.convert_input import ConvertInput
@@ -23,6 +24,7 @@
2324
)
2425
from deltacat.compute.converter.utils.converter_session_utils import (
2526
check_data_files_sequence_number,
27+
construct_iceberg_table_prefix,
2628
)
2729
from deltacat.compute.converter.pyiceberg.replace_snapshot import (
2830
commit_overwrite_snapshot,
@@ -70,7 +72,13 @@ def converter_session(params: ConverterSessionParams, **kwargs):
7072
)
7173

7274
iceberg_warehouse_bucket_name = params.iceberg_warehouse_bucket_name
73-
print(f"iceberg_warehouse_bucket_name:{iceberg_warehouse_bucket_name}")
75+
iceberg_namespace = params.iceberg_namespace
76+
iceberg_table_warehouse_prefix = construct_iceberg_table_prefix(
77+
iceberg_warehouse_bucket_name=iceberg_warehouse_bucket_name,
78+
table_name=table_name,
79+
iceberg_namespace=iceberg_namespace,
80+
)
81+
logger.info(f"iceberg_warehouse_bucket_name:{iceberg_warehouse_bucket_name}")
7482
merge_keys = params.merge_keys
7583
# Using table identifier fields as merge keys if merge keys not provided
7684
if not merge_keys:
@@ -105,7 +113,7 @@ def convert_input_provider(index, item):
105113
"convert_input": ConvertInput.of(
106114
files_for_each_bucket=item,
107115
convert_task_index=index,
108-
iceberg_warehouse_bucket_name=iceberg_warehouse_bucket_name,
116+
iceberg_table_warehouse_prefix=iceberg_table_warehouse_prefix,
109117
identifier_fields=identifier_fields,
110118
compact_small_files=compact_small_files,
111119
position_delete_for_multiple_data_files=position_delete_for_multiple_data_files,

deltacat/compute/converter/model/convert_input.py

+11-4
Original file line numberDiff line numberDiff line change
@@ -7,23 +7,26 @@ class ConvertInput(Dict):
77
def of(
88
files_for_each_bucket,
99
convert_task_index,
10-
iceberg_warehouse_bucket_name,
10+
iceberg_table_warehouse_prefix,
1111
identifier_fields,
1212
compact_small_files,
1313
position_delete_for_multiple_data_files,
1414
max_parallel_data_file_download,
15+
s3_file_system,
1516
) -> ConvertInput:
1617

1718
result = ConvertInput()
1819
result["files_for_each_bucket"] = files_for_each_bucket
1920
result["convert_task_index"] = convert_task_index
2021
result["identifier_fields"] = identifier_fields
21-
result["iceberg_warehouse_bucket_name"] = iceberg_warehouse_bucket_name
22+
result["iceberg_table_warehouse_prefix"] = iceberg_table_warehouse_prefix
2223
result["compact_small_files"] = compact_small_files
2324
result[
2425
"position_delete_for_multiple_data_files"
2526
] = position_delete_for_multiple_data_files
2627
result["max_parallel_data_file_download"] = max_parallel_data_file_download
28+
result["s3_file_system"] = s3_file_system
29+
2730
return result
2831

2932
@property
@@ -39,8 +42,8 @@ def convert_task_index(self) -> int:
3942
return self["convert_task_index"]
4043

4144
@property
42-
def iceberg_warehouse_bucket_name(self) -> str:
43-
return self["iceberg_warehouse_bucket_name"]
45+
def iceberg_table_warehouse_prefix(self) -> str:
46+
return self["iceberg_table_warehouse_prefix"]
4447

4548
@property
4649
def compact_small_files(self) -> bool:
@@ -53,3 +56,7 @@ def position_delete_for_multiple_data_files(self) -> bool:
5356
@property
5457
def max_parallel_data_file_download(self) -> int:
5558
return self["max_parallel_data_file_download"]
59+
60+
@property
61+
def s3_file_system(self):
62+
return self["s3_file_system"]

deltacat/compute/converter/model/converter_session_params.py

+7
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,9 @@ def of(params: Optional[Dict]) -> ConverterSessionParams:
1818
assert (
1919
params.get("iceberg_warehouse_bucket_name") is not None
2020
), "iceberg_warehouse_bucket_name is a required arg"
21+
assert (
22+
params.get("iceberg_namespace") is not None
23+
), "iceberg_namespace is a required arg"
2124
result = ConverterSessionParams(params)
2225

2326
result.compact_small_files = params.get("compact_small_files", False)
@@ -44,6 +47,10 @@ def iceberg_table_name(self) -> str:
4447
def iceberg_warehouse_bucket_name(self) -> str:
4548
return self["iceberg_warehouse_bucket_name"]
4649

50+
@property
51+
def iceberg_namespace(self) -> str:
52+
return self["iceberg_namespace"]
53+
4754
@property
4855
def compact_small_files(self) -> bool:
4956
return self["compact_small_files"]

deltacat/compute/converter/pyiceberg/replace_snapshot.py

+1-1
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@
1212
)
1313
import itertools
1414
from pyiceberg.utils.concurrent import ExecutorFactory
15-
from pyiceberg.table import UpdateSnapshot, _SnapshotProducer
15+
from pyiceberg.table.update.snapshot import UpdateSnapshot, _SnapshotProducer
1616

1717

1818
class _ReplaceFiles(_SnapshotProducer["_ReplaceFiles"]):

deltacat/compute/converter/steps/convert.py

+27-9
Original file line numberDiff line numberDiff line change
@@ -17,14 +17,14 @@
1717
def convert(convert_input: ConvertInput):
1818
files_for_each_bucket = convert_input.files_for_each_bucket
1919
convert_task_index = convert_input.convert_task_index
20-
iceberg_warehouse_bucket_name = convert_input.iceberg_warehouse_bucket_name
20+
iceberg_table_warehouse_prefix = convert_input.iceberg_table_warehouse_prefix
2121
identifier_fields = convert_input.identifier_fields
2222
compact_small_files = convert_input.compact_small_files
2323
position_delete_for_multiple_data_files = (
2424
convert_input.position_delete_for_multiple_data_files
2525
)
2626
max_parallel_data_file_download = convert_input.max_parallel_data_file_download
27-
27+
s3_file_system = convert_input.s3_file_system
2828
if not position_delete_for_multiple_data_files:
2929
raise NotImplementedError(
3030
f"Distributed file level position delete compute is not supported yet"
@@ -34,16 +34,27 @@ def convert(convert_input: ConvertInput):
3434

3535
logger.info(f"Starting convert task index: {convert_task_index}")
3636
data_files, equality_delete_files, position_delete_files = files_for_each_bucket[1]
37+
# Get string representation of partition value out of Record[partition_value]
38+
partition_value_str = (
39+
files_for_each_bucket[0].__repr__().split("[", 1)[1].split("]")[0]
40+
)
41+
partition_value_str = (
42+
files_for_each_bucket[0].__repr__().split("[", 1)[1].split("]")[0]
43+
)
3744
partition_value = files_for_each_bucket[0]
45+
iceberg_table_warehouse_prefix_with_partition = (
46+
f"{iceberg_table_warehouse_prefix}/{partition_value_str}"
47+
)
3848
(
3949
to_be_deleted_files_list,
4050
to_be_added_files_list,
4151
) = compute_pos_delete_with_limited_parallelism(
4252
data_files_list=data_files,
4353
identifier_columns=identifier_fields,
4454
equality_delete_files_list=equality_delete_files,
45-
iceberg_warehouse_bucket_name=iceberg_warehouse_bucket_name,
55+
iceberg_table_warehouse_prefix_with_partition=iceberg_table_warehouse_prefix_with_partition,
4656
max_parallel_data_file_download=max_parallel_data_file_download,
57+
s3_file_system=s3_file_system,
4758
)
4859
to_be_delete_files_dict = defaultdict()
4960
to_be_delete_files_dict[partition_value] = to_be_deleted_files_list
@@ -68,7 +79,9 @@ def filter_rows_to_be_deleted(
6879
f"length_pos_delete_table, {len(positional_delete_table)}, length_data_table:{len(data_file_table)}"
6980
)
7081
if positional_delete_table:
71-
positional_delete_table = positional_delete_table.drop(["primarykey"])
82+
# TODO: Add support for multiple identify columns
83+
identifier_column = identifier_columns[0]
84+
positional_delete_table = positional_delete_table.drop(identifier_column)
7285
if len(positional_delete_table) == len(data_file_table):
7386
return True, None
7487
return False, positional_delete_table
@@ -78,7 +91,8 @@ def compute_pos_delete(
7891
equality_delete_table,
7992
data_file_table,
8093
identifier_columns,
81-
iceberg_warehouse_bucket_name,
94+
iceberg_table_warehouse_prefix_with_partition,
95+
s3_file_system,
8296
):
8397
delete_whole_file, new_position_delete_table = filter_rows_to_be_deleted(
8498
data_file_table=data_file_table,
@@ -89,7 +103,10 @@ def compute_pos_delete(
89103
logger.info(f"compute_pos_delete_table:{new_position_delete_table.to_pydict()}")
90104
if new_position_delete_table:
91105
new_pos_delete_s3_link = upload_table_with_retry(
92-
new_position_delete_table, iceberg_warehouse_bucket_name, {}
106+
table=new_position_delete_table,
107+
s3_url_prefix=iceberg_table_warehouse_prefix_with_partition,
108+
s3_table_writer_kwargs={},
109+
s3_file_system=s3_file_system,
93110
)
94111
return delete_whole_file, new_pos_delete_s3_link
95112

@@ -126,8 +143,9 @@ def compute_pos_delete_with_limited_parallelism(
126143
data_files_list,
127144
identifier_columns,
128145
equality_delete_files_list,
129-
iceberg_warehouse_bucket_name,
146+
iceberg_table_warehouse_prefix_with_partition,
130147
max_parallel_data_file_download,
148+
s3_file_system,
131149
):
132150
to_be_deleted_file_list = []
133151
to_be_added_pos_delete_file_list = []
@@ -144,8 +162,9 @@ def compute_pos_delete_with_limited_parallelism(
144162
delete_whole_file, new_pos_delete_s3_link = compute_pos_delete(
145163
equality_delete_table=equality_delete_table,
146164
data_file_table=data_table,
147-
iceberg_warehouse_bucket_name=iceberg_warehouse_bucket_name,
165+
iceberg_table_warehouse_prefix_with_partition=iceberg_table_warehouse_prefix_with_partition,
148166
identifier_columns=identifier_columns,
167+
s3_file_system=s3_file_system,
149168
)
150169
if delete_whole_file:
151170
to_be_deleted_file_list.extend(data_files)
@@ -182,7 +201,6 @@ def download_parquet_with_daft_hash_applied(
182201
io_config=io_config,
183202
coerce_int96_timestamp_unit=coerce_int96_timestamp_unit,
184203
)
185-
logger.info(f"debug_identify_columns:{identify_columns}")
186204
df = df.select(daft.col(identify_columns[0]).hash())
187205
arrow_table = df.to_arrow()
188206
return arrow_table

deltacat/compute/converter/utils/converter_session_utils.py

+6
Original file line numberDiff line numberDiff line change
@@ -48,3 +48,9 @@ def append_larger_sequence_number_data_files(data_files_list):
4848
sublist_file_list.append(file)
4949
result.append(sublist_file_list)
5050
return result
51+
52+
53+
def construct_iceberg_table_prefix(
54+
iceberg_warehouse_bucket_name, table_name, iceberg_namespace
55+
):
56+
return f"{iceberg_warehouse_bucket_name}/{iceberg_namespace}/{table_name}/data"

deltacat/compute/converter/utils/iceberg_columns.py

+7-1
Original file line numberDiff line numberDiff line change
@@ -3,14 +3,17 @@
33

44

55
def _get_iceberg_col_name(suffix):
6-
return f"{suffix}"
6+
return suffix
77

88

99
_ORDERED_RECORD_IDX_COLUMN_NAME = _get_iceberg_col_name("pos")
1010
_ORDERED_RECORD_IDX_COLUMN_TYPE = pa.int64()
11+
_ORDERED_RECORD_IDX_FIELD_METADATA = {b"PARQUET:field_id": "2147483545"}
1112
_ORDERED_RECORD_IDX_COLUMN_FIELD = pa.field(
1213
_ORDERED_RECORD_IDX_COLUMN_NAME,
1314
_ORDERED_RECORD_IDX_COLUMN_TYPE,
15+
metadata=_ORDERED_RECORD_IDX_FIELD_METADATA,
16+
nullable=False,
1417
)
1518

1619

@@ -32,7 +35,10 @@ def append_record_idx_col(table: pa.Table, ordered_record_indices) -> pa.Table:
3235

3336
_FILE_PATH_COLUMN_NAME = _get_iceberg_col_name("file_path")
3437
_FILE_PATH_COLUMN_TYPE = pa.string()
38+
_FILE_PATH_FIELD_METADATA = {b"PARQUET:field_id": "2147483546"}
3539
_FILE_PATH_COLUMN_FIELD = pa.field(
3640
_FILE_PATH_COLUMN_NAME,
3741
_FILE_PATH_COLUMN_TYPE,
42+
metadata=_FILE_PATH_FIELD_METADATA,
43+
nullable=False,
3844
)

deltacat/compute/converter/utils/s3u.py

+6-2
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,7 @@ def upload_table_with_retry(
5757
s3_table_writer_kwargs: Optional[Dict[str, Any]],
5858
content_type: ContentType = ContentType.PARQUET,
5959
max_records_per_file: Optional[int] = 4000000,
60+
s3_file_system=None,
6061
**s3_client_kwargs,
6162
) -> List[str]:
6263
"""
@@ -72,9 +73,12 @@ def upload_table_with_retry(
7273
if s3_table_writer_kwargs is None:
7374
s3_table_writer_kwargs = {}
7475

75-
s3_file_system = get_s3_file_system(content_type=content_type)
76+
if not s3_file_system:
77+
s3_file_system = get_s3_file_system(content_type=content_type)
7678
capture_object = CapturedBlockWritePaths()
77-
block_write_path_provider = UuidBlockWritePathProvider(capture_object)
79+
block_write_path_provider = UuidBlockWritePathProvider(
80+
capture_object=capture_object
81+
)
7882
s3_table_writer_func = get_table_writer(table)
7983
table_record_count = get_table_length(table)
8084
if max_records_per_file is None or not table_record_count:

deltacat/tests/compute/converter/conftest.py

+8
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
import pytest
22
from pyspark.sql import SparkSession
33
import os
4+
import ray
45
from pyiceberg.catalog import Catalog, load_catalog
56

67

@@ -70,3 +71,10 @@ def session_catalog() -> Catalog:
7071
"s3.secret-access-key": "password",
7172
},
7273
)
74+
75+
76+
@pytest.fixture(autouse=True, scope="module")
77+
def setup_ray_cluster():
78+
ray.init(local_mode=True, ignore_reinit_error=True)
79+
yield
80+
ray.shutdown()

0 commit comments

Comments
 (0)