Skip to content

Commit 7f1c329

Browse files
author
XiaoHongbo
authored
[python] ray version compatible (apache#6937)
1 parent e1cbeed commit 7f1c329

File tree

2 files changed

+138
-16
lines changed

2 files changed

+138
-16
lines changed

.github/workflows/paimon-python-checks.yml

Lines changed: 104 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -101,3 +101,107 @@ jobs:
101101
run: |
102102
chmod +x paimon-python/dev/lint-python.sh
103103
./paimon-python/dev/lint-python.sh
104+
105+
requirement_version_compatible_test:
106+
runs-on: ubuntu-latest
107+
container: "python:3.10-slim"
108+
109+
steps:
110+
- name: Checkout code
111+
uses: actions/checkout@v2
112+
113+
- name: Set up JDK ${{ env.JDK_VERSION }}
114+
uses: actions/setup-java@v4
115+
with:
116+
java-version: ${{ env.JDK_VERSION }}
117+
distribution: 'temurin'
118+
119+
- name: Set up Maven
120+
uses: stCarolas/[email protected]
121+
with:
122+
maven-version: 3.8.8
123+
124+
- name: Install system dependencies
125+
shell: bash
126+
run: |
127+
apt-get update && apt-get install -y \
128+
build-essential \
129+
git \
130+
curl \
131+
&& rm -rf /var/lib/apt/lists/*
132+
133+
- name: Verify Java and Maven installation
134+
run: |
135+
java -version
136+
mvn -version
137+
138+
- name: Verify Python version
139+
run: python --version
140+
141+
- name: Build Java
142+
run: |
143+
echo "Start compiling modules"
144+
mvn -T 2C -B clean install -DskipTests
145+
146+
- name: Install base Python dependencies
147+
shell: bash
148+
run: |
149+
python -m pip install --upgrade pip
150+
python -m pip install -q \
151+
pyroaring \
152+
readerwriterlock==1.0.9 \
153+
fsspec==2024.3.1 \
154+
cachetools==5.3.3 \
155+
ossfs==2023.12.0 \
156+
fastavro==1.11.1 \
157+
pyarrow==16.0.0 \
158+
zstandard==0.24.0 \
159+
polars==1.32.0 \
160+
duckdb==1.3.2 \
161+
numpy==1.24.3 \
162+
pandas==2.0.3 \
163+
pytest~=7.0 \
164+
py4j==0.10.9.9 \
165+
requests \
166+
parameterized==0.9.0 \
167+
packaging
168+
169+
- name: Test requirement version compatibility
170+
shell: bash
171+
run: |
172+
cd paimon-python
173+
174+
# Test Ray version compatibility
175+
echo "=========================================="
176+
echo "Testing Ray version compatibility"
177+
echo "=========================================="
178+
for ray_version in 2.44.0 2.48.0 2.53.0; do
179+
echo "Testing Ray version: $ray_version"
180+
181+
# Install specific Ray version
182+
python -m pip install -q ray==$ray_version
183+
184+
# Verify Ray version
185+
python -c "import ray; print(f'Ray version: {ray.__version__}')"
186+
python -c "from packaging.version import parse; import ray; assert parse(ray.__version__) == parse('$ray_version'), f'Expected Ray $ray_version, got {ray.__version__}'"
187+
188+
# Run tests
189+
python -m pytest pypaimon/tests/ray_data_test.py::RayDataTest -v --tb=short || {
190+
echo "Tests failed for Ray $ray_version"
191+
exit 1
192+
}
193+
194+
# Uninstall Ray to avoid conflicts
195+
python -m pip uninstall -y ray
196+
done
197+
198+
# Add other dependency version tests here in the future
199+
# Example:
200+
# echo "=========================================="
201+
# echo "Testing PyArrow version compatibility"
202+
# echo "=========================================="
203+
# for pyarrow_version in 16.0.0 17.0.0 18.0.0; do
204+
# ...
205+
# done
206+
env:
207+
PYTHONPATH: ${{ github.workspace }}/paimon-python

paimon-python/pypaimon/read/ray_datasource.py

Lines changed: 34 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -25,13 +25,19 @@
2525
from typing import List, Optional, Iterable
2626

2727
import pyarrow
28+
from packaging.version import parse
29+
import ray
2830

2931
from pypaimon.read.split import Split
3032
from pypaimon.read.table_read import TableRead
3133
from pypaimon.schema.data_types import PyarrowFieldParser
3234

3335
logger = logging.getLogger(__name__)
3436

37+
# Ray version constants for compatibility
38+
RAY_VERSION_SCHEMA_IN_READ_TASK = "2.48.0" # Schema moved from BlockMetadata to ReadTask
39+
RAY_VERSION_PER_TASK_ROW_LIMIT = "2.52.0" # per_task_row_limit parameter introduced
40+
3541
from ray.data.datasource import Datasource
3642

3743

@@ -94,11 +100,13 @@ def _distribute_splits_into_equal_chunks(
94100

95101
return chunks
96102

97-
def get_read_tasks(self, parallelism: int) -> List:
103+
def get_read_tasks(self, parallelism: int, **kwargs) -> List:
98104
"""Return a list of read tasks that can be executed in parallel."""
99105
from ray.data.datasource import ReadTask
100106
from ray.data.block import BlockMetadata
101107

108+
per_task_row_limit = kwargs.get('per_task_row_limit', None)
109+
102110
# Validate parallelism parameter
103111
if parallelism < 1:
104112
raise ValueError(f"parallelism must be at least 1, got {parallelism}")
@@ -191,20 +199,30 @@ def _get_read_task(
191199
num_rows = total_rows if total_rows > 0 else None
192200
size_bytes = total_size if total_size > 0 else None
193201

194-
metadata = BlockMetadata(
195-
num_rows=num_rows,
196-
size_bytes=size_bytes,
197-
input_files=input_files if input_files else None,
198-
exec_stats=None, # Will be populated by Ray during execution
199-
)
200-
201-
# TODO: per_task_row_limit is not supported in Ray 2.48.0, will be added in future versions
202-
read_tasks.append(
203-
ReadTask(
204-
read_fn=lambda splits=chunk_splits: get_read_task(splits),
205-
metadata=metadata,
206-
schema=schema,
207-
)
208-
)
202+
metadata_kwargs = {
203+
'num_rows': num_rows,
204+
'size_bytes': size_bytes,
205+
'input_files': input_files if input_files else None,
206+
'exec_stats': None, # Will be populated by Ray during execution
207+
}
208+
209+
if parse(ray.__version__) < parse(RAY_VERSION_SCHEMA_IN_READ_TASK):
210+
metadata_kwargs['schema'] = schema
211+
212+
metadata = BlockMetadata(**metadata_kwargs)
213+
214+
read_fn = partial(get_read_task, chunk_splits)
215+
read_task_kwargs = {
216+
'read_fn': read_fn,
217+
'metadata': metadata,
218+
}
219+
220+
if parse(ray.__version__) >= parse(RAY_VERSION_SCHEMA_IN_READ_TASK):
221+
read_task_kwargs['schema'] = schema
222+
223+
if parse(ray.__version__) >= parse(RAY_VERSION_PER_TASK_ROW_LIMIT) and per_task_row_limit is not None:
224+
read_task_kwargs['per_task_row_limit'] = per_task_row_limit
225+
226+
read_tasks.append(ReadTask(**read_task_kwargs))
209227

210228
return read_tasks

0 commit comments

Comments
 (0)