Skip to content

Commit 429a5d8

Browse files
authored
Merge pull request #6 from hz-b/dev/feature/periodic-publisher
[TASK] Split view into data that produced by calculation and data republished as it is also done by the machine
2 parents e6ab762 + 7f77282 commit 429a5d8

File tree

16 files changed

+318
-109
lines changed

16 files changed

+318
-109
lines changed

src/dt4acc/core/accelerators/accelerator_manager.py

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -81,5 +81,4 @@ def setup_event_subscriptions(self):
8181
self.accelerator.on_new_twiss.subscribe(self.view.push_twiss)
8282
self.accelerator.on_new_orbit.subscribe(self.view.push_orbit)
8383

84-
self.accelerator.on_new_orbit.subscribe(self.view.push_bpms)
8584
self.accelerator.on_changed_value.subscribe(self.view.push_value)

src/dt4acc/core/bl/delay_execution.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,7 @@ async def request_execution(self):
3939
self.pending_task.cancel()
4040

4141
# Schedule a new execution task after the delay
42-
self.pending_task = asyncio.create_task(self._delayed_execution())
42+
self.pending_task = asyncio.create_task(self._delayed_execution(), name="delayed-execution")
4343

4444
async def _delayed_execution(self):
4545
"""
Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,9 @@
1+
from abc import ABCMeta, abstractmethod
2+
3+
4+
class ViewInterface(metaclass=ABCMeta):
5+
@abstractmethod
6+
async def push(self, data):
7+
"""
8+
"""
9+
raise NotImplementedError("use base class instead")

src/dt4acc/core/model/twiss.py

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -40,4 +40,11 @@ def get_plane(self, plane: Planes):
4040
elif plane == Planes.y:
4141
return self.y
4242
else:
43-
raise AssertionError("How could I end up here")
43+
raise AssertionError("How could I end up here")
44+
45+
@dataclass
46+
class TuneData:
47+
"""extracted tune data for the tune device
48+
"""
49+
x: float
50+
y: float
Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,41 @@
1+
import asyncio
2+
import itertools
3+
4+
from .logger import get_logger
5+
from ..interfaces.view_interface import ViewInterface
6+
7+
#: use a "singleton", even if the name is not unique it should still work
8+
counter = itertools.count()
9+
logger = get_logger()
10+
11+
class PeriodicPublisher:
12+
"""
13+
14+
Todo:
15+
find a better name for it?
16+
Its rather a periodic publisher proxy
17+
18+
It is expected to be triggered from outside
19+
"""
20+
def __init__(self, view: ViewInterface, name: str):
21+
"""
22+
23+
Todo:
24+
view should cohere to a protocol or interface
25+
"""
26+
self.view = view
27+
self.data = None
28+
self.name = name
29+
30+
def set_data(self, data):
31+
self.data = data
32+
33+
async def publish(self):
34+
"""
35+
Expect that publish is called in parallel, thus just return stat object
36+
"""
37+
# name=f"publish-data-{self.name}-{next(counter)}"
38+
if self.data is None:
39+
logger.warning(f"{self.__class__.__name__}(name={self.name}), self.data is None, thus not publishing data")
40+
return
41+
return await self.view.push(self.data)

src/dt4acc/core/views/shared_view.py

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,9 +21,11 @@ def get_view_instance():
2121
server_type = os.environ.get("server", "epics").lower()
2222

2323
if server_type == "tango":
24-
from ...custom_tango.views.calculation_result_view import ResultView
24+
# todo: needs to be implemented, sorry for breaking
25+
# tango implementation Pierre
26+
from ...custom_tango.views.result_view import ResultView
2527
elif server_type == "epics":
26-
from ...custom_epics.views.calculation_result_view import ResultView
28+
from ...custom_epics.views.result_view import ResultView
2729
else:
2830
raise ValueError(f"Unsupported server type: {server_type}")
2931

src/dt4acc/custom_epics/ioc/handlers.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,15 +1,15 @@
11
from bact_twin_architecture.bl.command_rewriter import CommandRewriter
22
# from dt4acc.custom_epics.ioc.liasion_translation_manager import build_managers
3-
from p4p.client.asyncio import Context
43

54
from .liasion_translation_manager import build_managers
5+
from ..utils.context_proxy import ContextProxy
66
from ...core.accelerators.pyat_accelerator import setup_accelerator
77
from ...core.command import UpdateManager
88
from ...core.utils.logger import get_logger
99

1010
logger = get_logger()
1111

12-
ctx = Context("pva") # Create a context for EPICS PVA (PV Access)
12+
ctx = ContextProxy("pva") # Create a context for EPICS PVA (PV Access)
1313

1414
#: todo replace soon by database service
1515

src/dt4acc/custom_epics/ioc/pv_setup.py

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,6 @@
33
LatticeElementPropertyID,
44
DevicePropertyID,
55
)
6-
from p4p.asLib.yacc import start
76

87
from .handlers import handle_device_update, update_manager
98
from ..data.constants import config, special_pvs, cavity_names
@@ -116,7 +115,7 @@ def add_pc_pvs(builder, pc_name, prefix):
116115
rdbk = builder.aOut(f"{pc_name}:rdbk", initial_value=start_val, PREC=2)
117116

118117
async def handle_pc_update(device_id: str, property_id: str, value: float):
119-
logger.debug("%s:%s updating setpoint val=%s", device_id, property_id, value)
118+
logger.warning("%s:%s updating setpoint val=%s", device_id, property_id, value)
120119
r = await handle_device_update(
121120
device_id=device_id, property_id=property_id, value=value
122121
)
@@ -148,6 +147,12 @@ def initialize_orbit_pvs(builder):
148147
builder.aOut(f"beam:orbit:found", initial_value=0)
149148

150149

150+
def initialize_tune_pvs(builder):
151+
for axis in ["x", "y"]:
152+
builder.aOut(f"TUNECC:{axis}", initial_value=0.0, PREC=9)
153+
builder.longOut(f"TUNECC:count", initial_value=0)
154+
155+
151156
def initialize_twiss_pvs(builder):
152157
"""
153158
Initializes PVs for Twiss parameters, which describe beam optics.

