Skip to content

Commit 054e774

Browse files
author
Alex Toker
committed
Make httpclient thread safe
1 parent 6118403 commit 054e774

File tree

1 file changed

+140
-55
lines changed

1 file changed

+140
-55
lines changed

v3io/dataplane/transport/httpclient.py

Lines changed: 140 additions & 55 deletions
Original file line numberDiff line numberDiff line change
@@ -12,11 +12,13 @@
1212
# See the License for the specific language governing permissions and
1313
# limitations under the License.
1414
#
15+
import contextlib
1516
import http.client
1617
import queue
1718
import socket
1819
import ssl
19-
20+
import threading
21+
import time
2022
import v3io.dataplane.request
2123
import v3io.dataplane.response
2224

@@ -31,7 +33,9 @@ class Transport(abstract.Transport):
3133
def __init__(self, logger, endpoint=None, max_connections=None, timeout=None, verbosity=None):
3234
super(Transport, self).__init__(logger, endpoint, max_connections, timeout, verbosity)
3335

34-
self._free_connections = queue.Queue()
36+
self._free_connections = queue.Queue(self.max_connections)
37+
self._lock = threading.RLock() # Reentrant lock for thread safety
38+
self._closed = False
3539

3640
# based on scheme, create a host and context for _create_connection
3741
self._host, self._ssl_context = self._parse_endpoint(self._endpoint)
@@ -52,6 +56,10 @@ def get_connection_timeout(cls):
5256
global _connection_timeout_seconds
5357
return _connection_timeout_seconds
5458

59+
@classmethod
60+
def get_connection_acquire_timeout(cls):
61+
return cls.get_connection_timeout() * 10
62+
5563
@classmethod
5664
def set_connection_timeout(cls, timeout):
5765
global _connection_timeout_seconds
@@ -67,39 +75,85 @@ def get_request_max_retries(cls):
6775
global _request_max_retries
6876
return _request_max_retries
6977

78+
def _put_connection(self, connection):
79+
with self._lock:
80+
if self._closed:
81+
with contextlib.suppress(Exception):
82+
connection.close()
83+
return
84+
try:
85+
self._free_connections.put(connection, block=False)
86+
except Exception as conn_error:
87+
self._logger.error(f"Failed to return connection to the pool: {conn_error}")
88+
with contextlib.suppress(Exception):
89+
connection.close()
90+
raise conn_error
91+
92+
def _get_connection(self):
93+
# First, check state under lock and decide what to do
94+
start_time = time.time()
95+
while True:
96+
# Check if we've exceeded the total timeout
97+
if time.time() - start_time > Transport.get_connection_acquire_timeout():
98+
raise TimeoutError(
99+
f"Could not get a connection within {Transport.get_connection_acquire_timeout()} seconds"
100+
)
101+
with self._lock:
102+
if self._closed:
103+
raise RuntimeError("Cannot send request on a closed client")
104+
105+
# Try non-blocking get first
106+
if not self._free_connections.empty():
107+
with contextlib.suppress(queue.Empty):
108+
return self._free_connections.get_nowait()
109+
110+
# Wait outside the lock
111+
try:
112+
connection = self._free_connections.get(block=True, timeout=0.01)
113+
except queue.Empty:
114+
continue # Go back to the start of the loop
115+
except Exception as e:
116+
raise RuntimeError(f"Cannot get connection , {e}") from e
117+
118+
# We got a connection, verify client is still open
119+
if self._closed:
120+
with contextlib.suppress(Exception):
121+
connection.close()
122+
raise RuntimeError("Cannot send request on a closed client")
123+
return connection
124+
70125
def close(self):
71-
# Ignore redundant calls to close
72-
if not self._free_connections:
73-
return
74-
75-
connections = []
76-
while not self._free_connections.empty():
77-
conn = self._free_connections.get()
78-
connections.append(conn)
79-
# In case anyone tries to reuse this object, we want them to get an error and not hang
80-
self._free_connections = None
126+
with self._lock:
127+
if self._closed:
128+
return
129+
# Mark as closed before draining the queue to prevent race conditions
130+
self._closed = True
131+
connections = []
132+
# Move free connections to local variable to release the lock as soon as possible
133+
with contextlib.suppress(queue.Empty):
134+
while not self._free_connections.empty():
135+
conn = self._free_connections.get_nowait()
136+
connections.append(conn)
137+
81138
self._logger.debug(f"Closing all {len(connections)} v3io transport connections")
82139
for conn in connections:
83-
conn.close()
140+
try:
141+
conn.close()
142+
except Exception as e:
143+
self._logger.debug(f"Error closing connection: {e}")
84144

85145
def requires_access_key(self):
86146
return True
87147

88148
def send_request(self, request):
89-
if not self._free_connections:
90-
raise RuntimeError("Cannot send request on a closed client")
91-
92-
# TODO: consider getting param of whether we should block or
93-
# not (wait for connection to be free or raise exception)
94-
connection = self._free_connections.get(block=True, timeout=None)
95-
149+
connection = self._get_connection()
96150
try:
97151
return self._send_request_on_connection(request, connection)
98-
except BaseException as e:
99-
request.transport.connection_used.close()
100-
connection = self._create_connection(self._host, self._ssl_context)
101-
self._free_connections.put(connection, block=True)
102-
raise e
152+
except BaseException:
153+
new_connection = self._create_connection(self._host, self._ssl_context)
154+
self._put_connection(new_connection)
155+
with contextlib.suppress(Exception):
156+
connection.close()
103157

