diff --git a/.gitignore b/.gitignore index de02ef0..c983345 100644 --- a/.gitignore +++ b/.gitignore @@ -1,3 +1,7 @@ .coverage coverage.xml -junit \ No newline at end of file +junit +build +dist +.idea +*.egg-info \ No newline at end of file diff --git a/docs/development/testing.md b/docs/development/testing.md index 4486bc3..ad29fff 100644 --- a/docs/development/testing.md +++ b/docs/development/testing.md @@ -14,7 +14,6 @@ docker build -t r2gg-debian -f docker/debian/Dockerfile . ## Initialisation de la base de données de tests ```sh -``` docker-compose -f tests/dev/docker-compose.dev.yml up -d ``` diff --git a/r2gg/_configure.py b/r2gg/_configure.py index 6000c33..11141dc 100644 --- a/r2gg/_configure.py +++ b/r2gg/_configure.py @@ -8,13 +8,14 @@ # Définition des niveaux de log LEVELS = { - 'CRITICAL' : logging.CRITICAL, - 'ERROR' : logging.ERROR, - 'WARNING' : logging.WARNING, - 'INFO' : logging.INFO, - 'DEBUG' : logging.DEBUG + 'CRITICAL': logging.CRITICAL, + 'ERROR': logging.ERROR, + 'WARNING': logging.WARNING, + 'INFO': logging.INFO, + 'DEBUG': logging.DEBUG } + def configure(): """ Fonction de lecture du fichier de configuration passé en argument @@ -27,8 +28,6 @@ def configure(): dictionnaire correspondant à la resource décrite dans le fichier passé en argument db_configs: dict dictionnaire correspondant aux configurations des bdd - connection: psycopg2.connection - connection à la bdd de travail logger: logging.Logger """ parser = argparse.ArgumentParser() @@ -40,7 +39,7 @@ def configure(): config = config_from_path(config_path)['generation'] # Récupération de la configuration du log - logs_config = config_from_path( config['general']['logs']['configFile'] ) + logs_config = config_from_path(config['general']['logs']['configFile']) # Gestion du fichiers de logs non spécifié try: @@ -51,7 +50,7 @@ def configure(): # Configuration du module logging logging.basicConfig( format='%(asctime)s %(message)s', - level=LEVELS[ logs_config['level'].upper() ], + level=LEVELS[logs_config['level'].upper()], handlers=[ logging.FileHandler(logs_file), logging.StreamHandler() @@ -66,10 +65,10 @@ def configure(): # Configuration des bases de données précisées dans la config for base in config['bases']: if base['type'] == 'bdd': - db_configs[ base['id'] ] = config_from_path(base['configFile']) - db_configs[base['id']].update({"schema":base['schema']}) + db_configs[base['id']] = config_from_path(base['configFile']) + db_configs[base['id']].update({"schema": base['schema']}) - # Récupération de l'objet permettant de générer la ressource + #  Récupération de l'objet permettant de générer la ressource resource = config['resource'] # Création de l'espace de travail @@ -77,52 +76,3 @@ def configure(): os.makedirs(config['workingSpace']['directory']) return config, resource, db_configs, logger - -def connect_working_db(config, db_configs, logger): - """ - Fonction de connexion à la BDD de travail - - Parameters - ---------- - config: dict - dictionnaire correspondant à la configuration décrite dans le fichier passé en argument - db_configs: dict - dictionnaire correspondant aux configurations des bdd - logger: logging.Logger - Returns - ------- - connection: psycopg2.connection - connection à la bdd de travail - - """ - - # Configuration de la bdd de travail - work_db_config = db_configs[ config['workingSpace']['baseId'] ] - - # Récupération des paramètres de la bdd - host = work_db_config.get('host') - dbname = work_db_config.get('database') - user = work_db_config.get('user') - password = work_db_config.get('password') - port = work_db_config.get('port') - connect_args = 'host=%s dbname=%s user=%s password=%s port=%s' %(host, dbname, user, password, port) - - logger.info("Connecting to work database") - connection = psycopg2.connect(connect_args) - connection.set_client_encoding('UTF8') - - return connection - -def disconnect_working_db(connection, logger): - """ - Fonction de connexion à la BDD de travail - - Parameters - ---------- - connection: psycopg2.connection - connection à la bdd de travail - logger: logging.Logger - """ - - connection.close() - logger.info("Connection to work database closed") \ No newline at end of file diff --git a/r2gg/_database.py b/r2gg/_database.py new file mode 100644 index 0000000..ea100f3 --- /dev/null +++ b/r2gg/_database.py @@ -0,0 +1,161 @@ +import time +from os import getenv + +import psycopg2 +from psycopg2 import OperationalError, DatabaseError, InterfaceError +from psycopg2.extras import DictCursor +import logging + +TIMEOUT = int(getenv("SQL_STATEMENT_TIMEOUT", 0)) +RETRY = int(getenv("SQL_STATEMENT_RETRY_ATTEMPTS", 3)) +DELAY = int(getenv("SQL_DELAY_BETWEEN_STATEMENTS", 30)) + + +def database_retry_decorator(func): + def wrapper(self, *args, **kwargs): + attempt = 1 + while attempt <= RETRY: + try: + self.ensure_connection() + yield from func(self, *args, **kwargs) + return + + except (OperationalError, DatabaseError, InterfaceError) as e: + if attempt >= RETRY: + self.logger.error(f"Query failed after {RETRY} attempts: {str(e).rstrip()}") + return + + self.logger.error( + f"Attempt {attempt}/{RETRY} failed ({str(e).rstrip()}), retrying in {DELAY} seconds" + ) + time.sleep(DELAY) + attempt += 1 + try: + if self._connection: + self._connection.rollback() + except Exception as e: + self.logger.error(f"Connection rollback failed {str(e).rstrip()}") + return + + return wrapper + + +class DatabaseManager: + def __init__(self, db_configs, logger): + self.logger = logger + self._work_db_config = db_configs + self._connection = self.connect_working_db() + + def connect_working_db(self): + """ + Fonction de connexion à la BDD de travail + + Parameters + ---------- + config: dict + dictionnaire correspondant à la configuration décrite dans le fichier passé en argument + db_configs: dict + dictionnaire correspondant aux configurations des bdd + Returns + ------- + connection: psycopg2.connection + connection à la bdd de travail + + """ + # Récupération des paramètres de la bdd + host = self._work_db_config.get("host") + dbname = self._work_db_config.get("database") + user = self._work_db_config.get("user") + password = self._work_db_config.get("password") + port = self._work_db_config.get("port") + connect_args = "host=%s dbname=%s user=%s password=%s port=%s" % (host, dbname, user, password, port) + + self.logger.info("Connecting to work database") + connection = psycopg2.connect(connect_args) + connection.set_client_encoding("UTF8") + + return connection + + def disconnect_working_db(self): + """ + Fonction de connexion à la BDD de travail + + Parameters + ---------- + connection: psycopg2.connection + connection à la bdd de travail + logger: logging.Logger + """ + if self._connection: + self._connection.close() + self.logger.info("Connection to work database closed") + + def ensure_connection(self): + """ + Ensure the connection is alive; reconnect if needed. + """ + try: + if self._connection is None or getattr(self._connection, "closed", 1) != 0: + self.logger.info("Connection is closed or missing; reconnecting") + self._connection = self.connect_working_db() + else: + with self._connection.cursor() as cur: + cur.execute("SELECT 1") + except Exception as e: + self.logger.error( + f"Something is wrong with the connection: {str(e).rstrip()}; reconnecting in {DELAY} seconds") + self.disconnect_working_db() + time.sleep(DELAY) + self._connection = self.connect_working_db() + + def execute_select_query(self, cursor, query, show_duration): + if TIMEOUT: + cursor.execute("SET statement_timeout = %s", (1000 * TIMEOUT,)) # timeout in milliseconds + + if show_duration: + self.logger.info("SQL: {}".format(query)) + st_execute = time.time() + cursor.execute(query) + et_execute = time.time() + self.logger.info("Execution ended. Elapsed time : %s seconds." % (et_execute - st_execute)) + else: + cursor.execute(query) + + @database_retry_decorator + def execute_select_fetch_multiple(self, query, batchsize=1, show_duration=False): + with self._connection.cursor(cursor_factory=DictCursor) as cursor: + self.execute_select_query(cursor, query, show_duration) + rows = cursor.fetchmany(batchsize) + count = cursor.rowcount + while rows: + if batchsize == 1: + rows = rows.pop() + yield rows, count + rows = cursor.fetchmany(batchsize) + self._connection.commit() + return + + # the method below should be used as a generator function otherwise use execute_update + @database_retry_decorator + def execute_update_query(self, query, params=None, isolation_level=None): + self.logger.info("SQL: {}".format(query)) + st_execute = time.time() + with self._connection.cursor(cursor_factory=DictCursor) as cursor: + old_isolation_level = self._connection.isolation_level + if isolation_level is not None: + self._connection.set_isolation_level(isolation_level) + cursor.execute(query, params) + self._connection.commit() + et_execute = time.time() + self.logger.info("Execution ended. Elapsed time : %s seconds." % (et_execute - st_execute)) + self._connection.set_isolation_level(old_isolation_level) + yield # the decorator database_retry_decorator only supports generators + return + + def execute_update(self, query, params=None, isolation_level=None): + next(self.execute_update_query(query, params=params, isolation_level=isolation_level), None) + + def execute_select_fetch_one(self, query, show_duration=False): + gen = self.execute_select_fetch_multiple(query, 1, show_duration) + row, count = next(gen, (None, None)) + return row, count diff --git a/r2gg/_main.py b/r2gg/_main.py index d242d41..131d21d 100644 --- a/r2gg/_main.py +++ b/r2gg/_main.py @@ -1,26 +1,25 @@ import json import multiprocessing import os -import json import time from datetime import datetime -import psycopg2 # https://github.com/andialbrecht/sqlparse import sqlparse +from r2gg._database import DatabaseManager +from r2gg._file_copier import copy_file_locally from r2gg._lua_builder import build_lua +from r2gg._osm_to_pbf import osm_to_pbf +from r2gg._path_converter import convert_path from r2gg._pivot_to_osm import pivot_to_osm from r2gg._pivot_to_pgr import pivot_to_pgr from r2gg._read_config import config_from_path from r2gg._subprocess_execution import subprocess_execution -from r2gg._path_converter import convert_path -from r2gg._file_copier import copy_file_locally from r2gg._valhalla_lua_builder import build_valhalla_lua -from r2gg._osm_to_pbf import osm_to_pbf -def sql_convert(config, resource, db_configs, connection, logger): +def sql_convert(config, resource, db_configs, database: DatabaseManager, logger): """ Fonction de conversion depuis la bdd source vers la bdd pivot @@ -32,8 +31,8 @@ def sql_convert(config, resource, db_configs, connection, logger): dictionnaire correspondant à la resource décrite dans le fichier passé en argument db_configs: dict dictionnaire correspondant aux configurations des bdd - connection: psycopg2.connection - connection à la bdd de travail + database: r2gg.DatabaseManager + gestionnaire de connexion et d'exécution de la base de la bdd logger: logging.Logger """ @@ -57,7 +56,7 @@ def sql_convert(config, resource, db_configs, connection, logger): used_bases = [] # Il y a potentiellement une conversion par source indiquée dans la ressource - for source in resource[ 'sources' ]: + for source in resource['sources']: logger.info("Create pivot of source: " + source['id']) @@ -77,12 +76,12 @@ def sql_convert(config, resource, db_configs, connection, logger): else: logger.info("Mapping not done") - # Configuration de la bdd source - source_db_config = db_configs[ source['mapping']['source']['baseId'] ] + #  Configuration de la bdd source + source_db_config = db_configs[source['mapping']['source']['baseId']] used_bases.append(source['mapping']['source']['baseId']) # Configuration de la bdd de travail utilisée pour ce pivot - work_db_config = db_configs[ config['workingSpace']['baseId'] ] + work_db_config = db_configs[config['workingSpace']['baseId']] # Récupération de la bbox bbox = [float(coord) for coord in source["bbox"].split(",")] @@ -94,9 +93,7 @@ def sql_convert(config, resource, db_configs, connection, logger): logger.info("Create source on bbox: " + source["bbox"]) # Lancement du script SQL de conversion source --> pivot - connection.autocommit = True - with open( source['mapping']['conversion']['file'] ) as sql_script: - cur = connection.cursor() + with open(source['mapping']['conversion']['file']) as sql_script: logger.info("Executing SQL conversion script") instructions = sqlparse.split(sql_script.read().format(user=work_db_config.get('user'), input_schema=source_db_config.get('schema'), @@ -107,37 +104,39 @@ def sql_convert(config, resource, db_configs, connection, logger): for instruction in instructions: if instruction == '': continue - logger.debug("SQL:\n{}\n".format(instruction) ) + logger.debug("SQL:\n{}\n".format(instruction)) st_instruction = time.time() - cur.execute(instruction, - { - 'bdpwd': source_db_config.get('password'), 'bdport': source_db_config.get('port'), - 'bdhost': source_db_config.get('host'), 'bduser': source_db_config.get('user'), - 'dbname': source_db_config.get('database'), - 'xmin': xmin, 'ymin': ymin, 'xmax': xmax, 'ymax': ymax - } - ) + database.execute_update(instruction, + { + 'bdpwd': source_db_config.get('password'), + 'bdport': source_db_config.get('port'), + 'bdhost': source_db_config.get('host'), + 'bduser': source_db_config.get('user'), + 'dbname': source_db_config.get('database'), + 'xmin': xmin, 'ymin': ymin, 'xmax': xmax, 'ymax': ymax + } + ) et_instruction = time.time() - logger.info("Execution ended. Elapsed time : %s seconds." %(et_instruction - st_instruction)) + logger.info("Execution ended. Elapsed time : %s seconds." % (et_instruction - st_instruction)) et_sql_conversion = time.time() - logger.info("Conversion from BDD to pivot ended. Elapsed time : %s seconds." %(et_sql_conversion - st_sql_conversion)) + logger.info( + "Conversion from BDD to pivot ended. Elapsed time : %s seconds." % (et_sql_conversion - st_sql_conversion)) + -def pgr_convert(config, resource, db_configs, connection, logger): +def pgr_convert(resource, db_configs, database: DatabaseManager, logger): """ Fonction de conversion depuis la bdd pivot vers la bdd pgrouting Parameters ---------- - config: dict - dictionnaire correspondant à la configuration décrite dans le fichier passé en argument resource: dict dictionnaire correspondant à la resource décrite dans le fichier passé en argument db_configs: dict dictionnaire correspondant aux configurations des bdd - connection: psycopg2.connection - connection à la bdd de travail + database: r2gg.DatabaseManager + gestionnaire de connexion et d'exécution de la base de la bdd logger: logging.Logger """ @@ -150,19 +149,13 @@ def pgr_convert(config, resource, db_configs, connection, logger): i = 0 for source in resource["sources"]: - logger.info("Source {} of {}...".format(i+1, len(resource["sources"]))) + logger.info("Source {} of {}...".format(i + 1, len(resource["sources"]))) logger.info("Source id : " + source["id"]) # Configuration et connection à la base de sortie - out_db_config = db_configs[ source['storage']['base']['baseId'] ] - host = out_db_config.get('host') - dbname = out_db_config.get('database') - user = out_db_config.get('user') - password = out_db_config.get('password') - port = out_db_config.get('port') - connect_args = 'host=%s dbname=%s user=%s password=%s port=%s' %(host, dbname, user, password, port) + out_db_config = db_configs[source['storage']['base']['baseId']] logger.info("Connecting to output database") - connection_out = psycopg2.connect(connect_args) + database_out = DatabaseManager(out_db_config, logger) schema_out = out_db_config.get('schema') @@ -172,14 +165,14 @@ def pgr_convert(config, resource, db_configs, connection, logger): cost_calculation_files_paths = {cost["compute"]["configuration"]["storage"]["file"] for cost in source["costs"]} for cost_calculation_file_path in cost_calculation_files_paths: - pivot_to_pgr(source, cost_calculation_file_path, connection, connection_out, schema_out, input_schema, logger) - connection_out.close() + pivot_to_pgr(source, cost_calculation_file_path, database, database_out, schema_out, input_schema, logger) + database_out.disconnect_working_db() et_pivot_to_pgr = time.time() - logger.info("Conversion from pivot to PGR ended. Elapsed time : %s seconds." %(et_pivot_to_pgr - st_pivot_to_pgr)) + logger.info("Conversion from pivot to PGR ended. Elapsed time : %s seconds." % (et_pivot_to_pgr - st_pivot_to_pgr)) -def osm_convert(config, resource, db_configs, connection, logger): +def osm_convert(config, resource, db_configs, database: DatabaseManager, logger): """ Fonction de conversion depuis la bdd pivot vers un fichier osm @@ -191,8 +184,8 @@ def osm_convert(config, resource, db_configs, connection, logger): dictionnaire correspondant à la resource décrite dans le fichier passé en argument db_configs: dict dictionnaire correspondant aux configurations des bdd - connection: psycopg2.connection - connection à la bdd de travail + database: r2gg.DatabaseManager + gestionnaire de connexion et d'exécution de la base de la bdd logger: logging.Logger """ @@ -224,7 +217,7 @@ def osm_convert(config, resource, db_configs, connection, logger): # Plusieurs sources peuvent référencer le même mapping mais changer plus tard dans la génération found_base = False found_id = '' - for sid,sub in used_bases.items(): + for sid, sub in used_bases.items(): if sub == source['mapping']['source']['baseId']: found_base = True found_id = sid @@ -251,11 +244,12 @@ def osm_convert(config, resource, db_configs, connection, logger): else: logger.info("Mapping not already done") - pivot_to_osm(config, source, db_configs, connection, logger, convert_osm_to_pbf) + pivot_to_osm(config, source, db_configs, database, logger, convert_osm_to_pbf) + + used_bases[source['id']] = source['mapping']['source']['baseId'] - used_bases[ source['id'] ] = source['mapping']['source']['baseId'] -def osrm_convert(config, resource, logger, build_lua_from_cost_config = True): +def osrm_convert(config, resource, logger, build_lua_from_cost_config=True): """ Fonction de conversion depuis le fichier osm vers les fichiers osrm @@ -282,7 +276,7 @@ def osrm_convert(config, resource, logger, build_lua_from_cost_config = True): i = 0 for source in resource["sources"]: - logger.info("Source {} of {}...".format(i+1, len(resource["sources"]))) + logger.info("Source {} of {}...".format(i + 1, len(resource["sources"]))) logger.info('LUA part') lua_file = source["cost"]["compute"]["storage"]["file"] @@ -293,7 +287,7 @@ def osrm_convert(config, resource, logger, build_lua_from_cost_config = True): costs_config = config_from_path(config_file) cost_name = source["cost"]["compute"]["configuration"]["name"] - if cost_name not in [ output["name"] for output in costs_config["outputs"] ]: + if cost_name not in [output["name"] for output in costs_config["outputs"]]: raise ValueError("cost_name must be in cost configuration") with open(lua_file, "w") as lua_f: @@ -332,15 +326,15 @@ def osrm_convert(config, resource, logger, build_lua_from_cost_config = True): start_command = time.time() subprocess_execution(osrm_extract_args, logger) end_command = time.time() - logger.info("OSRM extract ended. Elapsed time : %s seconds." %(end_command - start_command)) + logger.info("OSRM extract ended. Elapsed time : %s seconds." % (end_command - start_command)) subprocess_execution(osrm_contract_args, logger) final_command = time.time() - logger.info("OSRM contract ended. Elapsed time : %s seconds." %(final_command - end_command)) + logger.info("OSRM contract ended. Elapsed time : %s seconds." % (final_command - end_command)) subprocess_execution(rm_args, logger) i += 1 -def valhalla_convert(config, resource, logger, build_lua_from_cost_config = True): +def valhalla_convert(config, resource, logger, build_lua_from_cost_config=True): """ Fonction de conversion depuis le fichier .osm.pbf vers les fichiers valhalla @@ -367,7 +361,7 @@ def valhalla_convert(config, resource, logger, build_lua_from_cost_config = True i = 0 for source in resource["sources"]: - logger.info("Source {} of {}...".format(i+1, len(resource["sources"]))) + logger.info("Source {} of {}...".format(i + 1, len(resource["sources"]))) logger.info('Looking for OSM PBF file') @@ -406,15 +400,15 @@ def valhalla_convert(config, resource, logger, build_lua_from_cost_config = True start_command = time.time() valhalla_build_config_args = ["valhalla_build_config", - "--mjolnir-tile-dir", source["storage"]["dir"], - "--mjolnir-tile-extract", source["storage"]["tar"], - # Modification des limites par défaut du service : 10h pour isochrone et 1000km pour iso distance - # contre 2h et 200km par défaut - "--service-limits-isochrone-max-time-contour", "600", - "--service-limits-isochrone-max-distance-contour", "1000", - # Ajout de l'autorisation à exclure les ponts/tunnels/péages - "--service-limits-allow-hard-exclusions", "True"] - subprocess_execution(valhalla_build_config_args, logger, outfile = source["storage"]["config"]) + "--mjolnir-tile-dir", source["storage"]["dir"], + "--mjolnir-tile-extract", source["storage"]["tar"], + # Modification des limites par défaut du service : 10h pour isochrone et 1000km pour iso distance + # contre 2h et 200km par défaut + "--service-limits-isochrone-max-time-contour", "600", + "--service-limits-isochrone-max-distance-contour", "1000", + # Ajout de l'autorisation à exclure les ponts/tunnels/péages + "--service-limits-allow-hard-exclusions", "True"] + subprocess_execution(valhalla_build_config_args, logger, outfile=source["storage"]["config"]) # Nécessaire le temps que le fichier s'écrive... time.sleep(1) # Ajout du graph custom dans la config valhalla (impossible via les paramètres du build_config) @@ -432,10 +426,10 @@ def valhalla_convert(config, resource, logger, build_lua_from_cost_config = True subprocess_execution(valhalla_build_extract_args, logger) final_command = time.time() - logger.info("Valhalla tiles built. Elapsed time : %s seconds." %(final_command - start_command)) + logger.info("Valhalla tiles built. Elapsed time : %s seconds." % (final_command - start_command)) -def write_road2_config(config, resource, logger, convert_file_paths = True): +def write_road2_config(config, resource, logger, convert_file_paths=True): """ Fonction pour l'écriture du fichier de ressource @@ -456,7 +450,8 @@ def write_road2_config(config, resource, logger, convert_file_paths = True): for source in resource["sources"]: - source_file = os.path.join(config["outputs"]["configurations"]["sources"]["storage"]["directory"], source['id'] + ".source") + source_file = os.path.join(config["outputs"]["configurations"]["sources"]["storage"]["directory"], + source['id'] + ".source") logger.info("Writing source file : " + source_file) # On modifie la source en fonction de son type @@ -474,10 +469,11 @@ def write_road2_config(config, resource, logger, convert_file_paths = True): bid_tmp = source["storage"]["base"]["baseId"] for base in config["bases"]: if base["id"] == bid_tmp: - db_file_out = convert_path(base["configFile"], config["outputs"]["configurations"]["databases"]["storage"]["directory"]) + db_file_out = convert_path(base["configFile"], + config["outputs"]["configurations"]["databases"]["storage"]["directory"]) copy_file_locally(base["configFile"], db_file_out) - source["storage"]["base"].update({"dbConfig":db_file_out}) - source["storage"]["base"].update({"schema":base["schema"]}) + source["storage"]["base"].update({"dbConfig": db_file_out}) + source["storage"]["base"].update({"schema": base["schema"]}) source["storage"]["base"].pop("baseId", None) for cost in source["costs"]: cost.pop("compute", None) @@ -492,7 +488,8 @@ def write_road2_config(config, resource, logger, convert_file_paths = True): source_ids.append(source['id']) # On passe à la ressource - resource_file = os.path.join(config["outputs"]["configurations"]["resource"]["storage"]["directory"], resource['id'] + ".resource") + resource_file = os.path.join(config["outputs"]["configurations"]["resource"]["storage"]["directory"], + resource['id'] + ".resource") logger.info("Writing resource file: " + resource_file) # Récupération de la date d'extraction @@ -500,7 +497,7 @@ def write_road2_config(config, resource, logger, convert_file_paths = True): date_file = os.path.join(work_dir_config, "r2gg.date") f = open(date_file, "r") extraction_date = f.read() - logger.info("extraction date to add in resource (from "+ date_file +"): " + extraction_date) + logger.info("extraction date to add in resource (from " + date_file + "): " + extraction_date) f.close() # On fait le dossier s'il n'existe pas diff --git a/r2gg/_pivot_to_osm.py b/r2gg/_pivot_to_osm.py index 9e1b6c4..43bf8f8 100644 --- a/r2gg/_pivot_to_osm.py +++ b/r2gg/_pivot_to_osm.py @@ -1,17 +1,17 @@ -from datetime import date -from math import ceil import os import time +from datetime import date +from math import ceil from lxml import etree -from psycopg2.extras import DictCursor from r2gg._osm_building import writeNode, writeWay, writeWayNds, writeRes, writeWayTags -from r2gg._sql_building import getQueryByTableAndBoundingBox from r2gg._osm_to_pbf import osm_to_pbf +from r2gg._sql_building import getQueryByTableAndBoundingBox +from r2gg._database import DatabaseManager -def pivot_to_osm(config, source, db_configs, connection, logger, output_is_pbf = False): +def pivot_to_osm(config, source, db_configs, database: DatabaseManager, logger, output_is_pbf=False): """ Fonction de conversion depuis la bdd pivot vers le fichier osm puis pbf le cas échéant @@ -22,11 +22,10 @@ def pivot_to_osm(config, source, db_configs, connection, logger, output_is_pbf = source: dict db_configs: dict dictionnaire correspondant aux configurations des bdd - connection: psycopg2.connection - connection à la bdd de travail + database: r2gg.DatabaseManager + gestionnaire de connexion et d'exécution de la base de la bdd logger: logging.Logger """ - logger.info("Convert pivot to OSM format for a source") # Récupération de la date d'extraction @@ -44,16 +43,14 @@ def pivot_to_osm(config, source, db_configs, connection, logger, output_is_pbf = source_db_config = db_configs[source['mapping']['source']['baseId']] input_schema = source_db_config.get('schema') - cursor = connection.cursor(cursor_factory=DictCursor) - - logger.info(f"SQL: select last_value from {input_schema}.nodes_id_seq") - cursor.execute(f"select last_value from {input_schema}.nodes_id_seq") - vertexSequence = cursor.fetchone()[0] + last_value_nodes_query = f"select last_value from {input_schema}.nodes_id_seq" + vertexSequence, _ = database.execute_select_fetch_one(last_value_nodes_query, show_duration=True) + vertexSequence = vertexSequence[0] logger.info(vertexSequence) - logger.info(f"SQL: select last_value from {input_schema}.edges_id_seq") - cursor.execute(f"select last_value from {input_schema}.edges_id_seq") - edgeSequence = cursor.fetchone()[0] + last_value_edges_query = f"select last_value from {input_schema}.edges_id_seq" + edgeSequence, _ = database.execute_select_fetch_one(last_value_edges_query, show_duration=True) + edgeSequence = edgeSequence[0] logger.info(edgeSequence) logger.info("Starting conversion from pivot to OSM") @@ -70,13 +67,8 @@ def pivot_to_osm(config, source, db_configs, connection, logger, output_is_pbf = with xf.element("osm", attribs): # Récupération du nombre de nodes - sql_query = f"SELECT COUNT(*) as cnt FROM {input_schema}.nodes" - logger.info("SQL: {}".format(sql_query)) - st_execute = time.time() - cursor.execute(sql_query) - et_execute = time.time() - logger.info("Execution ended. Elapsed time : %s seconds." %(et_execute - st_execute)) - row = cursor.fetchone() + number_of_nodes_query = f"SELECT COUNT(*) as cnt FROM {input_schema}.nodes" + row, _ = database.execute_select_fetch_one(number_of_nodes_query, show_duration=True) nodesize = row["cnt"] # Ecriture des nodes @@ -85,34 +77,21 @@ def pivot_to_osm(config, source, db_configs, connection, logger, output_is_pbf = logger.info(f"Writing nodes: {nodesize} ways to write") st_nodes = time.time() while offset < nodesize: - sql_query = getQueryByTableAndBoundingBox(f'{input_schema}.nodes', source['bbox']) - sql_query += " LIMIT {} OFFSET {}".format(batchsize, offset) - logger.info("SQL: {}".format(sql_query)) - st_execute = time.time() - cursor.execute(sql_query) - et_execute = time.time() - offset += batchsize - logger.info("Execution ended. Elapsed time : %s seconds." %(et_execute - st_execute)) - row = cursor.fetchone() - logger.info("Writing nodes") - st_execute = time.time() - i = 1 - while row: - nodeEl = writeNode(row, extraction_date) - xf.write(nodeEl, pretty_print=True) - row = cursor.fetchone() - logger.info("%s / %s nodes ajoutés" %(offset, nodesize)) + sql_query_nodes = getQueryByTableAndBoundingBox(f'{input_schema}.nodes', source['bbox']) + sql_query_nodes += " LIMIT {} OFFSET {}".format(batchsize, offset) + offset += batchsize + logger.info("Writing nodes") + for row, count in database.execute_select_fetch_multiple(sql_query_nodes, show_duration=True): + nodeEl = writeNode(row, extraction_date) + xf.write(nodeEl, pretty_print=True) + + logger.info("%s / %s nodes ajoutés" % (offset, nodesize)) et_nodes = time.time() - logger.info("Writing nodes ended. Elapsed time : %s seconds." %(et_nodes - st_nodes)) + logger.info("Writing nodes ended. Elapsed time : %s seconds." % (et_nodes - st_nodes)) # Récupération du nombre de ways - sql_query = f"SELECT COUNT(*) as cnt FROM {input_schema}.edges" - logger.info("SQL: {}".format(sql_query)) - st_execute = time.time() - cursor.execute(sql_query) - et_execute = time.time() - logger.info("Execution ended. Elapsed time : %s seconds." %(et_execute - st_execute)) - row = cursor.fetchone() + sql_query_edges_count = f"SELECT COUNT(*) as cnt FROM {input_schema}.edges" + row, _ = database.execute_select_fetch_one(sql_query_edges_count, show_duration=True) edgesize = row["cnt"] # Ecriture des ways @@ -121,64 +100,49 @@ def pivot_to_osm(config, source, db_configs, connection, logger, output_is_pbf = logger.info(f"Writing ways: {edgesize} ways to write") st_edges = time.time() while offset < edgesize: - sql_query2 = getQueryByTableAndBoundingBox(f'{input_schema}.edges', source['bbox'], ['*', f'{input_schema}.inter_nodes(geom) as internodes']) - sql_query2 += " LIMIT {} OFFSET {}".format(batchsize, offset) - logger.info("SQL: {}".format(sql_query2)) - st_execute = time.time() - cursor.execute(sql_query2) - et_execute = time.time() - offset += batchsize - logger.info("Execution ended. Elapsed time : %s seconds." %(et_execute - st_execute)) - row = cursor.fetchone() - st_execute = time.time() - i = 1 - while row: - wayEl = writeWay(row, extraction_date) - for node in row['internodes']: - vertexSequence = vertexSequence + 1 - node['id'] = vertexSequence - nodeEl = writeNode(node, extraction_date) - xf.write(nodeEl, pretty_print=True) - wayEl = writeWayNds(wayEl, row, row['internodes']) - wayEl = writeWayTags(wayEl, row) - xf.write(wayEl, pretty_print=True) - row = cursor.fetchone() - logger.info("%s / %s ways ajoutés" %(offset, edgesize)) + sql_query_edges = getQueryByTableAndBoundingBox(f'{input_schema}.edges', source['bbox'], ['*', + f'{input_schema}.inter_nodes(geom) as internodes']) + sql_query_edges += " LIMIT {} OFFSET {}".format(batchsize, offset) + offset += batchsize + for row, count in database.execute_select_fetch_multiple(sql_query_edges, show_duration=True): + wayEl = writeWay(row, extraction_date) + for node in row['internodes']: + vertexSequence = vertexSequence + 1 + node['id'] = vertexSequence + nodeEl = writeNode(node, extraction_date) + xf.write(nodeEl, pretty_print=True) + wayEl = writeWayNds(wayEl, row, row['internodes']) + wayEl = writeWayTags(wayEl, row) + xf.write(wayEl, pretty_print=True) + + logger.info("%s / %s ways ajoutés" % (offset, edgesize)) et_edges = time.time() - logger.info("Writing ways ended. Elapsed time : %s seconds." %(et_edges - st_edges)) + logger.info("Writing ways ended. Elapsed time : %s seconds." % (et_edges - st_edges)) # Ecriture des restrictions - sql_query3 = f"select * from {input_schema}.non_comm" - logger.info("SQL: {}".format(sql_query3)) - st_execute = time.time() - cursor.execute(sql_query3) - et_execute = time.time() - logger.info("Execution ended. Elapsed time : %s seconds." %(et_execute - st_execute)) - row = cursor.fetchone() + sql_query_non_comm = f"select * from {input_schema}.non_comm" logger.info("Writing restrictions") st_execute = time.time() i = 1 - while row: + for row, count in database.execute_select_fetch_multiple(sql_query_non_comm, show_duration=True): if row['common_vertex_id'] == -1: - row = cursor.fetchone() i += 1 continue ResEl = writeRes(row, i, extraction_date) xf.write(ResEl, pretty_print=True) - row = cursor.fetchone() - if (i % ceil(cursor.rowcount/10) == 0): - logger.info("%s / %s restrictions ajoutés" %(i, cursor.rowcount)) + if (i % ceil(count / 10) == 0): + logger.info("%s / %s restrictions ajoutés" % (i, count)) i += 1 + et_execute = time.time() - logger.info("Writing restrictions ended. Elapsed time : %s seconds." %(et_execute - st_execute)) + logger.info("Writing restrictions ended. Elapsed time : %s seconds." % (et_execute - st_execute)) except etree.SerialisationError: - logger.warn("WARNING: XML file not closed properly (lxml.etree.SerialisationError)") + logger.warning("WARNING: XML file not closed properly (lxml.etree.SerialisationError)") - cursor.close() end_time = time.time() - logger.info("Conversion from pivot to OSM ended. Elapsed time : %s seconds." %(end_time - start_time)) + logger.info("Conversion from pivot to OSM ended. Elapsed time : %s seconds." % (end_time - start_time)) # osm2pbf : Gestion du format osm.pbf if output_is_pbf: - osm_to_pbf(filename, filename+'.pbf', logger) + osm_to_pbf(filename, filename + '.pbf', logger) diff --git a/r2gg/_pivot_to_pgr.py b/r2gg/_pivot_to_pgr.py index 34d4aa2..70ff18a 100644 --- a/r2gg/_pivot_to_pgr.py +++ b/r2gg/_pivot_to_pgr.py @@ -7,8 +7,9 @@ from r2gg._output_costs_from_costs_config import output_costs_from_costs_config from r2gg._read_config import config_from_path from r2gg._sql_building import getQueryByTableAndBoundingBox +from r2gg._database import DatabaseManager -def pivot_to_pgr(source, cost_calculation_file_path, connection_work, connection_out, schema, input_schema, logger): +def pivot_to_pgr(source, cost_calculation_file_path, database_work: DatabaseManager, database_out: DatabaseManager, schema, input_schema, logger): """ Fonction de conversion depuis la bdd pivot vers la base pgr @@ -17,9 +18,9 @@ def pivot_to_pgr(source, cost_calculation_file_path, connection_work, connection source: dict cost_calculation_file_path: str chemin vers le fichier json de configuration des coûts - connection_work: psycopg2.connection + database_work: DatabaseManager connection à la bdd de travail - connection_out: psycopg2.connection + database_out: DatabaseManager connection à la bdd pgrouting de sortie schema: str nom du schéma dans la base de sortie @@ -28,12 +29,10 @@ def pivot_to_pgr(source, cost_calculation_file_path, connection_work, connection logger: logging.Logger """ - cursor_in = connection_work.cursor(cursor_factory=DictCursor, name="cursor_in") ways_table_name = schema + '.ways' # Récupération des coûts à calculer costs = config_from_path(cost_calculation_file_path) - cursor_out = connection_out.cursor() # Création de la edge_table pgrouting create_table = """ DROP TABLE IF EXISTS {0}; @@ -85,8 +84,7 @@ def pivot_to_pgr(source, cost_calculation_file_path, connection_work, connection vehicule_leger_interdit boolean, cout_vehicule_prioritaire numeric );""".format(ways_table_name) - logger.debug("SQL: {}".format(create_table)) - cursor_out.execute(create_table) + database_out.execute_update(create_table) # Ajout des colonnes de coûts add_columns = "ALTER TABLE {} ".format(ways_table_name) @@ -95,7 +93,7 @@ def pivot_to_pgr(source, cost_calculation_file_path, connection_work, connection add_columns += "ADD COLUMN IF NOT EXISTS {} double precision,".format("reverse_" + output["name"]) add_columns = add_columns[:-1] logger.debug("SQL: adding costs columns \n {}".format(add_columns)) - cursor_out.execute(add_columns) + database_out.execute_update(add_columns) logger.info("Starting conversion") start_time = time.time() @@ -109,34 +107,30 @@ def pivot_to_pgr(source, cost_calculation_file_path, connection_work, connection id_from bigint, id_to bigint );""".format(schema) - logger.debug("SQL: {}".format(create_non_comm)) - cursor_out.execute(create_non_comm) + database_out.execute_update(create_non_comm) logger.info("Populating turn restrictions") tr_query = f"SELECT id_from, id_to FROM {input_schema}.non_comm;" - logger.debug("SQL: {}".format(tr_query)) - st_execute = time.time() - cursor_in.execute(tr_query) - et_execute = time.time() - logger.info("Execution ended. Elapsed time : %s seconds." %(et_execute - st_execute)) - # Insertion petit à petit -> plus performant - logger.info("SQL: Inserting or updating {} values in out db".format(cursor_in.rowcount)) - st_execute = time.time() index = 0 batchsize = 10000 - rows = cursor_in.fetchmany(batchsize) + generator = database_work.execute_select_fetch_multiple(tr_query, show_duration=True, batchsize=batchsize) + rows, count = next(generator,(None, None)) + # Insertion petit à petit -> plus performant + + logger.info("SQL: Inserting or updating {} values in out db".format(count)) + + st_execute = time.time() + while rows: values_str = "" - for row in rows: - values_str += "(%s, %s, %s)," - values_str = values_str[:-1] - # Tuple des valuers à insérer values_tuple = () for row in rows: + values_str += "(%s, %s, %s)," values_tuple += (index, row['id_from'], row['id_to']) index += 1 + values_str = values_str[:-1] set_on_conflict = ( "id_from = excluded.id_from,id_to = excluded.id_to" @@ -148,17 +142,15 @@ def pivot_to_pgr(source, cost_calculation_file_path, connection_work, connection ON CONFLICT (id) DO UPDATE SET {}; """.format(schema, values_str, set_on_conflict) - cursor_out.execute(sql_insert, values_tuple) - connection_out.commit() - rows = cursor_in.fetchmany(batchsize) + database_out.execute_update(sql_insert, values_tuple) + + rows, _ = next(generator,(None, None)) et_execute = time.time() - cursor_in.close() logger.info("Writing turn restrinctions Done. Elapsed time : %s seconds." %(et_execute - st_execute)) # Noeuds --------------------------------------------------------------------------------------- logger.info("Writing vertices...") - cursor_in = connection_work.cursor(cursor_factory=DictCursor, name="cursor_in") create_nodes = """ DROP TABLE IF EXISTS {0}_vertices_pgr; CREATE TABLE {0}_vertices_pgr( @@ -169,34 +161,26 @@ def pivot_to_pgr(source, cost_calculation_file_path, connection_work, connection eout int, the_geom geometry(Point,4326) );""".format(ways_table_name) - logger.debug("SQL: {}".format(create_nodes)) - cursor_out.execute(create_nodes) + database_out.execute_update(create_nodes) logger.info("Populating vertices") nd_query = f"SELECT id, geom FROM {input_schema}.nodes;" - - logger.debug("SQL: {}".format(nd_query)) - st_execute = time.time() - cursor_in.execute(nd_query) - et_execute = time.time() - logger.info("Execution ended. Elapsed time : %s seconds." %(et_execute - st_execute)) # Insertion petit à petit -> plus performant # logger.info("SQL: Inserting or updating {} values in out db".format(cursor_in.rowcount)) st_execute = time.time() index = 0 batchsize = 10000 - rows = cursor_in.fetchmany(batchsize) + generator = database_work.execute_select_fetch_multiple(nd_query, show_duration=True, batchsize=batchsize) + rows, count = next(generator, (None, None)) while rows: values_str = "" - for row in rows: - values_str += "(%s, %s)," - values_str = values_str[:-1] - # Tuple des valeurs à insérer values_tuple = () for row in rows: + values_str += "(%s, %s)," values_tuple += (row['id'], row['geom']) index += 1 + values_str = values_str[:-1] set_on_conflict = ( "the_geom = excluded.the_geom" @@ -208,18 +192,15 @@ def pivot_to_pgr(source, cost_calculation_file_path, connection_work, connection ON CONFLICT (id) DO UPDATE SET {}; """.format(ways_table_name, values_str, set_on_conflict) - cursor_out.execute(sql_insert, values_tuple) - connection_out.commit() - rows = cursor_in.fetchmany(batchsize) + database_out.execute_update(sql_insert, values_tuple) + rows, _ = next(generator,(None, None)) et_execute = time.time() - cursor_in.close() logger.info("Writing vertices Done. Elapsed time : %s seconds." %(et_execute - st_execute)) # Ways ----------------------------------------------------------------------------------------- # Colonnes à lire dans la base source (champs classiques + champs servant aux coûts) - cursor_in = connection_work.cursor(cursor_factory=DictCursor, name="cursor_in") attribute_columns = [ 'id', 'geom as the_geom', @@ -274,11 +255,8 @@ def pivot_to_pgr(source, cost_calculation_file_path, connection_work, connection # Ecriture des ways sql_query = getQueryByTableAndBoundingBox(f'{input_schema}.edges', source['bbox'], in_columns) - logger.info("SQL: {}".format(sql_query)) - st_execute = time.time() - cursor_in.execute(sql_query) - et_execute = time.time() - logger.info("Execution ended. Elapsed time : %s seconds." %(et_execute - st_execute)) + batchsize = 10000 + generator = database_work.execute_select_fetch_multiple(sql_query, show_duration=True, batchsize=batchsize) # Chaîne de n %s, pour l'insertion de données via psycopg single_value_str = "%s," * (len(attribute_columns) + 2 * len(costs["outputs"])) @@ -287,24 +265,21 @@ def pivot_to_pgr(source, cost_calculation_file_path, connection_work, connection # Insertion petit à petit -> plus performant # logger.info("SQL: Inserting or updating {} values in out db".format(cursor_in.rowcount)) st_execute = time.time() - batchsize = 10000 percent = 0 - rows = cursor_in.fetchmany(batchsize) + rows, count = next(generator, (None, None)) while rows: - percent += 1000000 / cursor_in.rowcount + percent += 1000000 / count # Chaîne permettant l'insertion de valeurs via psycopg values_str = "" - for row in rows: - values_str += "(" + single_value_str + ")," - values_str = values_str[:-1] - # Tuple des valuers à insérer values_tuple = () for row in rows: + values_str += "(" + single_value_str + ")," output_costs = output_costs_from_costs_config(costs, row) values_tuple += tuple( row[ output_columns_name ] for output_columns_name in output_columns_names ) + output_costs + values_str = values_str[:-1] output_columns = "(" for output_columns_name in output_columns_names: @@ -328,12 +303,10 @@ def pivot_to_pgr(source, cost_calculation_file_path, connection_work, connection ON CONFLICT (id) DO UPDATE SET {}; """.format(ways_table_name, output_columns, values_str, set_on_conflict) - cursor_out.execute(sql_insert, values_tuple) - connection_out.commit() - rows = cursor_in.fetchmany(batchsize) + database_out.execute_update(sql_insert, values_tuple) + rows, _ = next(generator,(None, None)) et_execute = time.time() - cursor_in.close(); logger.info("Writing ways ended. Elapsed time : %s seconds." %(et_execute - st_execute)) spacial_indices_query = """ @@ -343,60 +316,31 @@ def pivot_to_pgr(source, cost_calculation_file_path, connection_work, connection CLUSTER {0}_vertices_pgr USING ways_vertices_geom_gist ; CREATE INDEX IF NOT EXISTS ways_importance_idx ON {0} USING btree (importance); """.format(ways_table_name) - logger.info("SQL: {}".format(spacial_indices_query)) - st_execute = time.time() - cursor_out.execute(spacial_indices_query) - et_execute = time.time() - logger.info("Execution ended. Elapsed time : %s seconds." %(et_execute - st_execute)) - connection_out.commit() + database_out.execute_update(spacial_indices_query) turn_restrictions_indices_query = """ CREATE INDEX IF NOT EXISTS turn_restrictions_id_key ON {0}.turn_restrictions USING btree (id); CREATE INDEX IF NOT EXISTS ways_id_key ON {1} USING btree (id); CREATE INDEX IF NOT EXISTS ways_vertices_pgr_id_key ON {1}_vertices_pgr USING btree (id); """.format(schema, ways_table_name) - logger.info("SQL: {}".format(turn_restrictions_indices_query)) - st_execute = time.time() - cursor_out.execute(turn_restrictions_indices_query) - et_execute = time.time() - logger.info("Execution ended. Elapsed time : %s seconds." %(et_execute - st_execute)) - connection_out.commit() + database_out.execute_update(turn_restrictions_indices_query) - old_isolation_level = connection_out.isolation_level - connection_out.set_isolation_level(0) # VACCUM ANALYZE for ways vacuum_query = f"VACUUM ANALYZE {ways_table_name};" - logger.info("SQL: {}".format(vacuum_query)) - st_execute = time.time() - cursor_out.execute(vacuum_query) - et_execute = time.time() - logger.info("Execution ended. Elapsed time : %s seconds." %(et_execute - st_execute)) + database_out.execute_update(vacuum_query, isolation_level=0) # VACCUM ANALYZE for ways_vertices_pgr vacuum_query = f"VACUUM ANALYZE {ways_table_name}_vertices_pgr;" - logger.info("SQL: {}".format(vacuum_query)) - st_execute = time.time() - cursor_out.execute(vacuum_query) - et_execute = time.time() - logger.info("Execution ended. Elapsed time : %s seconds." %(et_execute - st_execute)) + database_out.execute_update(vacuum_query, isolation_level=0) # VACCUM ANALYZE for turn_restrictions vacuum_query = f"VACUUM ANALYZE {schema}.turn_restrictions;" - logger.info("SQL: {}".format(vacuum_query)) - st_execute = time.time() - cursor_out.execute(vacuum_query) - et_execute = time.time() - logger.info("Execution ended. Elapsed time : %s seconds." %(et_execute - st_execute)) - - connection_out.set_isolation_level(old_isolation_level) - connection_out.commit() + database_out.execute_update(vacuum_query, isolation_level=0) - cursor_out.close() # Nettoyage du graphe logger.info("Cleaning isolated clusters of less than 10 edges...") - cursor_isolated = connection_out.cursor() profile_names = set([ cost['profile'] for cost in source["costs"]]) st_execute = time.time() @@ -435,12 +379,10 @@ def pivot_to_pgr(source, cost_calculation_file_path, connection_work, connection WHERE {0}.ways.target = ANY(SELECT * from remove_nodes) OR {0}.ways.source = ANY(SELECT * from remove_nodes); """.format(schema, profile_name) logger.info("SQL: {}".format(clean_graph_query)) - cursor_isolated.execute(clean_graph_query) - connection_out.commit() + database_out.execute_update(clean_graph_query) et_execute = time.time() logger.info("Execution ended. Elapsed time : %s seconds." %(et_execute - st_execute)) - cursor_isolated.close() end_time = time.time() logger.info("Conversion from pivot to PGR ended. Elapsed time : %s seconds." %(end_time - start_time)) diff --git a/r2gg/cli.py b/r2gg/cli.py index f663ef3..1979cf3 100644 --- a/r2gg/cli.py +++ b/r2gg/cli.py @@ -3,38 +3,30 @@ """Main CLI entrypoint.""" # Package -from r2gg.__about__ import ( - __author__, - __cli_usage__, - __summary__, - __title__, - __title_clean__, - __uri_homepage__, - __version__, -) -from r2gg._configure import configure, connect_working_db, disconnect_working_db +from r2gg._configure import configure from r2gg._main import sql_convert, pgr_convert, osm_convert, osrm_convert, valhalla_convert, write_road2_config +from r2gg._database import DatabaseManager # ############################################################################ # ########## MAIN ################ # ################################ def sql2pivot(): config, resource, db_configs, logger = configure() - connection = connect_working_db(config, db_configs, logger) - sql_convert(config, resource, db_configs, connection, logger) - disconnect_working_db(connection, logger) + database = DatabaseManager(db_configs[config["workingSpace"]["baseId"]], logger) + sql_convert(config, resource, db_configs, database, logger) + database.disconnect_working_db() def pivot2pgrouting(): config, resource, db_configs, logger = configure() - connection = connect_working_db(config, db_configs, logger) - pgr_convert(config, resource, db_configs, connection, logger) - disconnect_working_db(connection, logger) + database = DatabaseManager(db_configs[config["workingSpace"]["baseId"]], logger) + pgr_convert(resource, db_configs, database, logger) + database.disconnect_working_db() def pivot2osm(): config, resource, db_configs, logger = configure() - connection = connect_working_db(config, db_configs, logger) - osm_convert(config, resource, db_configs, connection, logger) - disconnect_working_db(connection, logger) + database = DatabaseManager(db_configs[config["workingSpace"]["baseId"]], logger) + osm_convert(config, resource, db_configs, database, logger) + database.disconnect_working_db() def osm2osrm(): config, resource, _, logger = configure() @@ -52,20 +44,14 @@ def main(): """Main CLI entrypoint. """ config, resource, db_configs, logger = configure() - connection = connect_working_db(config, db_configs, logger) - sql_convert(config, resource, db_configs, connection, logger) - if (resource['type'] in ['pgr', 'smartpgr']): - config, resource, db_configs, connection, logger = configure() - pgr_convert(config, resource, db_configs, connection, logger) - disconnect_working_db(connection, logger) - elif (resource['type'] == 'osrm'): - config, resource, db_configs, connection, logger = configure() - osm_convert(config, resource, db_configs, connection, logger) - disconnect_working_db(connection, logger) + sql2pivot() + if resource['type'] in ['pgr', 'smartpgr']: + pivot2pgrouting() + elif resource['type'] == 'osrm': + pivot2osm() osrm_convert(config, resource, logger) - elif (resource['type'] == 'valhalla'): - config, resource, db_configs, connection, logger = configure() - osm_convert(config, resource, db_configs, connection, logger, True) + elif resource['type'] == 'valhalla': + pivot2osm() valhalla_convert(config, resource, logger) else: raise ValueError("Wrong resource type, should be in ['pgr',osrm','valhalla','smartpgr']")