1212# See the License for the specific language governing permissions and
1313# limitations under the License.
1414#
15+ import contextlib
1516import http .client
1617import queue
1718import socket
1819import ssl
20+ import threading
1921
2022import v3io .dataplane .request
2123import v3io .dataplane .response
@@ -31,7 +33,9 @@ class Transport(abstract.Transport):
3133 def __init__ (self , logger , endpoint = None , max_connections = None , timeout = None , verbosity = None ):
3234 super (Transport , self ).__init__ (logger , endpoint , max_connections , timeout , verbosity )
3335
34- self ._free_connections = queue .Queue ()
36+ self ._free_connections = queue .Queue (self .max_connections )
37+ self ._lock = threading .RLock () # Reentrant lock for thread safety
38+ self ._closed = False
3539
3640 # based on scheme, create a host and context for _create_connection
3741 self ._host , self ._ssl_context = self ._parse_endpoint (self ._endpoint )
@@ -67,39 +71,79 @@ def get_request_max_retries(cls):
6771 global _request_max_retries
6872 return _request_max_retries
6973
74+ def _put_connection (self , connection ):
75+ with self ._lock :
76+ if self ._closed :
77+ with contextlib .suppress (Exception ):
78+ connection .close ()
79+ return
80+ try :
81+ self ._free_connections .put (connection , block = False )
82+ except Exception as conn_error :
83+ self ._logger .error (f"Failed to return connection to the pool: { conn_error } " )
84+ with contextlib .suppress (Exception ):
85+ connection .close ()
86+ raise conn_error
87+
88+ def _get_connection (self ):
89+ # First, check state under lock and decide what to do
90+ while True :
91+ with self ._lock :
92+ if self ._closed :
93+ raise RuntimeError ("Cannot send request on a closed client" )
94+
95+ # Try non-blocking get first
96+ if not self ._free_connections .empty ():
97+ with contextlib .suppress (queue .Empty ):
98+ return self ._free_connections .get_nowait ()
99+
100+ # Wait outside the lock
101+ try :
102+ connection = self ._free_connections .get (block = True , timeout = 0.01 )
103+ except queue .Empty :
104+ continue # Go back to the start of the loop
105+ except Exception as e :
106+ raise RuntimeError (f"Cannot get connection , { e } " ) from e
107+
108+ # We got a connection, verify client is still open
109+ if self ._closed :
110+ with contextlib .suppress (Exception ):
111+ connection .close ()
112+ raise RuntimeError ("Cannot send request on a closed client" )
113+ return connection
114+
70115 def close (self ):
71- # Ignore redundant calls to close
72- if not self ._free_connections :
73- return
74-
75- connections = []
76- while not self ._free_connections .empty ():
77- conn = self ._free_connections .get ()
78- connections .append (conn )
79- # In case anyone tries to reuse this object, we want them to get an error and not hang
80- self ._free_connections = None
116+ with self ._lock :
117+ if self ._closed :
118+ return
119+ # Mark as closed before draining the queue to prevent race conditions
120+ self ._closed = True
121+ connections = []
122+ # Move free connections to local variable to release the lock as soon as possible
123+ with contextlib .suppress (queue .Empty ):
124+ while not self ._free_connections .empty ():
125+ conn = self ._free_connections .get_nowait ()
126+ connections .append (conn )
127+
81128 self ._logger .debug (f"Closing all { len (connections )} v3io transport connections" )
82129 for conn in connections :
83- conn .close ()
130+ try :
131+ conn .close ()
132+ except Exception as e :
133+ self ._logger .debug (f"Error closing connection: { e } " )
84134
85135 def requires_access_key (self ):
86136 return True
87137
88138 def send_request (self , request ):
89- if not self ._free_connections :
90- raise RuntimeError ("Cannot send request on a closed client" )
91-
92- # TODO: consider getting param of whether we should block or
93- # not (wait for connection to be free or raise exception)
94- connection = self ._free_connections .get (block = True , timeout = None )
95-
139+ connection = self ._get_connection ()
96140 try :
97141 return self ._send_request_on_connection (request , connection )
98- except BaseException as e :
99- request . transport . connection_used . close ( )
100- connection = self ._create_connection ( self . _host , self . _ssl_context )
101- self . _free_connections . put ( connection , block = True )
102- raise e
142+ except BaseException :
143+ new_connection = self . _create_connection ( self . _host , self . _ssl_context )
144+ self ._put_connection ( new_connection )
145+ with contextlib . suppress ( Exception ):
146+ connection . close ()
103147
104148 def wait_response (self , request , raise_for_status = None , num_retries = 1 ):
105149 connection = request .transport .connection_used
@@ -118,24 +162,25 @@ def wait_response(self, request, raise_for_status=None, num_retries=1):
118162 response_body = response .read ()
119163
120164 status_code , headers = self ._get_status_and_headers (response )
165+ self .log (
166+ "Rx" ,
167+ connection = connection ,
168+ status_code = status_code ,
169+ body = response_body ,
170+ )
171+ self ._put_connection (connection )
172+ try :
173+ v3io_response = v3io .dataplane .response .Response (
174+ request .output , status_code , headers , response_body
175+ )
176+ v3io_response .raise_for_status (request .raise_for_status or raise_for_status )
177+ return v3io_response
178+ except v3io .dataplane .response .HttpResponseError as response_error :
179+ self ._logger .warn_with (f"Response error: { response_error } " )
180+ raise response_error
121181
122- self .log ("Rx" , connection = connection , status_code = status_code , body = response_body )
123-
124- response = v3io .dataplane .response .Response (request .output , status_code , headers , response_body )
125-
126- self ._free_connections .put (connection , block = True )
127-
128- response .raise_for_status (request .raise_for_status or raise_for_status )
129-
130- return response
131-
132- except v3io .dataplane .response .HttpResponseError as response_error :
133- self ._logger .warn_with (f"Response error: { response_error } " )
134- raise response_error
135182 except BaseException as e :
136- connection .close ()
137183 connection = self ._create_connection (self ._host , self ._ssl_context )
138-
139184 if num_retries == 0 :
140185 self ._logger .error_with (
141186 "Error occurred while waiting for response and ran out of retries" ,
@@ -145,11 +190,11 @@ def wait_response(self, request, raise_for_status=None, num_retries=1):
145190 status_code = status_code ,
146191 headers = headers ,
147192 )
148- self ._free_connections . put (connection , block = True )
193+ self ._put_connection (connection )
149194 raise e
150195
151196 self ._logger .debug_with (
152- "Error occurred while waiting for response – retrying" ,
197+ "Error occurred while waiting for response - retrying" ,
153198 retries_left = num_retries ,
154199 e = type (e ),
155200 e_msg = e ,
@@ -159,25 +204,46 @@ def wait_response(self, request, raise_for_status=None, num_retries=1):
159204 is_retry = True
160205
161206 def _send_request_on_connection (self , request , connection ):
162- request .transport .connection_used = connection
207+ """Sends a request on the specified connection.
208+
209+ This method attempts to send the given request over the provided connection.
210+ It handles potential connection errors, retries if necessary, and manages
211+ the request body's position for seekable streams. Note!! If the send operation fails,
212+ the connection is closed within this function.
213+
214+ Args:
215+ request (Request): The request object to send.
216+ connection (http.client.HTTPConnection): The connection to use for sending.
217+
218+ Returns:
219+ Request: The original request object.
220+ """
163221
164222 path = request .encode_path ()
165223
166224 self .log (
167- "Tx" , connection = connection , method = request .method , path = path , headers = request .headers , body = request .body
225+ "Tx" ,
226+ connection = connection ,
227+ method = request .method ,
228+ path = path ,
229+ headers = request .headers ,
230+ body = request .body ,
168231 )
169232
170- starting_offset = 0
171233 is_body_seekable = request .body and hasattr (request .body , "seek" ) and hasattr (request .body , "tell" )
172- if is_body_seekable :
173- starting_offset = request .body .tell ()
174-
234+ starting_offset = request .body .tell () if is_body_seekable else 0
175235 retries_left = Transport .get_request_max_retries ()
236+
176237 while True :
177238 try :
239+ request .transport .connection_used = connection
178240 connection .request (request .method , path , request .body , request .headers )
179- break
241+ return request
180242 except self ._send_request_exceptions as e :
243+ # Close failed connection
244+ with contextlib .suppress (Exception ):
245+ connection .close ()
246+
181247 self ._logger .debug_with (
182248 f"Disconnected while attempting to send request – "
183249 f"{ retries_left } out of { Transport .get_request_max_retries ()} retries left." ,
@@ -186,27 +252,36 @@ def _send_request_on_connection(self, request, connection):
186252 )
187253 if retries_left == 0 :
188254 raise
255+
189256 retries_left -= 1
190- connection .close ()
257+
258+ connection = self ._create_connection (self ._host , self ._ssl_context )
259+
191260 if is_body_seekable :
192261 # If the first connection fails, the pointer of the body might move at the size
193262 # of the first connection blocksize.
194263 # We need to reset the position of the pointer in order to send the whole file.
195- request . body . seek ( starting_offset )
196- connection = self . _create_connection ( self . _host , self . _ssl_context )
197- request . transport . connection_used = connection
264+ with contextlib . suppress ( Exception ):
265+ request . body . seek ( starting_offset )
266+
198267 except BaseException as e :
199268 self ._logger .error_with (
200- "Unhandled exception while sending request" , e = type (e ), e_msg = e , connection = connection
269+ "Unhandled exception while sending request" ,
270+ e = type (e ),
271+ e_msg = e ,
272+ connection = connection ,
201273 )
274+ # Close failed connection
275+ with contextlib .suppress (Exception ):
276+ connection .close ()
202277 raise e
203278
204279 return request
205280
206281 def _create_connections (self , num_connections , host , ssl_context ):
207282 for _ in range (num_connections ):
208283 connection = self ._create_connection (host , ssl_context )
209- self ._free_connections . put (connection , block = True )
284+ self ._put_connection (connection )
210285
211286 def _create_connection (self , host , ssl_context ):
212287 if ssl_context is None :
0 commit comments