Battery of code examples to demonstrate the capabilities of Pravega as a data stream storage system for Apache Spark.
Install Ubuntu 18.04 LTS. Other operating systems can also be used but the commands below have only been tested on this version.
apt-get install openjdk-8-jdk
See https://docs.docker.com/install/linux/docker-ce/ubuntu/ and https://docs.docker.com/compose/install/.
This will run a development instance of Pravega locally. Note that the default standalone Pravega used for development is likely insufficient for testing video because it stores all data in memory and quickly runs out of memory. Using the procedure below, all data will be stored in a small HDFS cluster in Docker.
In the command below, replace x.x.x.x with the IP address of a local network interface such as eth0.
cd
git clone https://github.com/pravega/pravega
cd pravega
git checkout r0.8
cd docker/compose
export HOST_IP=x.x.x.x
docker-compose up -d
You can view the Pravega logs with docker-compose logs --follow
.
You can view the stream files stored on HDFS with docker-compose exec hdfs hdfs dfs -ls -h -R /
.
This will install a development instance of Spark locally.
Download https://www.apache.org/dyn/closer.lua/spark/spark-2.4.6/spark-2.4.6-bin-hadoop2.7.tgz.
mkdir -p ~/spark
cd ~/spark
tar -xzvf ~/Downloads/spark-2.4.6-bin-hadoop2.7.tgz
ln -s spark-2.4.6-bin-hadoop2.7 current
export PATH="$HOME/spark/current/bin:$PATH"
By default, the script run_spark_ap.sh
will use an in-process Spark mini-cluster
that is started with the Spark job (--master local[2]
).
This will build the Spark Connector and publish it to your local Maven repository.
cd
git clone https://github.com/pravega/spark-connectors
cd spark-connectors
./gradlew install
ls -lhR ~/.m2/repository/io/pravega/pravega-connectors-spark
Run a PySpark batch job that reads events from the file sample_data.json and writes to a Pravega stream
./run_pyspark_app.sh src/main/python/batch_file_to_pravega.py
The file sample_data.json has the following conents:
{"id": 1, "key": "a", "message": "Hello world."}
{"id": 2, "key": "b", "message": "Welcome to Pravega!"}
{"id": 3, "key": "a", "message": "Another message."}
{"id": 4, "key": "c", "message": "Another message."}
{"id": 5, "key": "c", "message": "Another message."}
You should see output similar to the following:
+--------------------------------------------------+-----------+
|event |routing_key|
+--------------------------------------------------+-----------+
|{"id":1,"key":"a","message":"Hello world."} |a |
|{"id":2,"key":"b","message":"Welcome to Pravega!"}|b |
|{"id":3,"key":"a","message":"Another message."} |a |
|{"id":4,"key":"c","message":"Another message."} |c |
|{"id":5,"key":"c","message":"Another message."} |c |
+--------------------------------------------------+-----------+
./run_pyspark_app.sh src/main/python/batch_pravega_to_console.py
You should see output similar to the following:
+--------------------------------------------------+--------+------------+----------+------+
|event |scope |stream |segment_id|offset|
+--------------------------------------------------+--------+------------+----------+------+
|{"id":4,"key":"c","message":"Another message."} |examples|batchstream1|1 |0 |
|{"id":5,"key":"c","message":"Another message."} |examples|batchstream1|1 |55 |
|{"id":1,"key":"a","message":"Hello world."} |examples|batchstream1|4 |0 |
|{"id":2,"key":"b","message":"Welcome to Pravega!"}|examples|batchstream1|4 |51 |
|{"id":3,"key":"a","message":"Another message."} |examples|batchstream1|4 |109 |
+--------------------------------------------------+--------+------------+----------+------+
./run_pyspark_app.sh src/main/python/stream_generated_data_to_pravega.py
This job will continue to run and write events until stopped.
rm -rf /tmp/spark-checkpoints-stream_pravega_to_console
./run_pyspark_app.sh src/main/python/stream_pravega_to_console.py
This will read events generated by stream_generated_data_to_pravega.py. It will continue to run until stopped.
You should see output similar to the following every 3 seconds:
+-----------------------+--------+-----------------+----------+------+
|event |scope |stream |segment_id|offset|
+-----------------------+--------+-----------------+----------+------+
|2019-05-23 14:40:08.329|examples|streamprocessing1|0 |181753|
|2019-05-23 14:40:10.329|examples|streamprocessing1|0 |181784|
|2019-05-23 14:40:09.329|examples|streamprocessing1|0 |181815|
+-----------------------+--------+-----------------+----------+------+
rm -rf /tmp/spark-checkpoints-stream_pravega_to_pravega
./run_spark_app.sh src/main/python/stream_pravega_to_pravega.py
This will read events generated by stream_generated_data_to_pravega.py. It will continue to run until stopped.
rm -rf /tmp/spark-checkpoints-StreamPravegaToConsole
./run_java_spark_app.sh io.pravega.example.spark.StreamPravegaToConsole
This will read events generated by stream_generated_data_to_pravega.py. It will continue to run until stopped.
You should see output similar to the following every 3 seconds:
+-----------------------+--------+-----------------+----------+------+
|event |scope |stream |segment_id|offset|
+-----------------------+--------+-----------------+----------+------+
|2019-05-23 14:45:59.329|examples|streamprocessing1|0 |192634|
|2019-05-23 14:46:01.329|examples|streamprocessing1|0 |192665|
|2019-05-23 14:46:00.329|examples|streamprocessing1|0 |192696|
+-----------------------+--------+-----------------+----------+------+
Start a separate Spark server process.
~/spark/current/sbin/start-all.sh
Confirm that you can browse to the Spark Master UI at http://localhost:8080/.
Submit the job.
USE_IN_PROCESS_SPARK=0 ./run_pyspark_app.sh src/main/python/stream_generated_data_to_pravega.py
USE_IN_PROCESS_SPARK=0 ./run_java_spark_app.sh io.pravega.example.spark.StreamPravegaToConsole
These steps show how to use Anaconda Python to run PySpark applications. Anaconda Python is configured with Numpy, Pandas, and TensorFlow.
See https://www.anaconda.com/rpm-and-debian-repositories-for-miniconda/.
source /opt/conda/etc/profile.d/conda.sh
src/main/python/create_conda_env.sh
conda activate pravega-samples
Run the following before executing spark-submit
.
export PYSPARK_PYTHON=$HOME/.conda/envs/pravega-samples/bin/python