Skip to content

Reintroduce Join Order Benchmark #41

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 18 commits into
base: refactoring
Choose a base branch
from
Prev Previous commit
Next Next commit
Download imdb data
Bensk1 committed Jun 26, 2020
commit 656588df438ada46bb2e7d3776a21e9dc32820b3
17 changes: 11 additions & 6 deletions selection/dbms/postgres_dbms.py
Original file line number Diff line number Diff line change
@@ -72,14 +72,19 @@ def create_database(self, database_name):
self.exec_only("create database {}".format(database_name))
logging.info("Database {} created".format(database_name))


def import_data(self, table, path, delimiter='|', encoding=None):
def import_data(self, table, path, delimiter="|", encoding=None):
if encoding:
with open(path, 'r', encoding=encoding) as file:
self._cursor.copy_expert(f"COPY {table} FROM STDIN WITH DELIMITER AS '{delimiter}' NULL AS 'NULL' CSV QUOTE AS '\"' ENCODING '{encoding}'", file)
with open(path, "r", encoding=encoding) as file:
self._cursor.copy_expert(
(
f"COPY {table} FROM STDIN WITH DELIMITER AS '{delimiter}' NULL "
f"AS 'NULL' CSV QUOTE AS '\"' ENCODING '{encoding}'"
),
file,
)
else:
with open(path, 'r') as file:
self._cursor.copy_from(file, table, sep=delimiter, null='')
with open(path, "r") as file:
self._cursor.copy_from(file, table, sep=delimiter, null="")

def indexes_size(self):
# Returns size in bytes
50 changes: 41 additions & 9 deletions selection/table_generator.py
Original file line number Diff line number Diff line change
@@ -3,8 +3,9 @@
import platform
import re
import subprocess
import sys

from .utils import b_to_mb
from .utils import IMDB_TABLE_DIR, b_to_mb, download_and_uncompress_imdb_data
from .workload import Column, Table


