Skip to content

Commit 537ccd2

Browse files
committed
[python] Introduce vortex file format integration
1 parent 8a59485 commit 537ccd2

File tree

11 files changed

+290
-16
lines changed

11 files changed

+290
-16
lines changed

.github/workflows/paimon-python-checks.yml

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,7 @@ jobs:
4747
strategy:
4848
fail-fast: false
4949
matrix:
50-
python-version: [ '3.6.15', '3.10' ]
50+
python-version: [ '3.6.15', '3.10', '3.11' ]
5151

5252
steps:
5353
- name: Checkout code
@@ -113,6 +113,9 @@ jobs:
113113
python -m pip install --upgrade pip
114114
pip install torch --index-url https://download.pytorch.org/whl/cpu
115115
python -m pip install pyroaring readerwriterlock==1.0.9 fsspec==2024.3.1 cachetools==5.3.3 ossfs==2023.12.0 ray==2.48.0 fastavro==1.11.1 pyarrow==16.0.0 zstandard==0.24.0 polars==1.32.0 duckdb==1.3.2 numpy==1.24.3 pandas==2.0.3 pylance==0.39.0 cramjam flake8==4.0.1 pytest~=7.0 py4j==0.10.9.9 requests parameterized==0.9.0
116+
if python -c "import sys; sys.exit(0 if sys.version_info >= (3, 11) else 1)"; then
117+
python -m pip install vortex-data
118+
fi
116119
fi
117120
df -h
118121
- name: Run lint-python.sh

paimon-python/dev/requirements.txt

Lines changed: 14 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -15,24 +15,24 @@
1515
# See the License for the specific language governing permissions and
1616
# limitations under the License.
1717
################################################################################
18-
cachetools>=4.2,<6; python_version=="3.6"
19-
cachetools>=5,<6; python_version>"3.6"
18+
cachetools>=4.2; python_version=="3.6"
19+
cachetools>=5; python_version>"3.6"
2020
dataclasses>=0.8; python_version < "3.7"
21-
fastavro>=1.4,<2
22-
fsspec>=2021.10,<2026; python_version<"3.8"
23-
fsspec>=2023,<2026; python_version>="3.8"
24-
packaging>=21,<26
21+
fastavro>=1.4
22+
fsspec>=2021.10; python_version<"3.8"
23+
fsspec>=2023; python_version>="3.8"
24+
packaging>=21
2525
pandas>=1.1,<2; python_version < "3.7"
26-
pandas>=1.3,<3; python_version >= "3.7" and python_version < "3.9"
27-
pandas>=1.5,<3; python_version >= "3.9"
26+
pandas>=1.3; python_version >= "3.7" and python_version < "3.9"
27+
pandas>=1.5; python_version >= "3.9"
2828
polars>=0.9,<1; python_version<"3.8"
29-
polars>=1,<2; python_version>="3.8"
29+
polars>=1; python_version>="3.8"
3030
pyarrow>=6,<7; python_version < "3.8"
31-
pyarrow>=16,<20; python_version >= "3.8"
31+
pyarrow>=16; python_version >= "3.8"
3232
pyroaring<=0.3.3; python_version < "3.7"
3333
pyroaring<=0.4.5; python_version == "3.7"
3434
pyroaring>=1.0.0; python_version >= "3.8"
35-
readerwriterlock>=1,<2
36-
zstandard>=0.19,<1
37-
cramjam>=1.3.0,<3; python_version>="3.7"
38-
pyyaml>=5.4,<7
35+
readerwriterlock>=1
36+
zstandard>=0.19
37+
cramjam>=1.3.0; python_version>="3.7"
38+
pyyaml>=5.4

paimon-python/pypaimon/common/file_io.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -254,6 +254,9 @@ def write_blob(self, path: str, data, **kwargs):
254254
"""Write Blob format file."""
255255
raise NotImplementedError("write_blob must be implemented by FileIO subclasses")
256256

257+
def write_vortex(self, path: str, data, **kwargs):
258+
raise NotImplementedError("write_vortex must be implemented by FileIO subclasses")
259+
257260
def close(self):
258261
pass
259262

