@@ -140,6 +140,10 @@ class ATSimulator:
140140 for new changes to the AT
141141 lattice and recalculate the
142142 physics data upon a change.
143+ _new_data_lock (asyncio.Lock): A lock which can be taken
144+ to stop new caput callbacks
145+ being added to the queue while
146+ held.
143147 """
144148
145149 _loop : asyncio .BaseEventLoop
@@ -148,6 +152,7 @@ class ATSimulator:
148152 _quit_thread : asyncio .Event
149153 _up_to_date : asyncio .Event
150154 _calculation_task : asyncio .Task
155+ _new_data_lock : asyncio .Lock
151156
152157 @classmethod
153158 async def create (
@@ -207,6 +212,7 @@ async def create(
207212 self ._quit_thread = asyncio .Event ()
208213 self ._up_to_date = asyncio .Event ()
209214 self ._up_to_date .set ()
215+ self ._new_data_lock = asyncio .Lock ()
210216
211217 self ._calculation_task = asyncio .create_task (
212218 self ._recalculate_phys_data (callback )
@@ -221,11 +227,13 @@ async def queue_set(self, func, field, value):
221227 field (str): The field to be changed.
222228 value (float): The value to be set.
223229 """
224- await self ._queue .put ((func , field , value ))
225- # If this flag gets cleared while we are recalculating, then it can cause
226- # everything to lock, so we setup a lock between this function and the
227- # recalculate function
228- logging .debug (f"Added task to async queue. qsize={ self ._queue .qsize ()} " )
230+ async with self ._new_data_lock :
231+ await self ._queue .put ((func , field , value ))
232+ # If this flag gets cleared while we are recalculating, then it can cause
233+ # everything to lock, so we setup a lock between this function and the
234+ # recalculate function
235+ self ._up_to_date .clear ()
236+ logging .debug (f"Added task to async queue. qsize={ self ._queue .qsize ()} " )
229237
230238 async def _gather_one_sample (self ):
231239 """If the queue is empty Wait() yields until an item is added. When the
@@ -306,14 +314,22 @@ async def _recalculate_phys_data(self, callback):
306314 # callback checks the flag.
307315 self ._up_to_date .set ()
308316 logging .debug ("Simulation up to date." )
309- if callback is not None :
310- logging .debug (f"Executing callback function: { callback .__name__ } " )
311- await callback ()
312- logging .debug ("Callback completed." )
317+ async with self ._new_data_lock :
318+ if callback is not None :
319+ logging .debug (
320+ f"Executing callback function: { callback .__name__ } "
321+ )
322+ # For Virtac this function calls update_pvs() which gets data
323+ # from the pytac datasource to update the softioc pvs with. The
324+ # data source is sim_data_sources.py and its get_value()
325+ # function waits on the wait_for_calculation() function which waits for the
326+ # up_to_date flag which currently will always be set, so this
327+ # process is pointless.
328+ await callback ()
329+ logging .debug ("Callback completed." )
313330 # After this point we assume new setpoints have made the data stale. We
314331 # cant clear this flag in queue_set() as the callbacks can depend on
315332 # this being set.
316- self ._up_to_date .clear ()
317333
318334 def toggle_calculations (self ):
319335 """Pause or unpause the physics calculations by setting or clearing the
0 commit comments