Skip to content

Commit aceca04

Browse files
Add information about the analysers into the rsyncer api calls (#571)
This adds the length and status of any corresponding analyser queue to the information returned about an rsyncer. I've ended up breaking the one-to-one relation between endpoints in server/api/instrument.py and instrument_server/api.py, but this seemed the easiest way to combine the rsyncer and analyser information.
1 parent 1f9d448 commit aceca04

File tree

2 files changed

+59
-13
lines changed

2 files changed

+59
-13
lines changed

src/murfey/instrument_server/api.py

Lines changed: 19 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -239,7 +239,7 @@ def restart_rsyncer(session_id: MurfeySessionID, rsyncer_source: RsyncerSource):
239239
return {"success": True}
240240

241241

242-
class RSyncerInfo(BaseModel):
242+
class ObserverInfo(BaseModel):
243243
source: str
244244
num_files_transferred: int
245245
num_files_in_queue: int
@@ -248,11 +248,11 @@ class RSyncerInfo(BaseModel):
248248

249249

250250
@router.get("/sessions/{session_id}/rsyncer_info")
251-
def get_rsyncer_info(session_id: MurfeySessionID) -> list[RSyncerInfo]:
251+
def get_rsyncer_info(session_id: MurfeySessionID) -> list[ObserverInfo]:
252252
info = []
253253
for k, v in controllers[session_id].rsync_processes.items():
254254
info.append(
255-
RSyncerInfo(
255+
ObserverInfo(
256256
source=str(k),
257257
num_files_transferred=v._files_transferred,
258258
num_files_in_queue=v.queue.qsize(),
@@ -263,6 +263,22 @@ def get_rsyncer_info(session_id: MurfeySessionID) -> list[RSyncerInfo]:
263263
return info
264264

265265

266+
@router.get("/sessions/{session_id}/analyser_info")
267+
def get_analyser_info(session_id: MurfeySessionID) -> list[ObserverInfo]:
268+
info = []
269+
for k, v in controllers[session_id].analysers.items():
270+
info.append(
271+
ObserverInfo(
272+
source=str(k),
273+
num_files_transferred=0,
274+
num_files_in_queue=v.queue.qsize(),
275+
alive=v.thread.is_alive(),
276+
stopping=v._stopping,
277+
)
278+
)
279+
return info
280+
281+
266282
class ProcessingParameters(BaseModel):
267283
gain_ref: str
268284
dose_per_frame: Optional[float] = None

src/murfey/server/api/instrument.py

Lines changed: 40 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -455,8 +455,11 @@ class RSyncerInfo(BaseModel):
455455
source: str
456456
num_files_transferred: int
457457
num_files_in_queue: int
458+
num_files_to_analyse: int
458459
alive: bool
459460
stopping: bool
461+
analyser_alive: bool
462+
analyser_stopping: bool
460463
destination: str
461464
tag: str
462465
files_transferred: int
@@ -469,7 +472,8 @@ class RSyncerInfo(BaseModel):
469472
async def get_rsyncer_info(
470473
instrument_name: str, session_id: MurfeySessionID, db=murfey_db
471474
) -> List[RSyncerInfo]:
472-
data = []
475+
rsyncer_list = []
476+
analyser_list = []
473477
machine_config = get_machine_config(instrument_name=instrument_name)[
474478
instrument_name
475479
]
@@ -486,27 +490,53 @@ async def get_rsyncer_info(
486490
headers={"Authorization": f"Bearer {token}"},
487491
) as resp:
488492
if resp.status == 200:
489-
data = await resp.json()
493+
rsyncer_list = await resp.json()
490494
else:
491-
data = []
495+
rsyncer_list = []
492496
except KeyError:
493-
data = []
497+
rsyncer_list = []
494498
except Exception:
495499
log.warning(
496500
"Exception encountered gathering rsyncer info from the instrument server",
497501
exc_info=True,
498502
)
503+
504+
try:
505+
async with lock:
506+
token = instrument_server_tokens[session_id]["access_token"]
507+
async with aiohttp.ClientSession() as clientsession:
508+
async with clientsession.get(
509+
f"{machine_config.instrument_server_url}/sessions/{session_id}/analyser_info",
510+
headers={"Authorization": f"Bearer {token}"},
511+
) as resp:
512+
if resp.status == 200:
513+
analyser_list = await resp.json()
514+
else:
515+
analyser_list = []
516+
except KeyError:
517+
analyser_list = []
518+
except Exception:
519+
log.warning(
520+
"Exception encountered gathering analyser info from the instrument server",
521+
exc_info=True,
522+
)
523+
499524
combined_data = []
500-
data_source_lookup = {d["source"]: d for d in data}
525+
rsyncer_source_lookup = {d["source"]: d for d in rsyncer_list}
526+
analyser_source_lookup = {d["source"]: d for d in analyser_list}
501527
for ri in rsync_instances:
502-
d = data_source_lookup.get(ri.source, {})
528+
rsync_data = rsyncer_source_lookup.get(ri.source, {})
529+
analyser_data = analyser_source_lookup.get(ri.source, {})
503530
combined_data.append(
504531
RSyncerInfo(
505532
source=ri.source,
506-
num_files_transferred=d.get("num_files_transferred", 0),
507-
num_files_in_queue=d.get("num_files_in_queue", 0),
508-
alive=d.get("alive", False),
509-
stopping=d.get("stopping", True),
533+
num_files_transferred=rsync_data.get("num_files_transferred", 0),
534+
num_files_in_queue=rsync_data.get("num_files_in_queue", 0),
535+
num_files_to_analyse=analyser_data.get("num_files_in_queue", 0),
536+
alive=rsync_data.get("alive", False),
537+
stopping=rsync_data.get("stopping", True),
538+
analyser_alive=analyser_data.get("alive", False),
539+
analyser_stopping=analyser_data.get("stopping", True),
510540
destination=ri.destination,
511541
tag=ri.tag,
512542
files_transferred=ri.files_transferred,

0 commit comments

Comments
 (0)