Skip to content

Commit 87fecf3

Browse files
committed
[DELTA-OSS-EXTERNAL] Added SQL examples and integration tests for python/scala
- Modified old tests to work with Spark 3.0 by adding the right spark conf - Added a new test to test SQL on metastore tables and paths - Updated integration testing script to run the examples in a fine-grained manner. Originally done by @rahulsmahadev in #426 Closes #451 Co-authored-by: Rahul Mahadev <[email protected]> Signed-off-by: Tathagata Das <[email protected]> Author: Tathagata Das <[email protected]> #10659 is resolved by tdas/qfwc2lq2. GitOrigin-RevId: 4247f17fc78e741b01f5ee47cc8b96f776f4b250
1 parent a0db1e7 commit 87fecf3

13 files changed

+447
-105
lines changed

examples/README.md

+4-4
Original file line numberDiff line numberDiff line change
@@ -2,9 +2,9 @@
22
In this folder there are examples taken from the delta.io quickstart guide and docs. They are available in both Scala and Python and can be run if the prerequisites are satisfied.
33

44
### Prerequisites
5-
* Apache Spark version 2.4.2 or above
6-
* PySpark is required for running python examples
5+
* For Python examples, PySpark 3.0.0 or above needs to be installed.
6+
* For Scala examples, Spark does not need to be install because it depends on Spark maven artifacts.
77

88
### Instructions
9-
* To run an example in Python run `spark-submit --packages io.delta:delta-core_2.11:0.5.0 PATH/TO/EXAMPLE`
10-
* To run the Scala examples, `cd examples/scala` and run `./build/sbt "runMain example.{Example class name}"` e.g. `./build/sbt "runMain example.Quickstart"`
9+
* To run an example in Python run `spark-submit --packages io.delta:delta-core_2.12:0.7.0 PATH/TO/EXAMPLE`
10+
* To run the Scala examples, `cd examples/scala` and run `./build/sbt "runMain example.{Example class name}"` e.g. `./build/sbt "runMain example.Quickstart"`

examples/python/quickstart.py

+6-8
Original file line numberDiff line numberDiff line change
@@ -14,8 +14,7 @@
1414
# limitations under the License.
1515
#
1616

17-
from pyspark import SparkContext
18-
from pyspark.sql import Column, DataFrame, SparkSession, SQLContext, functions
17+
from pyspark.sql import Column, DataFrame, SparkSession, functions
1918
from pyspark.sql.functions import *
2019
from py4j.java_collections import MapConverter
2120
from delta.tables import *
@@ -28,14 +27,13 @@
2827
except:
2928
pass
3029

31-
# Create SparkContext
32-
sc = SparkContext()
33-
sqlContext = SQLContext(sc)
34-
35-
spark = SparkSession \
36-
.builder \
30+
# Enable SQL commands and Update/Delete/Merge for the current spark session.
31+
# we need to set the following configs
32+
spark = SparkSession.builder \
3733
.appName("quickstart") \
3834
.master("local[*]") \
35+
.config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
36+
.config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog") \
3937
.getOrCreate()
4038

4139
# Create a table

examples/python/quickstart_sql.py

