Skip to content

Nick/week2 sql dataframe actions #54

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 8 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 1 addition & 2 deletions resources/requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -2,5 +2,4 @@ pyspark==3.1.3
kafka-python #Used by Kafka connection test, simple producer to Kafka
faker #Used to generate fake customer data
boto3 #Used to create tables in Glue Data Catalog that can be queried by Athena
tabulate #Used to print Kafka messages in an aesthetic way
python-dotenv #Automatically pull from a .env file.
tabulate #Used to print Kafka messages in an aesthetic way
123 changes: 123 additions & 0 deletions week2/test_week2_lab.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,123 @@
from pyspark.sql import SparkSession, DataFrame
from pyspark.sql.types import StructType, StructField, StringType
import pandas as pd
import unittest

# Welcome to the stretch challenge. The code below has two sections:
#
# 1) The method under test - you will fill out the method body
# implementing the behavior called out in the doc string.
#
# 2) The tests verifying the method operates per the documentation -
# you simply execute these and keep iterating on the method under
# test until all of these pass.


################# method under test

def get_most_frequent_product_categories(df: DataFrame) -> DataFrame:
'''
Gets the most frequently occurring values from the String 'product_category'
column passed into the DataFrame.

Raises:
ValueError: The DataFrame does not contain a 'product_category' column.

Parameters:
df (DataFrame): The DataFrame that contains the 'product_category' column.

Returns:
A DataFrame with one column named 'product_category' containing only the
most frequent 'product_category' values. If no rows are included in the
input DataFrame, zero rows are returned in the output DataFrame.
'''
# TODO: fill in with actual implementation
return None


################# tests

def get_most_frequent_product_categories__one_most_frequent__most_frequent_returned():
### ARRANGE
sample_data = [{"color": "yellow", "product_category": "movies", "title": "Mary Poppins"},
{"color": "blue", "product_category": "board games", "title": "Monopoly"},
{"color": "green", "product_category": "video games", "title": "Halo"},
{"color": "red", "product_category": "board games", "title": "Risk"}]
df = spark.createDataFrame(sample_data)

expected_data = [{"product_category": "board games"}]
expected_df = spark.createDataFrame(data=expected_data)

### ACT
result_df: DataFrame = get_most_frequent_product_categories(df)

### ASSERT
pd.testing.assert_frame_equal(result_df.toPandas(), expected_df.toPandas())


def get_most_frequent_product_categories__two_most_frequent__two_frequent_returned():
### ARRANGE
sample_data = [{"color": "yellow", "product_category": "movies", "title": "Mary Poppins"},
{"color": "blue", "product_category": "board games", "title": "Monopoly"},
{"color": "green", "product_category": "video games", "title": "Halo"},
{"color": "red", "product_category": "board games", "title": "Risk"},
{"color": "purple", "product_category": "movies", "title": "Purple Rain"}]
df = spark.createDataFrame(sample_data)
expected_data = [{"product_category": "movies"}, {"product_category": "board games"}]
expected_df = spark.createDataFrame(data=expected_data)

### ACT
result_df = get_most_frequent_product_categories(df)

### ASSERT
pd.testing.assert_frame_equal(result_df.toPandas(), expected_df.toPandas())


def get_most_frequent_product_categories__no_rows__none_returned():
### ARRANGE
# define the schema, but include no data
schema = StructType([
StructField("color", StringType(), True),
StructField("product_category", StringType(), True),
StructField("title", StringType(), True)
])
df = spark.createDataFrame([], schema)

### ACT
result = get_most_frequent_product_categories(df)

### ASSERT
assert result is None


def get_most_frequent_product_categories__no_product_category_column__value_error_raised():
### ARRANGE
# define the schema, but include no data
schema = StructType([
StructField("color", StringType(), True),
#StructField("product_category", StringType(), True), # commented out to generate the error
StructField("title", StringType(), True)
])
df = spark.createDataFrame([], schema)
result_ve: ValueError = None

### ACT
try:
result = get_most_frequent_product_categories(df)
except ValueError as ve:
result_ve = ve

### ASSERT
assert not result_ve is None


################# executing tests

spark = SparkSession.builder.appName("Testing get_most_frequent_product_categories").getOrCreate()

get_most_frequent_product_categories__one_most_frequent__most_frequent_returned()
get_most_frequent_product_categories__two_most_frequent__two_frequent_returned()
get_most_frequent_product_categories__no_rows__none_returned()
get_most_frequent_product_categories__no_product_category_column__value_error_raised()

spark.stop()
26 changes: 26 additions & 0 deletions week2/week2_README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
# Hours with Experts - Week 2 Overview

This week we will cover the following:
<div class='overview'>

* PySpark has multiple APIs, two of which we are going to use
* Spark SQL API
* Spark DataFrame API
* Spark Transforms and Actions

</div>

