#Sparkling Water Table of Contents
- Compiling examples
- Running examples
- Configuring variables
- Step-by-step weather example
- Running Sparkling Water on Hadoop
- Importing data from HDFS
CraigslistJobTitlesStreamingApp- stream application - it predicts job category based on incoming job descriptionCraigslistJobTitlesApp- predict job category based on posted job descriptionChicagoCrimeAppSmall- builds a model predicting a probability of arrest for given crime in Chicago using data insmalldatadirectoryChicagoCrimeApp- implementation of Chicago Crime demo with setup for data stored on HDFSCitiBikeSharingDemo- predicts occupancy of Citi bike stations in NYCHamOrSpamDemo- shows Spam detector with Spark and H2O's DeepLearningProstateDemo- running K-means on prostate dataset (see smalldata/prostate.csv)DeepLearningDemo- running DeepLearning on a subset of airlines dataset (see smalldata/allyears2k_headers.csv.gz)AirlinesWithWeatherDemo- joining flights data with weather data and running Deep LearningAirlinesWithWeatherDemo2- new iteration ofAirlinesWithWeatherDemo
Run examples by typing
bin/run-example.sh <name of demo>or follow text below.
chicagoCrimeSmallShell.script.scala- demo showing full source code of predicting arrest probability for a given crime. It covers whole machine learning process from loading and transforming data, building models, scoring incoming events.chicagoCrimeSmall.script.scala- example of using ChicagoCrimeApp - creating application and using it for scoring individual crime events.mlconf_2015_hamSpam.script.scala- HamOrSpam application which detectes Spam messages. Presented at MLConf 2015 NYC.strata2015_demo.scala- NYC CitiBike demo presented at Strata 2015 in San Jose.StrataAirlines.scala- example of using flights and weather data to predict delay of a flight
Run examples by typing
bin/sparkling-shell -i <path to file with demo script>
To compile, use top-level gradlew:
./gradlew assemble
Run a given example on local cluster. The cluster is defined by MASTER address local-cluster[3,2,3072] which means that cluster contains 3 worker nodes, each having 2CPUs and 3GB of memory:
- Run
bin/run-example.sh <name of demo>
- Run the Spark cluster, for example via
bin/launch-spark-cloud.sh- Verify that Spark is running: The Spark UI on
http://localhost:8080/should show 3 worker nodes
- Verify that Spark is running: The Spark UI on
- Export
MASTERaddress of Spark master usingexport MASTER="spark://localhost:7077" - Run
bin/run-example.sh <name of demo> - Observe status of the application via Spark UI on
http://localhost:8080/
You can configure Sparkling Water using the following variables:
spark.h2o.cloud.timeout- number of milliseconds to wait for cloud formationspark.h2o.workers- number of expected H2O workers; it should be same as number of Spark workersspark.h2o.preserve.executors- do not kill executors via callingsc.stop()call
- Run Sparkling shell with an embedded cluster:
export SPARK_HOME="/path/to/spark/installation"
export MASTER="local[*]"
bin/sparkling-shell
-
To see the Sparkling shell (i.e., Spark driver) status, go to http://localhost:4040/.
-
Initialize H2O services on top of Spark cluster:
import org.apache.spark.h2o._
val h2oContext = H2OContext.getOrCreate(sc)
import h2oContext._
import h2oContext.implicits._- Load weather data for Chicago international airport (ORD), with help from the RDD API:
import org.apache.spark.examples.h2o._
val weatherDataFile = "examples/smalldata/Chicago_Ohare_International_Airport.csv"
val wrawdata = sc.textFile(weatherDataFile,3).cache()
val weatherTable = wrawdata.map(_.split(",")).map(row => WeatherParse(row)).filter(!_.isWrongRow())- Load airlines data using the H2O parser:
import java.io.File
val dataFile = "examples/smalldata/allyears2k_headers.csv.gz"
val airlinesData = new H2OFrame(new File(dataFile))- Select flights destined for Chicago (ORD):
val airlinesTable : RDD[Airlines] = asRDD[Airlines](airlinesData)
val flightsToORD = airlinesTable.filter(f => f.Dest==Some("ORD"))- Compute the number of these flights:
flightsToORD.count- Use Spark SQL to join the flight data with the weather data:
import sqlContext.implicits._
flightsToORD.toDF.registerTempTable("FlightsToORD")
weatherTable.toDF.registerTempTable("WeatherORD")- Perform SQL JOIN on both tables:
val bigTable = sqlContext.sql(
"""SELECT
|f.Year,f.Month,f.DayofMonth,
|f.CRSDepTime,f.CRSArrTime,f.CRSElapsedTime,
|f.UniqueCarrier,f.FlightNum,f.TailNum,
|f.Origin,f.Distance,
|w.TmaxF,w.TminF,w.TmeanF,w.PrcpIn,w.SnowIn,w.CDD,w.HDD,w.GDD,
|f.ArrDelay
|FROM FlightsToORD f
|JOIN WeatherORD w
|ON f.Year=w.Year AND f.Month=w.Month AND f.DayofMonth=w.Day""".stripMargin)- Transform the first 3 columns containing date information into enum columns:
val bigDataFrame: H2OFrame = h2oContext.asH2OFrame(bigTable)
for( i <- 0 to 2) bigDataFrame.replace(i, bigDataFrame.vec(i).toCategoricalVec)
bigDataFrame.update()- Run deep learning to produce a model estimating arrival delay:
import _root_.hex.deeplearning.DeepLearning
import _root_.hex.deeplearning.DeepLearningModel.DeepLearningParameters
import _root_.hex.deeplearning.DeepLearningModel.DeepLearningParameters.Activation
val dlParams = new DeepLearningParameters()
dlParams._train = bigDataFrame
dlParams._response_column = 'ArrDelay
dlParams._epochs = 5
dlParams._activation = Activation.RectifierWithDropout
dlParams._hidden = Array[Int](100, 100)
// Create a job
val dl = new DeepLearning(dlParams)
val dlModel = dl.trainModel.get- Use the model to estimate the delay on the training data:
val predictionH2OFrame = dlModel.score(bigTable)('predict)
val predictionsFromModel = asDataFrame(predictionH2OFrame)(sqlContext).collect.map(row => if (row.isNullAt(0)) Double.NaN else row(0))- Generate an R-code producing residual plot:
import org.apache.spark.examples.h2o.DemoUtils.residualPlotRCode
residualPlotRCode(predictionH2OFrame, 'predict, bigTable, 'ArrDelay) - Execute generated R-code in RStudio:
#
# R script for residual plot
#
# Import H2O library
library(h2o)
# Initialize H2O R-client
h2o.init()
# Fetch prediction and actual data, use remembered keys
pred = h2o.getFrame("dframe_b5f449d0c04ee75fda1b9bc865b14a69")
act = h2o.getFrame ("frame_rdd_14_b429e8b43d2d8c02899ccb61b72c4e57")
# Select right columns
predDelay = pred$predict
actDelay = act$ArrDelay
# Make sure that number of rows is same
nrow(actDelay) == nrow(predDelay)
# Compute residuals
residuals = predDelay - actDelay
# Plot residuals
compare = cbind (as.data.frame(actDelay$ArrDelay), as.data.frame(residuals$predict))
nrow(compare)
plot( compare[,1:2] )