Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
@@ -1,3 +1,7 @@
.coverage
coverage.xml
junit
junit
build
dist
.idea
*.egg-info
1 change: 0 additions & 1 deletion docs/development/testing.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@ docker build -t r2gg-debian -f docker/debian/Dockerfile .
## Initialisation de la base de données de tests

```sh
```
docker-compose -f tests/dev/docker-compose.dev.yml up -d
```

Expand Down
72 changes: 11 additions & 61 deletions r2gg/_configure.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,14 @@

# Définition des niveaux de log
LEVELS = {
'CRITICAL' : logging.CRITICAL,
'ERROR' : logging.ERROR,
'WARNING' : logging.WARNING,
'INFO' : logging.INFO,
'DEBUG' : logging.DEBUG
'CRITICAL': logging.CRITICAL,
'ERROR': logging.ERROR,
'WARNING': logging.WARNING,
'INFO': logging.INFO,
'DEBUG': logging.DEBUG
}


def configure():
"""
Fonction de lecture du fichier de configuration passé en argument
Expand All @@ -27,8 +28,6 @@ def configure():
dictionnaire correspondant à la resource décrite dans le fichier passé en argument
db_configs: dict
dictionnaire correspondant aux configurations des bdd
connection: psycopg2.connection
connection à la bdd de travail
logger: logging.Logger
"""
parser = argparse.ArgumentParser()
Expand All @@ -40,7 +39,7 @@ def configure():
config = config_from_path(config_path)['generation']

# Récupération de la configuration du log
logs_config = config_from_path( config['general']['logs']['configFile'] )
logs_config = config_from_path(config['general']['logs']['configFile'])

# Gestion du fichiers de logs non spécifié
try:
Expand All @@ -51,7 +50,7 @@ def configure():
# Configuration du module logging
logging.basicConfig(
format='%(asctime)s %(message)s',
level=LEVELS[ logs_config['level'].upper() ],
level=LEVELS[logs_config['level'].upper()],
handlers=[
logging.FileHandler(logs_file),
logging.StreamHandler()
Expand All @@ -66,63 +65,14 @@ def configure():
# Configuration des bases de données précisées dans la config
for base in config['bases']:
if base['type'] == 'bdd':
db_configs[ base['id'] ] = config_from_path(base['configFile'])
db_configs[base['id']].update({"schema":base['schema']})
db_configs[base['id']] = config_from_path(base['configFile'])
db_configs[base['id']].update({"schema": base['schema']})

# Récupération de l'objet permettant de générer la ressource
#  Récupération de l'objet permettant de générer la ressource
resource = config['resource']

# Création de l'espace de travail
if not os.path.exists(config['workingSpace']['directory']):
os.makedirs(config['workingSpace']['directory'])

return config, resource, db_configs, logger

def connect_working_db(config, db_configs, logger):
"""
Fonction de connexion à la BDD de travail

Parameters
----------
config: dict
dictionnaire correspondant à la configuration décrite dans le fichier passé en argument
db_configs: dict
dictionnaire correspondant aux configurations des bdd
logger: logging.Logger
Returns
-------
connection: psycopg2.connection
connection à la bdd de travail

"""

# Configuration de la bdd de travail
work_db_config = db_configs[ config['workingSpace']['baseId'] ]

# Récupération des paramètres de la bdd
host = work_db_config.get('host')
dbname = work_db_config.get('database')
user = work_db_config.get('user')
password = work_db_config.get('password')
port = work_db_config.get('port')
connect_args = 'host=%s dbname=%s user=%s password=%s port=%s' %(host, dbname, user, password, port)

logger.info("Connecting to work database")
connection = psycopg2.connect(connect_args)
connection.set_client_encoding('UTF8')

return connection

def disconnect_working_db(connection, logger):
"""
Fonction de connexion à la BDD de travail

Parameters
----------
connection: psycopg2.connection
connection à la bdd de travail
logger: logging.Logger
"""

connection.close()
logger.info("Connection to work database closed")
161 changes: 161 additions & 0 deletions r2gg/_database.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,161 @@
import time
from os import getenv

import psycopg2
from psycopg2 import OperationalError, DatabaseError, InterfaceError
from psycopg2.extras import DictCursor
import logging

TIMEOUT = int(getenv("SQL_STATEMENT_TIMEOUT", 0))
RETRY = int(getenv("SQL_STATEMENT_RETRY_ATTEMPTS", 3))
DELAY = int(getenv("SQL_DELAY_BETWEEN_STATEMENTS", 30))


def database_retry_decorator(func):
def wrapper(self, *args, **kwargs):
attempt = 1
while attempt <= RETRY:
try:
self.ensure_connection()
yield from func(self, *args, **kwargs)
return

except (OperationalError, DatabaseError, InterfaceError) as e:
if attempt >= RETRY:
self.logger.error(f"Query failed after {RETRY} attempts: {str(e).rstrip()}")
return

self.logger.error(
f"Attempt {attempt}/{RETRY} failed ({str(e).rstrip()}), retrying in {DELAY} seconds"
)
time.sleep(DELAY)
attempt += 1
try:
if self._connection:
self._connection.rollback()
except Exception as e:
self.logger.error(f"Connection rollback failed {str(e).rstrip()}")
return

return wrapper


class DatabaseManager:
def __init__(self, db_configs, logger):
self.logger = logger
self._work_db_config = db_configs
self._connection = self.connect_working_db()

def connect_working_db(self):
"""
Fonction de connexion à la BDD de travail

Parameters
----------
config: dict
dictionnaire correspondant à la configuration décrite dans le fichier passé en argument
db_configs: dict
dictionnaire correspondant aux configurations des bdd
Returns
-------
connection: psycopg2.connection
connection à la bdd de travail

"""
# Récupération des paramètres de la bdd
host = self._work_db_config.get("host")
dbname = self._work_db_config.get("database")
user = self._work_db_config.get("user")
password = self._work_db_config.get("password")
port = self._work_db_config.get("port")
connect_args = "host=%s dbname=%s user=%s password=%s port=%s" % (host, dbname, user, password, port)

self.logger.info("Connecting to work database")
connection = psycopg2.connect(connect_args)
connection.set_client_encoding("UTF8")

return connection

def disconnect_working_db(self):
"""
Fonction de connexion à la BDD de travail

Parameters
----------
connection: psycopg2.connection
connection à la bdd de travail
logger: logging.Logger
"""
if self._connection:
self._connection.close()
self.logger.info("Connection to work database closed")

def ensure_connection(self):
"""
Ensure the connection is alive; reconnect if needed.
"""
try:
if self._connection is None or getattr(self._connection, "closed", 1) != 0:
self.logger.info("Connection is closed or missing; reconnecting")
self._connection = self.connect_working_db()
else:
with self._connection.cursor() as cur:
cur.execute("SELECT 1")
except Exception as e:
self.logger.error(
f"Something is wrong with the connection: {str(e).rstrip()}; reconnecting in {DELAY} seconds")
self.disconnect_working_db()
time.sleep(DELAY)
self._connection = self.connect_working_db()

def execute_select_query(self, cursor, query, show_duration):
if TIMEOUT:
cursor.execute("SET statement_timeout = %s", (1000 * TIMEOUT,)) # timeout in milliseconds

if show_duration:
self.logger.info("SQL: {}".format(query))
st_execute = time.time()
cursor.execute(query)
et_execute = time.time()
self.logger.info("Execution ended. Elapsed time : %s seconds." % (et_execute - st_execute))
else:
cursor.execute(query)

@database_retry_decorator
def execute_select_fetch_multiple(self, query, batchsize=1, show_duration=False):
with self._connection.cursor(cursor_factory=DictCursor) as cursor:
self.execute_select_query(cursor, query, show_duration)
rows = cursor.fetchmany(batchsize)
count = cursor.rowcount
while rows:
if batchsize == 1:
rows = rows.pop()
yield rows, count
rows = cursor.fetchmany(batchsize)
self._connection.commit()
return

# the method below should be used as a generator function otherwise use execute_update
@database_retry_decorator
def execute_update_query(self, query, params=None, isolation_level=None):
self.logger.info("SQL: {}".format(query))
st_execute = time.time()
with self._connection.cursor(cursor_factory=DictCursor) as cursor:
old_isolation_level = self._connection.isolation_level
if isolation_level is not None:
self._connection.set_isolation_level(isolation_level)
cursor.execute(query, params)
self._connection.commit()
et_execute = time.time()
self.logger.info("Execution ended. Elapsed time : %s seconds." % (et_execute - st_execute))
self._connection.set_isolation_level(old_isolation_level)
yield # the decorator database_retry_decorator only supports generators
return

def execute_update(self, query, params=None, isolation_level=None):
next(self.execute_update_query(query, params=params, isolation_level=isolation_level), None)

def execute_select_fetch_one(self, query, show_duration=False):
gen = self.execute_select_fetch_multiple(query, 1, show_duration)
row, count = next(gen, (None, None))
return row, count
Loading
Loading