Skip to content

Commit 57bdd85

Browse files
authored
fix(db-manager): broken select/fetch logic (#92)
* fix(db-manager): broken select/fetch logic * fix tests * fix tests * fix tests * fix tests * fix tests * fix tests * fix tests * fix tests * fix tests * fix tests * fix tests
1 parent 95d8401 commit 57bdd85

File tree

8 files changed

+182
-144
lines changed

8 files changed

+182
-144
lines changed

changelog.md

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,9 @@
11
# CHANGELOG
22

3+
## 3.1.6
4+
FIXED :
5+
- wrong database manager logic for select + batch fetch
6+
37
## 3.1.5
48
FIXED :
59
- database manager does not raie error when retries have failed
@@ -18,7 +22,7 @@ CHANGED:
1822

1923
## 3.1.0
2024
ADDED:
21-
- fields vla_par_defaut, cout_penalites, vehicule_leger_interdit, cout_vehicule_prioritaire
25+
- fields vla_par_defaut, cout_penalites, vehicule_leger_interdit, cout_vehicule_prioritaire
2226

2327
## 3.0.0
2428
REMOVED:

r2gg/__about__.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@
3434
__uri_tracker__ = f"{__uri_repository__}issues/"
3535
__uri__ = __uri_repository__
3636

37-
__version__ = "3.1.5"
37+
__version__ = "3.1.6"
3838
__version_info__ = tuple(
3939
[
4040
int(num) if num.isdigit() else num

r2gg/_database.py

Lines changed: 43 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -108,56 +108,70 @@ def ensure_connection(self):
108108
time.sleep(DELAY)
109109
self._connection = self.connect_working_db()
110110

111-
def execute_select_query(self, cursor, query, show_duration):
112-
if TIMEOUT:
113-
cursor.execute("SET statement_timeout = %s", (1000 * TIMEOUT,)) # timeout in milliseconds
114-
115-
if show_duration:
116-
self.logger.info("SQL: {}".format(query))
117-
st_execute = time.time()
118-
cursor.execute(query)
119-
et_execute = time.time()
120-
self.logger.info("Execution ended. Elapsed time : %s seconds." % (et_execute - st_execute))
121-
else:
122-
cursor.execute(query)
123-
124-
@database_retry_decorator
111+
# IMPORTANT:
112+
# Streaming SELECTs must NOT use retry logic.
113+
# If the connection drops, the cursor state is unrecoverable.
125114
def execute_select_fetch_multiple(self, query, batchsize=1, show_duration=False):
126-
with self._connection.cursor(cursor_factory=DictCursor) as cursor:
127-
self.execute_select_query(cursor, query, show_duration)
128-
rows = cursor.fetchmany(batchsize)
115+
"""
116+
Streaming SELECT using a named server-side cursor.
117+
No retry. No reconnect. No commit.
118+
Fail fast if the connection drops (old behavior).
119+
"""
120+
self.ensure_connection()
121+
cursor_name = f"cursor_{int(time.time() * 1000)}"
122+
with self._connection.cursor(cursor_factory=DictCursor, name=cursor_name) as cursor:
123+
if TIMEOUT:
124+
cursor.execute("SET statement_timeout = %s", (1000 * TIMEOUT,))
125+
if show_duration:
126+
self.logger.info(f"SQL: {query}")
127+
st = time.time()
128+
cursor.execute(query)
129+
self.logger.info(
130+
"Execution ended. Elapsed time : %s seconds.",
131+
time.time() - st
132+
)
133+
else:
134+
cursor.execute(query)
135+
129136
count = cursor.rowcount
130-
while rows:
137+
138+
while True:
139+
rows = cursor.fetchmany(batchsize)
140+
if not rows:
141+
break
131142
if batchsize == 1:
132143
rows = rows.pop()
133144
yield rows, count
134-
rows = cursor.fetchmany(batchsize)
135-
self._connection.commit()
136-
return
137145

138146
# the method below should be used as a generator function otherwise use execute_update
139147
@database_retry_decorator
140148
def execute_update_query(self, query, params=None, isolation_level=None, show_duration=False):
149+
self.ensure_connection()
141150
if show_duration :
142151
self.logger.info("SQL: {}".format(query))
143152
st_execute = time.time()
144153
with self._connection.cursor(cursor_factory=DictCursor) as cursor:
145154
old_isolation_level = self._connection.isolation_level
146-
if isolation_level is not None:
147-
self._connection.set_isolation_level(isolation_level)
148-
cursor.execute(query, params)
149-
self._connection.commit()
155+
try:
156+
if isolation_level is not None:
157+
self._connection.set_isolation_level(isolation_level)
158+
cursor.execute(query, params)
159+
self._connection.commit()
160+
finally:
161+
self._connection.set_isolation_level(old_isolation_level)
150162
if show_duration:
151163
et_execute = time.time()
152164
self.logger.info("Execution ended. Elapsed time : %s seconds." % (et_execute - st_execute))
153-
self._connection.set_isolation_level(old_isolation_level)
154165
yield # the decorator database_retry_decorator only supports generators
155166
return
156167

157168
def execute_update(self, query, params=None, isolation_level=None):
158169
next(self.execute_update_query(query, params=params, isolation_level=isolation_level), None)
159170

160171
def execute_select_fetch_one(self, query, show_duration=False):
161-
gen = self.execute_select_fetch_multiple(query, 1, show_duration)
162-
row, count = next(gen, (None, None))
163-
return row, count
172+
try:
173+
gen = self.execute_select_fetch_multiple(query, 1, show_duration)
174+
row, count = next(gen, (None, None))
175+
return row, count
176+
finally:
177+
gen.close() # Ensure the generator is closed to free resources

r2gg/_pivot_to_osm.py

Lines changed: 19 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -81,10 +81,13 @@ def pivot_to_osm(config, source, db_configs, database: DatabaseManager, logger,
8181
sql_query_nodes += " LIMIT {} OFFSET {}".format(batchsize, offset)
8282
offset += batchsize
8383
logger.info("Writing nodes")
84-
for row, count in database.execute_select_fetch_multiple(sql_query_nodes, show_duration=True):
85-
nodeEl = writeNode(row, extraction_date)
86-
xf.write(nodeEl, pretty_print=True)
87-
84+
gen = database.execute_select_fetch_multiple(sql_query_nodes, show_duration=True)
85+
try:
86+
for row, count in gen:
87+
nodeEl = writeNode(row, extraction_date)
88+
xf.write(nodeEl, pretty_print=True)
89+
finally:
90+
gen.close()
8891
logger.info("%s / %s nodes ajoutés" % (offset, nodesize))
8992
et_nodes = time.time()
9093
logger.info("Writing nodes ended. Elapsed time : %s seconds." % (et_nodes - st_nodes))
@@ -124,16 +127,19 @@ def pivot_to_osm(config, source, db_configs, database: DatabaseManager, logger,
124127
logger.info("Writing restrictions")
125128
st_execute = time.time()
126129
i = 1
127-
for row, count in database.execute_select_fetch_multiple(sql_query_non_comm, show_duration=True):
128-
if row['common_vertex_id'] == -1:
130+
gen = database.execute_select_fetch_multiple(sql_query_non_comm, show_duration=True)
131+
try:
132+
for row, count in gen:
133+
if row['common_vertex_id'] == -1:
134+
i += 1
135+
continue
136+
ResEl = writeRes(row, i, extraction_date)
137+
xf.write(ResEl, pretty_print=True)
138+
if count > 0 and (i % ceil(count / 10) == 0):
139+
logger.info("%s / %s restrictions ajoutés" % (i, count))
129140
i += 1
130-
continue
131-
ResEl = writeRes(row, i, extraction_date)
132-
xf.write(ResEl, pretty_print=True)
133-
if (i % ceil(count / 10) == 0):
134-
logger.info("%s / %s restrictions ajoutés" % (i, count))
135-
i += 1
136-
141+
finally:
142+
gen.close()
137143
et_execute = time.time()
138144
logger.info("Writing restrictions ended. Elapsed time : %s seconds." % (et_execute - st_execute))
139145

r2gg/_pivot_to_pgr.py

Lines changed: 99 additions & 91 deletions
Original file line numberDiff line numberDiff line change
@@ -115,38 +115,41 @@ def pivot_to_pgr(source, cost_calculation_file_path, database_work: DatabaseMana
115115
index = 0
116116
batchsize = 10000
117117
generator = database_work.execute_select_fetch_multiple(tr_query, show_duration=True, batchsize=batchsize)
118-
rows, count = next(generator,(None, None))
119-
# Insertion petit à petit -> plus performant
120-
121-
logger.info("SQL: Inserting or updating {} values in out db".format(count))
122-
123-
st_execute = time.time()
124-
125-
while rows:
126-
values_str = ""
127-
# Tuple des valuers à insérer
128-
values_tuple = ()
129-
for row in rows:
130-
values_str += "(%s, %s, %s),"
131-
values_tuple += (index, row['id_from'], row['id_to'])
132-
index += 1
133-
values_str = values_str[:-1]
134-
135-
set_on_conflict = (
136-
"id_from = excluded.id_from,id_to = excluded.id_to"
137-
)
118+
try:
119+
rows, count = next(generator,(None, None))
120+
# Insertion petit à petit -> plus performant
121+
122+
logger.info("SQL: Inserting or updating {} values in out db".format(count))
123+
124+
st_execute = time.time()
125+
126+
while rows:
127+
values_str = ""
128+
# Tuple des valuers à insérer
129+
values_tuple = ()
130+
for row in rows:
131+
values_str += "(%s, %s, %s),"
132+
values_tuple += (index, row['id_from'], row['id_to'])
133+
index += 1
134+
values_str = values_str[:-1]
135+
136+
set_on_conflict = (
137+
"id_from = excluded.id_from,id_to = excluded.id_to"
138+
)
138139

139-
sql_insert = """
140-
INSERT INTO {}.turn_restrictions (id, id_from, id_to)
141-
VALUES {}
142-
ON CONFLICT (id) DO UPDATE
143-
SET {};
144-
""".format(schema, values_str, set_on_conflict)
145-
database_out.execute_update(sql_insert, values_tuple)
140+
sql_insert = """
141+
INSERT INTO {}.turn_restrictions (id, id_from, id_to)
142+
VALUES {}
143+
ON CONFLICT (id) DO UPDATE
144+
SET {};
145+
""".format(schema, values_str, set_on_conflict)
146+
database_out.execute_update(sql_insert, values_tuple)
146147

147-
rows, _ = next(generator,(None, None))
148+
rows, _ = next(generator,(None, None))
148149

149-
et_execute = time.time()
150+
et_execute = time.time()
151+
finally:
152+
generator.close() # Ensure the generator is closed to free resources
150153
logger.info("Writing turn restrinctions Done. Elapsed time : %s seconds." %(et_execute - st_execute))
151154

152155
# Noeuds ---------------------------------------------------------------------------------------
@@ -171,30 +174,32 @@ def pivot_to_pgr(source, cost_calculation_file_path, database_work: DatabaseMana
171174
index = 0
172175
batchsize = 10000
173176
generator = database_work.execute_select_fetch_multiple(nd_query, show_duration=True, batchsize=batchsize)
174-
rows, count = next(generator, (None, None))
175-
while rows:
176-
values_str = ""
177-
# Tuple des valeurs à insérer
178-
values_tuple = ()
179-
for row in rows:
180-
values_str += "(%s, %s),"
181-
values_tuple += (row['id'], row['geom'])
182-
index += 1
183-
values_str = values_str[:-1]
184-
185-
set_on_conflict = (
186-
"the_geom = excluded.the_geom"
187-
)
188-
189-
sql_insert = """
190-
INSERT INTO {}_vertices_pgr (id, the_geom)
191-
VALUES {}
192-
ON CONFLICT (id) DO UPDATE
193-
SET {};
194-
""".format(ways_table_name, values_str, set_on_conflict)
195-
database_out.execute_update(sql_insert, values_tuple)
196-
rows, _ = next(generator,(None, None))
177+
try:
178+
rows, count = next(generator, (None, None))
179+
while rows:
180+
values_str = ""
181+
# Tuple des valeurs à insérer
182+
values_tuple = ()
183+
for row in rows:
184+
values_str += "(%s, %s),"
185+
values_tuple += (row['id'], row['geom'])
186+
index += 1
187+
values_str = values_str[:-1]
188+
189+
set_on_conflict = (
190+
"the_geom = excluded.the_geom"
191+
)
197192

193+
sql_insert = """
194+
INSERT INTO {}_vertices_pgr (id, the_geom)
195+
VALUES {}
196+
ON CONFLICT (id) DO UPDATE
197+
SET {};
198+
""".format(ways_table_name, values_str, set_on_conflict)
199+
database_out.execute_update(sql_insert, values_tuple)
200+
rows, _ = next(generator,(None, None))
201+
finally:
202+
generator.close() # Ensure the generator is closed to free resources
198203

199204
et_execute = time.time()
200205
logger.info("Writing vertices Done. Elapsed time : %s seconds." %(et_execute - st_execute))
@@ -266,45 +271,48 @@ def pivot_to_pgr(source, cost_calculation_file_path, database_work: DatabaseMana
266271
# logger.info("SQL: Inserting or updating {} values in out db".format(cursor_in.rowcount))
267272
st_execute = time.time()
268273
percent = 0
269-
rows, count = next(generator, (None, None))
270-
while rows:
271-
percent += 1000000 / count
272-
# Chaîne permettant l'insertion de valeurs via psycopg
273-
values_str = ""
274-
# Tuple des valuers à insérer
275-
values_tuple = ()
276-
for row in rows:
277-
values_str += "(" + single_value_str + "),"
278-
output_costs = output_costs_from_costs_config(costs, row)
279-
values_tuple += tuple(
280-
row[ output_columns_name ] for output_columns_name in output_columns_names
281-
) + output_costs
282-
values_str = values_str[:-1]
283-
284-
output_columns = "("
285-
for output_columns_name in output_columns_names:
286-
output_columns += output_columns_name + ','
287-
output_columns = output_columns[:-1]
288-
289-
set_on_conflict = ''
290-
for output_columns_name in output_columns_names:
291-
set_on_conflict += "{0} = excluded.{0},".format(output_columns_name)
292-
set_on_conflict = set_on_conflict[:-1]
293-
294-
for output in costs["outputs"]:
295-
output_columns += "," + output["name"] + ",reverse_" + output["name"]
296-
set_on_conflict += ",{0} = excluded.{0}".format(output["name"])
297-
set_on_conflict += ",{0} = excluded.{0}".format("reverse_" + output["name"])
298-
299-
output_columns += ")"
300-
sql_insert = """
301-
INSERT INTO {} {}
302-
VALUES {}
303-
ON CONFLICT (id) DO UPDATE
304-
SET {};
305-
""".format(ways_table_name, output_columns, values_str, set_on_conflict)
306-
database_out.execute_update(sql_insert, values_tuple)
307-
rows, _ = next(generator,(None, None))
274+
try:
275+
rows, count = next(generator, (None, None))
276+
while rows:
277+
percent += 1000000 / count
278+
# Chaîne permettant l'insertion de valeurs via psycopg
279+
values_str = ""
280+
# Tuple des valuers à insérer
281+
values_tuple = ()
282+
for row in rows:
283+
values_str += "(" + single_value_str + "),"
284+
output_costs = output_costs_from_costs_config(costs, row)
285+
values_tuple += tuple(
286+
row[ output_columns_name ] for output_columns_name in output_columns_names
287+
) + output_costs
288+
values_str = values_str[:-1]
289+
290+
output_columns = "("
291+
for output_columns_name in output_columns_names:
292+
output_columns += output_columns_name + ','
293+
output_columns = output_columns[:-1]
294+
295+
set_on_conflict = ''
296+
for output_columns_name in output_columns_names:
297+
set_on_conflict += "{0} = excluded.{0},".format(output_columns_name)
298+
set_on_conflict = set_on_conflict[:-1]
299+
300+
for output in costs["outputs"]:
301+
output_columns += "," + output["name"] + ",reverse_" + output["name"]
302+
set_on_conflict += ",{0} = excluded.{0}".format(output["name"])
303+
set_on_conflict += ",{0} = excluded.{0}".format("reverse_" + output["name"])
304+
305+
output_columns += ")"
306+
sql_insert = """
307+
INSERT INTO {} {}
308+
VALUES {}
309+
ON CONFLICT (id) DO UPDATE
310+
SET {};
311+
""".format(ways_table_name, output_columns, values_str, set_on_conflict)
312+
database_out.execute_update(sql_insert, values_tuple)
313+
rows, _ = next(generator,(None, None))
314+
finally:
315+
generator.close() # Ensure the generator is closed to free resources
308316

309317
et_execute = time.time()
310318
logger.info("Writing ways ended. Elapsed time : %s seconds." %(et_execute - st_execute))

0 commit comments

Comments
 (0)