diff --git a/.gitmodules b/.gitmodules index f117e24..2151142 100644 --- a/.gitmodules +++ b/.gitmodules @@ -4,6 +4,9 @@ [submodule "tpch-kit"] path = tpch-kit url = https://github.com/marcelja/tpch-kit.git +[submodule "join-order-benchmark"] + path = join-order-benchmark + url = https://github.com/gregrahn/join-order-benchmark.git [submodule "hypopg"] path = hypopg url = https://github.com/HypoPG/hypopg.git diff --git a/benchmark_results/job/config.json b/benchmark_results/job/config.json index f4bbff1..f3690ee 100644 --- a/benchmark_results/job/config.json +++ b/benchmark_results/job/config.json @@ -1,6 +1,6 @@ { "database_system": "postgres", - "benchmark_name": "JOB", + "benchmark_name": "job", "scale_factor": 1, "algorithms": [ { diff --git a/benchmark_results/notebooks/Graphs.ipynb b/benchmark_results/notebooks/Graphs.ipynb index 9ecf2e6..e868f7b 100644 --- a/benchmark_results/notebooks/Graphs.ipynb +++ b/benchmark_results/notebooks/Graphs.ipynb @@ -3,7 +3,9 @@ { "cell_type": "code", "execution_count": null, - "metadata": {}, + "metadata": { + "scrolled": false + }, "outputs": [], "source": [ "import pandas as pd\n", @@ -37,7 +39,9 @@ { "cell_type": "code", "execution_count": null, - "metadata": {}, + "metadata": { + "scrolled": false + }, "outputs": [], "source": [ "def get_costs(df):\n", @@ -56,7 +60,9 @@ { "cell_type": "code", "execution_count": null, - "metadata": {}, + "metadata": { + "scrolled": false + }, "outputs": [], "source": [ "def draw_cost_graph(cophy_costs=None, cophy_memory_consumption=None, legend=True):\n", @@ -238,7 +244,9 @@ { "cell_type": "code", "execution_count": null, - "metadata": {}, + "metadata": { + "scrolled": false + }, "outputs": [], "source": [ "def draw_legend():\n", @@ -275,7 +283,9 @@ { "cell_type": "code", "execution_count": null, - "metadata": {}, + "metadata": { + "scrolled": false + }, "outputs": [], "source": [ "CSV_PATH = '../tpch_wo_2_17_20'\n", @@ -290,7 +300,9 @@ { "cell_type": "code", "execution_count": null, - "metadata": {}, + "metadata": { + "scrolled": false + }, "outputs": [], "source": [ "# old\n", @@ -313,7 +325,9 @@ { "cell_type": "code", "execution_count": null, - "metadata": {}, + "metadata": { + "scrolled": false + }, "outputs": [], "source": [ "# Cophy What-If time: 151.91098499298096 - cost_requests: 82676 - cache_hits: 45776 - Gurobi Times:\n", @@ -333,7 +347,9 @@ { "cell_type": "code", "execution_count": null, - "metadata": {}, + "metadata": { + "scrolled": false + }, "outputs": [], "source": [ "draw_what_if_graph()\n" @@ -349,7 +365,9 @@ { "cell_type": "code", "execution_count": null, - "metadata": {}, + "metadata": { + "scrolled": false + }, "outputs": [], "source": [ "CSV_PATH = '../tpch_wo_2_17_20/all_queries'\n", @@ -527,7 +545,9 @@ { "cell_type": "code", "execution_count": null, - "metadata": {}, + "metadata": { + "scrolled": false + }, "outputs": [], "source": [ "CSV_PATH = '../tpcds_wo_4_6_9_10_11_32_35_41_95'\n", @@ -540,7 +560,9 @@ { "cell_type": "code", "execution_count": null, - "metadata": {}, + "metadata": { + "scrolled": false + }, "outputs": [], "source": [ "cophy_memory_consumptions_mb = [250,500,1000,1500,2000,2500,3000,3500,4250,5000,5750,6500,8000,10000,12500,15000]\n", @@ -561,7 +583,9 @@ { "cell_type": "code", "execution_count": null, - "metadata": {}, + "metadata": { + "scrolled": false + }, "outputs": [], "source": [ "# Cophy What-If time: 579.6870040893555 - cost_requests: 394317 - cache_hits: 342140 - Gurobi Times:\n", @@ -581,7 +605,9 @@ { "cell_type": "code", "execution_count": null, - "metadata": {}, + "metadata": { + "scrolled": false + }, "outputs": [], "source": [ "# draw_what_if_graph(million=True)" @@ -597,11 +623,13 @@ { "cell_type": "code", "execution_count": null, - "metadata": {}, + "metadata": { + "scrolled": false + }, "outputs": [], "source": [ - "CSV_PATH = '../job/'\n", - "BENCHMARK = 'JOB'\n", + "CSV_PATH = '../job'\n", + "BENCHMARK = 'job'\n", "SCALE_FACTOR = None\n", "QUERIES = range(0, 113)\n", "XLIM = 12" @@ -610,7 +638,9 @@ { "cell_type": "code", "execution_count": null, - "metadata": {}, + "metadata": { + "scrolled": false + }, "outputs": [], "source": [ "# draw_cost_graph()\n", @@ -630,7 +660,9 @@ { "cell_type": "code", "execution_count": null, - "metadata": {}, + "metadata": { + "scrolled": false + }, "outputs": [], "source": [ "# Cophy What-If time: 822.8340845108032 - cost_requests: 305326 - cache_hits: 267996 - Gurobi Times:\n", @@ -650,7 +682,9 @@ { "cell_type": "code", "execution_count": null, - "metadata": {}, + "metadata": { + "scrolled": false + }, "outputs": [], "source": [ "# GRAPH_SIZE = (5,2.2)\n", @@ -660,14 +694,18 @@ { "cell_type": "code", "execution_count": null, - "metadata": {}, + "metadata": { + "scrolled": false + }, "outputs": [], "source": [] }, { "cell_type": "code", "execution_count": null, - "metadata": {}, + "metadata": { + "scrolled": false + }, "outputs": [], "source": [ "\n" @@ -676,7 +714,9 @@ { "cell_type": "code", "execution_count": null, - "metadata": {}, + "metadata": { + "scrolled": false + }, "outputs": [], "source": [ "CSV_PATH = '../tpch_mssql'\n", @@ -690,7 +730,9 @@ { "cell_type": "code", "execution_count": null, - "metadata": {}, + "metadata": { + "scrolled": false + }, "outputs": [], "source": [ "draw_cost_graph()" @@ -699,7 +741,9 @@ { "cell_type": "code", "execution_count": null, - "metadata": {}, + "metadata": { + "scrolled": false + }, "outputs": [], "source": [] } diff --git a/join-order-benchmark b/join-order-benchmark new file mode 160000 index 0000000..e6d4aba --- /dev/null +++ b/join-order-benchmark @@ -0,0 +1 @@ +Subproject commit e6d4aba307a0f8849b681ccbd2ca04381d3d1731 diff --git a/selection/dbms/postgres_dbms.py b/selection/dbms/postgres_dbms.py index 1ba01eb..ec051f3 100644 --- a/selection/dbms/postgres_dbms.py +++ b/selection/dbms/postgres_dbms.py @@ -72,9 +72,18 @@ def create_database(self, database_name): self.exec_only("create database {}".format(database_name)) logging.info("Database {} created".format(database_name)) - def import_data(self, table, path, delimiter="|"): - with open(path, "r") as file: - self._cursor.copy_from(file, table, sep=delimiter, null="") + def import_data(self, table, path, delimiter="|", encoding=None): + with open(path, encoding=encoding) as file: + if encoding: + self._cursor.copy_expert( + ( + f"COPY {table} FROM STDIN WITH DELIMITER AS '{delimiter}' NULL " + f"AS 'NULL' CSV QUOTE AS '\"' ENCODING '{encoding}'" + ), + file, + ) + else: + self._cursor.copy_from(file, table, sep=delimiter, null="") def indexes_size(self): # Returns size in bytes diff --git a/selection/index_selection_evaluation.py b/selection/index_selection_evaluation.py index a50c5e4..c2f0f62 100644 --- a/selection/index_selection_evaluation.py +++ b/selection/index_selection_evaluation.py @@ -61,7 +61,9 @@ def _setup_config(self, config): dbms_class = DBMSYSTEMS[config["database_system"]] generating_connector = dbms_class(None, autocommit=True) table_generator = TableGenerator( - config["benchmark_name"], config["scale_factor"], generating_connector + config["benchmark_name"], + config["scale_factor"] if "scale_factor" in config else 1, + generating_connector, ) self.database_name = table_generator.database_name() self.database_system = config["database_system"] @@ -69,6 +71,7 @@ def _setup_config(self, config): if "queries" not in config: config["queries"] = None + query_generator = QueryGenerator( config["benchmark_name"], config["scale_factor"], diff --git a/selection/query_generator.py b/selection/query_generator.py index 08a8207..0fc3ea3 100644 --- a/selection/query_generator.py +++ b/selection/query_generator.py @@ -136,6 +136,43 @@ def _run_command(self, command, return_output=False, shell=False): def _files(self): return os.listdir(self.directory) + def _generate_job(self): + logging.info("Generating JOB Queries") + for filename in os.listdir(self.directory): + if ".sql" not in filename or "fkindexes" in filename or "schema" in filename: + continue + query_id = filename.replace(".sql", "") + + with open(f"{self.directory}/{filename}") as query_file: + query_text = query_file.read() + query_text = query_text.replace("\t", "") + query = Query(query_id, query_text) + + assert "WHERE" in query_text, "Query without WHERE clause encountered" + + split = query_text.split("WHERE") + assert len(split) == 2, "Query split for JOB query contains subquery" + query_text_before_where = split[0] + query_text_after_where = split[1] + + # Adds indexable columns to query. Parsing JOB queries is more complex + # than TPC-* queries as column names (in the part of the query before + # WHERE) are not always distinct. Also, below implementation might + # appear to theoratically overlook indexable columns. However, we + # verified this manually and did not encounter any problems. Still, + # this should be improved and a proper solution should be able to + # handle queries of all supported workloads (#44). + for column in self.columns: + if ( + column.name in query_text_after_where + and f"{column.table.name} " in query_text_before_where + ): + query.columns.append(column) + self.queries.append(query) + self._validate_query(query) + + logging.info("Queries generated") + def generate(self): if self.benchmark_name == "tpch": self.directory = "./tpch-kit/dbgen" @@ -154,5 +191,17 @@ def generate(self): self.make_command.append("OS=MACOS") self._generate_tpcds() + elif self.benchmark_name == "job": + assert self.scale_factor == 1, ( + "Can only handle JOB with a scale factor of 1" + ", i.e., no specific scaling" + ) + assert self.query_ids is None, ( + "Query filtering, i.e., providing query_ids to JOB QueryGenerator " + "is not supported." + ) + + self.directory = "./join-order-benchmark" + self._generate_job() else: - raise NotImplementedError("only tpch/tpcds implemented.") + raise NotImplementedError("Only TPC-H/-DS and JOB implemented.") diff --git a/selection/table_generator.py b/selection/table_generator.py index 70189e5..37fa9c6 100644 --- a/selection/table_generator.py +++ b/selection/table_generator.py @@ -3,8 +3,9 @@ import platform import re import subprocess +import sys -from .utils import b_to_mb +from .utils import IMDB_TABLE_DIR, b_to_mb, download_and_uncompress_imdb_data from .workload import Column, Table @@ -24,40 +25,60 @@ def __init__( self.database_names = self.db_connector.database_names() self.tables = [] self.columns = [] + self._prepare() if self.database_name() not in self.database_names: - self._generate() + if self.benchmark_name == "job": + self._prepare_imdb_data() + else: + self._generate() self.create_database() else: - logging.debug("Database with given scale factor already " "existing") + logging.debug("Database with given scale factor already existing") self._read_column_names() def database_name(self): if self.explicit_database_name: return self.explicit_database_name - name = "indexselection_" + self.benchmark_name + "___" - name += str(self.scale_factor).replace(".", "_") + scale_factor = str(self.scale_factor).replace(".", "_") + name = f"indexselection_{self.benchmark_name}___{scale_factor}" return name + def _prepare_imdb_data(self): + success = download_and_uncompress_imdb_data() + if not success: + logging.critical("Something went wrong during download IMDB data. Aborting.") + sys.exit(1) + + self.table_files = [ + filename + for filename in os.listdir(IMDB_TABLE_DIR) + if ".csv" in filename and ".json" not in filename + ] + def _read_column_names(self): # Read table and column names from 'create table' statements - filename = self.directory + "/" + self.create_table_statements_file - with open(filename, "r") as file: + schema_file = f"{self.directory}/{self.create_table_statements_file}" + with open(schema_file) as file: data = file.read().lower() - create_tables = data.split("create table ")[1:] - for create_table in create_tables: - splitted = create_table.split("(", 1) - table = Table(splitted[0].strip()) + create_table_statements = data.split("create table ")[1:] + for create_table_statement in create_table_statements: + split = create_table_statement.split("(", 1) + table = Table(split[0].strip()) self.tables.append(table) - # TODO regex split? ,[whitespace]\n - for column in splitted[1].split(",\n"): - name = column.lstrip().split(" ", 1)[0] - if name == "primary": + + for column_declaration in split[1].split(",\n"): + column_name = column_declaration.lstrip().split(" ", 1)[0] + + # Skip lines that start with primary and, thereby, declare previously + # declared columns as primary key + if column_name == "primary": continue - column_object = Column(name) - table.add_column(column_object) - self.columns.append(column_object) + + column = Column(column_name) + table.add_column(column) + self.columns.append(column) def _generate(self): logging.info("Generating {} data".format(self.benchmark_name)) @@ -72,11 +93,13 @@ def _generate(self): def create_database(self): self.db_connector.create_database(self.database_name()) - filename = self.directory + "/" + self.create_table_statements_file - with open(filename, "r") as file: + schema_file = f"{self.directory}/{self.create_table_statements_file}" + with open(schema_file) as file: create_statements = file.read() + # Do not create primary keys create_statements = re.sub(r",\s*primary key (.*)", "", create_statements) + create_statements = create_statements.replace("PRIMARY KEY", "") self.db_connector.db_name = self.database_name() self.db_connector.create_connection() self.create_tables(create_statements) @@ -91,16 +114,29 @@ def create_tables(self, create_statements): def _load_table_data(self, database_connector): logging.info("Loading data into the tables") + for filename in self.table_files: - logging.debug(" Loading file {}".format(filename)) + logging.debug(f" Loading file {filename}") + + table = filename.replace(".tbl", "").replace(".dat", "").replace(".csv", "") + + if self.benchmark_name == "job": + path = f"{IMDB_TABLE_DIR}/{filename}" + else: + path = f"{self.directory}/{filename}" - table = filename.replace(".tbl", "").replace(".dat", "") - path = self.directory + "/" + filename size = os.path.getsize(path) size_string = f"{b_to_mb(size):,.4f} MB" logging.debug(f" Import data of size {size_string}") - database_connector.import_data(table, path) - os.remove(os.path.join(self.directory, filename)) + + if self.benchmark_name == "job": + database_connector.import_data( + table, path, delimiter=",", encoding="Latin-1" + ) + else: + database_connector.import_data(table, path) + # Remove files only if they can be easily regenerated + os.remove(os.path.join(self.directory, filename)) database_connector.commit() def _run_make(self): @@ -150,5 +186,12 @@ def _prepare(self): and self.scale_factor != 0.001 ): raise Exception("Wrong TPCDS scale factor") + elif self.benchmark_name == "job": + assert self.scale_factor == 1, ( + "Can only handle JOB with a scale factor of 1" + ", i.e., no specific scaling" + ) + self.directory = "./join-order-benchmark" + self.create_table_statements_file = "schema.sql" else: - raise NotImplementedError("only tpch/ds implemented.") + raise NotImplementedError("Only TPC-H/-DS and JOB implemented.") diff --git a/selection/utils.py b/selection/utils.py index 10134b4..77a59f3 100644 --- a/selection/utils.py +++ b/selection/utils.py @@ -1,3 +1,9 @@ +import hashlib +import logging +import os +import urllib.request +import zipfile + from .workload import Workload @@ -17,8 +23,6 @@ def s_to_ms(s): # --- Index selection utilities --- - - def indexes_by_table(indexes): indexes_by_table = {} for index in indexes: @@ -55,3 +59,122 @@ def get_utilized_indexes( } return utilized_indexes_workload, query_details + + +# --- Join Order Benchmark utilities --- + +IMDB_LOCATION = "https://archive.org/download/imdb_20200624/imdb.zip" +IMDB_FILE_NAME = "imdb.zip" +IMDB_TABLE_DIR = "imdb_data" +IMDB_TABLE_NAMES = [ + "aka_name", + "aka_title", + "cast_info", + "char_name", + "company_name", + "company_type", + "comp_cast_type", + "complete_cast", + "info_type", + "keyword", + "kind_type", + "link_type", + "movie_companies", + "movie_info", + "movie_info_idx", + "movie_keyword", + "movie_link", + "name", + "person_info", + "role_type", + "title", +] + + +def _clean_up(including_table_dir=False): + if os.path.exists(IMDB_FILE_NAME): + os.remove(IMDB_FILE_NAME) + + if including_table_dir and os.path.exists(IMDB_TABLE_DIR): + for file in os.listdir(IMDB_TABLE_DIR): + os.remove("./%s/%s" % (IMDB_TABLE_DIR, file)) + os.rmdir(IMDB_TABLE_DIR) + + +def _files_exist(): + for table_name in IMDB_TABLE_NAMES: + if not os.path.exists(os.path.join(IMDB_TABLE_DIR, table_name + ".csv")): + return False + + return True + + +def download_and_uncompress_imdb_data(): + if _files_exist(): + logging.info("IMDB already present.") + return True + + logging.critical("Retrieving the IMDB dataset - this may take a while.") + + # We are going to calculate the md5 hash later, on-the-fly while downloading + hash_md5 = hashlib.md5() + + url = urllib.request.urlopen(IMDB_LOCATION) + meta = url.info() + file_size = int(meta["Content-Length"]) + + file = open(IMDB_FILE_NAME, "wb") + + logging.info(f"Downloading: {IMDB_FILE_NAME} ({b_to_mb(file_size):.3f} MB)") + + already_retrieved = 0 + block_size = 8192 + try: + while True: + buffer = url.read(block_size) + if not buffer: + break + + hash_md5.update(buffer) + + already_retrieved += len(buffer) + file.write(buffer) + status = ( + f"Retrieved {already_retrieved * 100.0 / file_size:3.2f}% of the data" + ) + # chr(8) refers to a backspace. In conjunction with end="\r", this overwrites + # the previous status value and achieves the right padding. + status = f"{status}{chr(8) * (len(status) + 1)}" + print(status, end="\r") + except Exception: + logging.critical( + "Aborting. Something went wrong during the download. Cleaning up." + ) + _clean_up() + return False + + file.close() + logging.critical("Validating integrity...") + + hash_dl = hash_md5.hexdigest() + + if hash_dl != "1b5cf1e8ca7f7cb35235a3c23f89d8e9": + logging.critical("Aborting. MD5 checksum mismatch. Cleaning up.") + _clean_up() + return False + + logging.critical("Downloaded file is valid.") + logging.critical("Unzipping the file...") + + try: + zip = zipfile.ZipFile(IMDB_FILE_NAME, "r") + zip.extractall(IMDB_TABLE_DIR) + zip.close() + except Exception: + logging.critical("Aborting. Something went wrong during unzipping. Cleaning up.") + _clean_up(including_table_dir=True) + return False + + logging.critical("Deleting the archive file.") + _clean_up() + return True diff --git a/tests/test_drop_heuristic.py b/tests/test_drop_heuristic.py index 042dd79..6b7e230 100644 --- a/tests/test_drop_heuristic.py +++ b/tests/test_drop_heuristic.py @@ -2,6 +2,7 @@ from unittest.mock import MagicMock import utils + from selection.algorithms.drop_heuristic_algorithm import DropHeuristicAlgorithm from selection.index import Index from selection.workload import Column, Query, Table, Workload diff --git a/tests/test_query_generator.py b/tests/test_query_generator.py index 38e3f9a..fd984f2 100644 --- a/tests/test_query_generator.py +++ b/tests/test_query_generator.py @@ -14,7 +14,11 @@ def tearDown(self): self.generating_connector.close() connector = PostgresDatabaseConnector(None, autocommit=True) - if self.db_name is not None and connector.database_exists(self.db_name): + if ( + self.db_name is not None + and self.db_name != "indexselection_job___1" + and connector.database_exists(self.db_name) + ): connector.drop_database(self.db_name) def test_generate_tpch(self): @@ -46,6 +50,36 @@ def test_generate_tpcds(self): self.assertEqual(len(queries), 99) db_connector.close() + def test_generate_job(self): + self.db_name = "indexselection_job___1" + + # Loading the JOB tables takes some time, + # we skip these tests if the dataset is not already loaded. + if self.db_name not in self.generating_connector.database_names(): + return + + TableGenerator( + "job", 1, self.generating_connector, explicit_database_name=self.db_name, + ) + + db_connector = PostgresDatabaseConnector(self.db_name, autocommit=True) + + # JOB supports only a scale factor of 1, i.e., no scaling + with self.assertRaises(AssertionError): + query_generator = QueryGenerator("job", 0.001, db_connector, None, []) + + # JOB does not support query filterting + with self.assertRaises(AssertionError): + query_generator = QueryGenerator( + "job", 0.001, db_connector, query_ids=[17], columns=[] + ) + + query_generator = QueryGenerator("job", 1, db_connector, None, []) + + queries = query_generator.queries + self.assertEqual(len(queries), 113) + db_connector.close() + def test_wrong_benchmark(self): with self.assertRaises(NotImplementedError): QueryGenerator("tpc-hallo", 1, self.generating_connector, None, []) diff --git a/tests/test_table_generator.py b/tests/test_table_generator.py index 2b3d36e..1c876a6 100644 --- a/tests/test_table_generator.py +++ b/tests/test_table_generator.py @@ -41,6 +41,9 @@ def test_database_name(self): def test_generate_tpch(self): table_generator = TableGenerator("tpch", 0.001, self.generating_connector) + # Check that correct number of columns were extracted + self.assertEqual(61, len(table_generator.columns)) + # Check that lineitem table exists in TableGenerator lineitem_table = None for table in table_generator.tables: @@ -51,9 +54,9 @@ def test_generate_tpch(self): # Check that l_receiptdate column exists in TableGenerator and Table object l_receiptdate = Column("l_receiptdate") - lineitem_table.add_column(l_receiptdate) + l_receiptdate.table = lineitem_table self.assertIn(l_receiptdate, table_generator.columns) - self.assertIn(l_receiptdate, table.columns) + self.assertIn(l_receiptdate, lineitem_table.columns) database_connect = PostgresDatabaseConnector( table_generator.database_name(), autocommit=True @@ -75,10 +78,13 @@ def test_generate_tpch(self): self.generating_connector.close() database_connect.close() - def test_generate_tpds(self): + def test_generate_tpcds(self): table_generator = TableGenerator("tpcds", 0.001, self.generating_connector) - # Check that lineitem table exists in TableGenerator + # Check that correct number of columns were extracted + self.assertEqual(429, len(table_generator.columns)) + + # Check that item table exists in TableGenerator item_table = None for table in table_generator.tables: if table.name == "item": @@ -88,9 +94,9 @@ def test_generate_tpds(self): # Check that i_item_sk column exists in TableGenerator and Table object i_item_sk = Column("i_item_sk") - item_table.add_column(i_item_sk) + i_item_sk.table = item_table self.assertIn(i_item_sk, table_generator.columns) - self.assertIn(i_item_sk, table.columns) + self.assertIn(i_item_sk, item_table.columns) database_connect = PostgresDatabaseConnector( table_generator.database_name(), autocommit=True @@ -128,6 +134,68 @@ def test_generate_tpds(self): self.generating_connector.close() database_connect.close() + def test_generate_job(self): + # Loading the JOB tables takes some time, + # we skip these tests if the dataset is not already loaded. + if "indexselection_job___1" not in self.generating_connector.database_names(): + return + + # JOB supports only a scale factor of 1, i.e., no scaling + with self.assertRaises(AssertionError): + table_generator = TableGenerator("job", 0.001, self.generating_connector) + + table_generator = TableGenerator("job", 1, self.generating_connector) + + # Check that correct number of columns were extracted + self.assertEqual(108, len(table_generator.columns)) + + # Check that item table exists in TableGenerator + title_table = None + for table in table_generator.tables: + if table.name == "title": + title_table = table + break + self.assertIsNotNone(title_table) + + # Check that i_item_sk column exists in TableGenerator and Table object + imdb_index = Column("imdb_index") + imdb_index.table = title_table + self.assertIn(imdb_index, table_generator.columns) + self.assertIn(imdb_index, title_table.columns) + + database_connect = PostgresDatabaseConnector( + table_generator.database_name(), autocommit=True + ) + + job_tables = [ + "aka_name", + "aka_title", + "cast_info", + "char_name", + "comp_cast_type", + "company_name", + "company_type", + "complete_cast", + "info_type", + "keyword", + "kind_type", + "link_type", + "movie_companies", + "movie_info", + "movie_info_idx", + "movie_keyword", + "movie_link", + "name", + "person_info", + "role_type", + "title", + ] + for job_table in job_tables: + self.assertTrue(database_connect.table_exists(job_table)) + + self.generating_connector.close() + database_connect.close() + def test_not_implemented(self): with self.assertRaises(NotImplementedError): TableGenerator("not_tpch", 0.001, self.generating_connector)