@@ -71,7 +71,10 @@ def connect_working_db(self):
7171 connect_args = "host=%s dbname=%s user=%s password=%s port=%s" % (host , dbname , user , password , port )
7272
7373 self .logger .info ("Connecting to work database" )
74- connection = psycopg2 .connect (connect_args )
74+ if TIMEOUT :
75+ connection = psycopg2 .connect (connect_args , options = f"-c statement_timeout={ TIMEOUT * 1000 } " )
76+ else :
77+ connection = psycopg2 .connect (connect_args )
7578 connection .set_client_encoding ("UTF8" )
7679
7780 return connection
@@ -119,29 +122,30 @@ def execute_select_fetch_multiple(self, query, batchsize=1, show_duration=False)
119122 """
120123 self .ensure_connection ()
121124 cursor_name = f"cursor_{ int (time .time () * 1000 )} "
122- with self ._connection .cursor (cursor_factory = DictCursor , name = cursor_name ) as cursor :
123- if TIMEOUT :
124- cursor .execute ("SET statement_timeout = %s" , (1000 * TIMEOUT ,))
125- if show_duration :
126- self .logger .info (f"SQL: { query } " )
127- st = time .time ()
128- cursor .execute (query )
129- self .logger .info (
130- "Execution ended. Elapsed time : %s seconds." ,
131- time .time () - st
132- )
133- else :
134- cursor .execute (query )
135-
136- count = cursor .rowcount
137-
138- while True :
139- rows = cursor .fetchmany (batchsize )
140- if not rows :
141- break
142- if batchsize == 1 :
143- rows = rows .pop ()
144- yield rows , count
125+ try :
126+ with self ._connection .cursor (cursor_factory = DictCursor , name = cursor_name ) as cursor :
127+ if show_duration :
128+ self .logger .info (f"SQL: { query } " )
129+ st = time .time ()
130+ cursor .execute (query )
131+ self .logger .info (
132+ "Execution ended. Elapsed time : %s seconds." ,
133+ time .time () - st
134+ )
135+ else :
136+ cursor .execute (query )
137+
138+ count = cursor .rowcount
139+
140+ while True :
141+ rows = cursor .fetchmany (batchsize )
142+ if not rows :
143+ break
144+ if batchsize == 1 :
145+ rows = rows .pop ()
146+ yield rows , count
147+ finally :
148+ self ._connection .commit ()
145149
146150 # the method below should be used as a generator function otherwise use execute_update
147151 @database_retry_decorator
0 commit comments