-
Notifications
You must be signed in to change notification settings - Fork 28
/
Copy pathquery_generator.py
201 lines (177 loc) · 7.45 KB
/
query_generator.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
import logging
import os
import platform
import re
import subprocess
from .workload import Query
class QueryGenerator:
def __init__(self, benchmark_name, scale_factor, db_connector, query_ids, columns):
self.scale_factor = scale_factor
self.benchmark_name = benchmark_name
self.db_connector = db_connector
self.queries = []
self.query_ids = query_ids
# All columns in current database/schema
self.columns = columns
self.generate()
def filter_queries(self, query_ids):
self.queries = [query for query in self.queries if query.nr in query_ids]
def add_new_query(self, query_id, query_text):
if not self.db_connector:
logging.info("{}:".format(self))
logging.error("No database connector to validate queries")
raise Exception("database connector missing")
query_text = self.db_connector.update_query_text(query_text)
query = Query(query_id, query_text)
self._validate_query(query)
self._store_indexable_columns(query)
self.queries.append(query)
def _validate_query(self, query):
try:
self.db_connector.get_plan(query)
except Exception as e:
self.db_connector.rollback()
logging.error("{}: {}".format(self, e))
def _store_indexable_columns(self, query):
for column in self.columns:
if column.name in query.text:
query.columns.append(column)
def _generate_tpch(self):
logging.info("Generating TPC-H Queries")
self._run_make()
# Using default parameters (`-d`)
queries_string = self._run_command(
["./qgen", "-c", "-d", "-s", str(self.scale_factor)], return_output=True
)
for query in queries_string.split("Query (Q"):
query_id_and_text = query.split(")\n", 1)
if len(query_id_and_text) == 2:
query_id, text = query_id_and_text
query_id = int(query_id)
if self.query_ids and query_id not in self.query_ids:
continue
text = text.replace("\t", "")
self.add_new_query(query_id, text)
logging.info("Queries generated")
def _generate_tpcds(self):
logging.info("Generating TPC-DS Queries")
self._run_make()
# dialects: ansi, db2, netezza, oracle, sqlserver
command = [
"./dsqgen",
"-DIRECTORY",
"../query_templates",
"-INPUT",
"../query_templates/templates.lst",
"-DIALECT",
"netezza",
"-QUALIFY",
"Y",
"-OUTPUT_DIR",
"../..",
]
self._run_command(command)
with open("query_0.sql", "r") as file:
queries_string = file.read()
for query_string in queries_string.split("-- start query"):
id_and_text = query_string.split(".tpl\n", 1)
if len(id_and_text) != 2:
continue
query_id = int(id_and_text[0].split("using template query")[-1])
if self.query_ids and query_id not in self.query_ids:
continue
query_text = id_and_text[1]
query_text = self._update_tpcds_query_text(query_text)
self.add_new_query(query_id, query_text)
# This manipulates TPC-DS specific queries to work in more DBMSs
def _update_tpcds_query_text(self, query_text):
query_text = query_text.replace(") returns", ") as returns")
replaced_string = "case when lochierarchy = 0"
if replaced_string in query_text:
new_string = re.search(
r"grouping\(.*\)\+" r"grouping\(.*\) " r"as lochierarchy", query_text
).group(0)
new_string = new_string.replace(" as lochierarchy", "")
new_string = "case when " + new_string + " = 0"
query_text = query_text.replace(replaced_string, new_string)
return query_text
def _run_make(self):
if "qgen" not in self._files() and "dsqgen" not in self._files():
logging.info("Running make in {}".format(self.directory))
self._run_command(self.make_command)
else:
logging.debug("No need to run make")
def _run_command(self, command, return_output=False, shell=False):
env = os.environ.copy()
env["DSS_QUERY"] = "queries"
p = subprocess.Popen(
command,
cwd=self.directory,
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
shell=shell,
env=env,
)
with p.stdout:
output_string = p.stdout.read().decode("utf-8")
p.wait()
if return_output:
return output_string
else:
logging.debug("[SUBPROCESS OUTPUT] " + output_string)
def _files(self):
return os.listdir(self.directory)
def _generate_job(self):
logging.info("Generating JOB Queries")
for filename in os.listdir(self.directory):
if ".sql" not in filename or "fkindexes" in filename or "schema" in filename:
continue
query_id = filename.replace(".sql", "")
with open(f"{self.directory}/{filename}") as query_file:
query_text = query_file.read()
query_text = query_text.replace("\t", "")
query = Query(query_id, query_text)
assert "WHERE" in query_text, "Query without WHERE clause encountered"
split = query_text.split("WHERE")
assert len(split) == 2, "Query split for JOB query contains subquery"
query_text_before_where = split[0]
query_text_after_where = split[1]
# Add indexable columns to query
for column in self.columns:
if (
column.name in query_text_after_where
and f"{column.table.name} " in query_text_before_where
):
query.columns.append(column)
self.queries.append(query)
self._validate_query(query)
logging.info("Queries generated")
def generate(self):
if self.benchmark_name == "tpch":
self.directory = "./tpch-kit/dbgen"
# DBMS in tpch-kit dbgen Makefile:
# INFORMIX, DB2, TDAT (Teradata),
# SQLSERVER, SYBASE, ORACLE, VECTORWISE, POSTGRESQL
self.make_command = ["make", "DATABASE=POSTGRESQL"]
if platform.system() == "Darwin":
self.make_command.append("OS=MACOS")
self._generate_tpch()
elif self.benchmark_name == "tpcds":
self.directory = "./tpcds-kit/tools"
self.make_command = ["make"]
if platform.system() == "Darwin":
self.make_command.append("OS=MACOS")
self._generate_tpcds()
elif self.benchmark_name == "job":
assert self.scale_factor == 1, (
"Can only handle JOB with a scale factor of 1"
", i.e., no specific scaling"
)
assert self.query_ids is None, (
"Query filtering, i.e., providing query_ids to JOB QueryGenerator "
"is not supported."
)
self.directory = "./join-order-benchmark"
self._generate_job()
else:
raise NotImplementedError("Only TPC-H/-DS and JOB implemented.")