paimon-python/pypaimon/common/options/core_options.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -64,6 +64,7 @@ class CoreOptions:
6464
FILE_FORMAT_PARQUET: str = "parquet"
6565
FILE_FORMAT_BLOB: str = "blob"
6666
FILE_FORMAT_LANCE: str = "lance"
67+
FILE_FORMAT_VORTEX: str = "vortex"
6768

6869
# Basic options
6970
AUTO_CREATE: ConfigOption[bool] = (

paimon-python/pypaimon/filesystem/local_file_io.py

Lines changed: 20 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -395,7 +395,26 @@ def write_lance(self, path: str, data: pyarrow.Table, **kwargs):
395395
except Exception as e:
396396
self.delete_quietly(path)
397397
raise RuntimeError(f"Failed to write Lance file {path}: {e}") from e
398-
398+
399+
def write_vortex(self, path: str, data: pyarrow.Table, **kwargs):
400+
try:
401+
import vortex
402+
from vortex._lib.io import write as vortex_write
403+
os.makedirs(os.path.dirname(path), exist_ok=True)
404+
405+
from pypaimon.read.reader.vortex_utils import to_vortex_specified
406+
_, store_kwargs = to_vortex_specified(self, path)
407+
408+
if store_kwargs:
409+
from vortex import store
410+
vortex_store = store.from_url(path, **store_kwargs)
411+
vortex_store.write(vortex.array(data))
412+
else:
413+
vortex_write(vortex.array(data), path)
414+
except Exception as e:
415+
self.delete_quietly(path)
416+
raise RuntimeError(f"Failed to write Vortex file {path}: {e}") from e
417+
399418
def write_blob(self, path: str, data: pyarrow.Table, **kwargs):
400419
try:
401420
if data.num_columns != 1:

paimon-python/pypaimon/filesystem/pyarrow_file_io.py

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -494,6 +494,24 @@ def write_lance(self, path: str, data: pyarrow.Table, **kwargs):
494494
self.delete_quietly(path)
495495
raise RuntimeError(f"Failed to write Lance file {path}: {e}") from e
496496

497+
def write_vortex(self, path: str, data: pyarrow.Table, **kwargs):
498+
try:
499+
import vortex
500+
from vortex import store
501+
502+
from pypaimon.read.reader.vortex_utils import to_vortex_specified
503+
file_path_for_vortex, store_kwargs = to_vortex_specified(self, path)
504+
505+
if store_kwargs:
506+
vortex_store = store.from_url(file_path_for_vortex, **store_kwargs)
507+
vortex_store.write(vortex.array(data))
508+
else:
509+
from vortex._lib.io import write as vortex_write
510+
vortex_write(vortex.array(data), file_path_for_vortex)
511+
except Exception as e:
512+
self.delete_quietly(path)
513+
raise RuntimeError(f"Failed to write Vortex file {path}: {e}") from e
514+
497515
def write_blob(self, path: str, data: pyarrow.Table, **kwargs):
498516
try:
499517
if data.num_columns != 1:
Lines changed: 90 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,90 @@
1+
################################################################################
2+
# Licensed to the Apache Software Foundation (ASF) under one
3+
# or more contributor license agreements. See the NOTICE file
4+
# distributed with this work for additional information
5+
# regarding copyright ownership. The ASF licenses this file
6+
# to you under the Apache License, Version 2.0 (the
7+
# "License"); you may not use this file except in compliance
8+
# with the License. You may obtain a copy of the License at
9+
#
10+
# http://www.apache.org/licenses/LICENSE-2.0
11+
#
12+
# Unless required by applicable law or agreed to in writing, software
13+
# distributed under the License is distributed on an "AS IS" BASIS,
14+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
# See the License for the specific language governing permissions and
16+
# limitations under the License.
17+
################################################################################
18+
19+
from typing import List, Optional, Any
20+
21+
import pyarrow as pa
22+
import pyarrow.dataset as ds
23+
from pyarrow import RecordBatch
24+
25+
from pypaimon.common.file_io import FileIO
26+
from pypaimon.read.reader.iface.record_batch_reader import RecordBatchReader
27+
28+
29+
class FormatVortexReader(RecordBatchReader):
30+
"""
31+
A Format Reader that reads record batch from a Vortex file,
32+
and filters it based on the provided predicate and projection.
33+
"""
34+
35+
def __init__(self, file_io: FileIO, file_path: str, read_fields: List[str],
36+
push_down_predicate: Any, batch_size: int = 1024):
37+
import vortex
38+
39+
from pypaimon.read.reader.vortex_utils import to_vortex_specified
40+
file_path_for_vortex, store_kwargs = to_vortex_specified(file_io, file_path)
41+
42+
if store_kwargs:
43+
from vortex import store
44+
vortex_store = store.from_url(file_path_for_vortex, **store_kwargs)
45+
vortex_file = vortex_store.open()
46+
else:
47+
vortex_file = vortex.open(file_path_for_vortex)
48+
49+
columns_for_vortex = read_fields if read_fields else None
50+
pa_table = vortex_file.to_arrow(columns_for_vortex).read_all()
51+
52+
# Vortex exports string_view which some PyArrow kernels don't support yet.
53+
pa_table = self._cast_string_view_columns(pa_table)
54+
55+
if push_down_predicate is not None:
56+
in_memory_dataset = ds.InMemoryDataset(pa_table)
57+
scanner = in_memory_dataset.scanner(filter=push_down_predicate, batch_size=batch_size)
58+
self.reader = scanner.to_reader()
59+
else:
60+
self.reader = iter(pa_table.to_batches(max_chunksize=batch_size))
61+
62+
@staticmethod
63+
def _cast_string_view_columns(table: pa.Table) -> pa.Table:
64+
new_fields = []
65+
needs_cast = False
66+
for field in table.schema:
67+
if field.type == pa.string_view():
68+
new_fields.append(field.with_type(pa.utf8()))
69+
needs_cast = True
70+
elif field.type == pa.binary_view():
71+
new_fields.append(field.with_type(pa.binary()))
72+
needs_cast = True
73+
else:
74+
new_fields.append(field)
75+
if not needs_cast:
76+
return table
77+
return table.cast(pa.schema(new_fields))
78+
79+
def read_arrow_batch(self) -> Optional[RecordBatch]:
80+
try:
81+
if hasattr(self.reader, 'read_next_batch'):
82+
return self.reader.read_next_batch()
83+
else:
84+
return next(self.reader)
85+
except StopIteration:
86+
return None
87+
88+
def close(self):
89+
if self.reader is not None:
90+
self.reader = None
Lines changed: 88 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,88 @@
1+
################################################################################
2+
# Licensed to the Apache Software Foundation (ASF) under one
3+
# or more contributor license agreements. See the NOTICE file
4+
# distributed with this work for additional information
5+
# regarding copyright ownership. The ASF licenses this file
6+
# to you under the Apache License, Version 2.0 (the
7+
# "License"); you may not use this file except in compliance
8+
# with the License. You may obtain a copy of the License at
9+
#
10+
# http://www.apache.org/licenses/LICENSE-2.0
11+
#
12+
# Unless required by applicable law or agreed to in writing, software
13+
# distributed under the License is distributed on an "AS IS" BASIS,
14+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
# See the License for the specific language governing permissions and
16+
# limitations under the License.
17+
################################################################################
18+
19+
import os
20+
from typing import Dict, Optional, Tuple
21+
from urllib.parse import urlparse
22+
23+
from pypaimon.common.file_io import FileIO
24+
from pypaimon.common.options.config import OssOptions, S3Options
25+
26+
27+
def to_vortex_specified(file_io: FileIO, file_path: str) -> Tuple[str, Optional[Dict[str, str]]]:
28+
"""Convert path and extract storage options for Vortex store.from_url().
29+
30+
Returns (url, store_kwargs) where store_kwargs can be passed as
31+
keyword arguments to ``vortex.store.from_url(url, **store_kwargs)``.
32+
For local paths store_kwargs is None.
33+
"""
34+
if hasattr(file_io, 'file_io'):
35+
file_io = file_io.file_io()
36+
37+
if hasattr(file_io, 'get_merged_properties'):
38+
properties = file_io.get_merged_properties()
39+
else:
40+
properties = file_io.properties if hasattr(file_io, 'properties') and file_io.properties else None
41+
42+
scheme, _, _ = file_io.parse_location(file_path)
43+
file_path_for_vortex = file_io.to_filesystem_path(file_path)
44+
45+
store_kwargs = None
46+
47+
if scheme in {'file', None} or not scheme:
48+
if not os.path.isabs(file_path_for_vortex):
49+
file_path_for_vortex = os.path.abspath(file_path_for_vortex)
50+
return file_path_for_vortex, None
51+
52+
# For remote schemes, keep the original URI so vortex can parse it
53+
file_path_for_vortex = file_path
54+
55+
if scheme in {'s3', 's3a', 's3n'} and properties:
56+
store_kwargs = {}
57+
if properties.contains(S3Options.S3_REGION):
58+
store_kwargs['region'] = properties.get(S3Options.S3_REGION)
59+
if properties.contains(S3Options.S3_ACCESS_KEY_ID):
60+
store_kwargs['access_key_id'] = properties.get(S3Options.S3_ACCESS_KEY_ID)
61+
if properties.contains(S3Options.S3_ACCESS_KEY_SECRET):
62+
store_kwargs['secret_access_key'] = properties.get(S3Options.S3_ACCESS_KEY_SECRET)
63+
if properties.contains(S3Options.S3_SECURITY_TOKEN):
64+
store_kwargs['session_token'] = properties.get(S3Options.S3_SECURITY_TOKEN)
65+
if properties.contains(S3Options.S3_ENDPOINT):
66+
store_kwargs['endpoint'] = properties.get(S3Options.S3_ENDPOINT)
67+
68+
elif scheme == 'oss' and properties:
69+
parsed = urlparse(file_path)
70+
bucket = parsed.netloc
71+
72+
store_kwargs = {}
73+
if properties.contains(OssOptions.OSS_REGION):
74+
store_kwargs['region'] = properties.get(OssOptions.OSS_REGION)
75+
if properties.contains(OssOptions.OSS_ACCESS_KEY_ID):
76+
store_kwargs['access_key_id'] = properties.get(OssOptions.OSS_ACCESS_KEY_ID)
77+
if properties.contains(OssOptions.OSS_ACCESS_KEY_SECRET):
78+
store_kwargs['secret_access_key'] = properties.get(OssOptions.OSS_ACCESS_KEY_SECRET)
79+
if properties.contains(OssOptions.OSS_SECURITY_TOKEN):
80+
store_kwargs['session_token'] = properties.get(OssOptions.OSS_SECURITY_TOKEN)
81+
if properties.contains(OssOptions.OSS_ENDPOINT):
82+
endpoint = properties.get(OssOptions.OSS_ENDPOINT)
83+
endpoint_clean = endpoint.replace('http://', '').replace('https://', '')
84+
store_kwargs['endpoint'] = f"https://{bucket}.{endpoint_clean}"
85+
86+
file_path_for_vortex = file_path_for_vortex.replace('oss://', 's3://')
87+
88+
return file_path_for_vortex, store_kwargs

paimon-python/pypaimon/read/split_read.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,7 @@
4646
from pypaimon.read.reader.format_blob_reader import FormatBlobReader
4747
from pypaimon.read.reader.format_lance_reader import FormatLanceReader
4848
from pypaimon.read.reader.format_pyarrow_reader import FormatPyArrowReader
49+
from pypaimon.read.reader.format_vortex_reader import FormatVortexReader
4950
from pypaimon.read.reader.iface.record_batch_reader import (RecordBatchReader,
5051
RowPositionReader, EmptyRecordBatchReader)
5152
from pypaimon.read.reader.iface.record_reader import RecordReader
@@ -149,6 +150,9 @@ def file_reader_supplier(self, file: DataFileMeta, for_merge_read: bool,
149150
elif file_format == CoreOptions.FILE_FORMAT_LANCE:
150151
format_reader = FormatLanceReader(self.table.file_io, file_path, read_file_fields,
151152
read_arrow_predicate, batch_size=batch_size)
153+
elif file_format == CoreOptions.FILE_FORMAT_VORTEX:
154+
format_reader = FormatVortexReader(self.table.file_io, file_path, read_file_fields,
155+
read_arrow_predicate, batch_size=batch_size)
152156
elif file_format == CoreOptions.FILE_FORMAT_PARQUET or file_format == CoreOptions.FILE_FORMAT_ORC:
153157
name_to_field = {f.name: f for f in self.read_fields}
154158
ordered_read_fields = [name_to_field[n] for n in read_file_fields if n in name_to_field]

paimon-python/pypaimon/tests/reader_append_only_test.py

Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818

1919
import os
2020
import shutil
21+
import sys
2122
import tempfile
2223
import time
2324
import unittest
@@ -102,6 +103,51 @@ def test_lance_ao_reader(self):
102103
actual = self._read_test_table(read_builder).sort_by('user_id')
103104
self.assertEqual(actual, self.expected)
104105

106+
@unittest.skipIf(sys.version_info < (3, 11), "vortex-data requires Python >= 3.11")
107+
def test_vortex_ao_reader(self):
108+
schema = Schema.from_pyarrow_schema(self.pa_schema, partition_keys=['dt'], options={'file.format': 'vortex'})
109+
self.catalog.create_table('default.test_append_only_vortex', schema, False)
110+
table = self.catalog.get_table('default.test_append_only_vortex')
111+
self._write_test_table(table)
112+
113+
read_builder = table.new_read_builder()
114+
actual = self._read_test_table(read_builder).sort_by('user_id')
115+
self.assertEqual(actual, self.expected)
116+
117+
@unittest.skipIf(sys.version_info < (3, 11), "vortex-data requires Python >= 3.11")
118+
def test_vortex_ao_reader_with_filter(self):
119+
schema = Schema.from_pyarrow_schema(self.pa_schema, partition_keys=['dt'], options={'file.format': 'vortex'})
120+
self.catalog.create_table('default.test_append_only_vortex_filter', schema, False)
121+
table = self.catalog.get_table('default.test_append_only_vortex_filter')
122+
self._write_test_table(table)
123+
124+
predicate_builder = table.new_read_builder().new_predicate_builder()
125+
p1 = predicate_builder.less_than('user_id', 7)
126+
p2 = predicate_builder.greater_or_equal('user_id', 2)
127+
p3 = predicate_builder.between('user_id', 0, 6) # [2/b, 3/c, 4/d, 5/e, 6/f] left
128+
p4 = predicate_builder.is_not_in('behavior', ['b', 'e']) # [3/c, 4/d, 6/f] left
129+
p5 = predicate_builder.is_in('dt', ['p1']) # exclude 3/c
130+
p6 = predicate_builder.is_not_null('behavior') # exclude 4/d
131+
g1 = predicate_builder.and_predicates([p1, p2, p3, p4, p5, p6])
132+
read_builder = table.new_read_builder().with_filter(g1)
133+
actual = self._read_test_table(read_builder)
134+
expected = pa.concat_tables([
135+
self.expected.slice(5, 1) # 6/f
136+
])
137+
self.assertEqual(actual.sort_by('user_id'), expected)
138+
139+
@unittest.skipIf(sys.version_info < (3, 11), "vortex-data requires Python >= 3.11")
140+
def test_vortex_ao_reader_with_projection(self):
141+
schema = Schema.from_pyarrow_schema(self.pa_schema, partition_keys=['dt'], options={'file.format': 'vortex'})
142+
self.catalog.create_table('default.test_vortex_append_only_projection', schema, False)
143+
table = self.catalog.get_table('default.test_vortex_append_only_projection')
144+
self._write_test_table(table)
145+
146+
read_builder = table.new_read_builder().with_projection(['dt', 'user_id'])
147+
actual = self._read_test_table(read_builder).sort_by('user_id')
148+
expected = self.expected.select(['dt', 'user_id'])
149+
self.assertEqual(actual, expected)
150+
105151
def test_lance_ao_reader_with_filter(self):
106152
schema = Schema.from_pyarrow_schema(self.pa_schema, partition_keys=['dt'], options={'file.format': 'lance'})
107153
self.catalog.create_table('default.test_append_only_lance_filter', schema, False)

0 commit comments

Comments
 (0)