11"""Module for simulated monitor devices."""
22
3+ from dataclasses import dataclass
4+
35import numpy as np
46from bec_lib import messages
57from bec_lib .endpoints import MessageEndpoints
1517logger = bec_logger .logger
1618
1719
20+ @dataclass
21+ class RegisteredCallback :
22+
23+ motor : str
24+ callback_id : int
25+
26+
1827class SimMonitor (ReadOnlySignal ):
1928 """
2029 A simulated device mimic any 1D Axis (position, temperature, beam).
@@ -62,10 +71,7 @@ def __init__(
6271 self .sim_init = sim_init
6372 self .device_manager = device_manager
6473 self ._registered_proxies = {}
65- self ._callback_id = (
66- None ,
67- None ,
68- ) # Tuple to store motor_name and callback_id for unregistration
74+ self ._registered_callback : RegisteredCallback | None = None
6975 self .sim = self .sim_cls (parent = self , ** kwargs )
7076
7177 super ().__init__ (
@@ -81,7 +87,7 @@ def __init__(
8187 self .sim .set_init (self .sim_init )
8288
8389 @property
84- def registered_proxies (self ) -> None :
90+ def registered_proxies (self ) -> dict :
8591 """Dictionary of registered signal_names and proxies."""
8692 return self ._registered_proxies
8793
@@ -93,20 +99,20 @@ def setup_readback_monitor(self, motor_name: str) -> None:
9399 motor_name (str): The name of the motor to monitor.
94100 """
95101
96- if self ._callback_id [0 ] == motor_name :
97- if self ._callback_id [1 ] is not None : # Already registered callback
102+ if self ._registered_callback :
103+ if self ._registered_callback .motor == motor_name :
104+ # Already registered callback
98105 return
99- else : # Unregister callback from previous motor if necessary
100- if self ._callback_id [1 ] is not None :
101- motor = self .device_manager .devices .get (self ._callback_id [0 ], None )
106+ else : # Unregister callback from previous motor if necessary
107+ motor = self .device_manager .devices .get (self ._registered_callback .motor , None )
102108 if motor :
103- motor .unsubscribe (self ._callback_id [ 1 ] )
109+ motor .unsubscribe (self ._registered_callback . callback_id )
104110
105111 # Register new callback
106112 motor = self .device_manager .devices .get (motor_name , None )
107113 if motor :
108114 cb_id = motor .subscribe (self ._update_readback , run = True )
109- self ._callback_id = ( motor_name , cb_id )
115+ self ._registered_callback = RegisteredCallback ( motor = motor_name , callback_id = cb_id )
110116
111117 def _update_readback (self , value , ** kwargs ):
112118 """Callback function to update the readback value."""
0 commit comments