Skip to content

Commit abd4808

Browse files
committed
More refactoring
1 parent 9b41784 commit abd4808

File tree

1 file changed

+99
-114
lines changed

1 file changed

+99
-114
lines changed

v3io/dataplane/transport/httpclient.py

Lines changed: 99 additions & 114 deletions
Original file line numberDiff line numberDiff line change
@@ -28,11 +28,12 @@
2828
class Transport(abstract.Transport):
2929
_connection_timeout_seconds = 20
3030
_request_max_retries = 2
31+
3132

3233
def __init__(self, logger, endpoint=None, max_connections=None, timeout=None, verbosity=None):
3334
super(Transport, self).__init__(logger, endpoint, max_connections, timeout, verbosity)
3435

35-
self._free_connections = queue.Queue()
36+
self._free_connections = queue.Queue(self.max_connections)
3637
self._lock = threading.RLock() # Reentrant lock for thread safety
3738
self._closed = False
3839

@@ -50,57 +51,80 @@ def __init__(self, logger, endpoint=None, max_connections=None, timeout=None, ve
5051
)
5152
self._get_status_and_headers = self._get_status_and_headers_py3
5253

53-
def close(self):
54+
def _put_connection(self, connection):
5455
with self._lock:
55-
# Avoid redundant calls to close
5656
if self._closed:
57+
with contextlib.suppress(Exception):
58+
connection.close()
5759
return
60+
try:
61+
self._free_connections.put(connection, block=False)
62+
except Exception as conn_error:
63+
self._logger.error(f"Failed to return connection to the pool: {conn_error}")
64+
with contextlib.suppress(Exception):
65+
connection.close()
66+
raise conn_error
67+
68+
69+
def _get_connection(self):
70+
# First, check state under lock and decide what to do
71+
while True:
72+
with self._lock:
73+
if self._closed:
74+
raise RuntimeError("Cannot send request on a closed client")
5875

76+
# Try non-blocking get first
77+
if not self._free_connections.empty():
78+
with contextlib.suppress(queue.Empty):
79+
return self._free_connections.get_nowait()
80+
81+
# Wait outside the lock
82+
try:
83+
connection = self._free_connections.get(block=True, timeout=0.01)
84+
except queue.Empty:
85+
continue # Go back to the start of the loop
86+
except Exception as e:
87+
raise RuntimeError(f"Cannot get connection , {e}") from e
88+
89+
# We got a connection, verify client is still open
90+
if self._closed:
91+
with contextlib.suppress(Exception):
92+
connection.close()
93+
raise RuntimeError("Cannot send request on a closed client")
94+
return connection
95+
96+
def close(self):
97+
with self._lock:
98+
if self._closed:
99+
return
59100
# Mark as closed before draining the queue to prevent race conditions
60101
self._closed = True
61-
62102
connections = []
103+
# Move free connections to local variable to release the lock as soon as possible
63104
with contextlib.suppress(queue.Empty):
64105
while not self._free_connections.empty():
65106
conn = self._free_connections.get_nowait()
66107
connections.append(conn)
67-
self._logger.debug(f"Closing all {len(connections)} v3io transport connections")
68-
for conn in connections:
69-
try:
70-
conn.close()
71-
except Exception as e:
72-
self._logger.debug(f"Error closing connection: {e}")
108+
109+
self._logger.debug(f"Closing all {len(connections)} v3io transport connections")
110+
for conn in connections:
111+
try:
112+
conn.close()
113+
except Exception as e:
114+
self._logger.debug(f"Error closing connection: {e}")
73115

74116
def requires_access_key(self):
75117
return True
76118

