Skip to content

Commit d7f6df7

Browse files
committed
Updated VivaTPS to new geocom protocol
1 parent 7de246d commit d7f6df7

File tree

11 files changed

+246
-305
lines changed

11 files changed

+246
-305
lines changed

src/geocompy/protocols.py

Lines changed: 4 additions & 106 deletions
Original file line numberDiff line numberDiff line change
@@ -174,69 +174,15 @@ def request(
174174
rpc: int,
175175
params: Iterable[int | float | bool | str | Angle | Byte] = (),
176176
parsers: Callable[[str], _T] | None = None
177-
) -> GeoComResponse[_T]:
178-
"""
179-
Executes an RPC request and returns the parsed GeoCom response.
180-
181-
Constructs a request (from the given RPC code and parameters),
182-
writes it to the serial line, then reads the response. The
183-
response is then parsed using the provided parser function.
184-
185-
Parameters
186-
----------
187-
rpc : int
188-
Number of the RPC to execute.
189-
params : Iterable[int | float | bool | str | Angle | Byte], optional
190-
Parameters for the request, by default []
191-
parsers : Callable[[str], Any] | None, optional
192-
Parser function for the value in the RPC response,
193-
by default None
194-
195-
Returns
196-
-------
197-
GeoComResponse
198-
Parsed return codes and parameter from the RPC response.
199-
200-
Raises
201-
------
202-
NotImplementedError
203-
If the method is not implemented on the class.
204-
"""
177+
) -> GeoComResponse[_T]: ...
205178

206179
@overload
207180
def request(
208181
self,
209182
rpc: int,
210183
params: Iterable[int | float | bool | str | Angle | Byte] = (),
211184
parsers: Iterable[Callable[[str], Any]] | None = None
212-
) -> GeoComResponse[tuple]:
213-
"""
214-
Executes an RPC request and returns the parsed GeoCom response.
215-
216-
Constructs a request (from the given RPC code and parameters),
217-
writes it to the serial line, then reads the response. The
218-
response is then parsed using the provided parser functions.
219-
220-
Parameters
221-
----------
222-
rpc : int
223-
Number of the RPC to execute.
224-
params : Iterable[int | float | bool | str | Angle | Byte], optional
225-
Parameters for the request, by default ()
226-
parsers : Iterable[Callable[[str], Any]] | None, optional
227-
Parser functions for the values in the RPC response,
228-
by default None
229-
230-
Returns
231-
-------
232-
GeoComResponse
233-
Parsed return codes and parameters from the RPC response.
234-
235-
Raises
236-
------
237-
NotImplementedError
238-
If the method is not implemented on the class.
239-
"""
185+
) -> GeoComResponse[tuple]: ...
240186

241187
def request(
242188
self,
@@ -284,63 +230,15 @@ def parse_response(
284230
cmd: str,
285231
response: str,
286232
parsers: Callable[[str], _T] | None = None
287-
) -> GeoComResponse[_T]:
288-
"""
289-
Parses RPC response and constructs :class:`GeoComResponse`
290-
instance.
291-
292-
Parameters
293-
----------
294-
cmd : str
295-
Full, serialized request, that invoked the response.
296-
response : str
297-
Full, received response.
298-
parsers : Callable[[str], Any] | None, optional
299-
Parser function for the value in the RPC response, by
300-
default None
301-
302-
Returns
303-
-------
304-
GeoComResponse
305-
Parsed return codes and parameter from the RPC response.
306-
307-
Raises
308-
------
309-
NotImplementedError
310-
If the method is not implemented on the class.
311-
"""
233+
) -> GeoComResponse[_T]: ...
312234

313235
@overload
314236
def parse_response(
315237
cls,
316238
cmd: str,
317239
response: str,
318240
parsers: Iterable[Callable[[str], Any]] | None = None
319-
) -> GeoComResponse[tuple]:
320-
"""
321-
Parses RPC response and constructs :class:`GeoComResponse`
322-
instance.
323-
324-
Parameters
325-
----------
326-
cmd : str
327-
Full, serialized request, that invoked the response.
328-
response : str
329-
Full, received response.
330-
parsers : Iterable[Callable[[str], Any]], optional
331-
Parser functions for the values in the RPC response, by
332-
default None
333-
334-
Returns
335-
-------
336-
GeoComResponse
337-
Parsed return codes and parameters from the RPC response.
338-
339-
Raises
340-
------
341-
NotImplementedError
342-
If the method is not implemented on the class.
343-
"""
241+
) -> GeoComResponse[tuple]: ...
344242