src/dt4acc/custom_epics/ioc/server.py

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44

55
from softioc import softioc, builder, asyncio_dispatcher
66

7+
from ...core.utils.logger import get_logger
78
from .tasks import monitor_heartbeat
89
from .pv_setup import (
910
initialize_power_converter_pvs,
@@ -13,9 +14,12 @@
1314
initialize_bpm_pvs,
1415
initialize_orbit_object_pvs,
1516
initialize_twiss_pvs,
17+
initialize_tune_pvs,
1618
initialize_other_pvs,
1719
)
1820

21+
logger = get_logger()
22+
1923
# Create an asyncio dispatcher to handle asynchronous PV updates
2024
dispatcher = asyncio_dispatcher.AsyncioDispatcher()
2125

@@ -39,14 +43,16 @@ def startup():
3943
initialize_bpm_pvs(builder) # Initialize Beam Position Monitor PVs
4044
initialize_orbit_object_pvs(builder) # Initialize PV's of the new orbit object ... collection of bpms
4145
initialize_twiss_pvs(builder) # Initialize Twiss parameter PVs
46+
initialize_tune_pvs(builder)
47+
logger.warning("All pvs set up")
4248

4349
# Load the database of PVs defined above into the SoftIOC server
4450
builder.LoadDatabase()
4551
# Start the SoftIOC server to handle PV interactions
4652
softioc.iocInit(dispatcher)
4753

4854
# Start monitoring the heartbeat to ensure the server is running correctly
49-
asyncio.create_task(monitor_heartbeat())
55+
asyncio.create_task(monitor_heartbeat(), name="server-heartbeat-loop")
5056

5157

5258
def main():

src/dt4acc/custom_epics/ioc/tasks.py

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,12 @@
11
import asyncio
22
import getpass
3+
from datetime import datetime
34

4-
from p4p.client.asyncio import Context
5-
5+
from ..utils.context_proxy import ContextProxy
66
from ...core.utils.logger import get_logger
77
from ...core.views.shared_view import get_view_instance
88

9-
ctx = Context("pva")
9+
ctx = ContextProxy("pva")
1010
logger = get_logger()
1111
view = get_view_instance()
1212
heartbeat_task = None # Global variable to store the heartbeat task reference
@@ -25,7 +25,7 @@ async def monitor_heartbeat():
2525
await ctx.put(f'{getpass.getuser()}:VS3P2T5R:set', 1e-3)
2626
await ctx.put(f'{getpass.getuser()}:VS3P2T5R:set', 0e-3)
2727
logger.warning("Heartbeat loop initial start or it was terminated unexpectedly. Restarting...")
28-
heartbeat_task = asyncio.create_task(heartbeat_loop())
28+
heartbeat_task = asyncio.create_task(heartbeat_loop(), name="heart-beat-loop")
2929
await asyncio.sleep(1)
3030

3131

@@ -36,12 +36,14 @@ async def heartbeat_loop():
3636
Runs indefinitely and logs any errors encountered.
3737
"""
3838
while True:
39+
logger.info(f"Heart beat loop executing at {datetime.now()}")
3940
try:
4041
await asyncio.sleep(1) # Run every second
41-
# logger.warning("Heartbeat loop was running ...")
42+
logger.debug(f"Heart beat loop executed at {datetime.now()}")
4243
await view.heart_beat()
4344
except asyncio.CancelledError:
4445
logger.warning("Heartbeat loop was cancelled.")
4546
break # Exit cleanly on cancellation
4647
except Exception as exc:
4748
logger.error(f"Heartbeat encountered an error: {exc}")
49+
raise exc

0 commit comments

Comments
 (0)