Skip to content

Commit 30ecd04

Browse files
authored
Update Readme (#188)
* Update README.md * Update README.md * Update README.md * Update README.md * Create spark_on_ray.md * Update README.md * Update README.md * Update README.md
1 parent fe62fe5 commit 30ecd04

File tree

2 files changed

+65
-29
lines changed

2 files changed

+65
-29
lines changed

README.md

Lines changed: 45 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -1,17 +1,17 @@
11
# RayDP
22

3-
RayDP is a distributed data processing library that provides simple APIs for running Spark/MPI on [Ray](https://github.com/ray-project/ray) and integrating Spark with distributed deep learning and machine learning frameworks. RayDP makes it simple to build distributed end-to-end data analytics and AI pipeline. Instead of using lots of glue code or an orchestration framework to stitch multiple distributed programs, RayDP allows you to write Spark, PyTorch, Tensorflow, XGBoost code in a single python program with increased productivity and performance. You can build an end-to-end pipeline on a single Ray cluster by using Spark for data preprocessing, RaySGD or Horovod for distributed deep learning, RayTune for hyperparameter tuning and RayServe for model serving.
3+
RayDP is a distributed data processing library that provides simple APIs for running Spark on [Ray](https://github.com/ray-project/ray) and integrating Spark with distributed deep learning and machine learning frameworks. RayDP makes it simple to build distributed end-to-end data analytics and AI pipeline. Instead of using lots of glue code or an orchestration framework to stitch multiple distributed programs, RayDP allows you to write Spark, PyTorch, Tensorflow, XGBoost code in a single python program with increased productivity and performance. You can build an end-to-end pipeline on a single Ray cluster by using Spark for data preprocessing, RaySGD or Horovod for distributed deep learning, RayTune for hyperparameter tuning and RayServe for model serving.
44

55
## Installation
66

77

8-
You can install latest RayDP using pip. RayDP requires Ray (>=1.3.0) and PySpark (>=3.0.0). Please also make sure java is installed and JAVA_HOME is set properly.
8+
You can install latest RayDP using pip. RayDP requires Ray and PySpark. Please also make sure java is installed and JAVA_HOME is set properly.
99

1010
```shell
1111
pip install raydp
1212
```
1313

14-
Or you can install our nightly build:
14+
Or you can install RayDP nightly build:
1515

1616
```shell
1717
pip install raydp-nightly
@@ -26,11 +26,7 @@ pip install dist/raydp*.whl
2626

2727
## Spark on Ray
2828

29-
RayDP provides an API for starting a Spark job on Ray in your python program without a need to setup a Spark cluster manually. RayDP supports Ray as a Spark resource manger and runs Spark executors in Ray actors. RayDP utilizes Ray's in-memory object store to efficiently exchange data between Spark and other Ray libraries. You can use Spark to read the input data, process the data using SQL, Spark DataFrame, or Pandas (via [Koalas](https://github.com/databricks/koalas)) API, extract and transform features using Spark MLLib, and feed the output to deep learning and machine learning frameworks.
30-
31-
### Classic Spark Word Count Example
32-
33-
To start a Spark job on Ray, you can use the `raydp.init_spark` API. After we use RayDP to initialize a Spark cluster, we can use Spark as usual.
29+
RayDP provides an API for starting a Spark job on Ray without a need to setup a Spark cluster separately. RayDP supports Ray as a Spark resource manager and runs Spark executors in Ray actors. To create a Spark session, call the `raydp.init_spark` API. For example:
3430

3531
```python
3632
import ray
@@ -40,10 +36,10 @@ import raydp
4036
ray.init(address='auto')
4137

4238
# create a Spark cluster with specified resource requirements
43-
spark = raydp.init_spark('word_count',
39+
spark = raydp.init_spark(app_name='RayDP Example',
4440
num_executors=2,
4541
executor_cores=2,
46-
executor_memory='1G')
42+
executor_memory='4GB')
4743

4844
# normal data processesing with Spark
4945
df = spark.createDataFrame([('look',), ('spark',), ('tutorial',), ('spark',), ('look', ), ('python', )], ['word'])
@@ -55,45 +51,65 @@ word_count.show()
5551
raydp.stop_spark()
5652
```
5753

58-
### Dynamic Resource Allocation
54+
Spark features such as dynamic resource allocation, spark-submit script, etc are also supported. Please refer to [Spark on Ray](./doc/spark_on_ray.md) for more details.
55+
56+
## Machine Learning and Deep Learning With a Spark DataFrame
5957

60-
RayDP now supports External Shuffle Serivce. To enable it, you can either set `spark.shuffle.service.enabled` to `true` in `spark-defaults.conf`, or you can provide a config to `raydp.init_spark`, as shown below:
58+
RayDP provides APIs for converting a Spark DataFrame to a Ray Dataset or Ray MLDataset which can be consumed by XGBoost, RaySGD or Horovod on Ray. RayDP also provides high level scikit-learn style Estimator APIs for distributed training with PyTorch or Tensorflow.
6159

60+
61+
***Spark DataFrame <=> Ray Dataset***
6262
```python
63-
raydp.init_spark(..., configs={"spark.shuffle.service.enabled": "true"})
64-
```
63+
import ray
64+
import raydp
6565

66-
The user-provided config will overwrite those specified in `spark-defaults.conf`. By default Spark will load `spark-defaults.conf` from `$SPARK_HOME/conf`, you can also modify this location by setting `SPARK_CONF_DIR`.
66+
ray.init()
67+
spark = raydp.init_spark(app_name="RayDP Example",
68+
num_executors=2,
69+
executor_cores=2,
70+
executor_memory="4GB")
6771

68-
Similarly, you can also enable Dynamic Executor Allocation this way. However, because Ray does not support object ownership tranferring now(1.3.0), you must use Dynamic Executor Allocation with data persistence. You can write the data frame in spark to HDFS as a parquet as shown below:
72+
# Spark Dataframe to Ray Dataset
73+
df1 = spark.range(0, 1000)
74+
ds1 = ray.data.from_spark(df1)
6975

70-
```python
71-
ds = RayMLDataset.from_spark(..., fs_directory="hdfs://host:port/your/directory")
76+
# Ray Dataset to Spark Dataframe
77+
ds2 = ray.data.from_items([{"id": i} for i in range(1000)])
78+
df2 = ds2.to_spark(spark)
7279
```
80+
Please refer to [Spark+XGBoost on Ray](./examples/xgboost_ray_nyctaxi.py) for a full example.
7381

74-
### Spark Submit
75-
76-
RayDP provides a substitute for spark-submit in Apache Spark. You can run your java or scala application on RayDP cluster by using `bin/raydp-submit`. You can add it to `PATH` for convenience. When using `raydp-submit`, you should specify number of executors, number of cores and memory each executor by Spark properties, such as `--conf spark.executor.cores=1`, `--conf spark.executor.instances=1` and `--conf spark.executor.memory=500m`. `raydp-submit` only supports Ray cluster. Spark standalone, Apache Mesos, Apache Yarn are not supported, please use traditional `spark-submit` in that case. For the same reason, you do not need to specify `--master` in the command. Besides, RayDP does not support cluster as deploy-mode.
82+
***Spark DataFrame => Ray MLDataset***
7783

78-
### Integrating Spark with Deep Learning and Machine Learning Frameworks
84+
RayDP provides an API for creating a Ray MLDataset from a Spark dataframe. MLDataset can be converted to a PyTorch or Tensorflow dataset for distributed training with Horovod on Ray or RaySGD. MLDataset is also supported by XGBoost on Ray as a data source.
7985

80-
Combined with other ray components, such as RaySGD and RayServe, we can easily build an end-to-end deep learning pipeline.
86+
```python
87+
import ray
88+
import raydp
89+
from raydp.spark import RayMLDataset
8190

82-
***MLDataset API***
91+
ray.init()
92+
spark = raydp.init_spark(app_name="RayDP Example",
93+
num_executors=2,
94+
executor_cores=2,
95+
executor_memory="4GB")
8396

84-
RayDP provides an API for creating a Ray MLDataset from a Spark dataframe. MLDataset represents a distributed dataset stored in Ray's in-memory object store. It supports transformation on each shard and can be converted to a PyTorch or Tensorflow dataset for distributed training. If you prefer to using Horovod on Ray or RaySGD for distributed training, you can use MLDataset to seamlessly integrate Spark with them.
97+
df = spark.range(0, 1000)
98+
ds = RayMLDataset.from_spark(df, num_shards=10)
99+
```
100+
Please refer to [Spark+Horovod on Ray](./examples/horovod_nyctaxi.py) for a full example.
85101

86102
***Estimator API***
87103

88-
RayDP also provides high level scikit-learn style Estimator APIs for distributed training. The Estimator APIs allow you to train a deep neural network directly on a Spark DataFrame, leveraging Ray’s ability to scale out across the cluster. The Estimator APIs are wrappers of RaySGD and hide the complexity of converting a Spark DataFrame to a PyTorch/Tensorflow dataset and distributing the training.
104+
The Estimator APIs allow you to train a deep neural network directly on a Spark DataFrame, leveraging Ray’s ability to scale out across the cluster. The Estimator APIs are wrappers of RaySGD and hide the complexity of converting a Spark DataFrame to a PyTorch/Tensorflow dataset and distributing the training. RayDP provides `raydp.torch.TorchEstimator` for PyTorch and `raydp.tf.TFEstimator` for Tensorflow. The following is an example of using TorchEstimator.
89105

90106
```python
91107
import ray
92108
import raydp
93109
from raydp.torch import TorchEstimator
94110

95111
ray.init(address="auto")
96-
spark = raydp.init_spark(app_name="RayDP example",
112+
spark = raydp.init_spark(app_name="RayDP Example",
97113
num_executors=2,
98114
executor_cores=2,
99115
executor_memory="4GB")
@@ -106,16 +122,16 @@ train_df = df.withColumn(…)
106122
model = torch.nn.Sequential(torch.nn.Linear(2, 1))
107123
optimizer = torch.optim.Adam(model.parameters())
108124

109-
# You can use the RayDP Estimator API or libraries like RaySGD for distributed training.
110125
estimator = TorchEstimator(model=model, optimizer=optimizer, ...)
111126
estimator.fit_on_spark(train_df)
112127

113128
raydp.stop_spark()
114129
```
130+
Please refer to [NYC Taxi PyTorch Estimator](./examples/pytorch_nyctaxi.py) and [NYC Taxi Tensorflow Estimator](./examples/tensorflow_nyctaxi.py) for full examples.
115131

116132
## MPI on Ray
117133

118-
RayDP also provides a simple API to running MPI job on top of Ray. Currently, we support three types of MPI: `intel_mpi`, `openmpi` and `MPICH`. You can refer [doc/mpi.md](./doc/mpi.md) for more details.
134+
RayDP also provides an API for running MPI job on Ray. We support three types of MPI: `intel_mpi`, `openmpi` and `MPICH`. You can refer to [doc/mpi.md](./doc/mpi.md) for more details.
119135

120136
## More Examples
121137
Not sure how to use RayDP? Check the `examples` folder. We have added many examples showing how RayDP works together with PyTorch, TensorFlow, XGBoost, Horovod, and so on. If you still cannot find what you want, feel free to post an issue to ask us!

doc/spark_on_ray.md

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,20 @@
1+
2+
### External Shuffle Service & Dynamic Resource Allocation
3+
4+
RayDP supports External Shuffle Serivce. To enable it, you can either set `spark.shuffle.service.enabled` to `true` in `spark-defaults.conf`, or you can provide a config to `raydp.init_spark`, as shown below:
5+
6+
```python
7+
raydp.init_spark(..., configs={"spark.shuffle.service.enabled": "true"})
8+
```
9+
10+
The user-provided config will overwrite those specified in `spark-defaults.conf`. By default Spark will load `spark-defaults.conf` from `$SPARK_HOME/conf`, you can also modify this location by setting `SPARK_CONF_DIR`.
11+
12+
Similarly, you can also enable Dynamic Executor Allocation this way. However, currently you must use Dynamic Executor Allocation with data persistence. You can write the data frame in spark to HDFS as a parquet as shown below:
13+
14+
```python
15+
ds = RayMLDataset.from_spark(..., fs_directory="hdfs://host:port/your/directory")
16+
```
17+
18+
### Spark Submit
19+
20+
RayDP provides a substitute for spark-submit in Apache Spark. You can run your java or scala application on RayDP cluster by using `bin/raydp-submit`. You can add it to `PATH` for convenience. When using `raydp-submit`, you should specify number of executors, number of cores and memory each executor by Spark properties, such as `--conf spark.executor.cores=1`, `--conf spark.executor.instances=1` and `--conf spark.executor.memory=500m`. `raydp-submit` only supports Ray cluster. Spark standalone, Apache Mesos, Apache Yarn are not supported, please use traditional `spark-submit` in that case. For the same reason, you do not need to specify `--master` in the command. Besides, RayDP does not support cluster as deploy-mode.

0 commit comments

Comments
 (0)