345243
def parse_response(
346244
cls,

src/geocompy/vivatps/__init__.py

Lines changed: 78 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,7 @@
3939
import logging
4040
from time import sleep
4141
from traceback import format_exc
42-
from typing import Callable, Iterable, Any
42+
from typing import Callable, Iterable, Any, overload, TypeVar
4343

4444
from serial import SerialException, SerialTimeoutException
4545

@@ -69,6 +69,9 @@
6969
from .grc import VivaTPSGRC, rpcnames
7070

7171

72+
_T = TypeVar("_T")
73+
74+
7275
class VivaTPS(GeoComProtocol):
7376
"""
7477
VivaTPS GeoCom protocol handler.
@@ -193,12 +196,17 @@ def __init__(
193196

194197
self._precision = 15
195198
resp = self.get_double_precision()
196-
if resp.comcode and resp.rpccode:
197-
self._precision = resp.params["digits"]
199+
if resp.params is not None:
200+
self._precision = resp.params
201+
else:
202+
self._logger.error(
203+
f"Could not syncronize double precision, "
204+
f"defaulting to {self._precision:d}"
205+
)
198206

199207
self._logger.info("Connection initialized")
200208

201-
def get_double_precision(self) -> GeoComResponse:
209+
def get_double_precision(self) -> GeoComResponse[int]:
202210
"""
203211
RPC 108, ``COM_GetDoublePrecision``
204212
@@ -218,13 +226,13 @@ def get_double_precision(self) -> GeoComResponse:
218226
"""
219227
return self.request(
220228
108,
221-
parsers={"digits": int}
229+
parsers=int
222230
)
223231

224232
def set_double_precision(
225233
self,
226234
digits: int
227-
) -> GeoComResponse:
235+
) -> GeoComResponse[None]:
228236
"""
229237
RPC 107, ``COM_SetDoublePrecision``
230238
@@ -245,16 +253,36 @@ def set_double_precision(
245253
get_double_precision
246254
247255
"""
248-
response = self.request(107, [digits])
256+
response: GeoComResponse[None] = self.request(107, [digits])
249257
if response.comcode and response.rpccode:
250258
self._precision = digits
251259
return response
252260

261+
@overload
262+
def request(
263+
self,
264+
rpc: int,
265+
params: Iterable[int | float | bool | str | Angle | Byte] = (),
266+
parsers: Callable[[str], _T] | None = None
267+
) -> GeoComResponse[_T]: ...
268+
269+
@overload
270+
def request(
271+
self,
272+
rpc: int,
273+
params: Iterable[int | float | bool | str | Angle | Byte] = (),
274+
parsers: Iterable[Callable[[str], Any]] | None = None
275+
) -> GeoComResponse[tuple]: ...
276+
253277
def request(
254278
self,
255279
rpc: int,
256280
params: Iterable[int | float | bool | str | Angle | Byte] = [],
257-
parsers: dict[str, Callable[[str], Any]] | None = None
281+
parsers: (
282+
Iterable[Callable[[str], Any]]
283+
| Callable[[str], Any]
284+
| None
285+
) = None
258286
) -> GeoComResponse:
259287
"""
260288
Executes a VivaTPS RPC request and returns the parsed GeoCom
@@ -377,16 +405,36 @@ def request(
377405
response = self.parse_response(
378406
cmd,
379407
answer,
380-
parsers if parsers is not None else {}
408+
parsers
381409
)
382410
self._logger.debug(response)
383411
return response
384412

413+
@overload
385414
def parse_response(
386415
self,
387416
cmd: str,
388417
response: str,
389-
parsers: dict[str, Callable[[str], Any]]
418+
parsers: Callable[[str], _T] | None = None
419+
) -> GeoComResponse[_T]: ...
420+
421+
@overload
422+
def parse_response(
423+
self,
424+
cmd: str,
425+
response: str,
426+
parsers: Iterable[Callable[[str], Any]] | None = None
427+
) -> GeoComResponse[tuple]: ...
428+
429+
def parse_response(
430+
self,
431+
cmd: str,
432+
response: str,
433+
parsers: (
434+
Iterable[Callable[[str], Any]]
435+
| Callable[[str], Any]
436+
| None
437+
) = None
390438
) -> GeoComResponse:
391439
"""
392440
Parses RPC response and constructs GeoComResponse
@@ -425,37 +473,49 @@ def parse_response(
425473
response,
426474
VivaTPSGRC.COM_CANT_DECODE,
427475
VivaTPSGRC.UNDEFINED,
428-
0,
429-
{}
476+
0
430477
)
431478

432479
groups = m.groupdict()
433480
values = groups.get("params", "")
434481
if values is None:
435482
values = ""
436-
params: dict = {}
483+
484+
if parsers is None:
485+
parsers = ()
486+
elif not isinstance(parsers, Iterable):
487+
parsers = (parsers,)
488+
489+
params: list = []
437490
try:
438-
for (name, func), value in zip(parsers.items(), values.split(",")):
439-
params[name] = func(value)
491+
for func, value in zip(parsers, values.split(",")):
492+
params.append(func(value))
440493
except Exception:
441494
return GeoComResponse(
442495
rpcname,
443496
cmd,
444497
response,
445498
VivaTPSGRC.COM_CANT_DECODE,
446499
VivaTPSGRC.UNDEFINED,
447-
0,
448-
{}
500+
0
449501
)
450502

451503
comrc = VivaTPSGRC(int(groups["comrc"]))
452504
rc = VivaTPSGRC(int(groups["rc"]))
505+
match len(params):
506+
case 0:
507+
params_final = None
508+
case 1:
509+
params_final = params[1]
510+
case _:
511+
params_final = tuple(params)
512+
453513
return GeoComResponse(
454514
rpcname,
455515
cmd,
456516
response,
457517
comrc,
458518
rc,
459519
int(groups["tr"]),
460-
params
520+
params_final
461521
)

src/geocompy/vivatps/aut.py

Lines changed: 4 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,7 @@ class ONOFF(Enum):
4343
def set_lock_fly_mode(
4444
self,
4545
state: ONOFF | str
46-
) -> GeoComResponse:
46+
) -> GeoComResponse[None]:
4747
"""
4848
RPC 9103, ``AUT_SetLockFlyMode``
4949
@@ -68,7 +68,7 @@ def set_lock_fly_mode(
6868
[_state.value]
6969
)
7070

71-
def get_lock_fly_mode(self) -> GeoComResponse:
71+
def get_lock_fly_mode(self) -> GeoComResponse[ONOFF]:
7272
"""
7373
RPC 9102, ``AUT_GetLockFlyMode``
7474
@@ -86,17 +86,15 @@ def get_lock_fly_mode(self) -> GeoComResponse:
8686
"""
8787
return self._request(
8888
9102,
89-
parsers={
90-
"state": enumparser(self.ONOFF)
91-
}
89+
parsers=enumparser(self.ONOFF)
9290
)
9391

9492
def cam_posit_to_pixel_coord(
9593
self,
9694
x: int,
9795
y: int,
9896
camtype: VivaTPSCAM.CAMTYPE | str = VivaTPSCAM.CAMTYPE.OVC
99-
) -> GeoComResponse:
97+
) -> GeoComResponse[None]:
10098
"""
10199
RPC 9081, ``AUT_CAM_PositToPixelCoord``
102100

src/geocompy/vivatps/bap.py

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,7 @@ class ONOFF(Enum):
3737
OFF = 0
3838
ON = 1
3939

40-
def get_atr_precise(self) -> GeoComResponse:
40+
def get_atr_precise(self) -> GeoComResponse[ONOFF]:
4141
"""
4242
RPC 17039, ``BAP_GetATRPrecise``
4343
@@ -55,15 +55,13 @@ def get_atr_precise(self) -> GeoComResponse:
5555
"""
5656
return self._request(
5757
17039,
58-
parsers={
59-
"state": enumparser(self.ONOFF)
60-
}
58+
parsers=enumparser(self.ONOFF)
6159
)
6260

6361
def set_atr_precise(
6462
self,
6563
state: ONOFF | str
66-
) -> GeoComResponse:
64+
) -> GeoComResponse[None]:
6765
"""
6866
RPC 17040, ``BAP_SetATRPrecise``
6967

0 commit comments

Comments
 (0)