+73
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,73 @@
1+
2+
from pyspark.sql import Column, DataFrame, SparkSession, functions
3+
from pyspark.sql.functions import *
4+
from py4j.java_collections import MapConverter
5+
from delta.tables import *
6+
import shutil
7+
import threading
8+
9+
tableName = "tbltestpython"
10+
11+
# Enable SQL/DML commands and Metastore tables for the current spark session.
12+
# We need to set the following configs
13+
14+
spark = SparkSession.builder \
15+
.appName("quickstart_sql") \
16+
.master("local[*]") \
17+
.config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
18+
.config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog") \
19+
.getOrCreate()
20+
21+
# Clear any previous runs
22+
spark.sql("DROP TABLE IF EXISTS " + tableName)
23+
spark.sql("DROP TABLE IF EXISTS newData")
24+
25+
try:
26+
# Create a table
27+
print("############# Creating a table ###############")
28+
spark.sql("CREATE TABLE %s(id LONG) USING delta" % tableName)
29+
spark.sql("INSERT INTO %s VALUES 0, 1, 2, 3, 4" % tableName)
30+
31+
# Read the table
32+
print("############ Reading the table ###############")
33+
spark.sql("SELECT * FROM %s" % tableName).show()
34+
35+
# Upsert (merge) new data
36+
print("########### Upsert new data #############")
37+
spark.sql("CREATE TABLE newData(id LONG) USING parquet")
38+
spark.sql("INSERT INTO newData VALUES 3, 4, 5, 6")
39+
40+
spark.sql('''MERGE INTO {0} USING newData
41+
ON {0}.id = newData.id
42+
WHEN MATCHED THEN
43+
UPDATE SET {0}.id = newData.id
44+
WHEN NOT MATCHED THEN INSERT *
45+
'''.format(tableName))
46+
47+
spark.sql("SELECT * FROM %s" % tableName).show()
48+
49+
# Update table data
50+
print("########## Overwrite the table ###########")
51+
spark.sql("INSERT OVERWRITE %s select * FROM (VALUES 5, 6, 7, 8, 9) x (id)" % tableName)
52+
spark.sql("SELECT * FROM %s" % tableName).show()
53+
54+
# Update every even value by adding 100 to it
55+
print("########### Update to the table(add 100 to every even value) ##############")
56+
spark.sql("UPDATE {0} SET id = (id + 100) WHERE (id % 2 == 0)".format(tableName))
57+
spark.sql("SELECT * FROM %s" % tableName).show()
58+
59+
# Delete every even value
60+
print("######### Delete every even value ##############")
61+
spark.sql("DELETE FROM {0} WHERE (id % 2 == 0)".format(tableName))
62+
spark.sql("SELECT * FROM %s" % tableName).show()
63+
64+
# Read old version of data using time travel
65+
print("######## Read old data using time travel ############")
66+
df = spark.read.format("delta").option("versionAsOf", 0).table(tableName)
67+
df.show()
68+
69+
finally:
70+
# cleanup
71+
spark.sql("DROP TABLE " + tableName)
72+
spark.sql("DROP TABLE IF EXISTS newData")
73+
spark.stop()
+73
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,73 @@
1+
2+
from pyspark.sql import Column, DataFrame, SparkSession, functions
3+
from pyspark.sql.functions import *
4+
from py4j.java_collections import MapConverter
5+
from delta.tables import *
6+
import shutil
7+
import threading
8+
import tempfile
9+
import os
10+
11+
table_dir = "/tmp/delta-table"
12+
# Clear any previous runs
13+
try:
14+
shutil.rmtree(table_dir)
15+
except:
16+
pass
17+
18+
# Enable SQL/DML commands and Metastore tables for the current spark session.
19+
# We need to set the following configs
20+
21+
spark = SparkSession.builder \
22+
.appName("quickstart_sql_on_paths") \
23+
.master("local[*]") \
24+
.config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
25+
.config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog") \
26+
.getOrCreate()
27+
28+
# Clear any previous runs
29+
spark.sql("DROP TABLE IF EXISTS newData")
30+
31+
try:
32+
# Create a table
33+
print("############# Creating a table ###############")
34+
spark.sql("CREATE TABLE delta.`%s`(id LONG) USING delta" % table_dir)
35+
spark.sql("INSERT INTO delta.`%s` VALUES 0, 1, 2, 3, 4" % table_dir)
36+
37+
# Read the table
38+
print("############ Reading the table ###############")
39+
spark.sql("SELECT * FROM delta.`%s`" % table_dir).show()
40+
41+
# Upsert (merge) new data
42+
print("########### Upsert new data #############")
43+
spark.sql("CREATE TABLE newData(id LONG) USING parquet")
44+
spark.sql("INSERT INTO newData VALUES 3, 4, 5, 6")
45+
46+
spark.sql('''MERGE INTO delta.`{0}` AS data USING newData
47+
ON data.id = newData.id
48+
WHEN MATCHED THEN
49+
UPDATE SET data.id = newData.id
50+
WHEN NOT MATCHED THEN INSERT *
51+
'''.format(table_dir))
52+
53+
spark.sql("SELECT * FROM delta.`%s`" % table_dir).show()
54+
55+
# Update table data
56+
print("########## Overwrite the table ###########")
57+
spark.sql("INSERT OVERWRITE delta.`%s` select * FROM (VALUES 5, 6, 7, 8, 9) x (id)" % table_dir)
58+
spark.sql("SELECT * FROM delta.`%s`" % table_dir).show()
59+
60+
# Update every even value by adding 100 to it
61+
print("########### Update to the table(add 100 to every even value) ##############")
62+
spark.sql("UPDATE delta.`{0}` SET id = (id + 100) WHERE (id % 2 == 0)".format(table_dir))
63+
spark.sql("SELECT * FROM delta.`%s`" % table_dir).show()
64+
65+
# Delete every even value
66+
print("######### Delete every even value ##############")
67+
spark.sql("DELETE FROM delta.`{0}` WHERE (id % 2 == 0)".format(table_dir))
68+
spark.sql("SELECT * FROM delta.`%s`" % table_dir).show()
69+
70+
finally:
71+
# cleanup
72+
spark.sql("DROP TABLE IF EXISTS newData")
73+
spark.stop()