77119
def send_request(self, request):
78-
with self._lock:
79-
if self._closed:
80-
raise RuntimeError("Cannot send request on a closed client")
81-
82-
# Get a connection from the pool (thread-safe operation)
83-
try:
84-
connection = self._free_connections.get(block=True, timeout=30)
85-
except queue.Empty as e:
86-
raise RuntimeError("Timed out waiting for an available connection") from e
87-
120+
connection = self._get_connection()
88121
try:
89122
return self._send_request_on_connection(request, connection)
90-
except BaseException as e:
91-
# Handle connection error in a thread-safe way
92-
with self._lock:
93-
if not self._closed:
94-
with contextlib.suppress(Exception):
95-
connection.close()
96-
# Only create and add a new connection if we're not closed
97-
try:
98-
new_connection = self._create_connection(self._host, self._ssl_context)
99-
self._free_connections.put(new_connection, block=False)
100-
except Exception as conn_error:
101-
self._logger.error(f"Failed to create replacement connection: {conn_error}")
102-
103-
raise e
123+
except BaseException:
124+
new_connection = self._create_connection(self._host, self._ssl_context)
125+
self._put_connection(new_connection)
126+
with contextlib.suppress(Exception):
127+
connection.close()
104128

105129
def wait_response(self, request, raise_for_status=None, num_retries=1):
106130
connection = request.transport.connection_used
@@ -112,56 +136,25 @@ def wait_response(self, request, raise_for_status=None, num_retries=1):
112136
headers = None
113137
try:
114138
if is_retry:
115-
with self._lock:
116-
if self._closed:
117-
raise RuntimeError("Transport closed during request retry")
118139
request = self._send_request_on_connection(request, connection)
119140
connection = request.transport.connection_used
120141

121142
response = connection.getresponse()
122143
response_body = response.read()
123144

124145
status_code, headers = self._get_status_and_headers(response)
125-
126146
self.log("Rx", connection=connection, status_code=status_code, body=response_body)
127-
128-
response = v3io.dataplane.response.Response(request.output, status_code, headers, response_body)
129-
130-
# Return connection to pool if successful
131-
with self._lock:
132-
if not self._closed:
133-
try:
134-
self._free_connections.put(connection, block=False)
135-
except Exception as e:
136-
self._logger.warn_with(
137-
"Failed to return connection to pool", exception=str(e), connection_id=id(connection)
138-
)
139-
connection.close()
140-
response.raise_for_status(request.raise_for_status or raise_for_status)
141-
return response
142-
143-
except v3io.dataplane.response.HttpResponseError as response_error:
144-
self._logger.warn_with(f"Response error: {response_error}")
145-
# Return connection to pool even on HTTP errors, as the connection is still valid
146-
with self._lock:
147-
if not self._closed:
148-
try:
149-
self._free_connections.put(connection, block=False)
150-
except Exception as e:
151-
self._logger.warn_with(
152-
"Failed to return connection to pool", exception=str(e), connection_id=id(connection)
153-
)
154-
connection.close()
155-
raise response_error
147+
self._put_connection(connection)
148+
try:
149+
v3io_response = v3io.dataplane.response.Response(request.output, status_code, headers, response_body)
150+
v3io_response.raise_for_status(request.raise_for_status or raise_for_status)
151+
return v3io_response
152+
except v3io.dataplane.response.HttpResponseError as response_error:
153+
self._logger.warn_with(f"Response error: {response_error}")
154+
raise response_error
155+
156156
except BaseException as e:
157-
# Handle connection error in thread-safe way
158-
with contextlib.suppress(Exception):
159-
connection.close()
160-
with self._lock:
161-
if self._closed:
162-
raise RuntimeError("Transport closed during response handling") from e
163-
connection = self._create_connection(self._host, self._ssl_context)
164-
157+
connection = self._create_connection(self._host, self._ssl_context)
165158
if num_retries == 0:
166159
self._logger.error_with(
167160
"Error occurred while waiting for response and ran out of retries",
@@ -171,24 +164,11 @@ def wait_response(self, request, raise_for_status=None, num_retries=1):
171164
status_code=status_code,
172165
headers=headers,
173166
)
174-
175-
# Return the new connection to the pool
176-
with self._lock:
177-
if not self._closed:
178-
try:
179-
self._free_connections.put(connection, block=False)
180-
except Exception as e:
181-
self._logger.warn_with(
182-
"Failed to return connection to pool",
183-
exception=str(e),
184-
connection_id=id(connection),
185-
)
186-
connection.close()
187-
167+
self._put_connection(connection)
188168
raise e
189169

