|
| 1 | +from typing import Union |
| 2 | + |
| 3 | +from ..bl.delay_execution import DelayExecution |
| 4 | +from ..bl.event import Event |
| 5 | +from ..interfaces.accelerator_interface import AcceleratorInterface |
| 6 | +from ..interfaces.element_interface import ElementInterface |
| 7 | + |
| 8 | + |
| 9 | +class AcceleratorImpl(AcceleratorInterface): |
| 10 | + def __init__(self, acc, proxy_factory, twiss_calculator, orbit_calculator, *, delay=0.1): |
| 11 | + """ |
| 12 | + Initializes the accelerator implementation. |
| 13 | + This manages the accelerator state, interactions, and calculation flow. |
| 14 | + Registers event handlers and calculations. |
| 15 | +
|
| 16 | + Args: |
| 17 | + acc: Accelerator model. |
| 18 | + proxy_factory: Factory for creating element proxies. |
| 19 | + twiss_calculator: Calculator for twiss parameters. |
| 20 | + orbit_calculator: Calculator for orbit calculations. |
| 21 | + delay (float): Delay for calculation executions. |
| 22 | + """ |
| 23 | + super().__init__() |
| 24 | + self.acc = acc |
| 25 | + self.proxy_factory = proxy_factory |
| 26 | + self.twiss_calculator = twiss_calculator |
| 27 | + self.orbit_calculator = orbit_calculator |
| 28 | + |
| 29 | + # Store calculation results |
| 30 | + self.twiss = twiss_calculator.calculate() |
| 31 | + self.orbit = orbit_calculator.calculate() |
| 32 | + |
| 33 | + # Events for notification |
| 34 | + self.on_new_orbit = Event() |
| 35 | + self.on_new_twiss = Event() |
| 36 | + |
| 37 | + async def cb_twiss(): |
| 38 | + """Internal method to calculate twiss parameters.""" |
| 39 | + self.twiss = self.twiss_calculator.calculate() |
| 40 | + await self.on_new_twiss.trigger(self.twiss) |
| 41 | + |
| 42 | + async def cb_orbit(): |
| 43 | + """Internal method to calculate orbit parameters.""" |
| 44 | + self.orbit = self.orbit_calculator.calculate() |
| 45 | + await self.on_new_orbit.trigger(self.orbit) |
| 46 | + |
| 47 | + self.twiss_calculation_delay = DelayExecution(callback=cb_twiss, delay=delay) |
| 48 | + self.orbit_calculation_delay = DelayExecution(callback=cb_orbit, delay=delay) |
| 49 | + |
| 50 | + self.on_changed_value = Event() |
| 51 | + |
| 52 | + self.on_orbit_calculation_request = self.orbit_calculation_delay.on_calculation_requested |
| 53 | + self.on_orbit_calculation = self.orbit_calculation_delay.on_calculation |
| 54 | + self.on_twiss_calculation_request = self.twiss_calculation_delay.on_calculation_requested |
| 55 | + self.on_twiss_calculation = self.twiss_calculation_delay.on_calculation |
| 56 | + |
| 57 | + def set_delay(self, delay: Union[float, None]): |
| 58 | + """How much to delay twiss and orbit calculation after last received command""" |
| 59 | + self.twiss_calculation_delay.set_delay(delay) |
| 60 | + self.orbit_calculation_delay.set_delay(delay) |
| 61 | + |
| 62 | + async def get_element(self, element_id) -> ElementInterface: |
| 63 | + """ |
| 64 | + Retrieves an element and subscribes to its changes. |
| 65 | +
|
| 66 | + Args: |
| 67 | + element_id (str): The element identifier. |
| 68 | +
|
| 69 | + Returns: |
| 70 | + ElementInterface: Proxy object for the element. |
| 71 | + """ |
| 72 | + proxy = self.proxy_factory.get(element_id) |
| 73 | + proxy.on_changed_value.subscribe(self.on_changed_value.trigger) |
| 74 | + |
| 75 | + async def cb(unused): |
| 76 | + await self.orbit_calculation_delay.request_execution() |
| 77 | + await self.twiss_calculation_delay.request_execution() |
| 78 | + |
| 79 | + proxy.on_update_finished.subscribe(cb) |
| 80 | + return proxy |
| 81 | + |
| 82 | + def calculate_twiss(self): |
| 83 | + return self.twiss_calculator.calculate() |
| 84 | + |
| 85 | + def calculate_orbit(self): |
| 86 | + return self.orbit_calculator.calculate() |
0 commit comments