Skip to content

Commit 30cbcf3

Browse files
author
Alex Toker
committed
Make httpclient thread safe
1 parent 6118403 commit 30cbcf3

File tree

1 file changed

+129
-54
lines changed

1 file changed

+129
-54
lines changed

v3io/dataplane/transport/httpclient.py

Lines changed: 129 additions & 54 deletions
Original file line numberDiff line numberDiff line change
@@ -12,10 +12,12 @@
1212
# See the License for the specific language governing permissions and
1313
# limitations under the License.
1414
#
15+
import contextlib
1516
import http.client
1617
import queue
1718
import socket
1819
import ssl
20+
import threading
1921

2022
import v3io.dataplane.request
2123
import v3io.dataplane.response
@@ -31,7 +33,9 @@ class Transport(abstract.Transport):
3133
def __init__(self, logger, endpoint=None, max_connections=None, timeout=None, verbosity=None):
3234
super(Transport, self).__init__(logger, endpoint, max_connections, timeout, verbosity)
3335

34-
self._free_connections = queue.Queue()
36+
self._free_connections = queue.Queue(self.max_connections)
37+
self._lock = threading.RLock() # Reentrant lock for thread safety
38+
self._closed = False
3539

3640
# based on scheme, create a host and context for _create_connection
3741
self._host, self._ssl_context = self._parse_endpoint(self._endpoint)
@@ -67,39 +71,79 @@ def get_request_max_retries(cls):
6771
global _request_max_retries
6872
return _request_max_retries
6973

74+
def _put_connection(self, connection):
75+
with self._lock:
76+
if self._closed:
77+
with contextlib.suppress(Exception):
78+
connection.close()
79+
return
80+
try:
81+
self._free_connections.put(connection, block=False)
82+
except Exception as conn_error:
83+
self._logger.error(f"Failed to return connection to the pool: {conn_error}")
84+
with contextlib.suppress(Exception):
85+
connection.close()
86+
raise conn_error
87+
88+
def _get_connection(self):
89+
# First, check state under lock and decide what to do
90+
while True:
91+
with self._lock:
92+
if self._closed:
93+
raise RuntimeError("Cannot send request on a closed client")
94+
95+
# Try non-blocking get first
96+
if not self._free_connections.empty():
97+
with contextlib.suppress(queue.Empty):
98+
return self._free_connections.get_nowait()
99+
100+
# Wait outside the lock
101+
try:
102+
connection = self._free_connections.get(block=True, timeout=0.01)
103+
except queue.Empty:
104+
continue # Go back to the start of the loop
105+
except Exception as e:
106+
raise RuntimeError(f"Cannot get connection , {e}") from e
107+
108+
# We got a connection, verify client is still open
109+
if self._closed:
110+
with contextlib.suppress(Exception):
111+
connection.close()
112+
raise RuntimeError("Cannot send request on a closed client")
113+
return connection
114+
70115
def close(self):
71-
# Ignore redundant calls to close
72-
if not self._free_connections:
73-
return
74-
75-
connections = []
76-
while not self._free_connections.empty():
77-
conn = self._free_connections.get()
78-
connections.append(conn)
79-
# In case anyone tries to reuse this object, we want them to get an error and not hang
80-
self._free_connections = None
116+
with self._lock:
117+
if self._closed:
118+
return
119+
# Mark as closed before draining the queue to prevent race conditions
120+
self._closed = True
121+
connections = []
122+
# Move free connections to local variable to release the lock as soon as possible
123+
with contextlib.suppress(queue.Empty):
124+
while not self._free_connections.empty():
125+
conn = self._free_connections.get_nowait()
126+
connections.append(conn)
127+
81128
self._logger.debug(f"Closing all {len(connections)} v3io transport connections")
82129
for conn in connections:
83-
conn.close()
130+
try:
131+
conn.close()
132+
except Exception as e:
133+
self._logger.debug(f"Error closing connection: {e}")
84134

85135
def requires_access_key(self):
86136
return True
87137

88138
def send_request(self, request):
89-
if not self._free_connections:
90-
raise RuntimeError("Cannot send request on a closed client")
91-
92-
# TODO: consider getting param of whether we should block or
93-
# not (wait for connection to be free or raise exception)
94-
connection = self._free_connections.get(block=True, timeout=None)
95-
139+
connection = self._get_connection()
96140
try:
97141
return self._send_request_on_connection(request, connection)
98-
except BaseException as e:
99-
request.transport.connection_used.close()
100-
connection = self._create_connection(self._host, self._ssl_context)
101-
self._free_connections.put(connection, block=True)
102-
raise e
142+
except BaseException:
143+
new_connection = self._create_connection(self._host, self._ssl_context)
144+
self._put_connection(new_connection)
145+
with contextlib.suppress(Exception):
146+
connection.close()
103147

104148
def wait_response(self, request, raise_for_status=None, num_retries=1):
105149
connection = request.transport.connection_used
@@ -118,24 +162,25 @@ def wait_response(self, request, raise_for_status=None, num_retries=1):
118162
response_body = response.read()
119163

