Skip to content

Commit 1ceb24d

Browse files
authored
Add request timeout (#121)
* Add request timeout [ML-5598](https://iguazio.atlassian.net/browse/ML-5598) * lint
1 parent b16fbf8 commit 1ceb24d

File tree

2 files changed

+34
-39
lines changed

2 files changed

+34
-39
lines changed

v3io/dataplane/kv_timestamp.py

Lines changed: 1 addition & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -13,27 +13,19 @@
1313
# limitations under the License.
1414
#
1515
import datetime
16-
import sys
1716

1817
# used only n py2
1918
BASE_DATETIME = datetime.datetime(1970, 1, 1)
2019

2120

22-
def _get_timestamp_from_datetime_py3(dt):
21+
def _get_timestamp_from_datetime(dt):
2322
return dt.astimezone(datetime.timezone.utc).timestamp()
2423

2524

2625
def _get_timestamp_from_datetime_py2(dt):
2726
return (dt - BASE_DATETIME).total_seconds()
2827

2928

30-
# _get_timestamp_from_datetime is python version specific. resolve this once
31-
if sys.version_info[0] >= 3:
32-
_get_timestamp_from_datetime = _get_timestamp_from_datetime_py3
33-
else:
34-
_get_timestamp_from_datetime = _get_timestamp_from_datetime_py2
35-
36-
3729
def encode(dt):
3830
timestamp = _get_timestamp_from_datetime(dt)
3931

v3io/dataplane/transport/httpclient.py

Lines changed: 33 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,6 @@
1616
import queue
1717
import socket
1818
import ssl
19-
import sys
2019

2120
import v3io.dataplane.request
2221
import v3io.dataplane.response
@@ -25,6 +24,9 @@
2524

2625

2726
class Transport(abstract.Transport):
27+
_connection_timeout_seconds = 20
28+
_request_max_retries = 2
29+
2830
def __init__(self, logger, endpoint=None, max_connections=None, timeout=None, verbosity=None):
2931
super(Transport, self).__init__(logger, endpoint, max_connections, timeout, verbosity)
3032

@@ -36,24 +38,19 @@ def __init__(self, logger, endpoint=None, max_connections=None, timeout=None, ve
3638
# create the pool connection
3739
self._create_connections(self.max_connections, self._host, self._ssl_context)
3840

39-
# python 2 and 3 have different exceptions
40-
if sys.version_info[0] >= 3:
41-
self._wait_response_exceptions = (
42-
http.client.RemoteDisconnected,
43-
ConnectionResetError,
44-
ConnectionRefusedError,
45-
http.client.ResponseNotReady,
46-
)
47-
self._send_request_exceptions = (
48-
BrokenPipeError,
49-
http.client.CannotSendRequest,
50-
http.client.RemoteDisconnected,
51-
)
52-
self._get_status_and_headers = self._get_status_and_headers_py3
53-
else:
54-
self._wait_response_exceptions = (http.client.BadStatusLine, socket.error)
55-
self._send_request_exceptions = (http.client.CannotSendRequest, http.client.BadStatusLine)
56-
self._get_status_and_headers = self._get_status_and_headers_py2
41+
self._wait_response_exceptions = (
42+
http.client.RemoteDisconnected,
43+
ConnectionResetError,
44+
ConnectionRefusedError,
45+
http.client.ResponseNotReady,
46+
)
47+
self._send_request_exceptions = (
48+
BrokenPipeError,
49+
http.client.CannotSendRequest,
50+
http.client.RemoteDisconnected,
51+
socket.timeout,
52+
)
53+
self._get_status_and_headers = self._get_status_and_headers_py3
5754

5855
def close(self):
5956
# Ignore redundant calls to close
@@ -154,20 +151,27 @@ def _send_request_on_connection(self, request, connection):
154151
self.log(
155152
"Tx", connection=connection, method=request.method, path=path, headers=request.headers, body=request.body
156153
)
154+
157155
starting_offset = 0
158156
is_body_seekable = request.body and hasattr(request.body, "seek") and hasattr(request.body, "tell")
159157
if is_body_seekable:
160158
starting_offset = request.body.tell()
161-
try:
159+
160+
retries_left = self._request_max_retries
161+
while True:
162162
try:
163163
connection.request(request.method, path, request.body, request.headers)
164+
break
164165
except self._send_request_exceptions as e:
165166
self._logger.debug_with(
166-
"Disconnected while attempting to send. Recreating connection and retrying",
167+
f"Disconnected while attempting to send request – "
168+
f"{retries_left} out of {self._request_max_retries} retries left.",
167169
e=type(e),
168170
e_msg=e,
169-
connection=connection,
170171
)
172+
if retries_left == 0:
173+
raise
174+
retries_left -= 1
171175
connection.close()
172176
if is_body_seekable:
173177
# If the first connection fails, the pointer of the body might move at the size
@@ -176,12 +180,11 @@ def _send_request_on_connection(self, request, connection):
176180
request.body.seek(starting_offset)
177181
connection = self._create_connection(self._host, self._ssl_context)
178182
request.transport.connection_used = connection
179-
connection.request(request.method, path, request.body, request.headers)
180-
except BaseException as e:
181-
self._logger.error_with(
182-
"Unhandled exception while sending request", e=type(e), e_msg=e, connection=connection
183-
)
184-
raise e
183+
except BaseException as e:
184+
self._logger.error_with(
185+
"Unhandled exception while sending request", e=type(e), e_msg=e, connection=connection
186+
)
187+
raise e
185188

186189
return request
187190

@@ -192,9 +195,9 @@ def _create_connections(self, num_connections, host, ssl_context):
192195

193196
def _create_connection(self, host, ssl_context):
194197
if ssl_context is None:
195-
return http.client.HTTPConnection(host)
198+
return http.client.HTTPConnection(host, timeout=self._connection_timeout_seconds)
196199

197-
return http.client.HTTPSConnection(host, context=ssl_context)
200+
return http.client.HTTPSConnection(host, timeout=self._connection_timeout_seconds, context=ssl_context)
198201

199202
def _parse_endpoint(self, endpoint):
200203
if endpoint.startswith("http://"):

0 commit comments

Comments
 (0)