104158
def wait_response(self, request, raise_for_status=None, num_retries=1):
105159
connection = request.transport.connection_used
@@ -118,24 +172,25 @@ def wait_response(self, request, raise_for_status=None, num_retries=1):
118172
response_body = response.read()
119173

120174
status_code, headers = self._get_status_and_headers(response)
175+
self.log(
176+
"Rx",
177+
connection=connection,
178+
status_code=status_code,
179+
body=response_body,
180+
)
181+
self._put_connection(connection)
182+
try:
183+
v3io_response = v3io.dataplane.response.Response(
184+
request.output, status_code, headers, response_body
185+
)
186+
v3io_response.raise_for_status(request.raise_for_status or raise_for_status)
187+
return v3io_response
188+
except v3io.dataplane.response.HttpResponseError as response_error:
189+
self._logger.warn_with(f"Response error: {response_error}")
190+
raise response_error
121191

122-
self.log("Rx", connection=connection, status_code=status_code, body=response_body)
123-
124-
response = v3io.dataplane.response.Response(request.output, status_code, headers, response_body)
125-
126-
self._free_connections.put(connection, block=True)
127-
128-
response.raise_for_status(request.raise_for_status or raise_for_status)
129-
130-
return response
131-
132-
except v3io.dataplane.response.HttpResponseError as response_error:
133-
self._logger.warn_with(f"Response error: {response_error}")
134-
raise response_error
135192
except BaseException as e:
136-
connection.close()
137193
connection = self._create_connection(self._host, self._ssl_context)
138-
139194
if num_retries == 0:
140195
self._logger.error_with(
141196
"Error occurred while waiting for response and ran out of retries",
@@ -145,11 +200,11 @@ def wait_response(self, request, raise_for_status=None, num_retries=1):
145200
status_code=status_code,
146201
headers=headers,
147202
)
148-
self._free_connections.put(connection, block=True)
203+
self._put_connection(connection)
149204
raise e
150205

151206
self._logger.debug_with(
152-
"Error occurred while waiting for response retrying",
207+
"Error occurred while waiting for response - retrying",
153208
retries_left=num_retries,
154209
e=type(e),
155210
e_msg=e,
@@ -159,25 +214,46 @@ def wait_response(self, request, raise_for_status=None, num_retries=1):
159214
is_retry = True
160215

161216
def _send_request_on_connection(self, request, connection):
162-
request.transport.connection_used = connection
217+
"""Sends a request on the specified connection.
218+
219+
This method attempts to send the given request over the provided connection.
220+
It handles potential connection errors, retries if necessary, and manages
221+
the request body's position for seekable streams. Note!! If the send operation fails,
222+
the connection is closed within this function.
223+
224+
Args:
225+
request (Request): The request object to send.
226+
connection (http.client.HTTPConnection): The connection to use for sending.
227+
228+
Returns:
229+
Request: The original request object.
230+
"""
163231

164232
path = request.encode_path()
165233

166234
self.log(
167-
"Tx", connection=connection, method=request.method, path=path, headers=request.headers, body=request.body
235+
"Tx",
236+
connection=connection,
237+
method=request.method,
238+
path=path,
239+
headers=request.headers,
240+
body=request.body,
168241
)
169242

170-
starting_offset = 0
171243
is_body_seekable = request.body and hasattr(request.body, "seek") and hasattr(request.body, "tell")
172-
if is_body_seekable:
173-
starting_offset = request.body.tell()
174-
244+
starting_offset = request.body.tell() if is_body_seekable else 0
175245
retries_left = Transport.get_request_max_retries()
246+
176247
while True:
177248
try:
249+
request.transport.connection_used = connection
178250
connection.request(request.method, path, request.body, request.headers)
179-
break
251+
return request
180252
except self._send_request_exceptions as e:
253+
# Close failed connection
254+
with contextlib.suppress(Exception):
255+
connection.close()
256+
181257
self._logger.debug_with(
182258
f"Disconnected while attempting to send request – "
183259
f"{retries_left} out of {Transport.get_request_max_retries()} retries left.",
@@ -186,27 +262,36 @@ def _send_request_on_connection(self, request, connection):
186262
)
187263
if retries_left == 0:
188264
raise
265+
189266
retries_left -= 1
190-
connection.close()
267+
268+
connection = self._create_connection(self._host, self._ssl_context)
269+
191270
if is_body_seekable:
192271
# If the first connection fails, the pointer of the body might move at the size
193272
# of the first connection blocksize.
194273
# We need to reset the position of the pointer in order to send the whole file.
195-
request.body.seek(starting_offset)
196-
connection = self._create_connection(self._host, self._ssl_context)
197-
request.transport.connection_used = connection
274+
with contextlib.suppress(Exception):
275+
request.body.seek(starting_offset)
276+
198277
except BaseException as e:
199278
self._logger.error_with(
200-
"Unhandled exception while sending request", e=type(e), e_msg=e, connection=connection
279+
"Unhandled exception while sending request",
280+
e=type(e),
281+
e_msg=e,
282+
connection=connection,
201283
)
284+
# Close failed connection
285+
with contextlib.suppress(Exception):
286+
connection.close()
202287
raise e
203288

204289
return request
205290

206291
def _create_connections(self, num_connections, host, ssl_context):
207292
for _ in range(num_connections):
208293
connection = self._create_connection(host, ssl_context)
209-
self._free_connections.put(connection, block=True)
294+
self._put_connection(connection)
210295

211296
def _create_connection(self, host, ssl_context):
212297
if ssl_context is None:

0 commit comments

Comments
 (0)