190170
self._logger.debug_with(
191-
"Error occurred while waiting for response retrying",
171+
"Error occurred while waiting for response - retrying",
192172
retries_left=num_retries,
193173
e=type(e),
194174
e_msg=e,
@@ -198,7 +178,20 @@ def wait_response(self, request, raise_for_status=None, num_retries=1):
198178
is_retry = True
199179

200180
def _send_request_on_connection(self, request, connection):
201-
request.transport.connection_used = connection
181+
"""Sends a request on the specified connection.
182+
183+
This method attempts to send the given request over the provided connection.
184+
It handles potential connection errors, retries if necessary, and manages
185+
the request body's position for seekable streams. Note!! If the send operation fails,
186+
the connection is closed within this function.
187+
188+
Args:
189+
request (Request): The request object to send.
190+
connection (http.client.HTTPConnection): The connection to use for sending.
191+
192+
Returns:
193+
Request: The original request object.
194+
"""
202195

203196
path = request.encode_path()
204197

@@ -209,17 +202,17 @@ def _send_request_on_connection(self, request, connection):
209202
is_body_seekable = request.body and hasattr(request.body, "seek") and hasattr(request.body, "tell")
210203
starting_offset = request.body.tell() if is_body_seekable else 0
211204
retries_left = self._request_max_retries
212-
current_connection = connection # Track the current connection for thread safety
213205

214206
while True:
215207
try:
216-
# Check if transport is closed before proceeding
217-
with self._lock:
218-
if self._closed:
219-
raise RuntimeError("Transport closed during request sending")
220-
current_connection.request(request.method, path, request.body, request.headers)
221-
break
208+
request.transport.connection_used = connection
209+
connection.request(request.method, path, request.body, request.headers)
210+
return request
222211
except self._send_request_exceptions as e:
212+
# Close failed connection
213+
with contextlib.suppress(Exception):
214+
connection.close()
215+
223216
self._logger.debug_with(
224217
f"Disconnected while attempting to send request – "
225218
f"{retries_left} out of {self._request_max_retries} retries left.",
@@ -231,38 +224,30 @@ def _send_request_on_connection(self, request, connection):
231224

232225
retries_left -= 1
233226

234-
# Close failed connection
235-
with contextlib.suppress(Exception):
236-
current_connection.close()
237-
# Create a new connection for retry, thread-safe
238-
with self._lock:
239-
if self._closed:
240-
raise RuntimeError("Transport closed during request retry") from e
241-
current_connection = self._create_connection(self._host, self._ssl_context)
227+
connection = self._create_connection(self._host, self._ssl_context)
242228

243229
if is_body_seekable:
244230
# If the first connection fails, the pointer of the body might move at the size
245231
# of the first connection blocksize.
246232
# We need to reset the position of the pointer in order to send the whole file.
247-
request.body.seek(starting_offset)
233+
with contextlib.suppress(Exception):
234+
request.body.seek(starting_offset)
248235

249-
# Update the connection in the request
250-
request.transport.connection_used = current_connection
251236
except BaseException as e:
252237
self._logger.error_with(
253-
"Unhandled exception while sending request", e=type(e), e_msg=e, connection=current_connection
238+
"Unhandled exception while sending request", e=type(e), e_msg=e, connection=connection
254239
)
240+
# Close failed connection
241+
with contextlib.suppress(Exception):
242+
connection.close()
255243
raise e
256244

257-
# Update request with the potentially new connection
258-
request.transport.connection_used = current_connection
259245
return request
260246

261247
def _create_connections(self, num_connections, host, ssl_context):
262-
with self._lock:
263248
for _ in range(num_connections):
264249
connection = self._create_connection(host, ssl_context)
265-
self._free_connections.put(connection, block=True)
250+
self._put_connection(connection)
266251

267252
def _create_connection(self, host, ssl_context):
268253
if ssl_context is None:

0 commit comments

Comments
 (0)