Skip to content

Commit 20bb892

Browse files
committed
feat(database): ISSUE-85 Add timeout and retry mecanisme on individual queries to deal with server connection loss
1 parent 9984650 commit 20bb892

File tree

7 files changed

+256
-198
lines changed

7 files changed

+256
-198
lines changed

.gitignore

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,7 @@
11
.coverage
22
coverage.xml
3-
junit
3+
junit
4+
build
5+
dist
6+
.idea
7+
*.egg-info

docs/development/testing.md

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,6 @@ docker build -t r2gg-debian -f docker/debian/Dockerfile .
1414
## Initialisation de la base de données de tests
1515

1616
```sh
17-
```
1817
docker-compose -f tests/dev/docker-compose.dev.yml up -d
1918
```
2019

r2gg/_database.py

Lines changed: 161 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,161 @@
1+
import time
2+
from os import getenv
3+
4+
import psycopg2
5+
from psycopg2 import OperationalError, DatabaseError, InterfaceError
6+
from psycopg2.extras import DictCursor
7+
import logging
8+
9+
TIMEOUT = int(getenv("SQL_STATEMENT_TIMEOUT", 0))
10+
RETRY = int(getenv("SQL_STATEMENT_RETRY_ATTEMPTS", 3))
11+
DELAY = int(getenv("SQL_DELAY_BETWEEN_STATEMENTS", 30))
12+
13+
14+
def database_retry_decorator(func):
15+
def wrapper(self, *args, **kwargs):
16+
attempt = 1
17+
while attempt <= RETRY:
18+
try:
19+
self.ensure_connection()
20+
yield from func(self, *args, **kwargs)
21+
return
22+
23+
except (OperationalError, DatabaseError, InterfaceError) as e:
24+
if attempt >= RETRY:
25+
self.logger.error(f"Query failed after {RETRY} attempts: {str(e).rstrip()}")
26+
return
27+
28+
self.logger.error(
29+
f"Attempt {attempt}/{RETRY} failed ({str(e).rstrip()}), retrying in {DELAY} seconds"
30+
)
31+
time.sleep(DELAY)
32+
attempt += 1
33+
try:
34+
if self._connection:
35+
self._connection.rollback()
36+
except Exception as e:
37+
self.logger.error(f"Connection rollback failed {str(e).rstrip()}")
38+
return
39+
40+
return wrapper
41+
42+
43+
class DatabaseManager:
44+
def __init__(self, db_configs, logger):
45+
self.logger = logger
46+
self._work_db_config = db_configs
47+
self._connection = self.connect_working_db()
48+
49+
def connect_working_db(self):
50+
"""
51+
Fonction de connexion à la BDD de travail
52+
53+
Parameters
54+
----------
55+
config: dict
56+
dictionnaire correspondant à la configuration décrite dans le fichier passé en argument
57+
db_configs: dict
58+
dictionnaire correspondant aux configurations des bdd
59+
Returns
60+
-------
61+
connection: psycopg2.connection
62+
connection à la bdd de travail
63+
64+
"""
65+
# Récupération des paramètres de la bdd
66+
host = self._work_db_config.get("host")
67+
dbname = self._work_db_config.get("database")
68+
user = self._work_db_config.get("user")
69+
password = self._work_db_config.get("password")
70+
port = self._work_db_config.get("port")
71+
connect_args = "host=%s dbname=%s user=%s password=%s port=%s" % (host, dbname, user, password, port)
72+
73+
self.logger.info("Connecting to work database")
74+
connection = psycopg2.connect(connect_args)
75+
connection.set_client_encoding("UTF8")
76+
77+
return connection
78+
79+
def disconnect_working_db(self):
80+
"""
81+
Fonction de connexion à la BDD de travail
82+
83+
Parameters
84+
----------
85+
connection: psycopg2.connection
86+
connection à la bdd de travail
87+
logger: logging.Logger
88+
"""
89+
if self._connection:
90+
self._connection.close()
91+
self.logger.info("Connection to work database closed")
92+
93+
def ensure_connection(self):
94+
"""
95+
Ensure the connection is alive; reconnect if needed.
96+
"""
97+
try:
98+
if self._connection is None or getattr(self._connection, "closed", 1) != 0:
99+
self.logger.info("Connection is closed or missing; reconnecting")
100+
self._connection = self.connect_working_db()
101+
else:
102+
with self._connection.cursor() as cur:
103+
cur.execute("SELECT 1")
104+
except Exception as e:
105+
self.logger.error(
106+
f"Something is wrong with the connection: {str(e).rstrip()}; reconnecting in {DELAY} seconds")
107+
self.disconnect_working_db()
108+
time.sleep(DELAY)
109+
self._connection = self.connect_working_db()
110+
111+
def execute_select_query(self, cursor, query, show_duration):
112+
if TIMEOUT:
113+
cursor.execute("SET statement_timeout = %s", (1000 * TIMEOUT,)) # timeout in milliseconds
114+
115+
if show_duration:
116+
self.logger.info("SQL: {}".format(query))
117+
st_execute = time.time()
118+
cursor.execute(query)
119+
et_execute = time.time()
120+
self.logger.info("Execution ended. Elapsed time : %s seconds." % (et_execute - st_execute))
121+
else:
122+
cursor.execute(query)
123+
124+
@database_retry_decorator
125+
def execute_select_fetch_multiple(self, query, batchsize=1, show_duration=False):
126+
with self._connection.cursor(cursor_factory=DictCursor) as cursor:
127+
self.execute_select_query(cursor, query, show_duration)
128+
rows = cursor.fetchmany(batchsize)
129+
count = cursor.rowcount
130+
while rows:
131+
if batchsize == 1:
132+
rows = rows.pop()
133+
yield rows, count
134+
rows = cursor.fetchmany(batchsize)
135+
self._connection.commit()
136+
return
137+
138+
# the method below should be used as a generator function otherwise use execute_update
139+
@database_retry_decorator
140+
def execute_update_query(self, query, params=None, isolation_level=None):
141+
self.logger.info("SQL: {}".format(query))
142+
st_execute = time.time()
143+
with self._connection.cursor(cursor_factory=DictCursor) as cursor:
144+
old_isolation_level = self._connection.isolation_level
145+
if isolation_level is not None:
146+
self._connection.set_isolation_level(isolation_level)
147+
cursor.execute(query, params)
148+
et_execute = time.time()
149+
self.logger.info("Execution ended. Elapsed time : %s seconds." % (et_execute - st_execute))
150+
self._connection.set_isolation_level(old_isolation_level)
151+
self._connection.commit()
152+
yield # the decorator database_retry_decorator only supports generators
153+
return
154+
155+
def execute_update(self, query, params=None, isolation_level=None):
156+
next(self.execute_update_query(query, params=params, isolation_level=isolation_level), None)
157+
158+
def execute_select_fetch_one(self, query, show_duration=False):
159+
gen = self.execute_select_fetch_multiple(query, 1, show_duration)
160+
row, count = next(gen, (None, None))
161+
return row, count

r2gg/_main.py

Lines changed: 9 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
from r2gg._file_copier import copy_file_locally
1919
from r2gg._valhalla_lua_builder import build_valhalla_lua
2020
from r2gg._osm_to_pbf import osm_to_pbf
21+
from r2gg._database import DatabaseManager
2122

2223

2324
def sql_convert(config, resource, db_configs, connection, logger):
@@ -124,7 +125,7 @@ def sql_convert(config, resource, db_configs, connection, logger):
124125

125126
logger.info("Conversion from BDD to pivot ended. Elapsed time : %s seconds." %(et_sql_conversion - st_sql_conversion))
126127

127-
def pgr_convert(config, resource, db_configs, connection, logger):
128+
def pgr_convert(config, resource, db_configs, database, logger):
128129
"""
129130
Fonction de conversion depuis la bdd pivot vers la bdd pgrouting
130131
@@ -136,7 +137,7 @@ def pgr_convert(config, resource, db_configs, connection, logger):
136137
dictionnaire correspondant à la resource décrite dans le fichier passé en argument
137138
db_configs: dict
138139
dictionnaire correspondant aux configurations des bdd
139-
connection: psycopg2.connection
140+
database: psycopg2.connection and execution Manager
140141
connection à la bdd de travail
141142
logger: logging.Logger
142143
"""
@@ -155,14 +156,8 @@ def pgr_convert(config, resource, db_configs, connection, logger):
155156

156157
# Configuration et connection à la base de sortie
157158
out_db_config = db_configs[ source['storage']['base']['baseId'] ]
158-
host = out_db_config.get('host')
159-
dbname = out_db_config.get('database')
160-
user = out_db_config.get('user')
161-
password = out_db_config.get('password')
162-
port = out_db_config.get('port')
163-
connect_args = 'host=%s dbname=%s user=%s password=%s port=%s' %(host, dbname, user, password, port)
164159
logger.info("Connecting to output database")
165-
connection_out = psycopg2.connect(connect_args)
160+
database_out = DatabaseManager(out_db_config, logger)
166161

167162
schema_out = out_db_config.get('schema')
168163

@@ -172,14 +167,14 @@ def pgr_convert(config, resource, db_configs, connection, logger):
172167
cost_calculation_files_paths = {cost["compute"]["configuration"]["storage"]["file"] for cost in source["costs"]}
173168

174169
for cost_calculation_file_path in cost_calculation_files_paths:
175-
pivot_to_pgr(source, cost_calculation_file_path, connection, connection_out, schema_out, input_schema, logger)
176-
connection_out.close()
170+
pivot_to_pgr(source, cost_calculation_file_path, database, database_out, schema_out, input_schema, logger)
171+
database_out.disconnect_working_db()
177172

178173
et_pivot_to_pgr = time.time()
179174
logger.info("Conversion from pivot to PGR ended. Elapsed time : %s seconds." %(et_pivot_to_pgr - st_pivot_to_pgr))
180175

181176

182-
def osm_convert(config, resource, db_configs, connection, logger):
177+
def osm_convert(config, resource, db_configs, database, logger):
183178
"""
184179
Fonction de conversion depuis la bdd pivot vers un fichier osm
185180
@@ -191,7 +186,7 @@ def osm_convert(config, resource, db_configs, connection, logger):
191186
dictionnaire correspondant à la resource décrite dans le fichier passé en argument
192187
db_configs: dict
193188
dictionnaire correspondant aux configurations des bdd
194-
connection: psycopg2.connection
189+
database: psycopg2.connection and execution Manager
195190
connection à la bdd de travail
196191
logger: logging.Logger
197192
"""
@@ -251,7 +246,7 @@ def osm_convert(config, resource, db_configs, connection, logger):
251246

252247
else:
253248
logger.info("Mapping not already done")
254-
pivot_to_osm(config, source, db_configs, connection, logger, convert_osm_to_pbf)
249+
pivot_to_osm(config, source, db_configs, database, logger, convert_osm_to_pbf)
255250

256251
used_bases[ source['id'] ] = source['mapping']['source']['baseId']
257252

0 commit comments

Comments
 (0)