Skip to content

Commit cdf3db4

Browse files
authored
feat(database): ISSUE-85 Add timeout and retry mecanisme on individual queries to deal with server connection loss (#86)
1 parent b3926fd commit cdf3db4

File tree

8 files changed

+359
-356
lines changed

8 files changed

+359
-356
lines changed

.gitignore

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,7 @@
11
.coverage
22
coverage.xml
3-
junit
3+
junit
4+
build
5+
dist
6+
.idea
7+
*.egg-info

docs/development/testing.md

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,6 @@ docker build -t r2gg-debian -f docker/debian/Dockerfile .
1414
## Initialisation de la base de données de tests
1515

1616
```sh
17-
```
1817
docker-compose -f tests/dev/docker-compose.dev.yml up -d
1918
```
2019

r2gg/_configure.py

Lines changed: 11 additions & 61 deletions
Original file line numberDiff line numberDiff line change
@@ -8,13 +8,14 @@
88

99
# Définition des niveaux de log
1010
LEVELS = {
11-
'CRITICAL' : logging.CRITICAL,
12-
'ERROR' : logging.ERROR,
13-
'WARNING' : logging.WARNING,
14-
'INFO' : logging.INFO,
15-
'DEBUG' : logging.DEBUG
11+
'CRITICAL': logging.CRITICAL,
12+
'ERROR': logging.ERROR,
13+
'WARNING': logging.WARNING,
14+
'INFO': logging.INFO,
15+
'DEBUG': logging.DEBUG
1616
}
1717

18+
1819
def configure():
1920
"""
2021
Fonction de lecture du fichier de configuration passé en argument
@@ -27,8 +28,6 @@ def configure():
2728
dictionnaire correspondant à la resource décrite dans le fichier passé en argument
2829
db_configs: dict
2930
dictionnaire correspondant aux configurations des bdd
30-
connection: psycopg2.connection
31-
connection à la bdd de travail
3231
logger: logging.Logger
3332
"""
3433
parser = argparse.ArgumentParser()
@@ -40,7 +39,7 @@ def configure():
4039
config = config_from_path(config_path)['generation']
4140

4241
# Récupération de la configuration du log
43-
logs_config = config_from_path( config['general']['logs']['configFile'] )
42+
logs_config = config_from_path(config['general']['logs']['configFile'])
4443

4544
# Gestion du fichiers de logs non spécifié
4645
try:
@@ -51,7 +50,7 @@ def configure():
5150
# Configuration du module logging
5251
logging.basicConfig(
5352
format='%(asctime)s %(message)s',
54-
level=LEVELS[ logs_config['level'].upper() ],
53+
level=LEVELS[logs_config['level'].upper()],
5554
handlers=[
5655
logging.FileHandler(logs_file),
5756
logging.StreamHandler()
@@ -66,63 +65,14 @@ def configure():
6665
# Configuration des bases de données précisées dans la config
6766
for base in config['bases']:
6867
if base['type'] == 'bdd':
69-
db_configs[ base['id'] ] = config_from_path(base['configFile'])
70-
db_configs[base['id']].update({"schema":base['schema']})
68+
db_configs[base['id']] = config_from_path(base['configFile'])
69+
db_configs[base['id']].update({"schema": base['schema']})
7170

72-
# Récupération de l'objet permettant de générer la ressource
71+
#  Récupération de l'objet permettant de générer la ressource
7372
resource = config['resource']
7473

7574
# Création de l'espace de travail
7675
if not os.path.exists(config['workingSpace']['directory']):
7776
os.makedirs(config['workingSpace']['directory'])
7877

7978
return config, resource, db_configs, logger
80-
81-
def connect_working_db(config, db_configs, logger):
82-
"""
83-
Fonction de connexion à la BDD de travail
84-
85-
Parameters
86-
----------
87-
config: dict
88-
dictionnaire correspondant à la configuration décrite dans le fichier passé en argument
89-
db_configs: dict
90-
dictionnaire correspondant aux configurations des bdd
91-
logger: logging.Logger
92-
Returns
93-
-------
94-
connection: psycopg2.connection
95-
connection à la bdd de travail
96-
97-
"""
98-
99-
# Configuration de la bdd de travail
100-
work_db_config = db_configs[ config['workingSpace']['baseId'] ]
101-
102-
# Récupération des paramètres de la bdd
103-
host = work_db_config.get('host')
104-
dbname = work_db_config.get('database')
105-
user = work_db_config.get('user')
106-
password = work_db_config.get('password')
107-
port = work_db_config.get('port')
108-
connect_args = 'host=%s dbname=%s user=%s password=%s port=%s' %(host, dbname, user, password, port)
109-
110-
logger.info("Connecting to work database")
111-
connection = psycopg2.connect(connect_args)
112-
connection.set_client_encoding('UTF8')
113-
114-
return connection
115-
116-
def disconnect_working_db(connection, logger):
117-
"""
118-
Fonction de connexion à la BDD de travail
119-
120-
Parameters
121-
----------
122-
connection: psycopg2.connection
123-
connection à la bdd de travail
124-
logger: logging.Logger
125-
"""
126-
127-
connection.close()
128-
logger.info("Connection to work database closed")

r2gg/_database.py

Lines changed: 161 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,161 @@
1+
import time
2+
from os import getenv
3+
4+
import psycopg2
5+
from psycopg2 import OperationalError, DatabaseError, InterfaceError
6+
from psycopg2.extras import DictCursor
7+
import logging
8+
9+
TIMEOUT = int(getenv("SQL_STATEMENT_TIMEOUT", 0))
10+
RETRY = int(getenv("SQL_STATEMENT_RETRY_ATTEMPTS", 3))
11+
DELAY = int(getenv("SQL_DELAY_BETWEEN_STATEMENTS", 30))
12+
13+
14+
def database_retry_decorator(func):
15+
def wrapper(self, *args, **kwargs):
16+
attempt = 1
17+
while attempt <= RETRY:
18+
try:
19+
self.ensure_connection()
20+
yield from func(self, *args, **kwargs)
21+
return
22+
23+
except (OperationalError, DatabaseError, InterfaceError) as e:
24+
if attempt >= RETRY:
25+
self.logger.error(f"Query failed after {RETRY} attempts: {str(e).rstrip()}")
26+
return
27+
28+
self.logger.error(
29+
f"Attempt {attempt}/{RETRY} failed ({str(e).rstrip()}), retrying in {DELAY} seconds"
30+
)
31+
time.sleep(DELAY)
32+
attempt += 1
33+
try:
34+
if self._connection:
35+
self._connection.rollback()
36+
except Exception as e:
37+
self.logger.error(f"Connection rollback failed {str(e).rstrip()}")
38+
return
39+
40+
return wrapper
41+
42+
43+
class DatabaseManager:
44+
def __init__(self, db_configs, logger):
45+
self.logger = logger
46+
self._work_db_config = db_configs
47+
self._connection = self.connect_working_db()
48+
49+
def connect_working_db(self):
50+
"""
51+
Fonction de connexion à la BDD de travail
52+
53+
Parameters
54+
----------
55+
config: dict
56+
dictionnaire correspondant à la configuration décrite dans le fichier passé en argument
57+
db_configs: dict
58+
dictionnaire correspondant aux configurations des bdd
59+
Returns
60+
-------
61+
connection: psycopg2.connection
62+
connection à la bdd de travail
63+
64+
"""
65+
# Récupération des paramètres de la bdd
66+
host = self._work_db_config.get("host")
67+
dbname = self._work_db_config.get("database")
68+
user = self._work_db_config.get("user")
69+
password = self._work_db_config.get("password")
70+
port = self._work_db_config.get("port")
71+
connect_args = "host=%s dbname=%s user=%s password=%s port=%s" % (host, dbname, user, password, port)
72+
73+
self.logger.info("Connecting to work database")
74+
connection = psycopg2.connect(connect_args)
75+
connection.set_client_encoding("UTF8")
76+
77+
return connection
78+
79+
def disconnect_working_db(self):
80+
"""
81+
Fonction de connexion à la BDD de travail
82+
83+
Parameters
84+
----------
85+
connection: psycopg2.connection
86+
connection à la bdd de travail
87+
logger: logging.Logger
88+
"""
89+
if self._connection:
90+
self._connection.close()
91+
self.logger.info("Connection to work database closed")
92+
93+
def ensure_connection(self):
94+
"""
95+
Ensure the connection is alive; reconnect if needed.
96+
"""
97+
try:
98+
if self._connection is None or getattr(self._connection, "closed", 1) != 0:
99+
self.logger.info("Connection is closed or missing; reconnecting")
100+
self._connection = self.connect_working_db()
101+
else:
102+
with self._connection.cursor() as cur:
103+
cur.execute("SELECT 1")
104+
except Exception as e:
105+
self.logger.error(
106+
f"Something is wrong with the connection: {str(e).rstrip()}; reconnecting in {DELAY} seconds")
107+
self.disconnect_working_db()
108+
time.sleep(DELAY)
109+
self._connection = self.connect_working_db()
110+
111+
def execute_select_query(self, cursor, query, show_duration):
112+
if TIMEOUT:
113+
cursor.execute("SET statement_timeout = %s", (1000 * TIMEOUT,)) # timeout in milliseconds
114+
115+
if show_duration:
116+
self.logger.info("SQL: {}".format(query))
117+
st_execute = time.time()
118+
cursor.execute(query)
119+
et_execute = time.time()
120+
self.logger.info("Execution ended. Elapsed time : %s seconds." % (et_execute - st_execute))
121+
else:
122+
cursor.execute(query)
123+
124+
@database_retry_decorator
125+
def execute_select_fetch_multiple(self, query, batchsize=1, show_duration=False):
126+
with self._connection.cursor(cursor_factory=DictCursor) as cursor:
127+
self.execute_select_query(cursor, query, show_duration)
128+
rows = cursor.fetchmany(batchsize)
129+
count = cursor.rowcount
130+
while rows:
131+
if batchsize == 1:
132+
rows = rows.pop()
133+
yield rows, count
134+
rows = cursor.fetchmany(batchsize)
135+
self._connection.commit()
136+
return
137+
138+
# the method below should be used as a generator function otherwise use execute_update
139+
@database_retry_decorator
140+
def execute_update_query(self, query, params=None, isolation_level=None):
141+
self.logger.info("SQL: {}".format(query))
142+
st_execute = time.time()
143+
with self._connection.cursor(cursor_factory=DictCursor) as cursor:
144+
old_isolation_level = self._connection.isolation_level
145+
if isolation_level is not None:
146+
self._connection.set_isolation_level(isolation_level)
147+
cursor.execute(query, params)
148+
self._connection.commit()
149+
et_execute = time.time()
150+
self.logger.info("Execution ended. Elapsed time : %s seconds." % (et_execute - st_execute))
151+
self._connection.set_isolation_level(old_isolation_level)
152+
yield # the decorator database_retry_decorator only supports generators
153+
return
154+
155+
def execute_update(self, query, params=None, isolation_level=None):
156+
next(self.execute_update_query(query, params=params, isolation_level=isolation_level), None)
157+
158+
def execute_select_fetch_one(self, query, show_duration=False):
159+
gen = self.execute_select_fetch_multiple(query, 1, show_duration)
160+
row, count = next(gen, (None, None))
161+
return row, count

0 commit comments

Comments
 (0)