|
| 1 | +"""This following code ensures assertion of expected parameters with Mavlink data. |
| 2 | +It reads a baseline file of expected Pixhawk parameters and compares them |
| 3 | +against values retrieved over MAVLink. |
| 4 | +""" |
| 5 | + |
| 6 | +from __future__ import annotations |
| 7 | + |
| 8 | +import os |
| 9 | +import re |
| 10 | +import sys |
| 11 | +import time |
| 12 | +from pathlib import Path |
| 13 | +from typing import Dict, List, Tuple, Union |
| 14 | + |
| 15 | +# Default connection settings used if environment variables are not set. |
| 16 | +DEFAULT_PIXHAWK_ADDRESS = "/dev/ttyAMA0" |
| 17 | +DEFAULT_PIXHAWK_BAUD = 57600 |
| 18 | + |
| 19 | +ParamValue = Union[int, float, str] |
| 20 | +Pathish = Union[str, Path] |
| 21 | + |
| 22 | +# parses expectations file by spliting each expected parameter into parameter |
| 23 | +# name (before comma) and value |
| 24 | +_LINE_RE = re.compile(r"^([^,]+)\s*,\s*(.+)$") |
| 25 | + |
| 26 | + |
| 27 | +def _coerce_value(value_str: str) -> ParamValue: |
| 28 | + """ |
| 29 | + This function allows for value interpretation as a number. |
| 30 | +
|
| 31 | + It tries to convert a string into a float, and if that float is an integer |
| 32 | + value, it converts it to an int. Otherwise, it leaves the string as-is. |
| 33 | + """ |
| 34 | + try: |
| 35 | + float_value = float(value_str) |
| 36 | + return int(float_value) if float_value.is_integer() else float_value |
| 37 | + except ValueError: |
| 38 | + return value_str |
| 39 | + |
| 40 | + |
| 41 | +def parse_params_file(path: Pathish) -> Dict[str, ParamValue]: |
| 42 | + """ |
| 43 | + reads param_expectations line by line and builds a dictionary |
| 44 | +
|
| 45 | + Each non-empty, non-comment line is expected to look like: |
| 46 | + PARAM_NAME, VALUE # optional inline comment |
| 47 | +
|
| 48 | + Returns: |
| 49 | + dict mapping parameter name -> coerced value. |
| 50 | + """ |
| 51 | + param_path = Path(path) |
| 52 | + params: Dict[str, ParamValue] = {} |
| 53 | + |
| 54 | + with param_path.open("r", encoding="utf-8", errors="replace") as file: |
| 55 | + for line in file: |
| 56 | + line = line.strip() |
| 57 | + if not line or line.startswith("#"): |
| 58 | + continue |
| 59 | + |
| 60 | + match = _LINE_RE.match(line) |
| 61 | + if not match: |
| 62 | + continue |
| 63 | + |
| 64 | + key, raw_value = match.group(1).strip(), match.group(2).strip() |
| 65 | + |
| 66 | + # allow inline comments after value |
| 67 | + if "#" in raw_value: |
| 68 | + raw_value = raw_value.split("#", 1)[0].strip() |
| 69 | + |
| 70 | + params[key] = _coerce_value(raw_value) |
| 71 | + |
| 72 | + return params |
| 73 | + |
| 74 | + |
| 75 | +def _float_equal(a: float, b: float, eps: float = 1e-6) -> bool: |
| 76 | + """Helper that checks if two floats are equal within a small epsilon.""" |
| 77 | + return abs(a - b) <= eps |
| 78 | + |
| 79 | + |
| 80 | +def _values_equal(a: ParamValue, b: ParamValue) -> bool: |
| 81 | + """ |
| 82 | + Ensures values are approprialtley cmpared via rounding / tolerance. |
| 83 | +
|
| 84 | + If both values are numeric, they are compared using a float tolerance. |
| 85 | + Otherwise, a normal equality comparison is used. |
| 86 | + """ |
| 87 | + if isinstance(a, (int, float)) and isinstance(b, (int, float)): |
| 88 | + return _float_equal(float(a), float(b)) |
| 89 | + return a == b |
| 90 | + |
| 91 | + |
| 92 | +def diff_params( |
| 93 | + actual: Dict[str, ParamValue], |
| 94 | + expected: Dict[str, ParamValue], |
| 95 | +) -> Tuple[List[str], List[Tuple[str, ParamValue, ParamValue]], List[str]]: |
| 96 | + """ |
| 97 | + Takes actual and expected values as parameters to build three lists of |
| 98 | + missing, mismatched and extras. |
| 99 | +
|
| 100 | + Returns: |
| 101 | + missing: parameters that are in expected but not found in actual. |
| 102 | + mismatches: (name, expected_value, actual_value) for value differences. |
| 103 | + extras: parameters that are found on dron but not in file. |
| 104 | + """ |
| 105 | + missing: List[str] = [] |
| 106 | + mismatches: List[Tuple[str, ParamValue, ParamValue]] = [] |
| 107 | + extras: List[str] = [] |
| 108 | + |
| 109 | + # builds missing and mismatches lists in case |
| 110 | + for name, expected_value in expected.items(): |
| 111 | + if name not in actual: |
| 112 | + missing.append(name) |
| 113 | + elif not _values_equal(actual[name], expected_value): |
| 114 | + mismatches.append((name, expected_value, actual[name])) |
| 115 | + |
| 116 | + # checks for parameters that are fouund on dron but not in file |
| 117 | + for name in actual: |
| 118 | + if name not in expected: |
| 119 | + extras.append(name) |
| 120 | + |
| 121 | + missing.sort() |
| 122 | + mismatches.sort(key=lambda item: item[0]) |
| 123 | + extras.sort() |
| 124 | + |
| 125 | + return missing, mismatches, extras |
| 126 | + |
| 127 | + |
| 128 | +def fetch_all_params_mavlink( |
| 129 | + address: str, |
| 130 | + baud: int = DEFAULT_PIXHAWK_BAUD, |
| 131 | + timeout_s: int = 25, |
| 132 | +) -> Dict[str, ParamValue]: |
| 133 | + """ |
| 134 | + Opens MAVLink connection directly and builds a dictionary by listening in |
| 135 | + for PARAM_VALUE messages. |
| 136 | +
|
| 137 | + It: |
| 138 | + - checks if vehicle is alive using a heartbeat |
| 139 | + - requests params |
| 140 | + - decrypts and strips messages into values |
| 141 | + """ |
| 142 | + # imported here so other modules can import this file without pymavlink |
| 143 | + from pymavlink import mavutil # pylint: disable=import-outside-toplevel |
| 144 | + |
| 145 | + # checks if vehicle is alive |
| 146 | + connection = mavutil.mavlink_connection(address, baud=baud) |
| 147 | + heartbeat = connection.wait_heartbeat(timeout=timeout_s) |
| 148 | + if not heartbeat: |
| 149 | + raise RuntimeError("Drone is not operating") |
| 150 | + |
| 151 | + # requests params |
| 152 | + connection.mav.param_request_list_send( |
| 153 | + connection.target_system, |
| 154 | + connection.target_component, |
| 155 | + ) |
| 156 | + |
| 157 | + params: Dict[str, ParamValue] = {} |
| 158 | + seen_names: set[str] = set() |
| 159 | + total_count: int | None = None |
| 160 | + end_time = time.time() + timeout_s |
| 161 | + |
| 162 | + try: |
| 163 | + while time.time() < end_time: |
| 164 | + msg = connection.recv_match( |
| 165 | + type="PARAM_VALUE", |
| 166 | + blocking=True, |
| 167 | + timeout=2.0, |
| 168 | + ) |
| 169 | + if not msg: |
| 170 | + continue |
| 171 | + |
| 172 | + # decrypt and strip message into values |
| 173 | + raw_id = msg.param_id |
| 174 | + if isinstance(raw_id, bytes): |
| 175 | + name = raw_id.decode("ascii", errors="ignore").strip("\x00") |
| 176 | + else: |
| 177 | + name = str(raw_id).strip("\x00") |
| 178 | + |
| 179 | + params[name] = float(msg.param_value) |
| 180 | + seen_names.add(name) # keeps track of how many parameters collected |
| 181 | + |
| 182 | + try: |
| 183 | + total_count = int(msg.param_count) |
| 184 | + except (TypeError, ValueError): |
| 185 | + total_count = None |
| 186 | + |
| 187 | + # break early if all have been collected |
| 188 | + if total_count and len(seen_names) >= total_count: |
| 189 | + break |
| 190 | + finally: |
| 191 | + # best-effort close, failure here is non-fatal |
| 192 | + try: |
| 193 | + connection.close() |
| 194 | + except Exception: # pylint: disable=broad-except |
| 195 | + pass |
| 196 | + |
| 197 | + return params |
| 198 | + |
| 199 | + |
| 200 | +def main() -> int: |
| 201 | + """ |
| 202 | + Main function which calls all others and compares expected parameters to |
| 203 | + actual parameters. |
| 204 | + """ |
| 205 | + address = os.environ.get("PIXHAWK_ADDRESS", DEFAULT_PIXHAWK_ADDRESS) |
| 206 | + baud_str = os.environ.get("PIXHAWK_BAUD", str(DEFAULT_PIXHAWK_BAUD)) |
| 207 | + baud = int(baud_str) |
| 208 | + |
| 209 | + baseline = Path(__file__).with_name("parameter_expectations.param") |
| 210 | + |
| 211 | + if not baseline.exists(): |
| 212 | + print(f"Baseline not found: {baseline}") |
| 213 | + return 2 # checks for expectations file |
| 214 | + |
| 215 | + try: |
| 216 | + actual = fetch_all_params_mavlink(address, baud=baud, timeout_s=25) |
| 217 | + except Exception as exc: # pylint: disable=broad-exception-caught |
| 218 | + # attempts to connect to mavlink and returns 2 if failiyre |
| 219 | + print(f"Failed to fetch params via MAVLink: {exc}") |
| 220 | + return 2 |
| 221 | + |
| 222 | + expected = parse_params_file(baseline) |
| 223 | + missing, mismatches, extras = diff_params(actual, expected) |
| 224 | + |
| 225 | + if not missing and not mismatches and not extras: |
| 226 | + print("All Pixhawk parameters match baseline.") |
| 227 | + return 0 # success |
| 228 | + |
| 229 | + # prints exactly which parameters were not found or are not right |
| 230 | + print("Differences found:") |
| 231 | + if missing: |
| 232 | + print(f"- Missing ({len(missing)}): {missing}") |
| 233 | + if mismatches: |
| 234 | + lines = [ |
| 235 | + f"{name}: expected={exp} actual={act}" |
| 236 | + for name, exp, act in mismatches |
| 237 | + ] |
| 238 | + print(f"- Mismatches ({len(mismatches)}):") |
| 239 | + print(" " + "\n ".join(lines)) |
| 240 | + if extras: |
| 241 | + print(f"- Extras ({len(extras)}): {extras}") |
| 242 | + |
| 243 | + return 1 # returns 1 if differences are found |
| 244 | + |
| 245 | + |
| 246 | +if __name__ == "__main__": |
| 247 | + sys.exit(main()) |
0 commit comments