Note: Run this before any spark command in the notebook. Restart interpreter if necessary!
%dep
z.reset()
z.addRepo("Spark Packages Repo").url("http://dl.bintray.com/spark-packages/maven")
z.load("com.databricks:spark-csv_2.10:1.3.0")
z.load("org.apache.spark:spark-streaming-kafka_2.10:1.6.0")
Calling sc will initialize the executors (org.apache.spark.executor.CoarseGrainedExecutorBackend) via yarn, if Zeppelin is configured as "yarn-client"
%pyspark
print(sc.version)A little convenience function to
- collect random samples of RDDs and DataFrames
- print RDDs and DataFrames in table form without switching to %sql by leveraging Zeppelin Display capabilities
%pyspark
def pprint(data, num=8, asTable=False, columns=None, sampleRatio=None, seed=42):
# If a sampleRatio is given, a random sample with given seed is selected
subset = data.sample(False, fraction=sampleRatio, seed=seed) if sampleRatio else data
# If it is a DataFrame, convert rows to arras and extract headers
if "rdd" in dir(data):
columns = subset.columns
subset = subset.map(lambda row: row.asDict().values())
# If num is -1 all records should be collected - avoid for big data ...
array = subset.collect() if num == -1 else subset.take(num)
# If asTable is True, sql format with columns c0, c1, ... as output
# If columns is array of column names, sql format with given columns as output
if asTable or columns:
output = ""
for d in array:
l = len(d)
output += "\t".join([str(x) for x in d]) + "\n"
if columns:
header = "\t".join([h for h in columns]) + "\n"
else:
header = "\t".join(["c%0d" %i for i in range(l) ]) + "\n"
print "%table " + header + output
else:
for d in array:
print d%pyspark
import random
data = [ [random.randint(10,99) for col in range(4)] for row in range(10)]
rdd = sc.parallelize(data, 4)
pprint(rdd, -1)%pyspark
m = rdd.map(lambda x: sum(x))
print(m.collect())
s = m.reduce(lambda x,y: x + y)
print "total = ", sAttribute Information:
[0] sepal length in cm
[1] sepal width in cm
[2] petal length in cm
[3] petal width in cm
[4] class: Iris Setosa, Iris Versicolour, Iris Virginica
%pyspark
def split(row):
parts = row.split(",")
return [float(v) for v in parts[:4]] + [parts[4]]
file = sc.textFile("/tmp/iris.data")
# remove empty lines and split each line
iris = file.filter(lambda row: len(row)>0)\
.map(split)
print iris.count()%pyspark
pprint(iris, sampleRatio=0.1, num=-1, columns=["sepL", "sepW", "petL", "petW", "species"])Calculate average sepal length per class
%pyspark
tuples = iris.map(lambda row: [row[4], row[0]])
result = tuples.groupByKey().mapValues(lambda row: sum(row)/len(row))
pprint(result)%pyspark
from pyspark.sql.types import *
schema = StructType([ \
StructField("sepalLength", DoubleType(), True), \
StructField("sepalWidth", DoubleType(), True), \
StructField("PetalLength", DoubleType(), True), \
StructField("PetalWidth", DoubleType(), True), \
StructField("Class", StringType(), True)
])
irisDf = sqlContext.createDataFrame(iris, schema=schema)
sqlContext.registerDataFrameAsTable(irisDf, "Iris")
irisDf.show()%pyspark
irisDf.select(["Class", "sepalLength"]).groupBy("Class").avg("sepalLength").show()%sql
select Class, avg(sepalLength) as avgSL
from Iris
group by ClassChanges: Reduced to a subset with countries of the European Union only (AUT, BEL, BUL, CYP, CZE, DEU, DNK, ESP, EST, FIN, FRA, GBR, GRC, HRV, HUN, IRL, ITA, LTU, LUX, LVA, MLT, NLD, POL, PRT, ROM, SVK, SVN, SWE)
%sh
hdfs dfs -ls /tmp/europe-indicators.csv%pyspark
from pyspark.sql.types import *
schema = StructType([ \
StructField("CountryName", StringType(), True), \
StructField("CountryCode", StringType(), True), \
StructField("IndicatorName", StringType(), True), \
StructField("IndicatorCode", StringType(), True), \
StructField("Year", IntegerType(), True), \
StructField("Value", DoubleType(), True) \
])
indicators_csv = sqlContext.read.load('/tmp/europe-indicators.csv', format='com.databricks.spark.csv', header='true', schema=schema).cache()
sqlContext.registerDataFrameAsTable(indicators_csv, "IndicatorsRDD")
print(indicators_csv.count())
Let's look at the schema of the Indicators table
%pyspark
indicators_csv.printSchema()
indicators_csv.sample(False, 0.1).show()Code/value encoding is not that optimal ... Let's transform the data set and store the result os ORC
Spark 1.5 does not provide a pivot method for DataFrames, hence we need to write our own pivot via RDDs and aggregateByKey
Some caveats for this step:
- Return a row from
merge, python dictionaries are deprecated **valueis a nice trick to convert a dictionary to a keyword parameter list (Rows are unmutable)- Initialize with all indicators and set them to None
.are not allowed in column names, so replace with_
%pyspark
columns = indicators_csv.map(lambda row: row.IndicatorCode.replace(".", "_")).distinct().collect()
bc = sc.broadcast(columns)
def seq(u, v):
if u == None:
u = {ind: None for ind in bc.value} # Use value of broadcast variable to initialize the dictionary and ensure all rows have all indicators
u[v.IndicatorCode.replace(".","_")] = v.Value # Set this indicators value converted to float
return u
def comb(u1, u2):
u1.update(u2)
return u1
def merge(keys, value):
value["Country"] = keys[0]
value["Year"] = int(keys[1])
return Row(**value)
data = indicators_csv.select(["CountryCode", "IndicatorCode", "Year", "Value"])\
.rdd\
.keyBy(lambda row: row.CountryCode + "|" + str(row.Year))\
.aggregateByKey(None, seq, comb)\
.map(lambda tuple: merge(tuple[0].split("|"), tuple[1]))\
.cache()
Finally, transform RDD back to DataFrame and register a table with the hiveContext (due to ORC)
Notes:
- The StructType schema has to be sorted! Spark does not match schema names with Row column names but uses the order of elements in Row and schema to apply types
- Also, due to the many null values, automatic schema inference will only work properly when "samplingRatio=100" in createDataFrame. However, I wouldn't rely on it ...
%pyspark
from pyspark.sql.types import *
sqlContext.setConf("spark.sql.orc.filterPushdown", "true")
fields = [StructField(ind, DoubleType(), True) for ind in columns ] + \
[StructField("Year", IntegerType(), False), StructField("Country", StringType(), False)]
sortedFields = sorted(fields, key=lambda x: x.name)
sortedSchema = StructType(fields=sortedFields)
indicators = sqlContext.createDataFrame(data, schema = sortedSchema)
sqlContext.registerDataFrameAsTable(indicators, "Indicators")%sh
hdfs dfs -rm -r /tmp/europe-indicators_transformed_orc
%pyspark
indicators.write.orc("/tmp/europe-indicators_transformed_orc")Load ORC data again to benefit from predicate pushdow, etc
%pyspark
indicators_t = sqlContext.read.orc("/tmp/europe-indicators_transformed_orc")
sqlContext.registerDataFrameAsTable(indicators_t, "Indicators_t")
sqlContext.cacheTable("Indicators_t")Execute some queries
%sql
-- SP.DYN.CBRT.IN: Birth rate, crude (per 1,000 people)
select Country, Year, SP_DYN_CBRT_IN from Indicators_t
where Country in ('AUT', 'FRA', 'DEU', 'GRC', 'IRL', 'ITA', 'NLD', 'PRT', 'ESP', 'GBR')
and Year > 1990
order by Country, Year
%sql
-- SL.UEM.1524.NE.ZS: Unemployment, youth total (% of total labor force ages 15-24) (national estimate)
select Country, Year, SL_UEM_1524_NE_ZS from Indicators_t
where Country in ('AUT', 'FRA', 'DEU', 'GRC', 'IRL', 'ITA', 'NLD', 'PRT', 'ESP', 'GBR')
and Year > 1990
order by Country, Year
%sql
-- SL.UEM.1524.NE.ZS: Unemployment, youth total (% of total labor force ages 15-24) (national estimate)
-- SL.UEM.TOTL.NE.ZS: Unemployment, total (% of total labor force) (national estimate)
-- SP.DYN.CBRT.IN: Birth rate, crude (per 1,000 people)
select Country, Year, SL_UEM_1524_NE_ZS, SP_DYN_CBRT_IN from Indicators_t
where Country in ('AUT', 'FRA', 'DEU', 'GRC', 'IRL', 'ITA', 'NLD', 'PRT', 'ESP', 'GBR')
and Year > 1990
and Year < 2015
order by Country, Year
Of course, this result could have been calculated without pivoting the table
%pyspark
sqlContext.registerDataFrameAsTable(indicators_csv, "Indicators")%sql
select Year, CountryCode, max(SL) as UNEM, max(SP) as CBRT from
(select Year, CountryCode,
case IndicatorCode when 'SP.DYN.CBRT.IN' then max(Value) else NULL end as SP,
case IndicatorCode when 'SL.UEM.1524.NE.ZS' then max(Value) else NULL end as SL
from Indicators
where IndicatorCode in ('SP.DYN.CBRT.IN', 'SL.UEM.1524.NE.ZS')
and CountryCode in ('AUT', 'FRA', 'DEU', 'GRC', 'IRL', 'ITA', 'NLD', 'PRT', 'ESP', 'GBR')
and year > 1990
group by Year, CountryCode, IndicatorCode
order by Year, CountryCode
) Indicators2
group by CountryCode, Year%pyspark
def cvtCodes(code):
return code.lower().replace(".", "_")
features = [
cvtCodes(c) for c in [
"SL.UEM.1524.NE.ZS", # Unemployment, youth total (% of total labor force ages 15-24) (national estimate)
"GC.BAL.CASH.GD.ZS", # Cash surplus/deficit (% of GDP)
"FP.CPI.TOTL.ZG" # Inflation, consumer prices (annual %)
]
]
years = [2007, 2008, 2009, 2010, 2011, 2012]
eu = indicators_t[indicators_t.Year.isin(years)].select(["country", "year"] + features)
sqlContext.registerDataFrameAsTable(eu, "eu")
Note: Input columns (features) need to be in Vector format. pyspark.ml.feature.VectorAssembler allows to pipeline this
%pyspark
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.clustering import KMeans, KMeansModel
from pyspark.ml import Pipeline
assembler = VectorAssembler(inputCols=features, outputCol="features")
kmeans = KMeans(k=3, seed=42)
pipeline = Pipeline(stages=[assembler, kmeans])
model = pipeline.fit(eu)
transformed = model.transform(eu).select("country", "year", "prediction").sort(["country", "year"])
sqlContext.registerDataFrameAsTable(transformed, "Classes")Caveat: The classes are not to interprete as an ordered list, they are complete random!
%sql
select country, year, prediction + 1 as class
from Classes
order by country, year
Note: There is no function for GroupedData to collect values as list. Hence, back to RDD and aggregateByKey
%pyspark
def seq(u, v):
if u == None: u = []
u.append(v.country)
return u
def comb(u1, u2):
return u1 + u2
data = transformed.select(["year", "country", "prediction"])\
.rdd\
.keyBy(lambda row: str(row.year) + ":" + str(row.prediction))\
.aggregateByKey(None, seq, comb)\
.sortByKey()\
.map(lambda tuple: (tuple[0], ", ".join(sorted(tuple[1]))))
year = ""
for c in data.collect():
y, cl = c[0].split(":")
if y != year:
print "\nYear:", y
year = y
print cl, "=", c[1]
%sql
select distinct IndicatorCode, IndicatorName from IndicatorsRDD
where indicatorName like "%unem%"
order by IndicatorCodeSteps:
-
Open an ssh terminal to your cluster
-
Get the python code
wget https://raw.githubusercontent.com/anset/SparkDemos/master/SimpleKafkaStreaming/direct_kafka_wordcount.py
-
Look at SimpleKafkaStreaming and follow Steps 2 and 3