-
Notifications
You must be signed in to change notification settings - Fork 37
Typed Command with multiple input values to replace SignalX #1138
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from 17 commits
f1adbe9
a1b5a2f
f867bd6
cf50bc6
e2f2c9f
397c28c
c3f6f41
5895971
1f07946
c86ac96
e4a32c0
f963ed7
023abb8
807ea5e
634750b
2be1bc6
52e91d3
ae851f0
6d49587
ffef220
c4e22ec
4c8dbb8
8f519f6
83ceac8
768ab53
1d1c46c
5d983a5
52ceedd
f9bd95c
c439f02
944deb0
8d243d8
40605ee
8b26da1
131dee6
8576376
362c749
126fc3c
4afd859
f444830
035aee6
8fd3eb9
70c2389
34a5012
2288c9c
41b9363
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change | ||||
|---|---|---|---|---|---|---|
| @@ -0,0 +1,340 @@ | ||||||
| from __future__ import annotations | ||||||
|
|
||||||
| import asyncio | ||||||
| import inspect | ||||||
| from abc import abstractmethod | ||||||
| from collections.abc import Awaitable, Callable, Sequence | ||||||
| from typing import ( | ||||||
| Generic, | ||||||
| ParamSpec, | ||||||
| Protocol, | ||||||
| TypeVar, | ||||||
| cast, | ||||||
| get_args, | ||||||
| get_origin, | ||||||
| get_type_hints, | ||||||
| ) | ||||||
| from unittest.mock import AsyncMock | ||||||
|
|
||||||
| from ._device import Device, DeviceConnector, LazyMock | ||||||
| from ._utils import DEFAULT_TIMEOUT | ||||||
|
|
||||||
| P = ParamSpec("P") | ||||||
| T_co = TypeVar("T_co", covariant=True) | ||||||
| T = TypeVar("T") | ||||||
|
|
||||||
|
|
||||||
| class CommandError(Exception): | ||||||
| """Base class for all Command related errors.""" | ||||||
|
|
||||||
|
|
||||||
| class ConnectionError(CommandError): | ||||||
| """Raised when a Command cannot connect to its backend.""" | ||||||
|
|
||||||
|
|
||||||
| class ConnectionTimeoutError(ConnectionError): | ||||||
| """Raised when a Command connection times out.""" | ||||||
|
|
||||||
|
|
||||||
| class ExecutionError(CommandError): | ||||||
| """Raised when a Command fails during execution.""" | ||||||
|
|
||||||
|
|
||||||
| class CommandBackend(Protocol[P, T_co]): | ||||||
| """A backend for a Command.""" | ||||||
|
|
||||||
| @abstractmethod | ||||||
| def source(self, name: str, read: bool) -> str: | ||||||
| """Return source of command.""" | ||||||
burkeds marked this conversation as resolved.
Show resolved
Hide resolved
|
||||||
|
|
||||||
| @abstractmethod | ||||||
| async def connect(self, timeout: float) -> None: | ||||||
| """Connect to underlying hardware.""" | ||||||
|
|
||||||
| @abstractmethod | ||||||
| async def call(self, *args: P.args, **kwargs: P.kwargs) -> T_co: | ||||||
| """Execute the command and return its result.""" | ||||||
|
|
||||||
|
|
||||||
| class CommandConnector(DeviceConnector): | ||||||
| """A connector for a Command.""" | ||||||
|
|
||||||
| def __init__(self, backend: CommandBackend): | ||||||
| self.backend = backend | ||||||
|
|
||||||
| async def connect_mock(self, device: Device, mock: LazyMock): | ||||||
| """Connect the backend in mock mode.""" | ||||||
| self.backend = MockCommandBackend(self.backend, mock) | ||||||
|
|
||||||
| async def connect_real(self, device: Device, timeout: float, force_reconnect: bool): | ||||||
| """Connect the backend to real hardware.""" | ||||||
| await self.backend.connect(timeout) | ||||||
|
|
||||||
|
|
||||||
| class Command(Device, Generic[P, T]): | ||||||
| """A Device that can be called to execute a command. | ||||||
|
|
||||||
| :param backend: The backend for executing the command. | ||||||
| :param timeout: The default timeout for calling the command. | ||||||
| :param name: The name of the command. | ||||||
| """ | ||||||
|
|
||||||
| _connector: CommandConnector | ||||||
|
|
||||||
| def __init__( | ||||||
| self, | ||||||
| backend: CommandBackend[P, T], | ||||||
| timeout: float | None = DEFAULT_TIMEOUT, | ||||||
| name: str = "", | ||||||
| ): | ||||||
| super().__init__(name=name, connector=CommandConnector(backend)) | ||||||
| self._timeout = timeout | ||||||
|
|
||||||
| @property | ||||||
| def source(self) -> str: | ||||||
| """Returns the source of the command.""" | ||||||
| return self._connector.backend.source(self.name, True) | ||||||
burkeds marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||||||
|
|
||||||
| async def __call__(self, *args: P.args, **kwargs: P.kwargs) -> T: | ||||||
|
||||||
| async def __call__(self, *args: P.args, **kwargs: P.kwargs) -> T: | |
| async def trigger(self, *args: P.args, **kwargs: P.kwargs) -> T: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@runtime_checkable
class Triggerable(Protocol):
@abstractmethod
def trigger(self) -> Status:
"""Return a ``Status`` that is marked done when the device is done triggering."""
...The problem here is that it won't match the protocol signature.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good point, Triggerable doesn't take args and kwargs because it was difficult to type hint at the time, but now we ParamSpec then maybe we can open that up again?
What's the intended use of Tango commands, would they be triggered directly from plans, or always wrapped in some other device that calls them?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
They should be callable directly from plans. There are no "rules" or best practices that are adopted community-wide in Tango so commands could do anything and everything. I would propose altering Trlggerable to take optional arguments rather than implementing a new Bluesky verb.
Uh oh!
There was an error while loading. Please reload this page.