-
Notifications
You must be signed in to change notification settings - Fork 37
Fix DeviceVector introspection #1198
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from 13 commits
773a8c1
2317b5f
7776da5
1727010
0d6f5fa
fda562a
91bad8b
04002d4
c65e603
4a5a1c1
150cef9
81c49c7
e284882
772e48e
77187f3
cdf18a1
e1ba902
345e706
7ef99a7
746f9ab
d138b37
bdf9f8c
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -1,8 +1,11 @@ | ||
| from __future__ import annotations | ||
|
|
||
| from typing import Literal, cast | ||
| import re | ||
|
|
||
| from pydantic import Field | ||
|
|
||
| from ophyd_async.core import ( | ||
| ConfinedModel, | ||
| Device, | ||
| DeviceConnector, | ||
| DeviceFiller, | ||
|
|
@@ -12,56 +15,12 @@ | |
| SignalRW, | ||
| SignalW, | ||
| SignalX, | ||
| gather_dict, | ||
| ) | ||
|
|
||
| from ._epics_connector import fill_backend_with_prefix | ||
| from ._signal import PvaSignalBackend, pvget_with_timeout | ||
|
|
||
| Entry = dict[str, str] | ||
|
|
||
| OldPVIVector = list[Entry | None] | ||
| # The older PVI structure has vectors of the form | ||
| # structure[] ttlout | ||
| # (none) | ||
| # structure | ||
| # string d PANDABLOCKS_IOC:TTLOUT1:PVI | ||
| # structure | ||
| # string d PANDABLOCKS_IOC:TTLOUT2:PVI | ||
| # structure | ||
| # string d PANDABLOCKS_IOC:TTLOUT3:PVI | ||
|
|
||
|
|
||
| FastCSPVIVector = dict[Literal["d"], Entry] | ||
| # The newer pva FastCS PVI structure has vectors of the form | ||
| # structure ttlout | ||
| # structure d | ||
| # string v1 FASTCS_PANDA:Ttlout1:PVI | ||
| # string v2 FASTCS_PANDA:Ttlout2:PVI | ||
| # string v3 FASTCS_PANDA:Ttlout3:PVI | ||
| # string v4 FASTCS_PANDA:Ttlout4:PVI | ||
|
|
||
|
|
||
| def _get_signal_details(entry: Entry) -> tuple[type[Signal], str, str]: | ||
| match entry: | ||
| case {"r": read_pv, "w": write_pv}: | ||
| return SignalRW, read_pv, write_pv | ||
| case {"r": read_pv}: | ||
| return SignalR, read_pv, read_pv | ||
| case {"w": write_pv}: | ||
| return SignalW, write_pv, write_pv | ||
| case {"rw": read_write_pv}: | ||
| return SignalRW, read_write_pv, read_write_pv | ||
| case {"x": execute_pv}: | ||
| return SignalX, execute_pv, execute_pv | ||
| case _: | ||
| raise TypeError(f"Can't process entry {entry}") | ||
|
|
||
|
|
||
| def _is_device_vector_entry(entry: Entry | OldPVIVector | FastCSPVIVector) -> bool: | ||
| return isinstance(entry, list) or ( | ||
| entry.keys() == {"d"} and isinstance(entry["d"], dict) | ||
| ) | ||
|
|
||
|
|
||
| class PviDeviceConnector(DeviceConnector): | ||
| """Connect to PVI structure served over PVA. | ||
|
|
@@ -78,6 +37,7 @@ | |
| """ | ||
|
|
||
| mock_device_vector_len: int = 2 | ||
| pvi_tree: PviTree | None = None | ||
|
|
||
| def __init__(self, prefix: str = "", error_hint: str = "") -> None: | ||
| # TODO: what happens if we get a leading "pva://" here? | ||
|
|
@@ -101,52 +61,215 @@ | |
| fill_backend_with_prefix(self.prefix, backend, annotations) | ||
| self.filler.check_created() | ||
|
|
||
| def _fill_child(self, name: str, entry: Entry, vector_index: int | None = None): | ||
| if set(entry) == {"d"}: | ||
| connector = self.filler.fill_child_device(name, vector_index=vector_index) | ||
| connector.pvi_pv = entry["d"] | ||
| else: | ||
| signal_type, read_pv, write_pv = _get_signal_details(entry) | ||
| backend = self.filler.fill_child_signal(name, signal_type, vector_index) | ||
| backend.read_pv = read_pv | ||
| backend.write_pv = write_pv | ||
|
|
||
| async def connect_mock(self, device: Device, mock: LazyMock): | ||
| self.filler.create_device_vector_entries_to_mock(self.mock_device_vector_len) | ||
| # Set the name of the device to name all children | ||
| device.set_name(device.name) | ||
| return await super().connect_mock(device, mock) | ||
|
|
||
| def _fill_vector_child(self, name: str, entry: OldPVIVector | FastCSPVIVector): | ||
| if isinstance(entry, list): | ||
| for i, e in enumerate(entry): | ||
| if e: | ||
| self._fill_child(name, e, i) | ||
| else: | ||
| for i_string, e in entry["d"].items(): | ||
| self._fill_child(name, {"d": e}, int(i_string.lstrip("v"))) | ||
|
|
||
| async def connect_real( | ||
| self, device: Device, timeout: float, force_reconnect: bool | ||
| ) -> None: | ||
| pvi_structure = await pvget_with_timeout(self.pvi_pv, timeout) | ||
|
|
||
| entries: dict[str, Entry | OldPVIVector | FastCSPVIVector] = pvi_structure[ | ||
| "value" | ||
| ].todict() | ||
| # Fill based on what PVI gives us | ||
| for name, entry in entries.items(): | ||
| if _is_device_vector_entry(entry): | ||
| self._fill_vector_child( | ||
| name, cast(OldPVIVector | FastCSPVIVector, entry) | ||
| ) | ||
| if not self.pvi_tree: | ||
| # Top-level device, so discover PVI tree | ||
| self.pvi_tree = await PviTree.build_device_tree( | ||
| name=device.name, pvi_pv=self.pvi_pv, timeout=timeout | ||
| ) | ||
| # Fill all signals | ||
| for signal_name, signal_details in self.pvi_tree.signals.items(): | ||
| backend = self.filler.fill_child_signal( | ||
| signal_name, signal_details.signal_type, None | ||
| ) | ||
| backend.read_pv = signal_details.read_pv | ||
| backend.write_pv = signal_details.write_pv | ||
| # Fill all sub devices | ||
| for device_name, device_sub_tree in self.pvi_tree.sub_devices.items(): | ||
| if device_sub_tree.vector_children: | ||
| # This is a DeviceVector | ||
| for ( | ||
| vector_index, | ||
| vector_child, | ||
| ) in device_sub_tree.vector_children.items(): | ||
| connector = self.filler.fill_child_device( | ||
| device_name, vector_index=vector_index | ||
| ) | ||
| connector.pvi_tree = vector_child | ||
| connector.pvi_pv = vector_child.pvi_pv | ||
| else: | ||
| # This is a child | ||
| self._fill_child(name, cast(Entry, entry)) | ||
| # This is a Device | ||
| connector = self.filler.fill_child_device(device_name) | ||
| connector.pvi_tree = device_sub_tree | ||
| connector.pvi_pv = device_sub_tree.pvi_pv | ||
|
|
||
| # Check that all the requested children have been filled | ||
| suffix = f"\n{self.error_hint}" if self.error_hint else "" | ||
| self.filler.check_filled(f"{self.pvi_pv}: {entries}{suffix}") | ||
| self.filler.check_filled(f"{self.pvi_pv}: {self.pvi_tree}{suffix}") | ||
| # Set the name of the device to name all children | ||
| device.set_name(device.name) | ||
| return await super().connect_real(device, timeout, force_reconnect) | ||
|
|
||
|
|
||
| class SignalDetails(ConfinedModel): | ||
| """Representation of a Signal to be constructed.""" | ||
|
|
||
| signal_type: type[Signal] | ||
| read_pv: str | ||
| write_pv: str | ||
|
|
||
| @classmethod | ||
| def from_entry(cls, entry: dict[str, str]) -> SignalDetails: | ||
Check noticeCode scanning / CodeQL Explicit returns mixed with implicit (fall through) returns Note
Mixing implicit and explicit returns may indicate an error, as implicit returns always return None.
Copilot AutofixAI 2 days ago Copilot could not generate an autofix suggestion Copilot could not generate an autofix suggestion for this alert. Try pushing a new commit or if the problem persists contact support. |
||
| match entry: | ||
| case {"r": read_pv, "w": write_pv}: | ||
| return cls(signal_type=SignalRW, read_pv=read_pv, write_pv=write_pv) | ||
|
|
||
| case {"rw": pv}: | ||
| return cls(signal_type=SignalRW, read_pv=pv, write_pv=pv) | ||
|
|
||
| case {"r": read_pv}: | ||
| return cls(signal_type=SignalR, read_pv=read_pv, write_pv=read_pv) | ||
|
|
||
| case {"w": write_pv}: | ||
| return cls(signal_type=SignalW, read_pv=write_pv, write_pv=write_pv) | ||
|
|
||
| case {"x": execute_pv}: | ||
| return cls(signal_type=SignalX, read_pv=execute_pv, write_pv=execute_pv) | ||
|
|
||
| case _: | ||
| raise TypeError(f"Can't process entry {entry}") | ||
shihab-dls marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
|
||
|
|
||
| class PviTree(ConfinedModel): | ||
| """Representation of a PVI structure of devices and signals in a PVI query. | ||
|
|
||
| Example 1: A device with sub-devices and signals | ||
| -------------------------------------- | ||
| For a PVI structure such as: | ||
|
|
||
| ```json | ||
| { | ||
| "bit": {"d": "TEST-PANDA:Bits:PVI"}, | ||
| "calc": {"d": "TEST-PANDA:Calc:PVI"}, | ||
| "a": {"rw": "TEST-PANDA:Bits:A"} | ||
| } | ||
| ``` | ||
|
|
||
| From "TEST-PANDA:PVI", This would be represented as: | ||
|
|
||
| ```python | ||
| PviTree( | ||
| pvi_pv="TEST-PANDA:PVI", | ||
| signals={ | ||
| "a": SignalDetails( | ||
| signal_type=SignalRW, | ||
| read_pv="TEST-PANDA:Bits:A", | ||
| write_pv="TEST-PANDA:Bits:A") | ||
| }, | ||
| sub_devices={ | ||
| "bit": PviTree(...), | ||
| "calc": PviTree(...) | ||
| }, | ||
| vector_children=[] | ||
| ) | ||
| ``` | ||
|
|
||
| Example 2: A device with vector children | ||
| ----------------------------------------- | ||
| If an entry like `"calc"` is a **DeviceVector** | ||
| (e.g., mirroring a fastCS controller vector), the PVI entries will look like this: | ||
|
|
||
| ```json | ||
| { | ||
| "__1": {"d": "TEST-PANDA:Calc:2:PVI"}, | ||
| "__2": {"d": "TEST-PANDA:Calc:1:PVI"} | ||
| } | ||
| ``` | ||
|
|
||
| This would be represented as: | ||
|
|
||
| ```python | ||
| PviTree( | ||
| pvi_pv="TEST-PANDA:Calc:PVI", | ||
| signals={}, | ||
| sub_devices={}, | ||
| vector_children=[ | ||
| PviTree(pvi_pv="TEST-PANDA:Calc:2:PVI", signals={}, ...), | ||
| PviTree(pvi_pv="TEST-PANDA:Calc:1:PVI", signals={}, ...) | ||
| ] | ||
| ) | ||
| ``` | ||
|
|
||
| :param pvi_pv: | ||
| The PVI PV of the device. | ||
|
|
||
| :param signals: | ||
| A dictionary mapping signal names to `SignalDetails` objects. | ||
|
|
||
| :param sub_devices: | ||
| A dictionary mapping sub-device names to their corresponding `PviTree` objects. | ||
|
|
||
| :param vector_children: | ||
| A list of `PviTree` objects representing child devices of a vector device. | ||
| """ | ||
|
|
||
| pvi_pv: str | ||
| signals: dict[str, SignalDetails] = Field(default={}) | ||
| sub_devices: dict[str, PviTree] = Field(default={}) | ||
| vector_children: dict[int, PviTree] = Field(default={}) | ||
|
|
||
| @classmethod | ||
| async def build_device_tree(cls, name: str, pvi_pv: str, timeout: float) -> PviTree: | ||
shihab-dls marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| """Recursively build a PviTree from a top level device. | ||
|
|
||
| Starting from the top-level device, this classmethod performs | ||
| post-order traversal over the served PVI structure, populating | ||
| a PviTree from the bottom up. | ||
|
|
||
| :param name: Device name | ||
| :param pvi_pv: Device PVI PV | ||
| :param timeout: Timeout on pvget | ||
| """ | ||
| pvi_structure = await pvget_with_timeout(pvi_pv, timeout) | ||
|
|
||
| # An example entry is: {"d": "Prefix:Device:PVI", "rw": "Prefix:A"} | ||
| # these entries are stored under the parent PVI structure name | ||
| # for example, {"device": {"d": "Prefix:Device:PVI", "rw": "Prefix:A"}} | ||
| entries: dict[str, dict[str, str]] = pvi_structure["value"].todict() | ||
|
|
||
| signal_details = { | ||
| entry_name: SignalDetails.from_entry(entries.pop(entry_name)) | ||
| for entry_name in entries | ||
shihab-dls marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| if set(entries[entry_name]) != {"d"} | ||
| } | ||
shihab-dls marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| sub_trees = await gather_dict( | ||
| { | ||
| entry_name: cls.build_device_tree(entry_name, entry["d"], timeout) | ||
| for entry_name, entry in entries.items() | ||
| } | ||
| ) | ||
coretl marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
|
||
| vector_children: dict[int, PviTree] = {} | ||
| # Filter vector children out of stand-alone devices | ||
| for child_name in list(sub_trees): | ||
| # Check if any sub-devices are named "__#" (e.g., "__1") | ||
| if m := re.match(r"^__(\d+)$", child_name): | ||
shihab-dls marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
shihab-dls marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| sub_tree = sub_trees.pop(child_name) | ||
| vector_children[int(m.group(1))] = sub_tree | ||
|
|
||
| return PviTree( | ||
| pvi_pv=pvi_pv, | ||
| signals=signal_details, | ||
| sub_devices=sub_trees, | ||
| vector_children=vector_children, | ||
| ) | ||
|
|
||
| def __str__(self) -> str: | ||
| """Print a readable top layer of the PviTree.""" | ||
| children = { | ||
| child_name: tree.pvi_pv | ||
| for child_name, tree in {**self.sub_devices, **self.vector_children}.items() | ||
| } | ||
| signals = { | ||
| signal_name: (detail.signal_type.__name__, detail.read_pv, detail.write_pv) | ||
| for signal_name, detail in self.signals.items() | ||
| } | ||
| return f"sub_devices={children}\nsignals={signals}" | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It occurs to me that we could relatively easily still support this structure for backwards compatibility, which would make it easier for people to upgrade...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I've pushed changes to support this, and amended
panda.dbto have a block that uses this legacy structure. Do you see a point in supporting:as well?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nope, let's drop the old old style...