|
52 | 52 | Base definitions for command protocols and responses. |
53 | 53 |
|
54 | 54 | """ |
55 | | -from __future__ import annotations |
56 | | - |
57 | | -from enum import Enum |
58 | | -from typing import Callable, Any, Iterable |
59 | | -from logging import Logger |
60 | | - |
61 | | -from .data import Angle, Byte |
62 | | -from .communication import Connection, get_logger |
63 | | - |
64 | 55 | try: |
65 | 56 | from ._version import __version__ |
66 | 57 | except Exception: |
67 | 58 | __version__ = "0.0.0" # Placeholder value for source installs |
68 | | - |
69 | | - |
70 | | -class GeoComReturnCode(Enum): |
71 | | - """Base class for all GeoCom return code enums.""" |
72 | | - |
73 | | - |
74 | | -class GeoComResponse: |
75 | | - """ |
76 | | - Container class for parsed GeoCom responses. |
77 | | -
|
78 | | - """ |
79 | | - |
80 | | - def __init__( |
81 | | - self, |
82 | | - rpcname: str, |
83 | | - cmd: str, |
84 | | - response: str, |
85 | | - comcode: GeoComReturnCode, |
86 | | - rpccode: GeoComReturnCode, |
87 | | - trans: int, |
88 | | - params: dict[str, Any] |
89 | | - ): |
90 | | - """ |
91 | | - Parameters |
92 | | - ---------- |
93 | | - rpcname : str |
94 | | - Name of the GeoCom function, that corresponds to the RPC, |
95 | | - that invoked this response. |
96 | | - cmd : str |
97 | | - Full, serialized request, that invoked this response. |
98 | | - response : str |
99 | | - Full, received response. |
100 | | - comcode : GeoComReturnCode |
101 | | - Parsed COM return code indicating the success/failure of |
102 | | - communication. |
103 | | - rpccode : GeoComReturnCode |
104 | | - Parsed RPC return code indicating the success/failure of |
105 | | - the command. |
106 | | - trans : int |
107 | | - Parsed transaction ID. |
108 | | - params : dict[str, Any] |
109 | | - Collection of parsed response parameters. The content |
110 | | - is dependent on the executed function. |
111 | | - """ |
112 | | - self.rpcname: str = rpcname |
113 | | - """Name of the GeoCom function, that correspondes to the RPC, |
114 | | - that invoked this response.""" |
115 | | - self.cmd: str = cmd |
116 | | - """Full, serialized request, that invoked this response.""" |
117 | | - self.response: str = response |
118 | | - """Full, received response.""" |
119 | | - self.comcode: GeoComReturnCode = comcode |
120 | | - """Parsed COM return code indicating the success/failure of |
121 | | - communication.""" |
122 | | - self.rpccode: GeoComReturnCode = rpccode |
123 | | - """Parsed RPC return code indicating the success/failure of |
124 | | - the command.""" |
125 | | - self.trans: int = trans |
126 | | - """Parsed transaction ID.""" |
127 | | - self.params: dict[str, Any] = params |
128 | | - """Collection of parsed response parameters. The content |
129 | | - is dependent on the executed function.""" |
130 | | - |
131 | | - def __str__(self) -> str: |
132 | | - return ( |
133 | | - f"GeoComResponse({self.rpcname}) com: {self.comcode.name:s}, " |
134 | | - f"rpc: {self.rpccode.name:s}, " |
135 | | - f"tr: {self.trans:d}, " |
136 | | - f"params: {self.params}, " |
137 | | - f"(cmd: '{self.cmd}', response: '{self.response}')" |
138 | | - ) |
139 | | - |
140 | | - def __bool__(self) -> bool: |
141 | | - return bool(self.comcode) and bool(self.rpccode) |
142 | | - |
143 | | - |
144 | | -class GeoComSubsystem: |
145 | | - """ |
146 | | - Base class for GeoCom subsystems. |
147 | | -
|
148 | | - """ |
149 | | - |
150 | | - def __init__(self, parent: GeoComProtocol): |
151 | | - """ |
152 | | - Parameters |
153 | | - ---------- |
154 | | - parent : GeoComProtocol |
155 | | - The parent protocol instance of this subsystem. |
156 | | - """ |
157 | | - self._parent: GeoComProtocol = parent |
158 | | - """Parent protocol instance""" |
159 | | - self._request = self._parent.request |
160 | | - """Shortcut to the `request` method of the parent protocol.""" |
161 | | - |
162 | | - |
163 | | -class GeoComProtocol: |
164 | | - """ |
165 | | - Base class for GeoCom protocol versions. |
166 | | -
|
167 | | - """ |
168 | | - |
169 | | - def __init__( |
170 | | - self, |
171 | | - connection: Connection, |
172 | | - logger: Logger | None = None |
173 | | - ): |
174 | | - """ |
175 | | - Parameters |
176 | | - ---------- |
177 | | - connection : Connection |
178 | | - Connection to use for communication |
179 | | - (usually :class:`~communication.SerialConnection`). |
180 | | - logger : ~logging.Logger | None, optional |
181 | | - Logger to log all requests and responses, by default None |
182 | | -
|
183 | | - """ |
184 | | - self._conn: Connection = connection |
185 | | - if logger is None: |
186 | | - logger = get_logger("/dev/null") |
187 | | - self._logger: Logger = logger |
188 | | - |
189 | | - def request( |
190 | | - self, |
191 | | - rpc: int, |
192 | | - params: Iterable[int | float | bool | str | Angle | Byte] = [], |
193 | | - parsers: dict[str, Callable[[str], Any]] | None = None |
194 | | - ) -> GeoComResponse: |
195 | | - """ |
196 | | - Executes an RPC request and returns the parsed GeoCom response. |
197 | | -
|
198 | | - Constructs a request (from the given RPC code and parameters), |
199 | | - writes it to the serial line, then reads the response. The |
200 | | - response is then parsed using the provided parser functions. |
201 | | -
|
202 | | - Parameters |
203 | | - ---------- |
204 | | - rpc : int |
205 | | - Number of the RPC to execute. |
206 | | - params : Iterable[int | float | bool | str | Angle | Byte], optional |
207 | | - Parameters for the request, by default [] |
208 | | - parsers : dict[str, Callable[[str], Any]] | None, optional |
209 | | - Parser functions for the values in the RPC response |
210 | | - (Maps the parser functions to the names of the parameters), |
211 | | - by default None |
212 | | -
|
213 | | - Returns |
214 | | - ------- |
215 | | - GeoComResponse |
216 | | - Parsed return codes and parameters from the RPC response. |
217 | | -
|
218 | | - Raises |
219 | | - ------ |
220 | | - NotImplementedError |
221 | | - If the method is not implemented on the class. |
222 | | -
|
223 | | - """ |
224 | | - raise NotImplementedError() |
225 | | - |
226 | | - def parse_response( |
227 | | - cls, |
228 | | - cmd: str, |
229 | | - response: str, |
230 | | - parsers: dict[str, Callable[[str], Any]] |
231 | | - ) -> GeoComResponse: |
232 | | - """ |
233 | | - Parses RPC response and constructs :class:`GeoComResponse` |
234 | | - instance. |
235 | | -
|
236 | | - Parameters |
237 | | - ---------- |
238 | | - cmd : str |
239 | | - Full, serialized request, that invoked the response. |
240 | | - response : str |
241 | | - Full, received response. |
242 | | - parsers : dict[str, Callable[[str], Any]] |
243 | | - Parser functions for the values in the RPC response. |
244 | | - (Maps the parser functions to the names of the parameters.) |
245 | | -
|
246 | | - Returns |
247 | | - ------- |
248 | | - GeoComResponse |
249 | | - Parsed return codes and parameters from the RPC response. |
250 | | -
|
251 | | - Raises |
252 | | - ------ |
253 | | - NotImplementedError |
254 | | - If the method is not implemented on the class. |
255 | | -
|
256 | | - """ |
257 | | - raise NotImplementedError() |
0 commit comments