@@ -24,9 +25,13 @@ def __init__(
self.database_names = self.db_connector.database_names()
self.tables = []
self.columns = []

self._prepare()
if self.database_name() not in self.database_names:
self._generate()
if self.benchmark_name != "job":
self._generate()
else:
self._prepare_imdb_data()
self.create_database()
else:
logging.debug("Database with given scale factor already " "existing")
@@ -40,6 +45,18 @@ def database_name(self):
name = f"indexselection_{self.benchmark_name}___{scale_factor}"
return name

def _prepare_imdb_data(self):
success = download_and_uncompress_imdb_data()
if not success:
logging.critical("Something went wrong during download IMDB data. Aborting.")
sys.exit(1)

self.table_files = [
filename
for filename in os.listdir(IMDB_TABLE_DIR)
if ".csv" in filename and ".json" not in filename
]

def _read_column_names(self):
# Read table and column names from 'create table' statements
schema_file = f"{self.directory}/{self.create_table_statements_file}"
@@ -76,11 +93,13 @@ def _generate(self):

def create_database(self):
self.db_connector.create_database(self.database_name())
filename = self.directory + "/" + self.create_table_statements_file
with open(filename, "r") as file:
schema_file = f"{self.directory}/{self.create_table_statements_file}"
with open(schema_file, "r") as file:
create_statements = file.read()

# Do not create primary keys
create_statements = re.sub(r",\s*primary key (.*)", "", create_statements)
create_statements = create_statements.replace("PRIMARY KEY", "")
self.db_connector.db_name = self.database_name()
self.db_connector.create_connection()
self.create_tables(create_statements)
@@ -95,16 +114,29 @@ def create_tables(self, create_statements):

def _load_table_data(self, database_connector):
logging.info("Loading data into the tables")

for filename in self.table_files:
logging.debug(" Loading file {}".format(filename))
logging.debug(f" Loading file {filename}")

table = filename.replace(".tbl", "").replace(".dat", "").replace(".csv", "")

if self.benchmark_name == "job":
path = f"{IMDB_TABLE_DIR}/{filename}"
else:
path = f"{self.directory}/{filename}"

table = filename.replace(".tbl", "").replace(".dat", "")
path = self.directory + "/" + filename
size = os.path.getsize(path)
size_string = f"{b_to_mb(size):,.4f} MB"
logging.debug(f" Import data of size {size_string}")
database_connector.import_data(table, path)
os.remove(os.path.join(self.directory, filename))

if self.benchmark_name == "job":
database_connector.import_data(
table, path, delimiter=",", encoding="Latin-1"
)
else:
database_connector.import_data(table, path)
# Remove files only if they can be easily regenerated
os.remove(os.path.join(self.directory, filename))
database_connector.commit()

def _run_make(self):
125 changes: 123 additions & 2 deletions selection/utils.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,9 @@
import hashlib
import logging
import os
import urllib.request
import zipfile

from .workload import Workload


@@ -17,8 +23,6 @@ def s_to_ms(s):


# --- Index selection utilities ---


def indexes_by_table(indexes):
indexes_by_table = {}
for index in indexes:
@@ -55,3 +59,120 @@ def get_utilized_indexes(
}

return utilized_indexes_workload, query_details


# --- Join Order Benchmark utilities ---

IMDB_LOCATION = "https://archive.org/download/imdb_20200624/imdb.zip"
IMDB_FILE_NAME = "imdb.zip"
IMDB_TABLE_DIR = "imdb_data"
IMDB_TABLE_NAMES = [
"aka_name",
"aka_title",
"cast_info",
"char_name",
"company_name",
"company_type",
"comp_cast_type",
"complete_cast",
"info_type",
"keyword",
"kind_type",
"link_type",
"movie_companies",
"movie_info",
"movie_info_idx",
"movie_keyword",
"movie_link",
"name",
"person_info",
"role_type",
"title",
]


def _clean_up(including_table_dir=False):
if os.path.exists(IMDB_FILE_NAME):
os.remove(IMDB_FILE_NAME)

if including_table_dir and os.path.exists(IMDB_TABLE_DIR):
for file in os.listdir(IMDB_TABLE_DIR):
os.remove("./%s/%s" % (IMDB_TABLE_DIR, file))
os.rmdir(IMDB_TABLE_DIR)


def _files_exist():
for table_name in IMDB_TABLE_NAMES:
if not os.path.exists(os.path.join(IMDB_TABLE_DIR, table_name + ".csv")):
return False

return True


def download_and_uncompress_imdb_data():
if _files_exist():
logging.info("IMDB already present.")
return True

logging.critical("Retrieving the IMDB dataset - this may take a while.")

# We are going to calculate the md5 hash later, on-the-fly while downloading
hash_md5 = hashlib.md5()

url = urllib.request.urlopen(IMDB_LOCATION)
meta = url.info()
file_size = int(meta["Content-Length"])

file = open(IMDB_FILE_NAME, "wb")

logging.info(f"Downloading: {IMDB_FILE_NAME} ({b_to_mb(file_size):.3f} MB)")

already_retrieved = 0
block_size = 8192
try:
while True:
buffer = url.read(block_size)
if not buffer:
break

hash_md5.update(buffer)

already_retrieved += len(buffer)
file.write(buffer)
status = (
f"Retrieved {already_retrieved * 100.0 / file_size:3.2f}% of the data"
)
status = f"{status}{chr(8) * (len(status) + 1)}"
print(status, end="\r")
except Exception:
logging.critical(
"Aborting. Something went wrong during the download. Cleaning up."
)
_clean_up()
return False

file.close()
logging.critical("Validating integrity...")

hash_dl = hash_md5.hexdigest()

if hash_dl != "1b5cf1e8ca7f7cb35235a3c23f89d8e9":
logging.critical("Aborting. MD5 checksum mismatch. Cleaning up.")
_clean_up()
return False

logging.critical("Downloaded file is valid.")
logging.critical("Unzipping the file...")

try:
zip = zipfile.ZipFile(IMDB_FILE_NAME, "r")
zip.extractall(IMDB_TABLE_DIR)
zip.close()
except Exception:
logging.critical("Aborting. Something went wrong during unzipping. Cleaning up.")
_clean_up(including_table_dir=True)
return False

logging.critical("Deleting the archive file.")
_clean_up()
return True