From 4e6fdcfa2bf313182f8eca28b1e7d19aced37607 Mon Sep 17 00:00:00 2001 From: SoroushHoseini Date: Fri, 31 Oct 2025 22:40:10 -0500 Subject: [PATCH 1/8] Working PAX polarimeter --- .../pqn/drivers/thorlabs_polarimeter.py | 317 ++++++++++++++++++ 1 file changed, 317 insertions(+) create mode 100644 src/pqnstack/pqn/drivers/thorlabs_polarimeter.py diff --git a/src/pqnstack/pqn/drivers/thorlabs_polarimeter.py b/src/pqnstack/pqn/drivers/thorlabs_polarimeter.py new file mode 100644 index 00000000..632684d7 --- /dev/null +++ b/src/pqnstack/pqn/drivers/thorlabs_polarimeter.py @@ -0,0 +1,317 @@ +from __future__ import annotations + +import atexit +import contextlib as contextlib +import datetime as _dt +import logging +import threading +import time +from dataclasses import dataclass, field +from typing import Any, Optional, Tuple + +import numpy as np +import pandas as pd +import pyvisa + +from pqnstack.base.errors import DeviceNotStartedError +from pqnstack.base.instrument import Instrument, InstrumentInfo, log_operation, log_parameter + +logger = logging.getLogger(__name__) + +USB_FILTER = "USB?*::INSTR" +CMD_ENABLE_CALC = "SENS:CALC 1" +CMD_ENABLE_ROTATION = "INP:ROT:STAT 1" +CMD_DISABLE_CALC = "SENS:CALC 0" +CMD_DISABLE_ROTATION = "INP:ROT:STAT 0" +QRY_IS_CALC_ENABLED = "SENS:CALC?" +QRY_IS_ROTATION_ENABLED = "INP:ROT:STAT?" +QRY_WAVELENGTH_METERS_OR_NM = "SENS:CORR:WAV?" +QRY_LATEST = "SENS:DATA:LAT?" +QRY_IDN = "*IDN?" + + +@dataclass(frozen=True, slots=True) +class PAX1000IR2Info(InstrumentInfo): + wavelength_nm: float = np.nan + last_theta_deg: float = np.nan + last_eta_deg: float = np.nan + last_dop: float = np.nan + last_power_w: float = np.nan + logging_rows: int = 0 + + +@dataclass(slots=True) +class PAX1000IR2(Instrument): + name: str + desc: str + hw_address: str # VISA resource or empty for discovery + parameters: set[str] = field(default_factory=set) + operations: dict[str, Any] = field(default_factory=dict) + + pax_id_contains: Optional[str] = None + pax_idn_contains: str = "PAX1000" + + _rm: Any | None = field(default=None, init=False, repr=False) + _instr: Any | None = field(default=None, init=False, repr=False) + _timeout_ms: int = field(default=3000, init=False, repr=False) + + _wavelength_nm_cache: float = field(default=np.nan, init=False) + _last_theta_deg: float = field(default=np.nan, init=False) + _last_eta_deg: float = field(default=np.nan, init=False) + _last_dop: float = field(default=np.nan, init=False) + _last_power_w: float = field(default=np.nan, init=False) + + data_log_dataframe: pd.DataFrame = field( + default_factory=lambda: pd.DataFrame( + { + "elapsed_sec": [], + "iso_timestamp": [], + "pax_theta_deg": [], + "pax_eta_deg": [], + "pax_power_w": [], + "pax_dop": [], + "pax_wavelength_nm": [], + "interval_sec": [], + } + ), + init=False, + repr=False, + ) + log_start_time_perf_counter: float | None = field(default=None, init=False, repr=False) + logging_thread: threading.Thread | None = field(default=None, init=False, repr=False) + stop_logging_event: threading.Event = field(default_factory=threading.Event, init=False, repr=False) + + def _q(self, cmd: str) -> str: + if self._instr is None: + raise DeviceNotStartedError("Start the device first.") + try: + self._instr.write(f"{cmd}\n") + return str(self._instr.read()).strip() + except Exception: + try: + return str(self._instr.query(cmd)).strip() + except Exception: + return "" + + def _w(self, cmd: str) -> None: + if self._instr is None: + raise DeviceNotStartedError("Start the device first.") + try: + self._instr.write(f"{cmd}\n") + except Exception: + with contextlib.suppress(Exception): + self._instr.write(cmd) + + def start(self) -> None: + if self._instr is not None: + return + try: + self._rm = pyvisa.ResourceManager("@py") + except Exception as exc: + raise RuntimeError(f"VISA backend not available: {exc}") from exc + + resource = self.hw_address + if not resource: + try: + resources: Tuple[str, ...] = self._rm.list_resources(USB_FILTER) + except Exception as exc: + raise FileNotFoundError(f"VISA resource discovery failed: {exc}") from exc + if self.pax_id_contains: + resources = tuple(r for r in resources if self.pax_id_contains in r) + if not resources: + raise FileNotFoundError("No USB VISA resources matched filter.") + if len(resources) == 1 and not self.pax_idn_contains: + resource = resources[0] + else: + target = self.pax_idn_contains or "" + matched = [] + for rname in resources: + try: + with self._rm.open_resource(rname) as tmp: + tmp.timeout = self._timeout_ms + try: + tmp.write(f"{QRY_IDN}\n") + idn = str(tmp.read()).strip() + except Exception: + try: + idn = str(tmp.query(QRY_IDN)).strip() + except Exception: + idn = "" + if not target or target in idn: + matched.append(rname) + except Exception: + continue + if len(matched) != 1: + raise FileNotFoundError("PAX discovery ambiguous or no match.") + resource = matched[0] + + try: + self._instr = self._rm.open_resource(resource) + self._instr.timeout = self._timeout_ms + except Exception as exc: + self._instr = None + raise RuntimeError(f"Failed to open VISA resource {resource}: {exc}") from exc + + def write_and_confirm(set_cmd: str, qry_cmd: str, expect: str | int | float) -> bool: + self._w(set_cmd) + expected = str(expect) + last = "" + for _ in range(10): + try: + last = self._q(qry_cmd) + except Exception: + last = "" + if last.startswith(expected): + return True + time.sleep(0.05) + return False + + ok1 = write_and_confirm(CMD_ENABLE_CALC, QRY_IS_CALC_ENABLED, 1) + ok2 = write_and_confirm(CMD_ENABLE_ROTATION, QRY_IS_ROTATION_ENABLED, 1) + if not (ok1 and ok2): + with contextlib.suppress(Exception): + write_and_confirm(CMD_DISABLE_CALC, QRY_IS_CALC_ENABLED, 0) + write_and_confirm(CMD_DISABLE_ROTATION, QRY_IS_ROTATION_ENABLED, 0) + raise RuntimeError("PAX setup failed to enable calc/rotation.") + + try: + raw = self._q(QRY_WAVELENGTH_METERS_OR_NM) + val = float(raw) + self._wavelength_nm_cache = val * 1e9 if val < 10.0 else val + except Exception: + self._wavelength_nm_cache = float("nan") + + self.operations.update( + { + "start_logging": self.start_logging, + "stop_logging": self.stop_logging, + "clear_log": self.clear_log, + "save_csv": self.save_csv, + "snapshot": self.snapshot, + } + ) + atexit.register(self.close) + + def close(self) -> None: + self.stop_logging() + if self._instr is not None: + with contextlib.suppress(Exception): + self._w(CMD_DISABLE_CALC) + self._w(CMD_DISABLE_ROTATION) + _ = self._q(QRY_IS_CALC_ENABLED) + _ = self._q(QRY_IS_ROTATION_ENABLED) + with contextlib.suppress(Exception): + self._instr.close() + self._instr = None + if self._rm is not None: + with contextlib.suppress(Exception): + self._rm.close() + self._rm = None + + @property + def info(self) -> PAX1000IR2Info: + return PAX1000IR2Info( + name=self.name, + desc=self.desc, + hw_address=self.hw_address, + hw_status={"connected": self._instr is not None}, + wavelength_nm=self._wavelength_nm_cache, + last_theta_deg=self._last_theta_deg, + last_eta_deg=self._last_eta_deg, + last_dop=self._last_dop, + last_power_w=self._last_power_w, + logging_rows=len(self.data_log_dataframe), + ) + + @property + @log_parameter + def wavelength_nm(self) -> float: + return self._wavelength_nm_cache + + @log_operation + def snapshot(self) -> dict[str, float]: + if self._instr is None: + raise DeviceNotStartedError("Start the device first.") + raw = self._q(QRY_LATEST) + + tokens = [p for p in raw.replace(";", ",").split(",") if p] + vals: list[float | str] = [] + for t in tokens: + try: + vals.append(float(t)) + except Exception: + vals.append(t) + + def getf(i: int) -> float: + try: + v = vals[i] + return float(v) if isinstance(v, (float, int)) else float(str(v)) + except Exception: + return float("nan") + + self._last_theta_deg = getf(9) + self._last_eta_deg = getf(10) + self._last_dop = getf(11) + self._last_power_w = getf(12) + + return { + "pax_theta_deg": self._last_theta_deg, + "pax_eta_deg": self._last_eta_deg, + "pax_dop": self._last_dop, + "pax_power_w": self._last_power_w, + "pax_wavelength_nm": self._wavelength_nm_cache, + } + + @log_operation + def start_logging(self, interval_sec: float = 0.2) -> None: + if self._instr is None: + raise DeviceNotStartedError("Start the device before logging.") + if self.logging_thread and self.logging_thread.is_alive(): + return + self.stop_logging_event.clear() + self.log_start_time_perf_counter = time.perf_counter() + + def loop() -> None: + while not self.stop_logging_event.is_set(): + now = time.perf_counter() + row = self.snapshot() + self.data_log_dataframe = pd.concat( + [ + self.data_log_dataframe, + pd.DataFrame( + [ + { + "elapsed_sec": 0.0 + if self.log_start_time_perf_counter is None + else now - self.log_start_time_perf_counter, + "iso_timestamp": _dt.datetime.now(tz=_dt.UTC).isoformat(), + "pax_theta_deg": row["pax_theta_deg"], + "pax_eta_deg": row["pax_eta_deg"], + "pax_power_w": row["pax_power_w"], + "pax_dop": row["pax_dop"], + "pax_wavelength_nm": row["pax_wavelength_nm"], + "interval_sec": interval_sec, + } + ] + ), + ], + ignore_index=True, + ) + time.sleep(interval_sec) + + self.logging_thread = threading.Thread(target=loop, name=f"{self.name}-poll", daemon=True) + self.logging_thread.start() + + @log_operation + def stop_logging(self) -> None: + if self.logging_thread and self.logging_thread.is_alive(): + self.stop_logging_event.set() + self.logging_thread.join(timeout=2.0) + + @log_operation + def clear_log(self) -> None: + self.data_log_dataframe = self.data_log_dataframe.iloc[0:0] + + @log_operation + def save_csv(self, path: str) -> None: + self.data_log_dataframe.to_csv(path, index=False) From 1e3615a3af62b8497da42058ce57f7f21191a9c3 Mon Sep 17 00:00:00 2001 From: SoroushHoseini Date: Sun, 2 Nov 2025 16:15:47 -0600 Subject: [PATCH 2/8] ruff and mypy compliant --- .../pqn/drivers/thorlabs_polarimeter.py | 278 ++++++++++-------- 1 file changed, 155 insertions(+), 123 deletions(-) diff --git a/src/pqnstack/pqn/drivers/thorlabs_polarimeter.py b/src/pqnstack/pqn/drivers/thorlabs_polarimeter.py index 632684d7..d73e0fb3 100644 --- a/src/pqnstack/pqn/drivers/thorlabs_polarimeter.py +++ b/src/pqnstack/pqn/drivers/thorlabs_polarimeter.py @@ -1,20 +1,25 @@ from __future__ import annotations import atexit -import contextlib as contextlib +import contextlib import datetime as _dt import logging import threading import time -from dataclasses import dataclass, field -from typing import Any, Optional, Tuple +from dataclasses import dataclass +from dataclasses import field +from typing import Any import numpy as np import pandas as pd import pyvisa +from pyvisa.errors import Error as VisaError from pqnstack.base.errors import DeviceNotStartedError -from pqnstack.base.instrument import Instrument, InstrumentInfo, log_operation, log_parameter +from pqnstack.base.instrument import Instrument +from pqnstack.base.instrument import InstrumentInfo +from pqnstack.base.instrument import log_operation +from pqnstack.base.instrument import log_parameter logger = logging.getLogger(__name__) @@ -44,11 +49,11 @@ class PAX1000IR2Info(InstrumentInfo): class PAX1000IR2(Instrument): name: str desc: str - hw_address: str # VISA resource or empty for discovery + hw_address: str parameters: set[str] = field(default_factory=set) operations: dict[str, Any] = field(default_factory=dict) - pax_id_contains: Optional[str] = None + pax_id_contains: str | None = None pax_idn_contains: str = "PAX1000" _rm: Any | None = field(default=None, init=False, repr=False) @@ -81,113 +86,138 @@ class PAX1000IR2(Instrument): logging_thread: threading.Thread | None = field(default=None, init=False, repr=False) stop_logging_event: threading.Event = field(default_factory=threading.Event, init=False, repr=False) - def _q(self, cmd: str) -> str: + def _read_and_write(self, cmd: str, *, expect_response: bool) -> str: if self._instr is None: - raise DeviceNotStartedError("Start the device first.") - try: - self._instr.write(f"{cmd}\n") - return str(self._instr.read()).strip() - except Exception: + msg = "Start the device first." + raise DeviceNotStartedError(msg) + instr: Any = self._instr # vendor object lacks type stubs + if expect_response: try: - return str(self._instr.query(cmd)).strip() - except Exception: - return "" - - def _w(self, cmd: str) -> None: - if self._instr is None: - raise DeviceNotStartedError("Start the device first.") + instr.write(f"{cmd}\n") + return str(instr.read()).strip() + except (VisaError, OSError): + try: + return str(instr.query(cmd)).strip() + except (VisaError, OSError): + return "" try: - self._instr.write(f"{cmd}\n") - except Exception: - with contextlib.suppress(Exception): - self._instr.write(cmd) - - def start(self) -> None: - if self._instr is not None: - return + instr.write(f"{cmd}\n") + except (VisaError, OSError): + with contextlib.suppress(VisaError, OSError): + instr.write(cmd) + return "" + + def _list_usb_resources(self) -> tuple[str, ...]: + assert self._rm is not None try: - self._rm = pyvisa.ResourceManager("@py") - except Exception as exc: - raise RuntimeError(f"VISA backend not available: {exc}") from exc - - resource = self.hw_address - if not resource: - try: - resources: Tuple[str, ...] = self._rm.list_resources(USB_FILTER) - except Exception as exc: - raise FileNotFoundError(f"VISA resource discovery failed: {exc}") from exc - if self.pax_id_contains: - resources = tuple(r for r in resources if self.pax_id_contains in r) - if not resources: - raise FileNotFoundError("No USB VISA resources matched filter.") - if len(resources) == 1 and not self.pax_idn_contains: - resource = resources[0] - else: - target = self.pax_idn_contains or "" - matched = [] - for rname in resources: + return self._rm.list_resources(USB_FILTER) # type: ignore[no-any-return] + except VisaError as exc: + msg = f"VISA resource discovery failed: {exc}" + raise FileNotFoundError(msg) from exc + + def _filter_candidates(self, resources: tuple[str, ...]) -> tuple[str, ...]: + if self.pax_id_contains: + return tuple(r for r in resources if self.pax_id_contains in r) + return resources + + def _probe_idn(self, resource_name: str) -> str: + assert self._rm is not None + try: + with self._rm.open_resource(resource_name) as resource_handle: + visa_resource: Any = resource_handle # vendor object lacks type stubs + visa_resource.timeout = self._timeout_ms + try: + visa_resource.write(f"{QRY_IDN}\n") + return str(visa_resource.read()).strip() + except (VisaError, OSError): try: - with self._rm.open_resource(rname) as tmp: - tmp.timeout = self._timeout_ms - try: - tmp.write(f"{QRY_IDN}\n") - idn = str(tmp.read()).strip() - except Exception: - try: - idn = str(tmp.query(QRY_IDN)).strip() - except Exception: - idn = "" - if not target or target in idn: - matched.append(rname) - except Exception: - continue - if len(matched) != 1: - raise FileNotFoundError("PAX discovery ambiguous or no match.") - resource = matched[0] + return str(visa_resource.query(QRY_IDN)).strip() + except (VisaError, OSError): + return "" + except VisaError as exc: + logger.debug("Resource probe failed for %s: %s", resource_name, exc) + return "" + + def _discover_resource(self) -> str: + resources = self._filter_candidates(self._list_usb_resources()) + if not resources: + msg = "No USB VISA resources matched filter." + raise FileNotFoundError(msg) + + if len(resources) == 1 and not self.pax_idn_contains: + return resources[0] + + idn_substring = self.pax_idn_contains or "" + matched = [r for r in resources if (not idn_substring) or (idn_substring in self._probe_idn(r))] + + if len(matched) != 1: + msg = "PAX discovery ambiguous or no match." + raise FileNotFoundError(msg) + return matched[0] + + def _open_resource(self, resource_name: str) -> None: + assert self._rm is not None try: - self._instr = self._rm.open_resource(resource) + self._instr = self._rm.open_resource(resource_name) self._instr.timeout = self._timeout_ms - except Exception as exc: + except VisaError as exc: self._instr = None - raise RuntimeError(f"Failed to open VISA resource {resource}: {exc}") from exc - - def write_and_confirm(set_cmd: str, qry_cmd: str, expect: str | int | float) -> bool: - self._w(set_cmd) - expected = str(expect) - last = "" - for _ in range(10): - try: - last = self._q(qry_cmd) - except Exception: - last = "" - if last.startswith(expected): - return True - time.sleep(0.05) - return False - - ok1 = write_and_confirm(CMD_ENABLE_CALC, QRY_IS_CALC_ENABLED, 1) - ok2 = write_and_confirm(CMD_ENABLE_ROTATION, QRY_IS_ROTATION_ENABLED, 1) - if not (ok1 and ok2): + msg = f"Failed to open VISA resource {resource_name}: {exc}" + raise RuntimeError(msg) from exc + + def _write_and_confirm(self, set_cmd: str, qry_cmd: str, expect: str | float) -> bool: + _ = self._read_and_write(set_cmd, expect_response=False) + expected_prefix = str(expect) + last_response = "" + for _ in range(10): + try: + last_response = self._read_and_write(qry_cmd, expect_response=True) + except DeviceNotStartedError: + last_response = "" + if last_response.startswith(expected_prefix): + return True + time.sleep(0.05) + return False + + def _init_settings(self) -> None: + calc_ok = self._write_and_confirm(CMD_ENABLE_CALC, QRY_IS_CALC_ENABLED, 1) + rot_ok = self._write_and_confirm(CMD_ENABLE_ROTATION, QRY_IS_ROTATION_ENABLED, 1) + if not (calc_ok and rot_ok): with contextlib.suppress(Exception): - write_and_confirm(CMD_DISABLE_CALC, QRY_IS_CALC_ENABLED, 0) - write_and_confirm(CMD_DISABLE_ROTATION, QRY_IS_ROTATION_ENABLED, 0) - raise RuntimeError("PAX setup failed to enable calc/rotation.") + self._write_and_confirm(CMD_DISABLE_CALC, QRY_IS_CALC_ENABLED, 0) + self._write_and_confirm(CMD_DISABLE_ROTATION, QRY_IS_ROTATION_ENABLED, 0) + msg = "PAX setup failed to enable calc/rotation." + raise RuntimeError(msg) + def _read_wavelength_cache(self) -> None: try: - raw = self._q(QRY_WAVELENGTH_METERS_OR_NM) - val = float(raw) - self._wavelength_nm_cache = val * 1e9 if val < 10.0 else val - except Exception: + raw_value = self._read_and_write(QRY_WAVELENGTH_METERS_OR_NM, expect_response=True) + self._wavelength_nm_cache = float(raw_value) + except (ValueError, TypeError): self._wavelength_nm_cache = float("nan") + def start(self) -> None: + if self._instr is not None: + return + try: + self._rm = pyvisa.ResourceManager("@py") + except Exception as exc: + msg = f"VISA backend not available: {exc}" + raise RuntimeError(msg) from exc + + resource_name = self.hw_address or self._discover_resource() + self._open_resource(resource_name) + self._init_settings() + self._read_wavelength_cache() + self.operations.update( { "start_logging": self.start_logging, "stop_logging": self.stop_logging, "clear_log": self.clear_log, "save_csv": self.save_csv, - "snapshot": self.snapshot, + "read": self.read, } ) atexit.register(self.close) @@ -196,10 +226,10 @@ def close(self) -> None: self.stop_logging() if self._instr is not None: with contextlib.suppress(Exception): - self._w(CMD_DISABLE_CALC) - self._w(CMD_DISABLE_ROTATION) - _ = self._q(QRY_IS_CALC_ENABLED) - _ = self._q(QRY_IS_ROTATION_ENABLED) + _ = self._read_and_write(CMD_DISABLE_CALC, expect_response=False) + _ = self._read_and_write(CMD_DISABLE_ROTATION, expect_response=False) + _ = self._read_and_write(QRY_IS_CALC_ENABLED, expect_response=True) + _ = self._read_and_write(QRY_IS_ROTATION_ENABLED, expect_response=True) with contextlib.suppress(Exception): self._instr.close() self._instr = None @@ -229,30 +259,31 @@ def wavelength_nm(self) -> float: return self._wavelength_nm_cache @log_operation - def snapshot(self) -> dict[str, float]: + def read(self) -> dict[str, float]: if self._instr is None: - raise DeviceNotStartedError("Start the device first.") - raw = self._q(QRY_LATEST) + msg = "Start the device first." + raise DeviceNotStartedError(msg) + raw_reply = self._read_and_write(QRY_LATEST, expect_response=True) - tokens = [p for p in raw.replace(";", ",").split(",") if p] - vals: list[float | str] = [] - for t in tokens: + token_strs = [p for p in raw_reply.replace(";", ",").split(",") if p] + parsed_values: list[float | str] = [] + for token_str in token_strs: try: - vals.append(float(t)) - except Exception: - vals.append(t) + parsed_values.append(float(token_str)) + except (ValueError, TypeError): + parsed_values.append(token_str) - def getf(i: int) -> float: + def get_float_at(index: int) -> float: try: - v = vals[i] - return float(v) if isinstance(v, (float, int)) else float(str(v)) - except Exception: + value = parsed_values[index] + return float(value) if isinstance(value, (float, int)) else float(str(value)) + except (ValueError, TypeError, IndexError): return float("nan") - self._last_theta_deg = getf(9) - self._last_eta_deg = getf(10) - self._last_dop = getf(11) - self._last_power_w = getf(12) + self._last_theta_deg = get_float_at(9) + self._last_eta_deg = get_float_at(10) + self._last_dop = get_float_at(11) + self._last_power_w = get_float_at(12) return { "pax_theta_deg": self._last_theta_deg, @@ -265,7 +296,8 @@ def getf(i: int) -> float: @log_operation def start_logging(self, interval_sec: float = 0.2) -> None: if self._instr is None: - raise DeviceNotStartedError("Start the device before logging.") + msg = "Start the device before logging." + raise DeviceNotStartedError(msg) if self.logging_thread and self.logging_thread.is_alive(): return self.stop_logging_event.clear() @@ -273,8 +305,8 @@ def start_logging(self, interval_sec: float = 0.2) -> None: def loop() -> None: while not self.stop_logging_event.is_set(): - now = time.perf_counter() - row = self.snapshot() + now_perf = time.perf_counter() + read_data = self.read() self.data_log_dataframe = pd.concat( [ self.data_log_dataframe, @@ -283,13 +315,13 @@ def loop() -> None: { "elapsed_sec": 0.0 if self.log_start_time_perf_counter is None - else now - self.log_start_time_perf_counter, + else now_perf - self.log_start_time_perf_counter, "iso_timestamp": _dt.datetime.now(tz=_dt.UTC).isoformat(), - "pax_theta_deg": row["pax_theta_deg"], - "pax_eta_deg": row["pax_eta_deg"], - "pax_power_w": row["pax_power_w"], - "pax_dop": row["pax_dop"], - "pax_wavelength_nm": row["pax_wavelength_nm"], + "pax_theta_deg": read_data["pax_theta_deg"], + "pax_eta_deg": read_data["pax_eta_deg"], + "pax_power_w": read_data["pax_power_w"], + "pax_dop": read_data["pax_dop"], + "pax_wavelength_nm": read_data["pax_wavelength_nm"], "interval_sec": interval_sec, } ] From 976c1fa6f7b50d1543dc37d908cdb0d8b5ed1fd0 Mon Sep 17 00:00:00 2001 From: SoroushHoseini Date: Sun, 2 Nov 2025 17:46:48 -0600 Subject: [PATCH 3/8] Remove renduntant imports (moved to sensor class) --- .../pqn/drivers/thorlabs_polarimeter.py | 73 ------------------- 1 file changed, 73 deletions(-) diff --git a/src/pqnstack/pqn/drivers/thorlabs_polarimeter.py b/src/pqnstack/pqn/drivers/thorlabs_polarimeter.py index d73e0fb3..bc3c56dd 100644 --- a/src/pqnstack/pqn/drivers/thorlabs_polarimeter.py +++ b/src/pqnstack/pqn/drivers/thorlabs_polarimeter.py @@ -2,7 +2,6 @@ import atexit import contextlib -import datetime as _dt import logging import threading import time @@ -11,7 +10,6 @@ from typing import Any import numpy as np -import pandas as pd import pyvisa from pyvisa.errors import Error as VisaError @@ -66,22 +64,6 @@ class PAX1000IR2(Instrument): _last_dop: float = field(default=np.nan, init=False) _last_power_w: float = field(default=np.nan, init=False) - data_log_dataframe: pd.DataFrame = field( - default_factory=lambda: pd.DataFrame( - { - "elapsed_sec": [], - "iso_timestamp": [], - "pax_theta_deg": [], - "pax_eta_deg": [], - "pax_power_w": [], - "pax_dop": [], - "pax_wavelength_nm": [], - "interval_sec": [], - } - ), - init=False, - repr=False, - ) log_start_time_perf_counter: float | None = field(default=None, init=False, repr=False) logging_thread: threading.Thread | None = field(default=None, init=False, repr=False) stop_logging_event: threading.Event = field(default_factory=threading.Event, init=False, repr=False) @@ -292,58 +274,3 @@ def get_float_at(index: int) -> float: "pax_power_w": self._last_power_w, "pax_wavelength_nm": self._wavelength_nm_cache, } - - @log_operation - def start_logging(self, interval_sec: float = 0.2) -> None: - if self._instr is None: - msg = "Start the device before logging." - raise DeviceNotStartedError(msg) - if self.logging_thread and self.logging_thread.is_alive(): - return - self.stop_logging_event.clear() - self.log_start_time_perf_counter = time.perf_counter() - - def loop() -> None: - while not self.stop_logging_event.is_set(): - now_perf = time.perf_counter() - read_data = self.read() - self.data_log_dataframe = pd.concat( - [ - self.data_log_dataframe, - pd.DataFrame( - [ - { - "elapsed_sec": 0.0 - if self.log_start_time_perf_counter is None - else now_perf - self.log_start_time_perf_counter, - "iso_timestamp": _dt.datetime.now(tz=_dt.UTC).isoformat(), - "pax_theta_deg": read_data["pax_theta_deg"], - "pax_eta_deg": read_data["pax_eta_deg"], - "pax_power_w": read_data["pax_power_w"], - "pax_dop": read_data["pax_dop"], - "pax_wavelength_nm": read_data["pax_wavelength_nm"], - "interval_sec": interval_sec, - } - ] - ), - ], - ignore_index=True, - ) - time.sleep(interval_sec) - - self.logging_thread = threading.Thread(target=loop, name=f"{self.name}-poll", daemon=True) - self.logging_thread.start() - - @log_operation - def stop_logging(self) -> None: - if self.logging_thread and self.logging_thread.is_alive(): - self.stop_logging_event.set() - self.logging_thread.join(timeout=2.0) - - @log_operation - def clear_log(self) -> None: - self.data_log_dataframe = self.data_log_dataframe.iloc[0:0] - - @log_operation - def save_csv(self, path: str) -> None: - self.data_log_dataframe.to_csv(path, index=False) From cd61250acc61f917c754f56d8ffbdab7659899ca Mon Sep 17 00:00:00 2001 From: SoroushHoseini Date: Thu, 6 Nov 2025 15:00:49 -0600 Subject: [PATCH 4/8] Remove data logging / sensor things --- src/pqnstack/pqn/drivers/thorlabs_polarimeter.py | 13 +------------ 1 file changed, 1 insertion(+), 12 deletions(-) diff --git a/src/pqnstack/pqn/drivers/thorlabs_polarimeter.py b/src/pqnstack/pqn/drivers/thorlabs_polarimeter.py index bc3c56dd..91f14701 100644 --- a/src/pqnstack/pqn/drivers/thorlabs_polarimeter.py +++ b/src/pqnstack/pqn/drivers/thorlabs_polarimeter.py @@ -3,7 +3,6 @@ import atexit import contextlib import logging -import threading import time from dataclasses import dataclass from dataclasses import field @@ -64,15 +63,11 @@ class PAX1000IR2(Instrument): _last_dop: float = field(default=np.nan, init=False) _last_power_w: float = field(default=np.nan, init=False) - log_start_time_perf_counter: float | None = field(default=None, init=False, repr=False) - logging_thread: threading.Thread | None = field(default=None, init=False, repr=False) - stop_logging_event: threading.Event = field(default_factory=threading.Event, init=False, repr=False) - def _read_and_write(self, cmd: str, *, expect_response: bool) -> str: if self._instr is None: msg = "Start the device first." raise DeviceNotStartedError(msg) - instr: Any = self._instr # vendor object lacks type stubs + instr: Any = self._instr if expect_response: try: instr.write(f"{cmd}\n") @@ -195,17 +190,12 @@ def start(self) -> None: self.operations.update( { - "start_logging": self.start_logging, - "stop_logging": self.stop_logging, - "clear_log": self.clear_log, - "save_csv": self.save_csv, "read": self.read, } ) atexit.register(self.close) def close(self) -> None: - self.stop_logging() if self._instr is not None: with contextlib.suppress(Exception): _ = self._read_and_write(CMD_DISABLE_CALC, expect_response=False) @@ -232,7 +222,6 @@ def info(self) -> PAX1000IR2Info: last_eta_deg=self._last_eta_deg, last_dop=self._last_dop, last_power_w=self._last_power_w, - logging_rows=len(self.data_log_dataframe), ) @property From a484dc8432c7eec43c3465df9ea6ad09e589ba47 Mon Sep 17 00:00:00 2001 From: SoroushHoseini Date: Thu, 6 Nov 2025 19:43:15 -0600 Subject: [PATCH 5/8] Add wavelength setter --- .../pqn/drivers/thorlabs_polarimeter.py | 60 ++++++++++++------- 1 file changed, 40 insertions(+), 20 deletions(-) diff --git a/src/pqnstack/pqn/drivers/thorlabs_polarimeter.py b/src/pqnstack/pqn/drivers/thorlabs_polarimeter.py index 91f14701..93121431 100644 --- a/src/pqnstack/pqn/drivers/thorlabs_polarimeter.py +++ b/src/pqnstack/pqn/drivers/thorlabs_polarimeter.py @@ -25,6 +25,7 @@ CMD_ENABLE_ROTATION = "INP:ROT:STAT 1" CMD_DISABLE_CALC = "SENS:CALC 0" CMD_DISABLE_ROTATION = "INP:ROT:STAT 0" +CMD_SET_WAVELENGTH_METERS = "SENS:CORR:WAV" QRY_IS_CALC_ENABLED = "SENS:CALC?" QRY_IS_ROTATION_ENABLED = "INP:ROT:STAT?" QRY_WAVELENGTH_METERS_OR_NM = "SENS:CORR:WAV?" @@ -63,26 +64,30 @@ class PAX1000IR2(Instrument): _last_dop: float = field(default=np.nan, init=False) _last_power_w: float = field(default=np.nan, init=False) - def _read_and_write(self, cmd: str, *, expect_response: bool) -> str: + def _write(self, cmd: str) -> None: if self._instr is None: msg = "Start the device first." raise DeviceNotStartedError(msg) instr: Any = self._instr - if expect_response: - try: - instr.write(f"{cmd}\n") - return str(instr.read()).strip() - except (VisaError, OSError): - try: - return str(instr.query(cmd)).strip() - except (VisaError, OSError): - return "" try: instr.write(f"{cmd}\n") except (VisaError, OSError): with contextlib.suppress(VisaError, OSError): instr.write(cmd) - return "" + + def _query(self, cmd: str) -> str: + if self._instr is None: + msg = "Start the device first." + raise DeviceNotStartedError(msg) + instr: Any = self._instr + try: + instr.write(f"{cmd}\n") + return str(instr.read()).strip() + except (VisaError, OSError): + try: + return str(instr.query(cmd)).strip() + except (VisaError, OSError): + return "" def _list_usb_resources(self) -> tuple[str, ...]: assert self._rm is not None @@ -144,12 +149,15 @@ def _open_resource(self, resource_name: str) -> None: raise RuntimeError(msg) from exc def _write_and_confirm(self, set_cmd: str, qry_cmd: str, expect: str | float) -> bool: - _ = self._read_and_write(set_cmd, expect_response=False) + try: + self._write(set_cmd) + except DeviceNotStartedError: + return False expected_prefix = str(expect) last_response = "" for _ in range(10): try: - last_response = self._read_and_write(qry_cmd, expect_response=True) + last_response = self._query(qry_cmd) except DeviceNotStartedError: last_response = "" if last_response.startswith(expected_prefix): @@ -169,11 +177,22 @@ def _init_settings(self) -> None: def _read_wavelength_cache(self) -> None: try: - raw_value = self._read_and_write(QRY_WAVELENGTH_METERS_OR_NM, expect_response=True) - self._wavelength_nm_cache = float(raw_value) + raw_value = self._query(QRY_WAVELENGTH_METERS) + value_m = float(raw_value) + self._wavelength_nm_cache = value_m * 1e9 except (ValueError, TypeError): self._wavelength_nm_cache = float("nan") + @log_operation + def set_wavelength_nm(self, wavelength_nm: float) -> None: + try: + value_m = float(wavelength_nm) * 1e-9 + except (TypeError, ValueError) as exc: + msg = f"Invalid wavelength: {wavelength_nm}" + raise ValueError(msg) from exc + self._write(f"{CMD_SET_WAVELENGTH_METERS} {value_m}") + self._read_wavelength_cache() + def start(self) -> None: if self._instr is not None: return @@ -191,6 +210,7 @@ def start(self) -> None: self.operations.update( { "read": self.read, + "set_wavelength_nm": self.set_wavelength_nm, } ) atexit.register(self.close) @@ -198,10 +218,10 @@ def start(self) -> None: def close(self) -> None: if self._instr is not None: with contextlib.suppress(Exception): - _ = self._read_and_write(CMD_DISABLE_CALC, expect_response=False) - _ = self._read_and_write(CMD_DISABLE_ROTATION, expect_response=False) - _ = self._read_and_write(QRY_IS_CALC_ENABLED, expect_response=True) - _ = self._read_and_write(QRY_IS_ROTATION_ENABLED, expect_response=True) + self._write(CMD_DISABLE_CALC) + self._write(CMD_DISABLE_ROTATION) + _ = self._query(QRY_IS_CALC_ENABLED) + _ = self._query(QRY_IS_ROTATION_ENABLED) with contextlib.suppress(Exception): self._instr.close() self._instr = None @@ -234,7 +254,7 @@ def read(self) -> dict[str, float]: if self._instr is None: msg = "Start the device first." raise DeviceNotStartedError(msg) - raw_reply = self._read_and_write(QRY_LATEST, expect_response=True) + raw_reply = self._query(QRY_LATEST) token_strs = [p for p in raw_reply.replace(";", ",").split(",") if p] parsed_values: list[float | str] = [] From 7e8e2252dae1836a843a26372aa4322dfc766274 Mon Sep 17 00:00:00 2001 From: SoroushHoseini Date: Thu, 6 Nov 2025 22:12:03 -0600 Subject: [PATCH 6/8] Working polarimeter --- src/pqnstack/pqn/drivers/thorlabs_polarimeter.py | 2 +- test.py | 9 +++++++++ 2 files changed, 10 insertions(+), 1 deletion(-) create mode 100644 test.py diff --git a/src/pqnstack/pqn/drivers/thorlabs_polarimeter.py b/src/pqnstack/pqn/drivers/thorlabs_polarimeter.py index 93121431..ebd09123 100644 --- a/src/pqnstack/pqn/drivers/thorlabs_polarimeter.py +++ b/src/pqnstack/pqn/drivers/thorlabs_polarimeter.py @@ -28,7 +28,7 @@ CMD_SET_WAVELENGTH_METERS = "SENS:CORR:WAV" QRY_IS_CALC_ENABLED = "SENS:CALC?" QRY_IS_ROTATION_ENABLED = "INP:ROT:STAT?" -QRY_WAVELENGTH_METERS_OR_NM = "SENS:CORR:WAV?" +QRY_WAVELENGTH_METERS = "SENS:CORR:WAV?" QRY_LATEST = "SENS:DATA:LAT?" QRY_IDN = "*IDN?" diff --git a/test.py b/test.py new file mode 100644 index 00000000..28f53581 --- /dev/null +++ b/test.py @@ -0,0 +1,9 @@ +from time import sleep + +from pqnstack.pqn.drivers.powermeter import PM100DDevice + +powermeter = PM100DDevice(name="bob", desc="desk", address="/dev/usbtmc0") +powermeter.start() +sleep(5) +while True: + sleep(0.5) From 9a4606919bbf435a0bbdbea91ea91e28565e6928 Mon Sep 17 00:00:00 2001 From: SoroushHoseini Date: Thu, 6 Nov 2025 22:14:11 -0600 Subject: [PATCH 7/8] Remove Numpy --- .../pqn/drivers/thorlabs_polarimeter.py | 84 +++++++------------ 1 file changed, 31 insertions(+), 53 deletions(-) diff --git a/src/pqnstack/pqn/drivers/thorlabs_polarimeter.py b/src/pqnstack/pqn/drivers/thorlabs_polarimeter.py index ebd09123..48c324f7 100644 --- a/src/pqnstack/pqn/drivers/thorlabs_polarimeter.py +++ b/src/pqnstack/pqn/drivers/thorlabs_polarimeter.py @@ -8,7 +8,6 @@ from dataclasses import field from typing import Any -import numpy as np import pyvisa from pyvisa.errors import Error as VisaError @@ -35,11 +34,11 @@ @dataclass(frozen=True, slots=True) class PAX1000IR2Info(InstrumentInfo): - wavelength_nm: float = np.nan - last_theta_deg: float = np.nan - last_eta_deg: float = np.nan - last_dop: float = np.nan - last_power_w: float = np.nan + wavelength_nm: float = float("nan") + last_theta_deg: float = float("nan") + last_eta_deg: float = float("nan") + last_dop: float = float("nan") + last_power_w: float = float("nan") logging_rows: int = 0 @@ -58,34 +57,30 @@ class PAX1000IR2(Instrument): _instr: Any | None = field(default=None, init=False, repr=False) _timeout_ms: int = field(default=3000, init=False, repr=False) - _wavelength_nm_cache: float = field(default=np.nan, init=False) - _last_theta_deg: float = field(default=np.nan, init=False) - _last_eta_deg: float = field(default=np.nan, init=False) - _last_dop: float = field(default=np.nan, init=False) - _last_power_w: float = field(default=np.nan, init=False) + _wavelength_nm_cache: float = field(default=float("nan"), init=False) + _last_theta_deg: float = field(default=float("nan"), init=False) + _last_eta_deg: float = field(default=float("nan"), init=False) + _last_dop: float = field(default=float("nan"), init=False) + _last_power_w: float = field(default=float("nan"), init=False) def _write(self, cmd: str) -> None: if self._instr is None: - msg = "Start the device first." - raise DeviceNotStartedError(msg) - instr: Any = self._instr + raise DeviceNotStartedError("Start the device first.") try: - instr.write(f"{cmd}\n") + self._instr.write(f"{cmd}\n") except (VisaError, OSError): with contextlib.suppress(VisaError, OSError): - instr.write(cmd) + self._instr.write(cmd) def _query(self, cmd: str) -> str: if self._instr is None: - msg = "Start the device first." - raise DeviceNotStartedError(msg) - instr: Any = self._instr + raise DeviceNotStartedError("Start the device first.") try: - instr.write(f"{cmd}\n") - return str(instr.read()).strip() + self._instr.write(f"{cmd}\n") + return str(self._instr.read()).strip() except (VisaError, OSError): try: - return str(instr.query(cmd)).strip() + return str(self._instr.query(cmd)).strip() except (VisaError, OSError): return "" @@ -94,8 +89,7 @@ def _list_usb_resources(self) -> tuple[str, ...]: try: return self._rm.list_resources(USB_FILTER) # type: ignore[no-any-return] except VisaError as exc: - msg = f"VISA resource discovery failed: {exc}" - raise FileNotFoundError(msg) from exc + raise FileNotFoundError(f"VISA resource discovery failed: {exc}") from exc def _filter_candidates(self, resources: tuple[str, ...]) -> tuple[str, ...]: if self.pax_id_contains: @@ -106,7 +100,7 @@ def _probe_idn(self, resource_name: str) -> str: assert self._rm is not None try: with self._rm.open_resource(resource_name) as resource_handle: - visa_resource: Any = resource_handle # vendor object lacks type stubs + visa_resource: Any = resource_handle visa_resource.timeout = self._timeout_ms try: visa_resource.write(f"{QRY_IDN}\n") @@ -122,20 +116,14 @@ def _probe_idn(self, resource_name: str) -> str: def _discover_resource(self) -> str: resources = self._filter_candidates(self._list_usb_resources()) - if not resources: - msg = "No USB VISA resources matched filter." - raise FileNotFoundError(msg) - + raise FileNotFoundError("No USB VISA resources matched filter.") if len(resources) == 1 and not self.pax_idn_contains: return resources[0] - idn_substring = self.pax_idn_contains or "" matched = [r for r in resources if (not idn_substring) or (idn_substring in self._probe_idn(r))] - if len(matched) != 1: - msg = "PAX discovery ambiguous or no match." - raise FileNotFoundError(msg) + raise FileNotFoundError("PAX discovery ambiguous or no match.") return matched[0] def _open_resource(self, resource_name: str) -> None: @@ -145,8 +133,7 @@ def _open_resource(self, resource_name: str) -> None: self._instr.timeout = self._timeout_ms except VisaError as exc: self._instr = None - msg = f"Failed to open VISA resource {resource_name}: {exc}" - raise RuntimeError(msg) from exc + raise RuntimeError(f"Failed to open VISA resource {resource_name}: {exc}") from exc def _write_and_confirm(self, set_cmd: str, qry_cmd: str, expect: str | float) -> bool: try: @@ -154,7 +141,6 @@ def _write_and_confirm(self, set_cmd: str, qry_cmd: str, expect: str | float) -> except DeviceNotStartedError: return False expected_prefix = str(expect) - last_response = "" for _ in range(10): try: last_response = self._query(qry_cmd) @@ -172,14 +158,12 @@ def _init_settings(self) -> None: with contextlib.suppress(Exception): self._write_and_confirm(CMD_DISABLE_CALC, QRY_IS_CALC_ENABLED, 0) self._write_and_confirm(CMD_DISABLE_ROTATION, QRY_IS_ROTATION_ENABLED, 0) - msg = "PAX setup failed to enable calc/rotation." - raise RuntimeError(msg) + raise RuntimeError("PAX setup failed to enable calc/rotation.") def _read_wavelength_cache(self) -> None: try: raw_value = self._query(QRY_WAVELENGTH_METERS) - value_m = float(raw_value) - self._wavelength_nm_cache = value_m * 1e9 + self._wavelength_nm_cache = float(raw_value) * 1e9 except (ValueError, TypeError): self._wavelength_nm_cache = float("nan") @@ -188,8 +172,7 @@ def set_wavelength_nm(self, wavelength_nm: float) -> None: try: value_m = float(wavelength_nm) * 1e-9 except (TypeError, ValueError) as exc: - msg = f"Invalid wavelength: {wavelength_nm}" - raise ValueError(msg) from exc + raise ValueError(f"Invalid wavelength: {wavelength_nm}") from exc self._write(f"{CMD_SET_WAVELENGTH_METERS} {value_m}") self._read_wavelength_cache() @@ -199,20 +182,17 @@ def start(self) -> None: try: self._rm = pyvisa.ResourceManager("@py") except Exception as exc: - msg = f"VISA backend not available: {exc}" - raise RuntimeError(msg) from exc + raise RuntimeError(f"VISA backend not available: {exc}") from exc resource_name = self.hw_address or self._discover_resource() self._open_resource(resource_name) self._init_settings() self._read_wavelength_cache() - self.operations.update( - { - "read": self.read, - "set_wavelength_nm": self.set_wavelength_nm, - } - ) + self.operations.update({ + "read": self.read, + "set_wavelength_nm": self.set_wavelength_nm, + }) atexit.register(self.close) def close(self) -> None: @@ -252,10 +232,8 @@ def wavelength_nm(self) -> float: @log_operation def read(self) -> dict[str, float]: if self._instr is None: - msg = "Start the device first." - raise DeviceNotStartedError(msg) + raise DeviceNotStartedError("Start the device first.") raw_reply = self._query(QRY_LATEST) - token_strs = [p for p in raw_reply.replace(";", ",").split(",") if p] parsed_values: list[float | str] = [] for token_str in token_strs: From 39f4fccdffac1aff9679073e312984f18ca5a2c5 Mon Sep 17 00:00:00 2001 From: SoroushHoseini Date: Thu, 6 Nov 2025 22:15:28 -0600 Subject: [PATCH 8/8] Ruff any mypy fixes --- .../pqn/drivers/thorlabs_polarimeter.py | 40 ++++++++++++------- 1 file changed, 26 insertions(+), 14 deletions(-) diff --git a/src/pqnstack/pqn/drivers/thorlabs_polarimeter.py b/src/pqnstack/pqn/drivers/thorlabs_polarimeter.py index 48c324f7..a1b57a67 100644 --- a/src/pqnstack/pqn/drivers/thorlabs_polarimeter.py +++ b/src/pqnstack/pqn/drivers/thorlabs_polarimeter.py @@ -65,7 +65,8 @@ class PAX1000IR2(Instrument): def _write(self, cmd: str) -> None: if self._instr is None: - raise DeviceNotStartedError("Start the device first.") + msg = "Start the device first." + raise DeviceNotStartedError(msg) try: self._instr.write(f"{cmd}\n") except (VisaError, OSError): @@ -74,7 +75,8 @@ def _write(self, cmd: str) -> None: def _query(self, cmd: str) -> str: if self._instr is None: - raise DeviceNotStartedError("Start the device first.") + msg = "Start the device first." + raise DeviceNotStartedError(msg) try: self._instr.write(f"{cmd}\n") return str(self._instr.read()).strip() @@ -89,7 +91,8 @@ def _list_usb_resources(self) -> tuple[str, ...]: try: return self._rm.list_resources(USB_FILTER) # type: ignore[no-any-return] except VisaError as exc: - raise FileNotFoundError(f"VISA resource discovery failed: {exc}") from exc + msg = f"VISA resource discovery failed: {exc}" + raise FileNotFoundError(msg) from exc def _filter_candidates(self, resources: tuple[str, ...]) -> tuple[str, ...]: if self.pax_id_contains: @@ -117,13 +120,15 @@ def _probe_idn(self, resource_name: str) -> str: def _discover_resource(self) -> str: resources = self._filter_candidates(self._list_usb_resources()) if not resources: - raise FileNotFoundError("No USB VISA resources matched filter.") + msg = "No USB VISA resources matched filter." + raise FileNotFoundError(msg) if len(resources) == 1 and not self.pax_idn_contains: return resources[0] idn_substring = self.pax_idn_contains or "" matched = [r for r in resources if (not idn_substring) or (idn_substring in self._probe_idn(r))] if len(matched) != 1: - raise FileNotFoundError("PAX discovery ambiguous or no match.") + msg = "PAX discovery ambiguous or no match." + raise FileNotFoundError(msg) return matched[0] def _open_resource(self, resource_name: str) -> None: @@ -133,7 +138,8 @@ def _open_resource(self, resource_name: str) -> None: self._instr.timeout = self._timeout_ms except VisaError as exc: self._instr = None - raise RuntimeError(f"Failed to open VISA resource {resource_name}: {exc}") from exc + msg = f"Failed to open VISA resource {resource_name}: {exc}" + raise RuntimeError(msg) from exc def _write_and_confirm(self, set_cmd: str, qry_cmd: str, expect: str | float) -> bool: try: @@ -158,7 +164,8 @@ def _init_settings(self) -> None: with contextlib.suppress(Exception): self._write_and_confirm(CMD_DISABLE_CALC, QRY_IS_CALC_ENABLED, 0) self._write_and_confirm(CMD_DISABLE_ROTATION, QRY_IS_ROTATION_ENABLED, 0) - raise RuntimeError("PAX setup failed to enable calc/rotation.") + msg = "PAX setup failed to enable calc/rotation." + raise RuntimeError(msg) def _read_wavelength_cache(self) -> None: try: @@ -172,7 +179,8 @@ def set_wavelength_nm(self, wavelength_nm: float) -> None: try: value_m = float(wavelength_nm) * 1e-9 except (TypeError, ValueError) as exc: - raise ValueError(f"Invalid wavelength: {wavelength_nm}") from exc + msg = f"Invalid wavelength: {wavelength_nm}" + raise ValueError(msg) from exc self._write(f"{CMD_SET_WAVELENGTH_METERS} {value_m}") self._read_wavelength_cache() @@ -182,17 +190,20 @@ def start(self) -> None: try: self._rm = pyvisa.ResourceManager("@py") except Exception as exc: - raise RuntimeError(f"VISA backend not available: {exc}") from exc + msg = f"VISA backend not available: {exc}" + raise RuntimeError(msg) from exc resource_name = self.hw_address or self._discover_resource() self._open_resource(resource_name) self._init_settings() self._read_wavelength_cache() - self.operations.update({ - "read": self.read, - "set_wavelength_nm": self.set_wavelength_nm, - }) + self.operations.update( + { + "read": self.read, + "set_wavelength_nm": self.set_wavelength_nm, + } + ) atexit.register(self.close) def close(self) -> None: @@ -232,7 +243,8 @@ def wavelength_nm(self) -> float: @log_operation def read(self) -> dict[str, float]: if self._instr is None: - raise DeviceNotStartedError("Start the device first.") + msg = "Start the device first." + raise DeviceNotStartedError(msg) raw_reply = self._query(QRY_LATEST) token_strs = [p for p in raw_reply.replace(";", ",").split(",") if p] parsed_values: list[float | str] = []