-
Notifications
You must be signed in to change notification settings - Fork 28
/
Copy pathpostgres_dbms.py
219 lines (187 loc) · 7.64 KB
/
postgres_dbms.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
import logging
import re
import psycopg2
from ..database_connector import DatabaseConnector
class PostgresDatabaseConnector(DatabaseConnector):
def __init__(self, db_name, autocommit=False):
DatabaseConnector.__init__(self, db_name, autocommit=autocommit)
self.db_system = "postgres"
self._connection = None
if not self.db_name:
self.db_name = "postgres"
self.create_connection()
self.set_random_seed()
logging.debug("Postgres connector created: {}".format(db_name))
def create_connection(self):
if self._connection:
self.close()
self._connection = psycopg2.connect("dbname={}".format(self.db_name))
self._connection.autocommit = self.autocommit
self._cursor = self._connection.cursor()
def enable_simulation(self):
self.exec_only("create extension hypopg")
self.commit()
def database_names(self):
result = self.exec_fetch("select datname from pg_database", False)
return [x[0] for x in result]
# Updates query syntax to work in PostgreSQL
def update_query_text(self, text):
text = text.replace(";\nlimit ", " limit ").replace("limit -1", "")
text = re.sub(r" ([0-9]+) days\)", r" interval '\1 days')", text)
text = self._add_alias_subquery(text)
return text
# PostgreSQL requires an alias for subqueries
def _add_alias_subquery(self, query_text):
text = query_text.lower()
positions = []
for match in re.finditer(r"((from)|,)[ \n]*\(", text):
counter = 1
pos = match.span()[1]
while counter > 0:
char = text[pos]
if char == "(":
counter += 1
elif char == ")":
counter -= 1
pos += 1
next_word = query_text[pos:].lstrip().split(" ")[0].split("\n")[0]
if next_word[0] in [")", ","] or next_word in [
"limit",
"group",
"order",
"where",
]:
positions.append(pos)
for pos in sorted(positions, reverse=True):
query_text = query_text[:pos] + " as alias123 " + query_text[pos:]
return query_text
def create_database(self, database_name):
self.exec_only("create database {}".format(database_name))
logging.info("Database {} created".format(database_name))
def import_data(self, table, path, delimiter="|", encoding=None):
with open(path, encoding=encoding) as file:
if encoding:
self._cursor.copy_expert(
(
f"COPY {table} FROM STDIN WITH DELIMITER AS '{delimiter}' NULL "
f"AS 'NULL' CSV QUOTE AS '\"' ENCODING '{encoding}'"
),
file,
)
else:
self._cursor.copy_from(file, table, sep=delimiter, null="")
def indexes_size(self):
# Returns size in bytes
statement = (
"select sum(pg_indexes_size(table_name::text)) from "
"(select table_name from information_schema.tables "
"where table_schema='public') as all_tables"
)
result = self.exec_fetch(statement)
return result[0]
def drop_database(self, database_name):
statement = f"DROP DATABASE {database_name};"
self.exec_only(statement)
logging.info(f"Database {database_name} dropped")
def create_statistics(self):
logging.info("Postgres: Run `analyze`")
self.commit()
self._connection.autocommit = True
self.exec_only("analyze")
self._connection.autocommit = self.autocommit
def set_random_seed(self, value=0.17):
logging.info(f"Postgres: Set random seed `SELECT setseed({value})`")
self.exec_only(f"SELECT setseed({value})")
def supports_index_simulation(self):
if self.db_system == "postgres":
return True
return False
def _simulate_index(self, index):
table_name = index.table()
statement = (
"select * from hypopg_create_index( "
f"'create index on {table_name} "
f"({index.joined_column_names()})')"
)
result = self.exec_fetch(statement)
return result
def _drop_simulated_index(self, oid):
statement = f"select * from hypopg_drop_index({oid})"
result = self.exec_fetch(statement)
assert result[0] is True, f"Could not drop simulated index with oid = {oid}."
def create_index(self, index):
table_name = index.table()
statement = (
f"create index {index.index_idx()} "
f"on {table_name} ({index.joined_column_names()})"
)
self.exec_only(statement)
size = self.exec_fetch(
f"select relpages from pg_class c " f"where c.relname = '{index.index_idx()}'"
)
size = size[0]
index.estimated_size = size * 8 * 1024
def drop_indexes(self):
logging.info("Dropping indexes")
stmt = "select indexname from pg_indexes where schemaname='public'"
indexes = self.exec_fetch(stmt, one=False)
for index in indexes:
index_name = index[0]
drop_stmt = "drop index {}".format(index_name)
logging.debug("Dropping index {}".format(index_name))
self.exec_only(drop_stmt)
# PostgreSQL expects the timeout in milliseconds
def exec_query(self, query, timeout=None, cost_evaluation=False):
# Committing to not lose indexes after timeout
if not cost_evaluation:
self._connection.commit()
query_text = self._prepare_query(query)
if timeout:
set_timeout = f"set statement_timeout={timeout}"
self.exec_only(set_timeout)
statement = f"explain (analyze, buffers, format json) {query_text}"
try:
plan = self.exec_fetch(statement, one=True)[0][0]["Plan"]
result = plan["Actual Total Time"], plan
except Exception as e:
logging.error(f"{query.nr}, {e}")
self._connection.rollback()
result = None, self._get_plan(query)
# Disable timeout
self._cursor.execute("set statement_timeout = 0")
self._cleanup_query(query)
return result
def _cleanup_query(self, query):
for query_statement in query.text.split(";"):
if "drop view" in query_statement:
self.exec_only(query_statement)
self.commit()
def _get_cost(self, query):
query_plan = self._get_plan(query)
total_cost = query_plan["Total Cost"]
return total_cost
def _get_plan(self, query):
query_text = self._prepare_query(query)
statement = f"explain (format json) {query_text}"
query_plan = self.exec_fetch(statement)[0][0]["Plan"]
self._cleanup_query(query)
return query_plan
def number_of_indexes(self):
statement = """select count(*) from pg_indexes
where schemaname = 'public'"""
result = self.exec_fetch(statement)
return result[0]
def table_exists(self, table_name):
statement = f"""SELECT EXISTS (
SELECT 1
FROM pg_tables
WHERE tablename = '{table_name}');"""
result = self.exec_fetch(statement)
return result[0]
def database_exists(self, database_name):
statement = f"""SELECT EXISTS (
SELECT 1
FROM pg_database
WHERE datname = '{database_name}');"""
result = self.exec_fetch(statement)
return result[0]