Skip to content

Commit

Permalink
save...
Browse files Browse the repository at this point in the history
  • Loading branch information
vegano1 committed Jul 31, 2024
1 parent ad870e7 commit f98063e
Show file tree
Hide file tree
Showing 11 changed files with 376 additions and 244 deletions.
12 changes: 11 additions & 1 deletion api/src/opentrons/drivers/absorbance_reader/abstract.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
from abc import ABC, abstractmethod
from typing import Dict, List
from typing import Dict, List, Tuple
from opentrons.drivers.types import (
AbsorbanceReaderLidStatus,
AbsorbanceReaderDeviceState,
Expand Down Expand Up @@ -48,8 +48,18 @@ async def get_device_info(self) -> Dict[str, str]:
"""Get device info"""
...

@abstractmethod
async def get_uptime(self) -> Dict[str, str]:
"""Get device uptime"""
...

@abstractmethod
async def get_plate_presence(self) -> AbsorbanceReaderPlatePresence:
"""Check if there is a plate in the reader."""
...

@abstractmethod
async def update_firmware(self, firmware_file_path: str) -> Tuple[bool, str]:
"""Updates the firmware on the device."""
...

286 changes: 151 additions & 135 deletions api/src/opentrons/drivers/absorbance_reader/async_byonoy.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,19 @@
from __future__ import annotations

import asyncio
import logging
import os
import re
from concurrent.futures.thread import ThreadPoolExecutor
from functools import partial
from typing import Optional, List, Dict
from sys import version
from typing import Optional, List, Dict, Tuple

# TODO: Make sure we move this back into the AsyncByonoy
import pybyonoy_device_library as byonoy # type: ignore[import-not-found]

# for debugging
byonoy.byonoy_enable_logging(False)


from .hid_protocol import (
Expand All @@ -20,8 +29,11 @@
)
from opentrons.drivers.rpi_drivers.types import USBPort

log = logging.getLogger(__name__)


SN_PARSER = re.compile(r'ATTRS{serial}=="(?P<serial>.+?)"')
VERSION_PARSER = re.compile(r'Absorbance (?P<version>V\d+\.\d+\.\d+)')


class AsyncByonoy:
Expand Down Expand Up @@ -71,8 +83,6 @@ async def create(
loop = loop or asyncio.get_running_loop()
executor = ThreadPoolExecutor(max_workers=1)

import pybyonoy_device_library as byonoy # type: ignore[import-not-found]

interface: AbsProtocol = byonoy

device_sn = cls.serial_number_from_port(usb_port.name)
Expand Down Expand Up @@ -111,61 +121,139 @@ def __init__(
self._device_handle: Optional[int] = None
self._current_config: Optional[AbsProtocol.MeasurementConfig] = None

def _cleanup(self) -> None:
self._device_handle = None
async def open(self) -> int:
"""
Open the connection.
def _open(self) -> None:
err, device_handle = self._interface.byonoy_open_device(self._device)
if err.name != "BYONOY_ERROR_NO_ERROR":
raise RuntimeError(f"Error opening device: {err}")
Returns: boolean denoting connection success.
"""

log.warning("Opening connection!")
err, device_handle = await self._loop.run_in_executor(
executor=self._executor, func=partial(
self._interface.byonoy_open_device, self._device),
)
self._raise_if_error(err.name, f"Error opening device: {err}")
self._device_handle = device_handle
log.warning(f"Connection Opened: <{device_handle}>")
return bool(device_handle)

def _free(self) -> None:
if self._device_handle:
self._interface.byonoy_free_device(self._device_handle)
self._cleanup()
async def close(self) -> None:
"""Close the connection."""
handle = self._verify_device_handle()
await self._loop.run_in_executor(
executor=self._executor, func=partial(
self._interface.byonoy_free_device, handle)
)
self._device_handle = None

def verify_device_handle(self) -> int:
assert self._device_handle is not None, RuntimeError(
"Device handle not set up."
async def is_open(self) -> bool:
"""True if connection is open."""
handle = self._verify_device_handle()
return await self._loop.run_in_executor(
executor=self._executor, func=partial(
self._interface.byonoy_device_open, handle)
)
return self._device_handle

def _raise_if_error(
self,
err_name: ErrorCodeNames,
msg: str = "Error occurred: ",
) -> None:
if err_name != "BYONOY_ERROR_NO_ERROR":
raise RuntimeError(msg, err_name)
async def get_device_information(self) -> Dict[str, str]:
"""Get serial number and version info."""
handle = self._verify_device_handle()
err, device_info = await self._loop.run_in_executor(
executor=self._executor, func=partial(
self._interface.byonoy_get_device_information, handle)
)
self._raise_if_error(err.name, f"Error getting device information: {err}")

def _get_device_information(self) -> AbsProtocol.DeviceInfo:
handle = self.verify_device_handle()
err, device_info = self._interface.byonoy_get_device_information(handle)
self._raise_if_error(err.name, "Error getting device information: ")
return device_info
match = VERSION_PARSER.match(device_info.version)
version = match["version"].lower() if match else "v0.0.0"
info = {
"serial": self._device.sn,
"model": "ABS96",
"version": version,
}
log.warning(f"ABS_DEVICE_INFO: {info}")
log.warning(f"SERIAL_NUMBER: {self._device.sn}")
return info

def _get_device_status(self) -> AbsProtocol.DeviceState:
handle = self.verify_device_handle()
err, status = self._interface.byonoy_get_device_status(handle)
async def get_device_status(self) -> AbsorbanceReaderDeviceState:
"""Get state information of the device."""
handle = self._verify_device_handle()
err, status = await self._loop.run_in_executor(
executor=self._executor, func=partial(
self._interface.byonoy_get_device_status, handle)
)
self._raise_if_error(err.name, "Error getting device status: ")
return status
return self.convert_device_state(status.name)

async def update_firmware(self, firmware_file_path: str) -> Tuple[bool, str]:
"""Updates the firmware of the device."""
handle = self._verify_device_handle()
if not os.path.exists(firmware_file_path):
return False, f"Firmware file not found: {firmware_file_path}"
# TODO: validate file somehow?

# TODO: verify if this needs to run in this context or executor thread
err = self._interface.byonoy_update_device(handle, firmware_file_path)
if err.name != "BYONOY_ERROR_NO_ERROR":
return False, f"Byonoy update failed with error: {err}"
return True, ""

async def get_device_uptime(self) -> int:
"""Get how long in seconds the device has been running for."""
handle = self._verify_device_handle()
err, uptime = await self._loop.run_in_executor(
executor=self._executor, func=partial(
self._interface.byonoy_get_device_uptime, handle)
)
self._raise_if_error(err.name, "Error getting device uptime: ")
return uptime

async def get_lid_status(self) -> AbsorbanceReaderLidStatus:
"""Get the state of the absorbance lid."""
handle = self._verify_device_handle()
err, lid_info = await self._loop.run_in_executor(
executor=self._executor, func=partial(
self._interface.byonoy_get_device_parts_aligned, handle)
)
self._raise_if_error(err.name, f"Error getting lid status: {err}")
return (
AbsorbanceReaderLidStatus.ON if lid_info else AbsorbanceReaderLidStatus.OFF
)

async def get_supported_wavelengths(self) -> list[int]:
"""Get a list of the wavelength readings this device supports."""
handle = self._verify_device_handle()
err, wavelengths = await self._loop.run_in_executor(
executor=self._executor, func=partial(
self._interface.byonoy_abs96_get_available_wavelengths, handle)
)
self._raise_if_error(err.name, "Error getting available wavelengths: ")
self._supported_wavelengths = wavelengths
return wavelengths

def _get_slot_status(self) -> AbsProtocol.SlotState:
handle = self.verify_device_handle()
err, slot_status = self._interface.byonoy_get_device_slot_status(handle)
self._raise_if_error(err.name, "Error getting slot status: ")
return slot_status
async def get_single_measurement(self, wavelength: int) -> List[float]:
"""Get a single measurement based on the current configuration."""
handle = self._verify_device_handle()
assert self._current_config and self._current_config.sample_wavelength == wavelength
err, measurements = await self._loop.run_in_executor(
executor=self._executor, func=partial(
self._interface.byonoy_abs96_single_measure, handle, self._current_config)
)
self._raise_if_error(err.name, f"Error getting single measurement: {err}")
return measurements

def _get_lid_status(self) -> bool:
handle = self.verify_device_handle()
lid_on: bool
err, lid_on = self._interface.byonoy_get_device_parts_aligned(handle)
self._raise_if_error(err.name, "Error getting lid status: ")
return lid_on
async def get_plate_presence(self) -> AbsorbanceReaderPlatePresence:
"""Get the state of the plate for the reader."""
handle = self._verify_device_handle()
err, presence = await self._loop.run_in_executor(
executor=self._executor, func=partial(
self._interface.byonoy_get_device_slot_status, handle)
)
self._raise_if_error(err.name, f"Error getting slot status: {err}")
return self.convert_plate_presence(presence.name)

def _get_supported_wavelengths(self) -> List[int]:
handle = self.verify_device_handle()
handle = self._verify_device_handle()
wavelengths: List[int]
err, wavelengths = self._interface.byonoy_abs96_get_available_wavelengths(
handle
Expand All @@ -175,18 +263,11 @@ def _get_supported_wavelengths(self) -> List[int]:
return wavelengths

def _initialize_measurement(self, conf: AbsProtocol.MeasurementConfig) -> None:
handle = self.verify_device_handle()
handle = self._verify_device_handle()
err = self._interface.byonoy_abs96_initialize_single_measurement(handle, conf)
self._raise_if_error(err.name, "Error initializing measurement: ")
self._current_config = conf

def _single_measurement(self, conf: AbsProtocol.MeasurementConfig) -> List[float]:
handle = self.verify_device_handle()
measurements: List[float]
err, measurements = self._interface.byonoy_abs96_single_measure(handle, conf)
self._raise_if_error(err.name, "Error getting single measurement: ")
return measurements

def _set_sample_wavelength(self, wavelength: int) -> AbsProtocol.MeasurementConfig:
if not self._supported_wavelengths:
self._get_supported_wavelengths()
Expand All @@ -204,100 +285,35 @@ def _initialize(self, wavelength: int) -> None:
conf = self._set_sample_wavelength(wavelength)
self._initialize_measurement(conf)

def _get_single_measurement(self, wavelength: int) -> List[float]:
initialized = self._current_config
assert initialized and initialized.sample_wavelength == wavelength
return self._single_measurement(initialized)

async def open(self) -> None:
"""
Open the connection.
Returns: None
"""
return await self._loop.run_in_executor(
executor=self._executor, func=self._open
)

async def close(self) -> None:
"""
Close the connection
Returns: None
"""
await self._loop.run_in_executor(executor=self._executor, func=self._free)

async def is_open(self) -> bool:
"""
Check if connection is open.
Returns: boolean
"""
return self._device_handle is not None

async def get_device_static_info(self) -> Dict[str, str]:
return {
"serial": self._device.sn,
"model": "ABS96",
"version": "1.0",
}

async def get_device_information(self) -> Dict[str, str]:
device_info = await self._loop.run_in_executor(
executor=self._executor, func=self._get_device_information
)
return {
"serial_number": device_info.sn,
"reference_number": device_info.ref_no,
"version": device_info.version,
}

async def get_lid_status(self) -> AbsorbanceReaderLidStatus:
lid_info = await self._loop.run_in_executor(
executor=self._executor, func=self._get_lid_status
)
return (
AbsorbanceReaderLidStatus.ON if lid_info else AbsorbanceReaderLidStatus.OFF
)

async def get_supported_wavelengths(self) -> list[int]:
return await self._loop.run_in_executor(
executor=self._executor, func=self._get_supported_wavelengths
)

async def initialize(self, wavelength: int) -> None:
return await self._loop.run_in_executor(
"""???"""
await self._loop.run_in_executor(
executor=self._executor, func=partial(self._initialize, wavelength)
)

async def get_single_measurement(self, wavelength: int) -> List[float]:
return await self._loop.run_in_executor(
executor=self._executor,
func=partial(self._get_single_measurement, wavelength),
)

async def get_plate_presence(self) -> AbsorbanceReaderPlatePresence:
presence = await self._loop.run_in_executor(
executor=self._executor, func=self._get_slot_status
def _verify_device_handle(self) -> int:
assert self._device_handle is not None, RuntimeError(
"Device handle not set up."
)
return self.convert_plate_presence(presence.name)
return self._device_handle

async def get_device_status(self) -> AbsorbanceReaderDeviceState:
status = await self._loop.run_in_executor(
executor=self._executor,
func=self._get_device_status,
)
return self.convert_device_state(status.name)
def _raise_if_error(
self,
err_name: ErrorCodeNames,
msg: str = "Error occurred: ",
) -> None:
if err_name != "BYONOY_ERROR_NO_ERROR":
raise RuntimeError(msg, err_name)

@staticmethod
def convert_device_state(
device_state: DeviceStateNames,
) -> AbsorbanceReaderDeviceState:
state_map: Dict[DeviceStateNames, AbsorbanceReaderDeviceState] = {
"BYONOY_DEVICE_STATE_UNKNOWN": AbsorbanceReaderDeviceState.UNKNOWN,
"BYONOY_DEVICE_STATE_OK": AbsorbanceReaderDeviceState.OK,
"BYONOY_DEVICE_STATE_BROKEN_FW": AbsorbanceReaderDeviceState.BROKEN_FW,
"BYONOY_DEVICE_STATE_ERROR": AbsorbanceReaderDeviceState.ERROR,
"UNKNOWN": AbsorbanceReaderDeviceState.UNKNOWN,
"OK": AbsorbanceReaderDeviceState.OK,
"BROKEN_FW": AbsorbanceReaderDeviceState.BROKEN_FW,
"ERROR": AbsorbanceReaderDeviceState.ERROR,
}
return state_map[device_state]

Expand Down
Loading

0 comments on commit f98063e

Please sign in to comment.