@@ -108,49 +108,59 @@ def ensure_connection(self):
108108 time .sleep (DELAY )
109109 self ._connection = self .connect_working_db ()
110110
111- def execute_select_query (self , cursor , query , show_duration ):
112- if TIMEOUT :
113- cursor .execute ("SET statement_timeout = %s" , (1000 * TIMEOUT ,)) # timeout in milliseconds
114-
115- if show_duration :
116- self .logger .info ("SQL: {}" .format (query ))
117- st_execute = time .time ()
118- cursor .execute (query )
119- et_execute = time .time ()
120- self .logger .info ("Execution ended. Elapsed time : %s seconds." % (et_execute - st_execute ))
121- else :
122- cursor .execute (query )
123-
124- @database_retry_decorator
111+ # IMPORTANT:
112+ # Streaming SELECTs must NOT use retry logic.
113+ # If the connection drops, the cursor state is unrecoverable.
125114 def execute_select_fetch_multiple (self , query , batchsize = 1 , show_duration = False ):
126- with self ._connection .cursor (cursor_factory = DictCursor ) as cursor :
127- self .execute_select_query (cursor , query , show_duration )
128- rows = cursor .fetchmany (batchsize )
115+ """
116+ Streaming SELECT using a named server-side cursor.
117+ No retry. No reconnect. No commit.
118+ Fail fast if the connection drops (old behavior).
119+ """
120+ self .ensure_connection ()
121+ cursor_name = f"cursor_{ int (time .time () * 1000 )} "
122+ with self ._connection .cursor (cursor_factory = DictCursor , name = cursor_name ) as cursor :
123+ if TIMEOUT :
124+ cursor .execute ("SET statement_timeout = %s" , (1000 * TIMEOUT ,))
125+ if show_duration :
126+ self .logger .info (f"SQL: { query } " )
127+ st = time .time ()
128+ cursor .execute (query )
129+ self .logger .info (
130+ "Execution ended. Elapsed time : %s seconds." ,
131+ time .time () - st
132+ )
133+ else :
134+ cursor .execute (query )
135+
129136 count = cursor .rowcount
130- while rows :
131- if batchsize == 1 :
132- rows = rows .pop ()
133- yield rows , count
137+
138+ while True :
134139 rows = cursor .fetchmany (batchsize )
135- self ._connection .commit ()
136- return
140+ if not rows :
141+ break
142+
143+ yield rows , count
137144
138145 # the method below should be used as a generator function otherwise use execute_update
139146 @database_retry_decorator
140147 def execute_update_query (self , query , params = None , isolation_level = None , show_duration = False ):
148+ self .ensure_connection ()
141149 if show_duration :
142150 self .logger .info ("SQL: {}" .format (query ))
143151 st_execute = time .time ()
144- with self ._connection .cursor (cursor_factory = DictCursor ) as cursor :
152+ with self ._connection .cursor (cursor_factory = DictCursor , name = "cursor_out" ) as cursor :
145153 old_isolation_level = self ._connection .isolation_level
146- if isolation_level is not None :
147- self ._connection .set_isolation_level (isolation_level )
148- cursor .execute (query , params )
149- self ._connection .commit ()
154+ try :
155+ if isolation_level is not None :
156+ self ._connection .set_isolation_level (isolation_level )
157+ cursor .execute (query , params )
158+ self ._connection .commit ()
159+ finally :
160+ self ._connection .set_isolation_level (old_isolation_level )
150161 if show_duration :
151162 et_execute = time .time ()
152163 self .logger .info ("Execution ended. Elapsed time : %s seconds." % (et_execute - st_execute ))
153- self ._connection .set_isolation_level (old_isolation_level )
154164 yield # the decorator database_retry_decorator only supports generators
155165 return
156166
0 commit comments