120164
status_code, headers = self._get_status_and_headers(response)
165+
self.log(
166+
"Rx",
167+
connection=connection,
168+
status_code=status_code,
169+
body=response_body,
170+
)
171+
self._put_connection(connection)
172+
try:
173+
v3io_response = v3io.dataplane.response.Response(
174+
request.output, status_code, headers, response_body
175+
)
176+
v3io_response.raise_for_status(request.raise_for_status or raise_for_status)
177+
return v3io_response
178+
except v3io.dataplane.response.HttpResponseError as response_error:
179+
self._logger.warn_with(f"Response error: {response_error}")
180+
raise response_error
121181

122-
self.log("Rx", connection=connection, status_code=status_code, body=response_body)
123-
124-
response = v3io.dataplane.response.Response(request.output, status_code, headers, response_body)
125-
126-
self._free_connections.put(connection, block=True)
127-
128-
response.raise_for_status(request.raise_for_status or raise_for_status)
129-
130-
return response
131-
132-
except v3io.dataplane.response.HttpResponseError as response_error:
133-
self._logger.warn_with(f"Response error: {response_error}")
134-
raise response_error
135182
except BaseException as e:
136-
connection.close()
137183
connection = self._create_connection(self._host, self._ssl_context)
138-
139184
if num_retries == 0:
140185
self._logger.error_with(
141186
"Error occurred while waiting for response and ran out of retries",
@@ -145,11 +190,11 @@ def wait_response(self, request, raise_for_status=None, num_retries=1):
145190
status_code=status_code,
146191
headers=headers,
147192
)
148-
self._free_connections.put(connection, block=True)
193+
self._put_connection(connection)
149194
raise e
150195

151196
self._logger.debug_with(
152-
"Error occurred while waiting for response retrying",
197+
"Error occurred while waiting for response - retrying",
153198
retries_left=num_retries,
154199
e=type(e),
155200
e_msg=e,
@@ -159,25 +204,46 @@ def wait_response(self, request, raise_for_status=None, num_retries=1):
159204
is_retry = True
160205

161206
def _send_request_on_connection(self, request, connection):
162-
request.transport.connection_used = connection
207+
"""Sends a request on the specified connection.
208+
209+
This method attempts to send the given request over the provided connection.
210+
It handles potential connection errors, retries if necessary, and manages
211+
the request body's position for seekable streams. Note!! If the send operation fails,
212+
the connection is closed within this function.
213+
214+
Args:
215+
request (Request): The request object to send.
216+
connection (http.client.HTTPConnection): The connection to use for sending.
217+
218+
Returns:
219+
Request: The original request object.
220+
"""
163221

164222
path = request.encode_path()
165223

166224
self.log(
167-
"Tx", connection=connection, method=request.method, path=path, headers=request.headers, body=request.body
225+
"Tx",
226+
connection=connection,
227+
method=request.method,
228+
path=path,
229+
headers=request.headers,
230+
body=request.body,
168231
)
169232

170-
starting_offset = 0
171233
is_body_seekable = request.body and hasattr(request.body, "seek") and hasattr(request.body, "tell")
172-
if is_body_seekable:
173-
starting_offset = request.body.tell()
174-
234+
starting_offset = request.body.tell() if is_body_seekable else 0
175235
retries_left = Transport.get_request_max_retries()
236+
176237
while True:
177238
try:
239+
request.transport.connection_used = connection
178240
connection.request(request.method, path, request.body, request.headers)
179-
break
241+
return request
180242
except self._send_request_exceptions as e:
243+
# Close failed connection
244+
with contextlib.suppress(Exception):
245+
connection.close()
246+
181247
self._logger.debug_with(
182248
f"Disconnected while attempting to send request – "
183249
f"{retries_left} out of {Transport.get_request_max_retries()} retries left.",
@@ -186,27 +252,36 @@ def _send_request_on_connection(self, request, connection):
186252
)
187253
if retries_left == 0:
188254
raise
255+
189256
retries_left -= 1
190-
connection.close()
257+
258+
connection = self._create_connection(self._host, self._ssl_context)
259+
191260
if is_body_seekable:
192261
# If the first connection fails, the pointer of the body might move at the size
193262
# of the first connection blocksize.
194263
# We need to reset the position of the pointer in order to send the whole file.
195-
request.body.seek(starting_offset)
196-
connection = self._create_connection(self._host, self._ssl_context)
197-
request.transport.connection_used = connection
264+
with contextlib.suppress(Exception):
265+
request.body.seek(starting_offset)
266+
198267
except BaseException as e:
199268
self._logger.error_with(
200-
"Unhandled exception while sending request", e=type(e), e_msg=e, connection=connection
269+
"Unhandled exception while sending request",
270+
e=type(e),
271+
e_msg=e,
272+
connection=connection,
201273
)
274+
# Close failed connection
275+
with contextlib.suppress(Exception):
276+
connection.close()
202277
raise e
203278

204279
return request
205280

206281
def _create_connections(self, num_connections, host, ssl_context):
207282
for _ in range(num_connections):
208283
connection = self._create_connection(host, ssl_context)
209-
self._free_connections.put(connection, block=True)
284+
self._put_connection(connection)
210285

211286
def _create_connection(self, host, ssl_context):
212287
if ssl_context is None:

0 commit comments

Comments
 (0)