examples/python/streaming.py

+24-30
Original file line numberDiff line numberDiff line change
@@ -14,52 +14,47 @@
1414
# limitations under the License.
1515
#
1616

17-
from pyspark import SparkContext
18-
from pyspark.sql import Column, DataFrame, SparkSession, SQLContext, functions
17+
from pyspark.sql import Column, DataFrame, SparkSession, functions
1918
from pyspark.sql.functions import *
2019
from py4j.java_collections import MapConverter
2120
from delta.tables import *
2221
import shutil
2322
import random
2423
import threading
2524

26-
# Clear previous run delta-tables
27-
files = ["/tmp/delta-table", "/tmp/delta-table2", "/tmp/delta-table3", "/tmp/delta-table4",
28-
"/tmp/delta-table5", "/tmp/checkpoint/tbl1"]
29-
for i in files:
30-
try:
31-
shutil.rmtree(i)
32-
except:
33-
pass
34-
35-
# Create SparkContext
36-
sc = SparkContext()
37-
sqlContext = SQLContext(sc)
38-
39-
spark = SparkSession \
40-
.builder \
25+
26+
# Enable SQL commands and Update/Delete/Merge for the current spark session.
27+
# we need to set the following configs
28+
spark = SparkSession.builder \
4129
.appName("streaming") \
4230
.master("local[*]") \
31+
.config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
32+
.config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog") \
4333
.getOrCreate()
4434

35+
try:
36+
shutil.rmtree("/tmp/delta-streaming/")
37+
except:
38+
pass
39+
4540
# Create a table(key, value) of some data
4641
data = spark.range(8)
4742
data = data.withColumn("value", data.id + random.randint(0, 5000))
48-
data.write.format("delta").save("/tmp/delta-table")
43+
data.write.format("delta").save("/tmp/delta-streaming/delta-table")
4944

5045
# Stream writes to the table
5146
print("####### Streaming write ######")
5247
streamingDf = spark.readStream.format("rate").load()
5348
stream = streamingDf.selectExpr("value as id").writeStream\
5449
.format("delta")\
55-
.option("checkpointLocation", "/tmp/checkpoint")\
56-
.start("/tmp/delta-table2")
50+
.option("checkpointLocation", "/tmp/delta-streaming/checkpoint")\
51+
.start("/tmp/delta-streaming/delta-table2")
5752
stream.awaitTermination(10)
5853
stream.stop()
5954

6055
# Stream reads from a table
6156
print("##### Reading from stream ######")
62-
stream2 = spark.readStream.format("delta").load("/tmp/delta-table2")\
57+
stream2 = spark.readStream.format("delta").load("/tmp/delta-streaming/delta-table2")\
6358
.writeStream\
6459
.format("console")\
6560
.start()
@@ -82,7 +77,7 @@ def upsertToDelta(microBatchOutputDF, batchId):
8277
.withColumn("id", col("value") % 10)\
8378
.drop("timestamp")
8479
# Write the output of a streaming aggregation query into Delta Lake table
85-
deltaTable = DeltaTable.forPath(spark, "/tmp/delta-table")
80+
deltaTable = DeltaTable.forPath(spark, "/tmp/delta-streaming/delta-table")
8681
print("############# Original Delta Table ###############")
8782
deltaTable.toDF().show()
8883
stream3 = streamingAggregatesDF.writeStream\
@@ -98,8 +93,8 @@ def upsertToDelta(microBatchOutputDF, batchId):
9893
# Streaming append and concurrent repartition using data change = false
9994
# tbl1 is the sink and tbl2 is the source
10095
print("############ Streaming appends with concurrent table repartition ##########")
101-
tbl1 = "/tmp/delta-table4"
102-
tbl2 = "/tmp/delta-table5"
96+
tbl1 = "/tmp/delta-streaming/delta-table4"
97+
tbl2 = "/tmp/delta-streaming/delta-table5"
10398
numRows = 10
10499
spark.range(numRows).write.mode("overwrite").format("delta").save(tbl1)
105100
spark.read.format("delta").load(tbl1).show()
@@ -110,7 +105,7 @@ def upsertToDelta(microBatchOutputDF, batchId):
110105
# Prior to Delta 0.5.0 this would throw StreamingQueryException: Detected a data update in the
111106
# source table. This is currently not supported.
112107
stream4 = spark.readStream.format("delta").load(tbl2).writeStream.format("delta")\
113-
.option("checkpointLocation", "/tmp/checkpoint/tbl1") \
108+
.option("checkpointLocation", "/tmp/delta-streaming/checkpoint/tbl1") \
114109
.outputMode("append") \
115110
.start(tbl1)
116111

