Skip to content

Commit 0ab98fd

Browse files
committed
Threading overhaul
1 parent 9c90f99 commit 0ab98fd

File tree

1 file changed

+45
-2
lines changed

1 file changed

+45
-2
lines changed

src/atip/simulator.py

Lines changed: 45 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
import asyncio
44
import concurrent
55
import logging
6+
import time
67
from dataclasses import dataclass
78
from warnings import warn
89

@@ -263,6 +264,35 @@ async def cancel_calculation_task(self, timeout=10):
263264
for task in tasks:
264265
await task.cancel()
265266

267+
async def _queue_wrangler(self):
268+
# The time we wait before giving up waiting for new items in queue
269+
interval = 0.05
270+
# The min time we spend getting data from queue. If this is small, then we will recalculate before all items from the queue have been emptied
271+
min_time = 0
272+
init = time.time()
273+
start = 0
274+
end = 0
275+
276+
queue_warning = True
277+
# while item in queue, there was recently an item in queue, or we havnt had
278+
# our minimum waiting time, keep waiting for queue
279+
while (
280+
not self._queue.empty()
281+
or end - start < interval
282+
or not (end - init > min_time)
283+
):
284+
start = time.time()
285+
try:
286+
await asyncio.wait_for(self._gather_one_sample(), timeout=interval)
287+
end = time.time()
288+
except TimeoutError:
289+
end = time.time()
290+
if end - init > min_time:
291+
logging.info(f"No new changes seen in {interval} seconds")
292+
queue_warning = False
293+
break
294+
return queue_warning
295+
266296
async def _recalculate_phys_data(self, callback):
267297
"""Run as a never ending asyncio task. Recalculates the physics
268298
data dependent on the status of the '_paused' flag and the length of
@@ -288,8 +318,21 @@ async def _recalculate_phys_data(self, callback):
288318
logging.debug("Starting recalculation loop")
289319
while not self._quit_thread.is_set():
290320
await self._gather_one_sample()
291-
while not self._queue.empty():
292-
await self._gather_one_sample()
321+
322+
# The max time we spend getting data from queue
323+
max_time = 1
324+
try:
325+
logging.info(f"Starting queue wrangling with max time = {max_time}")
326+
queue_warning = await asyncio.wait_for(
327+
self._queue_wrangler(), timeout=max_time
328+
)
329+
except TimeoutError:
330+
logging.info("Too long since last recalculation, forcing update!")
331+
queue_warning = True
332+
333+
if queue_warning:
334+
print("Recalculation occurred with changes still in queue!")
335+
293336
logging.debug("Recaulculating simulation with new setpoints.")
294337
if not self._paused.is_set():
295338
async with self._new_data_lock:

0 commit comments

Comments
 (0)