Skip to content

Commit ead1e4e

Browse files
authored
Fix FFI errors, and add script for running TPC-H (#20)
* Add script for running TPC-H * use latest df-python * use latest df-python * fix * tpch now works * remove old performance data * update result filename * update expected plans * formatting * add note * revert formatting change
1 parent 880544a commit ead1e4e

30 files changed

+2301
-1411
lines changed

Cargo.lock

+304-317
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

+9-6
Original file line numberDiff line numberDiff line change
@@ -29,17 +29,20 @@ rust-version = "1.62"
2929
build = "build.rs"
3030

3131
[dependencies]
32-
datafusion = { version = "41.0.0", features = ["pyarrow", "avro"] }
33-
datafusion-proto = "41.0.0"
34-
datafusion-python = "41.0.0"
32+
datafusion = { version = "42.0.0", features = ["pyarrow", "avro"] }
33+
datafusion-proto = "42.0.0"
34+
35+
# temporarily point to revision until version 42 is released
36+
datafusion-python = { git = "https://github.com/apache/datafusion-python" }
37+
3538
futures = "0.3"
3639
log = "0.4"
37-
prost = "0.12"
38-
pyo3 = { version = "0.21", features = ["extension-module", "abi3", "abi3-py38"] }
40+
prost = "0.13"
41+
pyo3 = { version = "0.22", features = ["extension-module", "abi3", "abi3-py38"] }
3942
tokio = { version = "1.40", features = ["macros", "rt", "rt-multi-thread", "sync"] }
4043

4144
[build-dependencies]
42-
prost-types = "0.12"
45+
prost-types = "0.13"
4346
rustc_version = "0.4.0"
4447
tonic-build = { version = "0.8", default-features = false, features = ["transport", "prost"] }
4548

README.md

+24-44
Original file line numberDiff line numberDiff line change
@@ -19,31 +19,41 @@
1919

2020
# DataFusion on Ray
2121

22-
> This was originally a research project donated from [ray-sql](https://github.com/datafusion-contrib/ray-sql) to evaluate performing distributed SQL queries from Python, using
23-
> [Ray](https://www.ray.io/) and [DataFusion](https://github.com/apache/arrow-datafusion).
22+
> This was originally a research project donated from [ray-sql] to evaluate performing distributed SQL queries from
23+
> Python, using [Ray] and [Apache DataFusion]
2424
25-
DataFusion Ray is a distributed SQL query engine powered by the Rust implementation of [Apache Arrow](https://arrow.apache.org/), [Apache DataFusion](https://datafusion.apache.org/) and [Ray](https://www.ray.io/).
25+
[ray-sql]: https://github.com/datafusion-contrib/ray-sql
2626

27-
## Goals
27+
DataFusion Ray is a distributed Python DataFrame and SQL query engine powered by the Rust implementation
28+
of [Apache Arrow], [Apache DataFusion], and [Ray].
2829

29-
- Demonstrate how easily new systems can be built on top of DataFusion. See the [design documentation](./docs/README.md)
30-
to understand how RaySQL works.
31-
- Drive requirements for DataFusion's [Python bindings](https://github.com/apache/arrow-datafusion-python).
32-
- Create content for an interesting blog post or conference talk.
30+
[Ray]: https://www.ray.io/
31+
[Apache Arrow]: https://arrow.apache.org/
32+
[Apache DataFusion]: https://datafusion.apache.org/
3333

34-
## Non Goals
34+
## Comparison to other DataFusion projects
3535

36-
- Re-build the cluster scheduling systems like what [Ballista](https://datafusion.apache.org/ballista/) did.
37-
- Ballista is extremely complex and utilizing Ray feels like it abstracts some of that complexity away.
38-
- Datafusion Ray is delegating cluster management to Ray.
36+
### Comparison to DataFusion Ballista
37+
38+
- Unlike [DataFusion Ballista], DataFusion Ray does not provide its own distributed scheduler and instead relies on
39+
Ray for this functionality. As a result of this design choice, DataFusion Ray is a much smaller and simpler project.
40+
- DataFusion Ray is Python-first, and DataFusion Ballista is Rust-first
41+
42+
[DataFusion Ballista]: https://github.com/apache/datafusion-ballista
43+
44+
### Comparison to DataFusion Python
45+
46+
- [DataFusion Python] provides a Python DataFrame and SQL API for in-process execution. DataFusion Ray extends
47+
DataFusion Python to provide scalability across multiple nodes.
48+
49+
[DataFusion Python]: https://github.com/apache/datafusion-python
3950

4051
## Example
4152

4253
Run the following example live in your browser using a Google Colab [notebook](https://colab.research.google.com/drive/1tmSX0Lu6UFh58_-DBUVoyYx6BoXHOszP?usp=sharing).
4354

4455
```python
4556
import os
46-
import pandas as pd
4757
import ray
4858

4959
from datafusion_ray import DatafusionRayContext
@@ -54,7 +64,7 @@ SCRIPT_DIR = os.path.dirname(os.path.realpath(__file__))
5464
ray.init(resources={"worker": 1})
5565

5666
# Create a context and register a table
57-
ctx = DatafusionRayContext(2, use_ray_shuffle=True)
67+
ctx = DatafusionRayContext(2)
5868
# Register either a CSV or Parquet file
5969
# ctx.register_csv("tips", f"{SCRIPT_DIR}/tips.csv", True)
6070
ctx.register_parquet("tips", f"{SCRIPT_DIR}/tips.parquet")
@@ -75,36 +85,6 @@ for record_batch in result_set:
7585
- Mature SQL support (CTEs, joins, subqueries, etc) thanks to DataFusion
7686
- Support for CSV and Parquet files
7787

78-
## Limitations
79-
80-
- Requires a shared file system currently. Check details [here](./docs/README.md#distributed-shuffle).
81-
82-
## Performance
83-
84-
This chart shows the performance of DataFusion Ray compared to Apache Spark for
85-
[SQLBench-H](https://sqlbenchmarks.io/sqlbench-h/) at a very small data set (10GB), running on a desktop (Threadripper
86-
with 24 physical cores). Both DataFusion Ray and Spark are configured with 24 executors.
87-
88-
### Overall Time
89-
90-
DataFusion Ray is ~1.9x faster overall for this scale factor and environment with disk-based shuffle.
91-
92-
![SQLBench-H Total](./docs/sqlbench-h-total.png)
93-
94-
### Per Query Time
95-
96-
Spark is much faster on some queries, likely due to broadcast exchanges, which DataFusion Ray hasn't implemented yet.
97-
98-
![SQLBench-H Per Query](./docs/sqlbench-h-per-query.png)
99-
100-
### Performance Plan
101-
102-
Plans on experimenting with the following changes to improve performance:
103-
104-
- Make better use of Ray futures to run more tasks in parallel
105-
- Use Ray object store for shuffle data transfer to reduce disk I/O cost
106-
- Keep upgrading to newer versions of DataFusion to pick up the latest optimizations
107-
10888
## Building
10989

11090
```bash

src/context.rs

+4-4
Original file line numberDiff line numberDiff line change
@@ -174,7 +174,7 @@ pub fn deserialize_execution_plan(bytes: Vec<u8>) -> PyResult<PyExecutionPlan> {
174174
/// Iterate down an ExecutionPlan and set the input objects for RayShuffleReaderExec.
175175
fn _set_inputs_for_ray_shuffle_reader(
176176
plan: Arc<dyn ExecutionPlan>,
177-
input_partitions: &PyList,
177+
input_partitions: &Bound<'_, PyList>,
178178
) -> Result<()> {
179179
if let Some(reader_exec) = plan.as_any().downcast_ref::<RayShuffleReaderExec>() {
180180
let exec_stage_id = reader_exec.stage_id;
@@ -200,8 +200,8 @@ fn _set_inputs_for_ray_shuffle_reader(
200200
.map_err(|e| DataFusionError::Execution(format!("{}", e)))?
201201
.extract::<usize>()
202202
.map_err(|e| DataFusionError::Execution(format!("{}", e)))?;
203-
let batch = RecordBatch::from_pyarrow(
204-
pytuple
203+
let batch = RecordBatch::from_pyarrow_bound(
204+
&pytuple
205205
.get_item(2)
206206
.map_err(|e| DataFusionError::Execution(format!("{}", e)))?,
207207
)
@@ -235,7 +235,7 @@ fn _execute_partition(
235235
));
236236
Python::with_gil(|py| {
237237
let input_partitions = inputs
238-
.as_ref(py)
238+
.bind(py)
239239
.downcast::<PyList>()
240240
.map_err(|e| DataFusionError::Execution(format!("{}", e)))?;
241241
_set_inputs_for_ray_shuffle_reader(plan.plan.clone(), input_partitions)

0 commit comments

Comments
 (0)