|
2 | 2 | import asyncio
|
3 | 3 | import logging
|
4 | 4 | import re
|
5 |
| -from pkg_resources import parse_version |
6 |
| -from typing import ClassVar, Mapping, Optional, cast, TypeVar |
7 |
| - |
| 5 | +from typing import ClassVar, Mapping, Optional, TypeVar |
| 6 | +from packaging.version import InvalidVersion, parse, Version |
8 | 7 | from opentrons.config import IS_ROBOT, ROBOT_FIRMWARE_DIR
|
9 | 8 | from opentrons.drivers.rpi_drivers.types import USBPort
|
10 | 9 |
|
|
16 | 15 | TaskPayload = TypeVar("TaskPayload")
|
17 | 16 |
|
18 | 17 |
|
| 18 | +def parse_fw_version(version: str) -> Version: |
| 19 | + try: |
| 20 | + device_version = parse(version) |
| 21 | + except InvalidVersion: |
| 22 | + device_version = parse("v0.0.0") |
| 23 | + return device_version |
| 24 | + |
| 25 | + |
19 | 26 | class AbstractModule(abc.ABC):
|
20 | 27 | """Defines the common methods of a module."""
|
21 | 28 |
|
@@ -88,9 +95,9 @@ def get_bundled_fw(self) -> Optional[BundledFirmware]:
|
88 | 95 | def has_available_update(self) -> bool:
|
89 | 96 | """Return whether a newer firmware file is available"""
|
90 | 97 | if self.device_info and self._bundled_fw:
|
91 |
| - device_version = parse_version(self.device_info["version"]) |
92 |
| - available_version = parse_version(self._bundled_fw.version) |
93 |
| - return cast(bool, available_version > device_version) |
| 98 | + device_version = parse_fw_version(self.device_info["version"]) |
| 99 | + available_version = parse_fw_version(self._bundled_fw.version) |
| 100 | + return available_version > device_version |
94 | 101 | return False
|
95 | 102 |
|
96 | 103 | async def wait_for_is_running(self) -> None:
|
|
0 commit comments