@@ -126,8 +121,7 @@ def upsertToDelta(microBatchOutputDF, batchId):
126121
print("######### After streaming write #########")
127122
spark.read.format("delta").load(tbl1).show()
128123
# cleanup
129-
for i in files:
130-
try:
131-
shutil.rmtree(i)
132-
except:
133-
pass
124+
try:
125+
shutil.rmtree("/tmp/delta-streaming/")
126+
except:
127+
pass

examples/python/utilities.py

+9-25
Original file line numberDiff line numberDiff line change
@@ -14,42 +14,26 @@
1414
# limitations under the License.
1515
#
1616

17-
from pyspark import SparkContext
18-
from pyspark.sql import Column, DataFrame, SparkSession, SQLContext, functions
17+
from pyspark.sql import Column, DataFrame, SparkSession, functions
1918
from pyspark.sql.functions import *
2019
from py4j.java_collections import MapConverter
2120
from delta.tables import *
2221
import shutil
2322
import threading
2423

25-
# Clear previous run's delta-tables
26-
try:
27-
shutil.rmtree("/tmp/delta-table")
28-
except:
29-
pass
30-
31-
# Create SparkContext
32-
sc = SparkContext()
33-
sqlContext = SQLContext(sc)
34-
35-
# Enable SQL for the current spark session. we need to set the following configs to enable SQL
36-
# Commands
37-
# config io.delta.sql.DeltaSparkSessionExtension -- to enable custom Delta-specific SQL commands
38-
# config parallelPartitionDiscovery.parallelism -- control the parallelism for vacuum
39-
spark = SparkSession \
40-
.builder \
24+
spark = SparkSession.builder \
4125
.appName("utilities") \
4226
.master("local[*]") \
4327
.config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
44-
.config("spark.sql.sources.parallelPartitionDiscovery.parallelism", "8") \
28+
.config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog") \
29+
.config("spark.sql.sources.parallelPartitionDiscovery.parallelism", "4") \
4530
.getOrCreate()
4631

47-
# Apache Spark 2.4.x has a known issue (SPARK-25003) that requires explicit activation
48-
# of the extension and cloning of the session. This will unnecessary in Apache Spark 3.x.
49-
if spark.sparkContext.version < "3.":
50-
spark.sparkContext._jvm.io.delta.sql.DeltaSparkSessionExtension() \
51-
.apply(spark._jsparkSession.extensions())
52-
spark = SparkSession(spark.sparkContext, spark._jsparkSession.cloneSession())
32+
# Clear previous run's delta-tables
33+
try:
34+
shutil.rmtree("/tmp/delta-table")
35+
except:
36+
pass
5337

5438
# Create a table
5539
print("########### Create a Parquet table ##############")

examples/scala/build.sbt

+3-3
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@
1717
name := "example"
1818
organization := "com.example"
1919
organizationName := "example"
20-
scalaVersion := "2.11.12"
20+
scalaVersion := "2.12.10"
2121
version := "0.1.0"
2222

2323
def getDeltaVersion(): String = {
@@ -27,13 +27,13 @@ def getDeltaVersion(): String = {
2727
println("Using Delta version " + version)
2828
version
2929
} else {
30-
"0.5.0"
30+
"0.7.0"
3131
}
3232
}
3333

3434
lazy val root = (project in file("."))
3535
.settings(
3636
name := "hello-world",
3737
libraryDependencies += "io.delta" %% "delta-core" % getDeltaVersion(),
38-
libraryDependencies += "org.apache.spark" %% "spark-sql" % "2.4.3",
38+
libraryDependencies += "org.apache.spark" %% "spark-sql" % "3.0.0",
3939
resolvers += "Delta" at "https://dl.bintray.com/delta-io/delta/")

0 commit comments

Comments
 (0)