Skip to content

Commit 5ee90a4

Browse files
author
Jason Mobarak
authored
Add TCP reconnect, purge "Skylark broker" support (#678)
1 parent 6cb5960 commit 5ee90a4

File tree

3 files changed

+43
-399
lines changed

3 files changed

+43
-399
lines changed

python/requirements.txt

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,8 @@
11
construct==2.9.33
2-
futures>=2.2.0
3-
httpretty==0.9.4
4-
# httpretty 0.9.5 is causing issues on travis: https://github.com/gabrielfalcao/HTTPretty/issues/340
52
pyftdi==0.13.4
63
pylibftdi
74
pyserial
85
requests>=2.8.1
9-
requests-futures>=0.9.5
106
llvmlite==0.26.0
117
numpy==1.16.2
128
numba==0.41.0

python/sbp/client/drivers/network_drivers.py

Lines changed: 42 additions & 275 deletions
Original file line numberDiff line numberDiff line change
@@ -11,18 +11,19 @@
1111
1212
"""
1313

14-
from .base_driver import BaseDriver
15-
from concurrent.futures import ThreadPoolExecutor
16-
from requests.adapters import DEFAULT_POOLBLOCK, DEFAULT_POOLSIZE, HTTPAdapter
17-
from requests.packages.urllib3.util import Retry
18-
from requests_futures.sessions import FuturesSession
19-
import requests
20-
from functools import partial
14+
import sys
2115
import errno
2216
import socket
2317
import threading
2418
import time
25-
import warnings
19+
20+
from functools import partial
21+
22+
from .base_driver import BaseDriver
23+
24+
25+
MAX_RECONNECT_RETRIES = 30
26+
RECONNECT_SLEEP_S = 1
2627

2728

2829
class TCPDriver(BaseDriver):
@@ -40,7 +41,7 @@ class TCPDriver(BaseDriver):
4041
4142
"""
4243

43-
def __init__(self, host, port, timeout=5, raise_initial_timeout=False):
44+
def __init__(self, host, port, timeout=5, raise_initial_timeout=False, reconnect=False):
4445
self._address = (host, port)
4546
print((host, port))
4647
self._create_connection = partial(socket.create_connection,
@@ -50,6 +51,8 @@ def __init__(self, host, port, timeout=5, raise_initial_timeout=False):
5051
self._connect(timeout_raises=raise_initial_timeout)
5152
super(TCPDriver, self).__init__(self.handle)
5253
self._write_lock = threading.Lock()
54+
self._reconnect_count = 0
55+
self._reconnect_supported = reconnect
5356

5457
def _connect(self, timeout_raises=False):
5558
while True:
@@ -60,6 +63,21 @@ def _connect(self, timeout_raises=False):
6063
if timeout_raises:
6164
raise
6265

66+
def _reconnect(self, exc):
67+
if not self._reconnect_supported:
68+
raise exc
69+
while True:
70+
if self._reconnect_count >= MAX_RECONNECT_RETRIES:
71+
raise exc
72+
try:
73+
self._connect(timeout_raises=True)
74+
self._reconnect_count = 0
75+
except socket.error:
76+
self._reconnect_count += 1
77+
time.sleep(RECONNECT_SLEEP_S)
78+
continue
79+
break
80+
6381
def read(self, size):
6482
"""
6583
Read wrapper.
@@ -69,19 +87,21 @@ def read(self, size):
6987
size : int
7088
Number of bytes to read
7189
"""
72-
try:
73-
data = self.handle.recv(size)
90+
data = None
91+
while True:
92+
try:
93+
data = self.handle.recv(size)
94+
except socket.timeout as socket_error:
95+
self._reconnect(socket_error)
96+
except socket.error as socket_error:
97+
# this is fine, just retry
98+
if socket_error.errno == errno.EINTR:
99+
continue
100+
self._reconnect(IOError)
74101
if not data:
75-
raise IOError
76-
return data
77-
except socket.timeout:
78-
self._connect()
79-
except socket.error as socket_error:
80-
# this is fine
81-
if socket_error.errno == errno.EINTR:
82-
return
83-
# we really shouldn't be doing this
84-
raise IOError
102+
self._reconnect(IOError)
103+
break
104+
return data
85105

86106
def flush(self):
87107
pass
@@ -100,263 +120,10 @@ def write(self, s):
100120
self.handle.sendall(s)
101121
except socket.timeout:
102122
self._connect()
103-
except socket.error as msg:
123+
except socket.error:
104124
raise IOError
105125
finally:
106126
self._write_lock.release()
107127

108128

109-
class HTTPException(Exception):
110-
pass
111-
112-
113-
DEFAULT_CONNECT_TIMEOUT = 30
114-
DEFAULT_READ_TIMEOUT = 120
115-
DEFAULT_TIMEOUT = (DEFAULT_CONNECT_TIMEOUT, DEFAULT_READ_TIMEOUT)
116-
MAX_CONNECT_RETRIES = 5
117-
MAX_READ_RETRIES = 3
118-
DEFAULT_RETRIES = (MAX_CONNECT_RETRIES, MAX_READ_RETRIES)
119-
MAX_REDIRECTS = 0
120-
DEFAULT_BACKOFF_FACTOR = 0.2
121-
BROKER_SBP_TYPE = 'application/vnd.swiftnav.broker.v1+sbp2'
122-
123-
124-
class HTTPDriver(BaseDriver):
125-
"""HTTPDriver
126-
127-
The :class:`HTTPDriver` class reads SBP messages from an HTTP
128-
service for a device and writes out to a stream. This driver is like
129-
a file-handle with read and writes over two separately HTTP
130-
connections, but can also be enabled and disabled by its consumer.
131-
132-
Parameters
133-
----------
134-
device_uid : uid
135-
Device unique id
136-
url : str
137-
HTTP endpoint
138-
retries : tuple
139-
Configure connect and read retry count. Defaults to
140-
(MAX_CONNECT_RETRIES, MAX_READ_RETRIES).
141-
timeout : tuple
142-
Configure connect and read timeouts. Defaults to
143-
(DEFAULT_CONNECT_TIMEOUT, DEFAULT_READ_TIMEOUT).
144-
145-
"""
146-
147-
def __init__(
148-
self,
149-
device_uid=None,
150-
url="https://broker.staging.skylark.swiftnav.com",
151-
retries=DEFAULT_RETRIES,
152-
timeout=DEFAULT_TIMEOUT, ):
153-
self._retry = Retry(
154-
connect=DEFAULT_RETRIES[0],
155-
read=DEFAULT_RETRIES[1],
156-
redirect=MAX_REDIRECTS,
157-
status_forcelist=[500],
158-
backoff_factor=DEFAULT_BACKOFF_FACTOR)
159-
self.url = url
160-
self.read_session = requests.Session()
161-
self.read_session.mount("http://",
162-
HTTPAdapter(
163-
pool_connections=DEFAULT_POOLSIZE,
164-
pool_maxsize=DEFAULT_POOLSIZE,
165-
pool_block=DEFAULT_POOLBLOCK,
166-
max_retries=self._retry))
167-
self.read_session.mount("https://",
168-
HTTPAdapter(
169-
pool_connections=DEFAULT_POOLSIZE,
170-
pool_maxsize=DEFAULT_POOLSIZE,
171-
pool_block=DEFAULT_POOLBLOCK,
172-
max_retries=self._retry))
173-
self.write_session = None
174-
self.device_uid = device_uid
175-
self.timeout = timeout
176-
self.read_response = None
177-
self.write_response = None
178-
self.source = None
179-
180-
def flush(self):
181-
"""File-flush wrapper (noop).
182-
183-
"""
184-
pass
185-
186-
def close(self):
187-
"""File-handle close wrapper (noop).
188-
189-
"""
190-
try:
191-
self.read_close()
192-
self.write_close()
193-
except:
194-
pass
195-
196-
@property
197-
def write_ok(self):
198-
"""
199-
Are we connected for writes?
200-
"""
201-
# Note that self.write_response is either None or a Response
202-
# object, which cast to False for 4xx and 5xx HTTP codes.
203-
return bool(self.write_response)
204-
205-
def connect_write(self, source, whitelist, device_uid=None, pragma=None):
206-
"""Initialize a streaming write HTTP response. Manually connects the
207-
underlying file-handle. In the event of a network disconnection,
208-
use to manually reinitiate an HTTP session.
209-
210-
Parameters
211-
----------
212-
source : sbp.client.handler.Handler
213-
Iterable source of SBP messages.
214-
whitelist : [int]
215-
Whitelist of messages to write
216-
217-
"""
218-
header_device_uid = device_uid or self.device_uid
219-
headers = {
220-
'Device-Uid': header_device_uid,
221-
'Content-Type': BROKER_SBP_TYPE,
222-
'Pragma': pragma
223-
}
224-
if not pragma:
225-
del headers['Pragma']
226-
try:
227-
self.executor = ThreadPoolExecutor(max_workers=DEFAULT_POOLSIZE)
228-
self.write_session = FuturesSession(executor=self.executor)
229-
self.write_session.mount("http://",
230-
HTTPAdapter(
231-
pool_connections=DEFAULT_POOLSIZE,
232-
pool_maxsize=DEFAULT_POOLSIZE,
233-
pool_block=DEFAULT_POOLBLOCK,
234-
max_retries=self._retry))
235-
self.write_session.mount("https://",
236-
HTTPAdapter(
237-
pool_connections=DEFAULT_POOLSIZE,
238-
pool_maxsize=DEFAULT_POOLSIZE,
239-
pool_block=DEFAULT_POOLBLOCK,
240-
max_retries=self._retry))
241-
self.source = source.filter(whitelist)
242-
gen = (msg.pack() for msg, _ in self.source)
243-
self.write_session.put(self.url, data=gen, headers=headers)
244-
self.write_response = True
245-
except requests.exceptions.ConnectionError:
246-
msg = "Client connection error to %s with [PUT] headers %s" \
247-
% (self.url, headers)
248-
warnings.warn(msg)
249-
except requests.exceptions.ConnectTimeout:
250-
msg = "Client connection timeout to %s with [PUT] headers %s" \
251-
% (self.url, headers)
252-
warnings.warn(msg)
253-
except requests.exceptions.RetryError:
254-
msg = "Client retry error to %s with [PUT] headers %s" \
255-
% (self.url, headers)
256-
warnings.warn(msg)
257-
except requests.exceptions.ReadTimeout:
258-
msg = "Client read timeout to %s with [PUT] headers %s" \
259-
% (self.url, headers)
260-
warnings.warn(msg)
261-
return self.write_ok
262-
263-
def write(self, data):
264-
"""Write wrapper (noop). Actual stream is initiated by the write
265-
connection.
266-
267-
Parameters
268-
----------
269-
data : object
270-
Data to write.
271-
272-
"""
273-
pass
274-
275-
def write_close(self):
276-
"""File-handle close wrapper (noop).
277-
278-
"""
279-
try:
280-
self.write_session.close()
281-
self.executor.shutdown(wait=False)
282-
self.source.breakiter()
283-
self.source = None
284-
self.executor = None
285-
self.write_session = None
286-
except:
287-
pass
288-
289-
@property
290-
def read_ok(self):
291-
"""
292-
Are we connected for reads?
293-
"""
294-
return bool(self.read_response)
295-
296-
def connect_read(self, device_uid=None, pragma=None):
297-
"""Initialize a streaming read/write HTTP response. Manually connects
298-
the underlying file-handle. In the event of a network
299-
disconnection, use to manually reinitiate an HTTP session.
300-
301-
"""
302-
header_device_uid = device_uid or self.device_uid
303-
headers = {
304-
'Device-Uid': header_device_uid,
305-
'Accept': BROKER_SBP_TYPE,
306-
'Pragma': pragma
307-
}
308-
if not pragma:
309-
del headers['Pragma']
310-
try:
311-
self.read_response = self.read_session.get(
312-
self.url, stream=True, headers=headers, timeout=self.timeout)
313-
except requests.exceptions.ConnectionError:
314-
msg = "Client connection error to %s with [GET] headers %s" \
315-
% (self.url, headers)
316-
warnings.warn(msg)
317-
except requests.exceptions.ConnectTimeout:
318-
msg = "Client connection timeout to %s with [GET] headers %s" \
319-
% (self.url, headers)
320-
warnings.warn(msg)
321-
except requests.exceptions.RetryError:
322-
msg = "Client retry error to %s with [GET] headers %s" \
323-
% (self.url, headers)
324-
warnings.warn(msg)
325-
except requests.exceptions.ReadTimeout:
326-
msg = "Client read timeout to %s with [GET] headers %s" \
327-
% (self.url, headers)
328-
warnings.warn(msg)
329-
return self.read_ok
330-
331-
def read(self, size):
332-
"""Read wrapper. If the client connection is closed or some other
333-
exception is thrown, raises an IOError.
334-
335-
Parameters
336-
----------
337-
size : int
338-
Size to read (in bytes).
339-
340-
Returns
341-
----------
342-
bytearray, or None
343-
344-
"""
345-
if self.read_response is None or not self.device_uid:
346-
raise ValueError("Invalid/insufficient HTTP request parameters!")
347-
elif not self.read_ok or self.read_response.raw.closed:
348-
raise IOError("HTTP read closed?!")
349-
try:
350-
return self.read_response.raw.read(size)
351-
except:
352-
raise IOError("HTTP read error!")
353-
354-
def read_close(self):
355-
"""File-handle close wrapper (noop).
356129

357-
"""
358-
try:
359-
self.read_response.close()
360-
self.read_response = None
361-
except:
362-
pass

0 commit comments

Comments
 (0)