Skip to content

Commit fe86387

Browse files
authored
Update packaging and documentation (#8)
* [Docs] Update ReadMe Signed-off-by: Austin Liu <[email protected]> * [Chores] Update packagings Signed-off-by: Austin Liu <[email protected]> Format Signed-off-by: Austin Liu <[email protected]> * Fix tense Signed-off-by: Austin Liu <[email protected]> --------- Signed-off-by: Austin Liu <[email protected]>
1 parent 78defa4 commit fe86387

12 files changed

+58
-57
lines changed

Cargo.lock

+19-19
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

+5-5
Original file line numberDiff line numberDiff line change
@@ -16,8 +16,8 @@
1616
# under the License.
1717

1818
[package]
19-
name = "raysql"
20-
description = "RaySQL: DataFusion on Ray"
19+
name = "datafusion_ray"
20+
description = "DataFusion on Ray"
2121
homepage = "https://github.com/datafusion-contrib/ray-sql"
2222
repository = "https://github.com/datafusion-contrib/ray-sql"
2323
authors = ["Andy Grove <[email protected]>", "Frank Luan <[email protected]>"]
@@ -38,19 +38,19 @@ log = "0.4"
3838
prost = "0.12"
3939
prost-types = "0.12"
4040
pyo3 = { version = "0.21", features = ["extension-module", "abi3", "abi3-py38"] }
41-
tokio = { version = "1.24", features = ["macros", "rt", "rt-multi-thread", "sync"] }
41+
tokio = { version = "1.40", features = ["macros", "rt", "rt-multi-thread", "sync"] }
4242
uuid = "1.2"
4343

4444
[build-dependencies]
4545
rustc_version = "0.4.0"
4646
tonic-build = { version = "0.8", default-features = false, features = ["transport", "prost"] }
4747

4848
[lib]
49-
name = "raysql"
49+
name = "datafusion_ray"
5050
crate-type = ["cdylib", "rlib"]
5151

5252
[package.metadata.maturin]
53-
name = "raysql._raysql_internal"
53+
name = "datafusion_ray._datafusion_ray_internal"
5454

5555
[profile.release]
5656
codegen-units = 1

README.md

+15-11
Original file line numberDiff line numberDiff line change
@@ -17,11 +17,13 @@
1717
under the License.
1818
-->
1919

20-
# datafusion-ray: DataFusion on Ray
20+
# DataFusion on Ray
2121

22-
This is a research project to evaluate performing distributed SQL queries from Python, using
22+
> This was originally a research project donated from [ray-sql](https://github.com/datafusion-contrib/ray-sql) to evaluate performing distributed SQL queries from Python, using
2323
[Ray](https://www.ray.io/) and [DataFusion](https://github.com/apache/arrow-datafusion).
2424

25+
DataFusion Ray is a distributed SQL query engine powered by the Rust implementation of [Apache Arrow](https://arrow.apache.org/), [Apache DataFusion](https://datafusion.apache.org/) and [Ray](https://www.ray.io/).
26+
2527
## Goals
2628

2729
- Demonstrate how easily new systems can be built on top of DataFusion. See the [design documentation](./docs/README.md)
@@ -31,7 +33,9 @@ This is a research project to evaluate performing distributed SQL queries from P
3133

3234
## Non Goals
3335

34-
- Build and support a production system.
36+
- Re-build the cluster scheduling systems like what [Ballista](https://datafusion.apache.org/ballista/) did.
37+
- Ballista is extremely complex and utilizing Ray feels like it abstracts some of that complexity away.
38+
- Datafusion Ray is delegating cluster management to Ray.
3539

3640
## Example
3741

@@ -42,7 +46,7 @@ import os
4246
import pandas as pd
4347
import ray
4448

45-
from raysql import RaySqlContext
49+
from datafusion_ray import RaySqlContext
4650

4751
SCRIPT_DIR = os.path.dirname(os.path.realpath(__file__))
4852

@@ -64,7 +68,7 @@ for record_batch in result_set:
6468

6569
## Status
6670

67-
- RaySQL can run all queries in the TPC-H benchmark
71+
- DataFusion Ray can run all queries in the TPC-H benchmark
6872

6973
## Features
7074

@@ -73,29 +77,29 @@ for record_batch in result_set:
7377

7478
## Limitations
7579

76-
- Requires a shared file system currently
80+
- Requires a shared file system currently. Check details [here](./docs/README.md#distributed-shuffle).
7781

7882
## Performance
7983

80-
This chart shows the performance of RaySQL compared to Apache Spark for
84+
This chart shows the performance of DataFusion Ray compared to Apache Spark for
8185
[SQLBench-H](https://sqlbenchmarks.io/sqlbench-h/) at a very small data set (10GB), running on a desktop (Threadripper
82-
with 24 physical cores). Both RaySQL and Spark are configured with 24 executors.
86+
with 24 physical cores). Both DataFusion Ray and Spark are configured with 24 executors.
8387

8488
### Overall Time
8589

86-
RaySQL is ~1.9x faster overall for this scale factor and environment with disk-based shuffle.
90+
DataFusion Ray is ~1.9x faster overall for this scale factor and environment with disk-based shuffle.
8791

8892
![SQLBench-H Total](./docs/sqlbench-h-total.png)
8993

9094
### Per Query Time
9195

92-
Spark is much faster on some queries, likely due to broadcast exchanges, which RaySQL hasn't implemented yet.
96+
Spark is much faster on some queries, likely due to broadcast exchanges, which DataFusion Ray hasn't implemented yet.
9397

9498
![SQLBench-H Per Query](./docs/sqlbench-h-per-query.png)
9599

96100
### Performance Plan
97101

98-
I'm planning on experimenting with the following changes to improve performance:
102+
Plans on experimenting with the following changes to improve performance:
99103

100104
- Make better use of Ray futures to run more tasks in parallel
101105
- Use Ray object store for shuffle data transfer to reduce disk I/O cost

raysql/__init__.py datafusion_ray/__init__.py

+1-1
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@
2020
except ImportError:
2121
import importlib_metadata
2222

23-
from ._raysql_internal import (
23+
from ._datafusion_ray_internal import (
2424
Context,
2525
ExecutionGraph,
2626
QueryStage,

raysql/context.py datafusion_ray/context.py

+7-7
Original file line numberDiff line numberDiff line change
@@ -23,8 +23,8 @@
2323
import pyarrow as pa
2424
import ray
2525

26-
import raysql
27-
from raysql import Context, ExecutionGraph, QueryStage
26+
import datafusion_ray
27+
from datafusion_ray import Context, ExecutionGraph, QueryStage
2828
from typing import List
2929

3030
def schedule_execution(
@@ -73,7 +73,7 @@ def _get_worker_inputs(
7373
return ids, futures
7474

7575
# schedule the actual execution workers
76-
plan_bytes = raysql.serialize_execution_plan(stage.get_execution_plan())
76+
plan_bytes = datafusion_ray.serialize_execution_plan(stage.get_execution_plan())
7777
futures = []
7878
opt = {}
7979
opt["resources"] = {"worker": 1e-3}
@@ -153,7 +153,7 @@ def _get_worker_inputs(
153153
ray.get([f for _, lst in child_outputs for f in lst])
154154

155155
# schedule the actual execution workers
156-
plan_bytes = raysql.serialize_execution_plan(stage.get_execution_plan())
156+
plan_bytes = datafusion_ray.serialize_execution_plan(stage.get_execution_plan())
157157
futures = []
158158
opt = {}
159159
opt["resources"] = {"worker": 1e-3}
@@ -179,7 +179,7 @@ def execute_query_partition(
179179
*input_partitions: list[pa.RecordBatch],
180180
) -> Iterable[pa.RecordBatch]:
181181
start_time = time.time()
182-
plan = raysql.deserialize_execution_plan(plan_bytes)
182+
plan = datafusion_ray.deserialize_execution_plan(plan_bytes)
183183
# print(
184184
# "Worker executing plan {} partition #{} with shuffle inputs {}".format(
185185
# plan.display(),
@@ -193,7 +193,7 @@ def execute_query_partition(
193193
# This is delegating to DataFusion for execution, but this would be a good place
194194
# to plug in other execution engines by translating the plan into another engine's plan
195195
# (perhaps via Substrait, once DataFusion supports converting a physical plan to Substrait)
196-
ret = raysql.execute_partition(plan, part, partitions)
196+
ret = datafusion_ray.execute_partition(plan, part, partitions)
197197
duration = time.time() - start_time
198198
event = {
199199
"cat": f"{stage_id}-{part}",
@@ -238,7 +238,7 @@ def sql(self, sql: str) -> pa.RecordBatch:
238238
else:
239239
# serialize the query stages and store in Ray object store
240240
query_stages = [
241-
raysql.serialize_execution_plan(
241+
datafusion_ray.serialize_execution_plan(
242242
graph.get_query_stage(i).get_execution_plan()
243243
)
244244
for i in range(final_stage_id + 1)

raysql/main.py datafusion_ray/main.py

+3-6
Original file line numberDiff line numberDiff line change
@@ -20,18 +20,15 @@
2020

2121
from pyarrow import csv as pacsv
2222
import ray
23-
from raysql import RaySqlContext
23+
from datafusion_ray import RaySqlContext
2424

2525
NUM_CPUS_PER_WORKER = 8
2626

27-
SF = 10
27+
SF = 1
2828
DATA_DIR = f"/mnt/data0/tpch/sf{SF}-parquet"
2929
SCRIPT_DIR = os.path.dirname(os.path.realpath(__file__))
3030
QUERIES_DIR = os.path.join(SCRIPT_DIR, f"../sqlbench-h/queries/sf={SF}")
3131
RESULTS_DIR = f"results-sf{SF}"
32-
TRUTH_DIR = (
33-
"/home/ubuntu/raysort/ray-sql/sqlbench-runners/spark/{RESULTS_DIR}/{RESULTS_DIR}"
34-
)
3532

3633

3734
def setup_context(use_ray_shuffle: bool, num_workers: int = 2) -> RaySqlContext:
@@ -104,7 +101,7 @@ def compare(q: int):
104101

105102

106103
def tpch_bench():
107-
ray.init("auto")
104+
ray.init(resources={"worker": 1})
108105
num_workers = int(ray.cluster_resources().get("worker", 1)) * NUM_CPUS_PER_WORKER
109106
use_ray_shuffle = False
110107
ctx = setup_context(use_ray_shuffle, num_workers)
File renamed without changes.

raysql/tests/test_context.py datafusion_ray/tests/test_context.py

+1-1
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@
1616
# under the License.
1717

1818
import pytest
19-
from raysql import Context
19+
from datafusion_ray import Context
2020

2121
def test():
2222
ctx = Context(1, False)

pyproject.toml

+3-3
Original file line numberDiff line numberDiff line change
@@ -16,11 +16,11 @@
1616
# under the License.
1717

1818
[build-system]
19-
requires = ["maturin>=0.14,<0.15"]
19+
requires = ["maturin>=0.14,<1.7.4"]
2020
build-backend = "maturin"
2121

2222
[project]
23-
name = "raysql"
23+
name = "datafusion-ray"
2424
requires-python = ">=3.7"
2525
classifiers = [
2626
"Programming Language :: Rust",
@@ -30,4 +30,4 @@ classifiers = [
3030

3131

3232
[tool.maturin]
33-
module-name = "raysql._raysql_internal"
33+
module-name = "datafusion_ray._datafusion_ray_internal"

requirements-in.txt

+2-2
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,11 @@
11
black
22
flake8
33
isort
4-
maturin[patchelf]
4+
maturin
55
mypy
66
numpy
77
pyarrow
88
pytest
9-
ray==2.3.0
9+
ray==2.37.0
1010
toml
1111
importlib_metadata; python_version < "3.8"

src/lib.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@ pub mod utils;
3131

3232
/// A Python module implemented in Rust.
3333
#[pymodule]
34-
fn _raysql_internal(_py: Python, m: &PyModule) -> PyResult<()> {
34+
fn _datafusion_ray_internal(_py: Python, m: &PyModule) -> PyResult<()> {
3535
// register classes that can be created directly from Python code
3636
m.add_class::<context::PyContext>()?;
3737
m.add_class::<planner::PyExecutionGraph>()?;

src/query_stage.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ use datafusion_python::physical_plan::PyExecutionPlan;
2424
use pyo3::prelude::*;
2525
use std::sync::Arc;
2626

27-
#[pyclass(name = "QueryStage", module = "raysql", subclass)]
27+
#[pyclass(name = "QueryStage", module = "datafusion_ray", subclass)]
2828
pub struct PyQueryStage {
2929
stage: Arc<QueryStage>,
3030
}

0 commit comments

Comments
 (0)