|
| 1 | +import json |
| 2 | +from dataclasses import dataclass |
| 3 | +from typing import Any, Literal |
| 4 | + |
| 5 | +from pydantic import BaseModel, ConfigDict, ValidationError |
| 6 | + |
| 7 | +from fastcs.attributes import Attribute, AttrR, AttrRW, AttrW, Handler |
| 8 | +from fastcs.connections import IPConnection, IPConnectionSettings |
| 9 | +from fastcs.controller import BaseController, Controller, SubController |
| 10 | +from fastcs.datatypes import Bool, DataType, Float, Int, String |
| 11 | +from fastcs.launch import FastCS |
| 12 | +from fastcs.transport.epics.ca.options import EpicsCAOptions |
| 13 | +from fastcs.transport.epics.options import EpicsIOCOptions |
| 14 | + |
| 15 | + |
| 16 | +class TemperatureControllerParameter(BaseModel): |
| 17 | + model_config = ConfigDict(extra="forbid") |
| 18 | + |
| 19 | + command: str |
| 20 | + type: Literal["bool", "int", "float", "str"] |
| 21 | + access_mode: Literal["r", "rw"] |
| 22 | + |
| 23 | + @property |
| 24 | + def fastcs_datatype(self) -> DataType: |
| 25 | + match self.type: |
| 26 | + case "bool": |
| 27 | + return Bool() |
| 28 | + case "int": |
| 29 | + return Int() |
| 30 | + case "float": |
| 31 | + return Float() |
| 32 | + case "str": |
| 33 | + return String() |
| 34 | + |
| 35 | + |
| 36 | +def create_attributes(parameters: dict[str, Any]) -> dict[str, Attribute]: |
| 37 | + attributes: dict[str, Attribute] = {} |
| 38 | + for name, parameter in parameters.items(): |
| 39 | + name = name.replace(" ", "_").lower() |
| 40 | + |
| 41 | + try: |
| 42 | + parameter = TemperatureControllerParameter.model_validate(parameter) |
| 43 | + except ValidationError as e: |
| 44 | + print(f"Failed to validate parameter '{parameter}'\n{e}") |
| 45 | + continue |
| 46 | + |
| 47 | + handler = TemperatureControllerHandler(parameter.command) |
| 48 | + match parameter.access_mode: |
| 49 | + case "r": |
| 50 | + attributes[name] = AttrR(parameter.fastcs_datatype, handler=handler) |
| 51 | + case "rw": |
| 52 | + attributes[name] = AttrRW(parameter.fastcs_datatype, handler=handler) |
| 53 | + |
| 54 | + return attributes |
| 55 | + |
| 56 | + |
| 57 | +@dataclass |
| 58 | +class TemperatureControllerHandler(Handler): |
| 59 | + command_name: str |
| 60 | + update_period: float | None = 0.2 |
| 61 | + |
| 62 | + async def update(self, controller: BaseController, attr: AttrR): |
| 63 | + assert isinstance(controller, TemperatureController | TemperatureRampController) |
| 64 | + |
| 65 | + response = await controller.connection.send_query(f"{self.command_name}?\r\n") |
| 66 | + value = response.strip("\r\n") |
| 67 | + |
| 68 | + await attr.set(attr.dtype(value)) |
| 69 | + |
| 70 | + async def put(self, controller: BaseController, attr: AttrW, value: Any): |
| 71 | + assert isinstance(controller, TemperatureController | TemperatureRampController) |
| 72 | + |
| 73 | + await controller.connection.send_command(f"{self.command_name}={value}\r\n") |
| 74 | + |
| 75 | + |
| 76 | +class TemperatureRampController(SubController): |
| 77 | + def __init__(self, index: int, connection: IPConnection): |
| 78 | + super().__init__(f"Ramp {index}") |
| 79 | + |
| 80 | + self.connection = connection |
| 81 | + |
| 82 | + async def initialise(self, parameters: dict[str, Any]): |
| 83 | + self.attributes.update(create_attributes(parameters)) |
| 84 | + |
| 85 | + |
| 86 | +class TemperatureController(Controller): |
| 87 | + def __init__(self, settings: IPConnectionSettings): |
| 88 | + super().__init__() |
| 89 | + |
| 90 | + self._ip_settings = settings |
| 91 | + self.connection = IPConnection() |
| 92 | + |
| 93 | + async def connect(self): |
| 94 | + await self.connection.connect(self._ip_settings) |
| 95 | + |
| 96 | + async def initialise(self): |
| 97 | + await self.connect() |
| 98 | + |
| 99 | + api = json.loads((await self.connection.send_query("API?\r\n")).strip("\r\n")) |
| 100 | + |
| 101 | + ramps_api = api.pop("Ramps") |
| 102 | + self.attributes.update(create_attributes(api)) |
| 103 | + |
| 104 | + for idx, ramp_parameters in enumerate(ramps_api): |
| 105 | + ramp_controller = TemperatureRampController(idx + 1, self.connection) |
| 106 | + self.register_sub_controller(f"Ramp{idx + 1:02d}", ramp_controller) |
| 107 | + await ramp_controller.initialise(ramp_parameters) |
| 108 | + |
| 109 | + await self.connection.close() |
| 110 | + |
| 111 | + |
| 112 | +epics_options = EpicsCAOptions(ioc=EpicsIOCOptions(pv_prefix="DEMO")) |
| 113 | +connection_settings = IPConnectionSettings("localhost", 25565) |
| 114 | +fastcs = FastCS(TemperatureController(connection_settings), [epics_options]) |
| 115 | + |
| 116 | +# fastcs.run() # Commented as this will block |
0 commit comments