# Week 2 Homework and Lab
* Watch the content from the 'Week 2 Content' section below
* Complete the Week 2 Lab at [week2_lab.py](week2_lab.py)

## Week 2 Content
Use this content to learn about the topics and help with the week 2 labs.
- [Instruction and videos](https://where-ever.com)

## Week 2 Stretch Challenge
In the week 2 stretch challenge you will implement a unit tested method body that requires Spark.
We complete this exercise as a gentle introduction to unit testing. If you want to know why unit
testing is important, [please watch this video](https://www.youtube.com/watch?v=Ezz5nAH1opA).
In later weeks we will do a deeper dive to disect the components of the unit test so by the end
of this course you know how to write your own unit tests from scratch.
111 changes: 111 additions & 0 deletions week2/week2_lab.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
####### Welcome to the Week 2 Lab!
# Go throug the following exercises where we learn about
# PySparks SQL and DataFrame APIs. In addition we will learn
# the difference between Spark Transforms and Actions.

####### import libraries and setup environment
from pyspark.sql import SparkSession, DataFrame

####### perform environment tests
# TODO section

####### setup environment

# Create a SparkSession
spark = None

# For Windows users, quiet errors about not being able to delete temporary directories which make your logs impossible to read...
logger = spark.sparkContext._jvm.org.apache.log4j
logger.LogManager.getLogger("org.apache.spark.util.ShutdownHookManager"). setLevel( logger.Level.OFF )
logger.LogManager.getLogger("org.apache.spark.SparkEnv"). setLevel( logger.Level.ERROR )

### Challeneges and Questions

# Note - some of these challenges/questions are asking you to do the same thing twice, with two
# different PySPark APIs. Please complete those challenges with both PySpark APIs
# verify you get the same answer. The challenges that are asking for to use the two APIs
# will be in the following format:
## Challenge X: <challenge content>
# With SQL
# With DataFrame

## Challenge 1: Read the tab separated file named "resources/reviews.tsv.gz" into a dataframe and
# name the variable 'reviews'. For this homework it's important that you don't use the infer
# schema option when reading the file.
reviews = None

# Question 1.1: What type of character seperates the content of each rows?
# Question 1.2: Why might a data engineer NOT want to infer the schema?

## Challenge 2: Print the first 5 rows of the dataframe.
# Some of the columns are long - print the entire record, regardless of length.


## Challenge 3: Print the schema and verify that all columns are strings and
# the following columns are found in the DataFrame:
# TODO: fill out with expected columns


## Challenge 4: Create a virtual view on top of the reviews dataframe, so that we can query it with Spark SQL.
# Name the virtual view 'reviews_view' for future referencing.


## Challenge 5: Determine the number of records in the 'reviews_view' view and the 'reviews' dataframe.
# With SQL - use 'reviews_view'
# With DataFrame - use 'reviews' dataframe

# Question 5.1: How many records are there?


## Challenge 6: Create a new dataframe based on "reviews" with exactly 1 column, the: the value
# of the 'product_category' field. Determine the 'product_category' that is most common.
# With SQL - use 'reviews_view'
# With DataFrame - use 'reviews' dataframe

# Question 6.1: Which value is the most common?


## Challenge 7: Find how many reviews exist in the dataframe with a 5 star rating.
# With SQL - use 'reviews_view'
# With DataFrame - use 'reviews' dataframe

# Question 7.1: How many reviews exist in the dataframe with a 5 star rating?


## Challenge 8: Find the most helpful review in the dataframe - the one with
# the highest number of helpful votes.
# With SQL - use 'reviews_view'
# With DataFrame - use 'reviews' dataframe

# Question 8.1: What is the product title for that review?
# Question 8.2: How many helpful votes did it have?


## Challenge 9: Find the date with the most purchases.
# With SQL - use 'reviews_view'
# With DataFrame - use 'reviews' dataframe

# Question 9.1: What is the date with the most purchases?
# Question 9.2: What is the count of purchases?


# Challenge 10: Currently every field in the data file is interpreted as a string,
# but there are 3 that should really be numbers. Create a new dataframe with just
# keeping the original columns but casting the 3 columns that should be integers
# as actually ints.


# Challenge 11: Write the dataframe from Challenge 10 to your drive in JSON format.
# Feel free to pick any directory on your computer.
# Use overwrite mode.

### Teardown
# Stop the SparkSession




####### Stretch Challenges

## Stretch Challenge:
# Go to 'test_week2_lab.py' for the stretch challenge
3 changes: 3 additions & 0 deletions week6_gold/week6_build_gold_layer.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,9 @@
#Define a streaming dataframe using readStream on top of the silver reviews directory on S3
silver_data = None

# TODO: figure out how to apply the review timestamp
# Question 3: Add a column to the dataframe named "review_timestamp", representing the current time on your computer.

#Define a watermarked_data dataframe by defining a watermark on the `review_timestamp` column with an interval of 10 seconds
watermarked_data = None

Expand Down