-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy path1_5.py
More file actions
42 lines (27 loc) · 914 Bytes
/
1_5.py
File metadata and controls
42 lines (27 loc) · 914 Bytes
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
from pyspark.sql import SparkSession
import sys
import time
disabled =sys.argv[1]
spark = SparkSession.builder.appName('query1-sql').getOrCreate()
if disabled =="Y":
spark.conf.set("spark.sql.autoBroadcastJoinThreshold", "-1")
elif disabled =='N':
pass
else:
raiseException ("This setting is not available.")
df =spark.read.format("parquet")
df1 =df.load("hdfs://master:9000/data/yellow_tripdata_1m.parquet")
df2 =df.load("hdfs://master:9000/data/yellow_tripvendors_1m.parquet")
df1.registerTempTable("tripdata")
df2.registerTempTable("tripvendors")
sqlString = "SELECT * "+\
"FROM "+\
" (SELECT * FROM tripvendors LIMIT 100) as v, "+\
" tripdata as d "+\
"WHERE "+\
" v.ID = d.ID"
t1 =time.time()
spark.sql(sqlString).collect()
t2 =time.time()
spark.sql(sqlString).explain()
print("Time with choosing join type %s is %.4f sec."%("enabled" if disabled =='N' else "disabled",t2-t1))