-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathexecute_queries.py
More file actions
100 lines (81 loc) · 3.24 KB
/
Copy pathexecute_queries.py
File metadata and controls
100 lines (81 loc) · 3.24 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
# SPDX-FileCopyrightText: 2024 Grupo de Arquitectura de Computadores (GAC), UDC
#
# SPDX-License-Identifier: Apache-2.0
import os
import threading
from datetime import datetime
import pandas as pd
from tqdm import tqdm
from src.options.queries_options import QueryOptions
from src.bench_factory import BenchFactory
from src.utils.add_index_queries import add_index
from src.utils.fix_queries import fix_queries
from utils.tpcds_schema_utils import get_sql_statements
from utils.util_functions import get_permutation_orders
def execute_queries(bench, queries_options, sql_queries, output_file, order, n_thread, cs):
headers = ["Query number"]
for i in range(1, int(queries_options["times"]) + 1):
headers.append(f"Execution {i}")
df = pd.DataFrame(columns=headers)
df["Query number"] = order
for i in range(1, int(queries_options["times"]) + 1):
if queries_options["queries"] == "all":
execution_column = []
bench.deactivate_cache(cs)
for key in tqdm(order, desc=f"Execution {i}, Thread {n_thread}"):
query_number = key
query_sql = sql_queries.get(query_number)
execution_time, _ = bench.run_sql(cs, query_sql)
execution_column.append(execution_time)
df[f"Execution {i}"] = execution_column
else:
print("Not implemented yet")
exit()
df.to_csv(os.path.join(output_file), index=False)
def run_query(bench, cs, queries_options):
print("\nFormat queries")
fix_queries(bench.raw_queries, bench.processed_queries, databricks=bench.databricks)
add_index(bench.processed_queries, bench.processed_queries)
sql = get_sql_statements(bench.processed_queries)
sql = {k.strip(): v for (k, v) in sql.items()}
base_filename = (
"power.csv" if bench.streams == 1 else f"throughput_{bench.streams}.csv"
)
dt = datetime.today().strftime("%Y%m%d-%H%M%S")
output_base_file = os.path.join(
bench.results_path, "_".join([dt, base_filename])
)
bench.use_wh_sb_schema(cs, bench.warehouse, bench.db, bench.schema)
print("Executing queries...")
print(
f"Results will be saved to {output_base_file.split('.csv')[0]}_X.csv files"
)
print("Wait until all threads finish executing queries\n")
# if the results folder does not exist, create it
if not os.path.exists(bench.results_path):
os.makedirs(bench.results_path)
# get permutation orders for each stream
orders = []
for i in range(bench.streams):
orders.append(
get_permutation_orders(
os.path.join(bench.permutation_orders_folder, f"00000{i}.dat")
)
)
# execute queries in parallel
threads = []
for i, order in enumerate(orders):
output_file = output_base_file.split(".csv")[0] + f"_{i}.csv"
t = threading.Thread(
target=execute_queries,
args=(bench, queries_options, sql, output_file, order, i, cs),
)
threads.append(t)
t.start()
for t in threads:
t.join()
if __name__ == "__main__":
opt = QueryOptions()
bench = BenchFactory.create_bench(opt)
cs = bench.set_up_conn()
run_query(bench, cs, opt.get_queries_options())