diff --git a/pyproject.toml b/pyproject.toml index 31c733ec..852403e9 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -109,6 +109,7 @@ GitHub = "https://github.com/DiamondLightSource/python-murfey" "clem.register_preprocessing_result" = "murfey.workflows.clem.register_preprocessing_results:run" "data_collection" = "murfey.workflows.register_data_collection:run" "data_collection_group" = "murfey.workflows.register_data_collection_group:run" +"experiment_type_update" = "murfey.workflows.register_experiment_type_update:run" "pato" = "murfey.workflows.notifications:notification_setup" "picked_particles" = "murfey.workflows.spa.picking:particles_picked" "picked_tomogram" = "murfey.workflows.tomo.picking:picked_tomogram" diff --git a/src/murfey/client/context.py b/src/murfey/client/context.py index 15cc61f0..57cc0bb3 100644 --- a/src/murfey/client/context.py +++ b/src/murfey/client/context.py @@ -3,16 +3,156 @@ import logging from importlib.metadata import entry_points from pathlib import Path -from typing import Any, Dict, List, NamedTuple +from typing import Any, List, NamedTuple -from murfey.client.instance_environment import MurfeyInstanceEnvironment +import xmltodict + +from murfey.client.instance_environment import MurfeyInstanceEnvironment, SampleInfo +from murfey.util.client import capture_post, get_machine_config_client logger = logging.getLogger("murfey.client.context") -class FutureRequest(NamedTuple): - url: str - message: Dict[str, Any] +def _atlas_destination( + environment: MurfeyInstanceEnvironment, source: Path, token: str +) -> Path: + machine_config = get_machine_config_client( + str(environment.url.geturl()), + token, + instrument_name=environment.instrument_name, + demo=environment.demo, + ) + for i, destination_part in enumerate( + Path(environment.default_destinations[source]).parts + ): + if destination_part == environment.visit: + return Path(machine_config.get("rsync_basepath", "")) / "/".join( + Path(environment.default_destinations[source]).parent.parts[: i + 1] + ) + return ( + Path(machine_config.get("rsync_basepath", "")) + / Path(environment.default_destinations[source]).parent + / environment.visit + ) + + +def ensure_dcg_exists( + collection_type: str, + metadata_source: Path, + environment: MurfeyInstanceEnvironment, + token: str, +) -> str | None: + """Create a data collection group""" + if collection_type == "tomo": + experiment_type_id = 36 + session_file = metadata_source / "Session.dm" + elif collection_type == "spa": + experiment_type_id = 37 + session_file = metadata_source / "EpuSession.dm" + for h in entry_points(group="murfey.hooks"): + try: + if h.name == "get_epu_session_metadata": + h.load()(session_file, environment=environment) + except Exception as e: + logger.warning(f"Get EPU session hook failed: {e}") + else: + logger.error(f"Unknown collection type {collection_type}") + return None + + if not session_file.is_file(): + logger.warning(f"Cannot find session file {str(session_file)}") + dcg_tag = ( + str(metadata_source).replace(f"/{environment.visit}", "").replace("//", "/") + ) + dcg_data = { + "experiment_type_id": experiment_type_id, + "tag": dcg_tag, + } + else: + with open(session_file, "r") as session_xml: + session_data = xmltodict.parse(session_xml.read()) + + if collection_type == "tomo": + windows_path = session_data["TomographySession"]["AtlasId"] + else: + windows_path = session_data["EpuSessionXml"]["Samples"]["_items"][ + "SampleXml" + ][0]["AtlasId"]["#text"] + + logger.info(f"Windows path to atlas metadata found: {windows_path}") + if not windows_path: + logger.warning("No atlas metadata path found") + return None + visit_index = windows_path.split("\\").index(environment.visit) + partial_path = "/".join(windows_path.split("\\")[visit_index + 1 :]) + logger.info("Partial Linux path successfully constructed from Windows path") + + source_visit_dir = metadata_source.parent + logger.info( + f"Looking for atlas XML file in metadata directory {str((source_visit_dir / partial_path).parent)}" + ) + atlas_xml_path = list( + (source_visit_dir / partial_path).parent.glob("Atlas_*.xml") + )[0] + logger.info(f"Atlas XML path {str(atlas_xml_path)} found") + with open(atlas_xml_path, "rb") as atlas_xml: + atlas_xml_data = xmltodict.parse(atlas_xml) + atlas_original_pixel_size = float( + atlas_xml_data["MicroscopeImage"]["SpatialScale"]["pixelSize"]["x"][ + "numericValue" + ] + ) + # need to calculate the pixel size of the downscaled image + atlas_pixel_size = atlas_original_pixel_size * 7.8 + logger.info(f"Atlas image pixel size determined to be {atlas_pixel_size}") + + for p in partial_path.split("/"): + if p.startswith("Sample"): + sample = int(p.replace("Sample", "")) + break + else: + logger.warning(f"Sample could not be identified for {metadata_source}") + return None + environment.samples[metadata_source] = SampleInfo( + atlas=Path(partial_path), sample=sample + ) + + dcg_search_dir = ( + str(metadata_source).replace(f"/{environment.visit}", "").replace("//", "/") + ) + if collection_type == "tomo": + dcg_tag = dcg_search_dir + else: + dcg_images_dirs = sorted( + Path(dcg_search_dir).glob("Images-Disc*"), + key=lambda x: x.stat().st_ctime, + ) + if not dcg_images_dirs: + logger.warning(f"Cannot find Images-Disc* in {dcg_search_dir}") + return None + dcg_tag = str(dcg_images_dirs[-1]) + + dcg_data = { + "experiment_type_id": experiment_type_id, + "tag": dcg_tag, + "atlas": str( + _atlas_destination(environment, metadata_source, token) + / environment.samples[metadata_source].atlas.parent + / atlas_xml_path.with_suffix(".jpg").name + ).replace("//", "/"), + "sample": environment.samples[metadata_source].sample, + "atlas_pixel_size": atlas_pixel_size, + } + capture_post( + base_url=str(environment.url.geturl()), + router_name="workflow.router", + function_name="register_dc_group", + token=token, + visit_name=environment.visit, + session_id=environment.murfey_session, + data=dcg_data, + ) + return dcg_tag class ProcessingParameter(NamedTuple): diff --git a/src/murfey/client/contexts/atlas.py b/src/murfey/client/contexts/atlas.py index 85460d8d..d44d3c58 100644 --- a/src/murfey/client/contexts/atlas.py +++ b/src/murfey/client/contexts/atlas.py @@ -2,9 +2,10 @@ from pathlib import Path from typing import Optional -from murfey.client.context import Context +import xmltodict + +from murfey.client.context import Context, _atlas_destination from murfey.client.contexts.spa import _get_source -from murfey.client.contexts.spa_metadata import _atlas_destination from murfey.client.instance_environment import MurfeyInstanceEnvironment from murfey.util.client import capture_post @@ -36,7 +37,7 @@ def post_transfer( source = _get_source(transferred_file, environment) if source: transferred_atlas_name = _atlas_destination( - environment, source, transferred_file, self._token + environment, source, self._token ) / transferred_file.relative_to(source.parent) capture_post( base_url=str(environment.url.geturl()), @@ -44,8 +45,60 @@ def post_transfer( function_name="make_atlas_jpg", token=self._token, session_id=environment.murfey_session, - data={"path": str(transferred_atlas_name)}, + data={"path": str(transferred_atlas_name).replace("//", "/")}, ) logger.info( f"Submitted request to create JPG image of atlas {str(transferred_atlas_name)!r}" ) + elif ( + environment + and "Atlas_" in transferred_file.stem + and transferred_file.suffix == ".xml" + ): + source = _get_source(transferred_file, environment) + if source: + atlas_mrc = transferred_file.with_suffix(".mrc") + transferred_atlas_jpg = _atlas_destination( + environment, source, self._token + ) / atlas_mrc.relative_to(source.parent).with_suffix(".jpg") + + with open(transferred_file, "rb") as atlas_xml: + atlas_xml_data = xmltodict.parse(atlas_xml) + atlas_original_pixel_size = float( + atlas_xml_data["MicroscopeImage"]["SpatialScale"]["pixelSize"][ + "x" + ]["numericValue"] + ) + + # need to calculate the pixel size of the downscaled image + atlas_pixel_size = atlas_original_pixel_size * 7.8 + + for p in transferred_file.parts: + if p.startswith("Sample"): + sample = int(p.replace("Sample", "")) + break + else: + logger.warning( + f"Sample could not be identified for {transferred_file}" + ) + return + + dcg_data = { + "experiment_type_id": 44, # Atlas + "tag": str(transferred_file.parent), + "atlas": str(transferred_atlas_jpg).replace("//", "/"), + "sample": sample, + "atlas_pixel_size": atlas_pixel_size, + } + capture_post( + base_url=str(environment.url.geturl()), + router_name="workflow.router", + function_name="register_dc_group", + token=self._token, + visit_name=environment.visit, + session_id=environment.murfey_session, + data=dcg_data, + ) + logger.info( + f"Registered data collection group for atlas {str(transferred_atlas_jpg)!r}" + ) diff --git a/src/murfey/client/contexts/spa_metadata.py b/src/murfey/client/contexts/spa_metadata.py index f8c2e54f..52785ad9 100644 --- a/src/murfey/client/contexts/spa_metadata.py +++ b/src/murfey/client/contexts/spa_metadata.py @@ -4,10 +4,10 @@ import xmltodict -from murfey.client.context import Context +from murfey.client.context import Context, ensure_dcg_exists from murfey.client.contexts.spa import _file_transferred_to, _get_source -from murfey.client.instance_environment import MurfeyInstanceEnvironment, SampleInfo -from murfey.util.client import capture_post, get_machine_config_client +from murfey.client.instance_environment import MurfeyInstanceEnvironment +from murfey.util.client import capture_post from murfey.util.spa_metadata import ( FoilHoleInfo, get_grid_square_atlas_positions, @@ -69,29 +69,6 @@ def _foil_hole_positions(xml_path: Path, grid_square: int) -> Dict[str, FoilHole return foil_holes -def _atlas_destination( - environment: MurfeyInstanceEnvironment, source: Path, file_path: Path, token: str -) -> Path: - machine_config = get_machine_config_client( - str(environment.url.geturl()), - token, - instrument_name=environment.instrument_name, - demo=environment.demo, - ) - for i, destination_part in enumerate( - Path(environment.default_destinations[source]).parts - ): - if destination_part == environment.visit: - return Path(machine_config.get("rsync_basepath", "")) / "/".join( - Path(environment.default_destinations[source]).parent.parts[: i + 1] - ) - return ( - Path(machine_config.get("rsync_basepath", "")) - / Path(environment.default_destinations[source]).parent - / environment.visit - ) - - class SPAMetadataContext(Context): def __init__(self, acquisition_software: str, basepath: Path, token: str): super().__init__("SPA_metadata", acquisition_software, token) @@ -124,82 +101,19 @@ def post_transfer( source = _get_source(transferred_file, environment) if not source: logger.warning( - f"Source could not be indentified for {str(transferred_file)}" + f"Source could not be identified for {str(transferred_file)}" ) return - source_visit_dir = source.parent - - logger.info( - f"Looking for atlas XML file in metadata directory {str((source_visit_dir / partial_path).parent)}" - ) - atlas_xml_path = list( - (source_visit_dir / partial_path).parent.glob("Atlas_*.xml") - )[0] - logger.info(f"Atlas XML path {str(atlas_xml_path)} found") - with open(atlas_xml_path, "rb") as atlas_xml: - atlas_xml_data = xmltodict.parse(atlas_xml) - atlas_original_pixel_size = float( - atlas_xml_data["MicroscopeImage"]["SpatialScale"]["pixelSize"]["x"][ - "numericValue" - ] - ) - - # need to calculate the pixel size of the downscaled image - atlas_pixel_size = atlas_original_pixel_size * 7.8 - logger.info(f"Atlas image pixel size determined to be {atlas_pixel_size}") - - for p in partial_path.split("/"): - if p.startswith("Sample"): - sample = int(p.replace("Sample", "")) - break - else: - logger.warning(f"Sample could not be identified for {transferred_file}") - return if source: - environment.samples[source] = SampleInfo( - atlas=Path(partial_path), sample=sample - ) - dcg_search_dir = "/".join( - p for p in transferred_file.parent.parts if p != environment.visit - ) - dcg_search_dir = ( - dcg_search_dir[1:] - if dcg_search_dir.startswith("//") - else dcg_search_dir - ) - dcg_images_dirs = sorted( - Path(dcg_search_dir).glob("Images-Disc*"), - key=lambda x: x.stat().st_ctime, - ) - if not dcg_images_dirs: - logger.warning(f"Cannot find Images-Disc* in {dcg_search_dir}") - return - dcg_tag = str(dcg_images_dirs[-1]) - dcg_data = { - "experiment_type_id": 37, # Single particle - "tag": dcg_tag, - "atlas": str( - _atlas_destination( - environment, source, transferred_file, self._token - ) - / environment.samples[source].atlas.parent - / atlas_xml_path.with_suffix(".jpg").name - ), - "sample": environment.samples[source].sample, - "atlas_pixel_size": atlas_pixel_size, - } - capture_post( - base_url=str(environment.url.geturl()), - router_name="workflow.router", - function_name="register_dc_group", + dcg_tag = ensure_dcg_exists( + collection_type="spa", + metadata_source=source, + environment=environment, token=self._token, - visit_name=environment.visit, - session_id=environment.murfey_session, - data=dcg_data, ) gs_pix_positions = get_grid_square_atlas_positions( - source_visit_dir / partial_path + source.parent / partial_path ) for gs, pos_data in gs_pix_positions.items(): if pos_data: @@ -228,36 +142,14 @@ def post_transfer( and environment ): # Make sure we have a data collection group before trying to register grid square - dcg_search_dir = "/".join( - p - for p in transferred_file.parent.parent.parts - if p != environment.visit - ) - dcg_search_dir = ( - dcg_search_dir[1:] - if dcg_search_dir.startswith("//") - else dcg_search_dir - ) - dcg_images_dirs = sorted( - Path(dcg_search_dir).glob("Images-Disc*"), - key=lambda x: x.stat().st_ctime, - ) - if not dcg_images_dirs: - logger.warning(f"Cannot find Images-Disc* in {dcg_search_dir}") - return - dcg_tag = str(dcg_images_dirs[-1]) - dcg_data = { - "experiment_type_id": 37, # Single particle - "tag": dcg_tag, - } - capture_post( - base_url=str(environment.url.geturl()), - router_name="workflow.router", - function_name="register_dc_group", + source = _get_source(transferred_file, environment=environment) + if source is None: + return None + ensure_dcg_exists( + collection_type="spa", + metadata_source=source, + environment=environment, token=self._token, - visit_name=environment.visit, - session_id=environment.murfey_session, - data=dcg_data, ) gs_name = int(transferred_file.stem.split("_")[1]) @@ -265,9 +157,6 @@ def post_transfer( f"Collecting foil hole positions for {str(transferred_file)} and grid square {gs_name}" ) fh_positions = _foil_hole_positions(transferred_file, gs_name) - source = _get_source(transferred_file, environment=environment) - if source is None: - return None visitless_source_search_dir = str(source).replace( f"/{environment.visit}", "" ) diff --git a/src/murfey/client/contexts/tomo.py b/src/murfey/client/contexts/tomo.py index 6c4eff7e..5df9bb1e 100644 --- a/src/murfey/client/contexts/tomo.py +++ b/src/murfey/client/contexts/tomo.py @@ -8,7 +8,7 @@ import xmltodict import murfey.util.eer -from murfey.client.context import Context, ProcessingParameter +from murfey.client.context import Context, ProcessingParameter, ensure_dcg_exists from murfey.client.instance_environment import ( MovieID, MovieTracker, @@ -101,20 +101,14 @@ def register_tomography_data_collections( ) return try: - dcg_data = { - "experiment_type_id": 36, # Tomo - "tag": str(self._basepath), - "atlas": "", - "sample": None, - } - capture_post( - base_url=str(environment.url.geturl()), - router_name="workflow.router", - function_name="register_dc_group", + metadata_source = ( + self._basepath.parent / environment.visit / self._basepath.name + ) + ensure_dcg_exists( + collection_type="tomo", + metadata_source=metadata_source, + environment=environment, token=self._token, - visit_name=environment.visit, - session_id=environment.murfey_session, - data=dcg_data, ) for tilt_series in self._tilt_series.keys(): diff --git a/src/murfey/client/contexts/tomo_metadata.py b/src/murfey/client/contexts/tomo_metadata.py index 45d22a22..7d85a6a5 100644 --- a/src/murfey/client/contexts/tomo_metadata.py +++ b/src/murfey/client/contexts/tomo_metadata.py @@ -4,39 +4,14 @@ import xmltodict -from murfey.client.context import Context +from murfey.client.context import Context, ensure_dcg_exists from murfey.client.contexts.spa import _file_transferred_to, _get_source -from murfey.client.contexts.spa_metadata import _atlas_destination -from murfey.client.instance_environment import MurfeyInstanceEnvironment, SampleInfo +from murfey.client.instance_environment import MurfeyInstanceEnvironment from murfey.util.client import capture_post logger = logging.getLogger("murfey.client.contexts.tomo_metadata") -def ensure_dcg_exists( - transferred_file: Path, environment: MurfeyInstanceEnvironment, token: str -): - # Make sure we have a data collection group - source = _get_source(transferred_file, environment=environment) - if not source: - return None - dcg_tag = str(source).replace(f"/{environment.visit}", "") - dcg_data = { - "experiment_type_id": 36, # Tomo - "tag": dcg_tag, - } - capture_post( - base_url=str(environment.url.geturl()), - router_name="workflow.router", - function_name="register_dc_group", - token=token, - visit_name=environment.visit, - session_id=environment.murfey_session, - data=dcg_data, - ) - return dcg_tag - - class TomographyMetadataContext(Context): def __init__(self, acquisition_software: str, basepath: Path, token: str): super().__init__("Tomography_metadata", acquisition_software, token) @@ -54,83 +29,33 @@ def post_transfer( **kwargs, ) - if transferred_file.name == "Session.dm" and environment: - logger.info("Tomography session metadata found") - with open(transferred_file, "r") as session_xml: - session_data = xmltodict.parse(session_xml.read()) - - windows_path = session_data["TomographySession"]["AtlasId"] - logger.info(f"Windows path to atlas metadata found: {windows_path}") - if not windows_path: - logger.warning("No atlas metadata path found") - return - visit_index = windows_path.split("\\").index(environment.visit) - partial_path = "/".join(windows_path.split("\\")[visit_index + 1 :]) - logger.info("Partial Linux path successfully constructed from Windows path") - - source = _get_source(transferred_file, environment) - if not source: - logger.warning( - f"Source could not be identified for {str(transferred_file)}" - ) - return - - source_visit_dir = source.parent + if environment is None: + logger.warning("No environment set") + return - logger.info( - f"Looking for atlas XML file in metadata directory {str((source_visit_dir / partial_path).parent)}" - ) - atlas_xml_path = list( - (source_visit_dir / partial_path).parent.glob("Atlas_*.xml") - )[0] - logger.info(f"Atlas XML path {str(atlas_xml_path)} found") - with open(atlas_xml_path, "rb") as atlas_xml: - atlas_xml_data = xmltodict.parse(atlas_xml) - atlas_pixel_size = float( - atlas_xml_data["MicroscopeImage"]["SpatialScale"]["pixelSize"]["x"][ - "numericValue" - ] - ) + metadata_source = _get_source(transferred_file, environment=environment) + if not metadata_source: + logger.warning(f"No source found for {str(transferred_file)}") + return - for p in partial_path.split("/"): - if p.startswith("Sample"): - sample = int(p.replace("Sample", "")) - break - else: - logger.warning(f"Sample could not be identified for {transferred_file}") - return - environment.samples[source] = SampleInfo( - atlas=Path(partial_path), sample=sample - ) - dcg_tag = "/".join( - p for p in transferred_file.parent.parts if p != environment.visit - ).replace("//", "/") - dcg_data = { - "experiment_type_id": 36, # Tomo - "tag": dcg_tag, - "atlas": str( - _atlas_destination( - environment, source, transferred_file, self._token - ) - / environment.samples[source].atlas.parent - / atlas_xml_path.with_suffix(".jpg").name - ), - "sample": environment.samples[source].sample, - "atlas_pixel_size": atlas_pixel_size, - } - capture_post( - base_url=str(environment.url.geturl()), - router_name="workflow.router", - function_name="register_dc_group", + if transferred_file.name == "Session.dm": + logger.info("Tomography session metadata found") + ensure_dcg_exists( + collection_type="tomo", + metadata_source=metadata_source, + environment=environment, token=self._token, - visit_name=environment.visit, - session_id=environment.murfey_session, - data=dcg_data, ) - elif transferred_file.name == "SearchMap.xml" and environment: + elif transferred_file.name == "SearchMap.xml": logger.info("Tomography session search map xml found") - dcg_tag = ensure_dcg_exists(transferred_file, environment, self._token) + + dcg_tag = ensure_dcg_exists( + collection_type="tomo", + metadata_source=metadata_source, + environment=environment, + token=self._token, + ) with open(transferred_file, "r") as sm_xml: sm_data = xmltodict.parse(sm_xml.read()) @@ -230,9 +155,14 @@ def post_transfer( }, ) - elif transferred_file.name == "SearchMap.dm" and environment: + elif transferred_file.name == "SearchMap.dm": logger.info("Tomography session search map dm found") - dcg_tag = ensure_dcg_exists(transferred_file, environment, self._token) + dcg_tag = ensure_dcg_exists( + collection_type="tomo", + metadata_source=metadata_source, + environment=environment, + token=self._token, + ) with open(transferred_file, "r") as sm_xml: sm_data = xmltodict.parse(sm_xml.read()) @@ -276,9 +206,14 @@ def post_transfer( }, ) - elif transferred_file.name == "BatchPositionsList.xml" and environment: + elif transferred_file.name == "BatchPositionsList.xml": logger.info("Tomography session batch positions list found") - dcg_tag = ensure_dcg_exists(transferred_file, environment, self._token) + dcg_tag = ensure_dcg_exists( + collection_type="tomo", + metadata_source=metadata_source, + environment=environment, + token=self._token, + ) with open(transferred_file) as xml: for_parsing = xml.read() batch_xml = xmltodict.parse(for_parsing) diff --git a/src/murfey/client/multigrid_control.py b/src/murfey/client/multigrid_control.py index 78b61614..b6e9e6c3 100644 --- a/src/murfey/client/multigrid_control.py +++ b/src/murfey/client/multigrid_control.py @@ -12,6 +12,7 @@ import murfey.client.websocket from murfey.client.analyser import Analyser +from murfey.client.context import ensure_dcg_exists from murfey.client.contexts.spa import SPAModularContext from murfey.client.contexts.tomo import TomographyContext from murfey.client.destinations import determine_default_destination @@ -610,28 +611,20 @@ def _start_dc(self, metadata_json, from_form: bool = False): log.info("Tomography processing flushed") elif isinstance(context, SPAModularContext): - dcg_data = { - "experiment_type_id": 37, # Single particle - "tag": str(source), - "atlas": ( - str(self._environment.samples[source].atlas) - if self._environment.samples.get(source) - else "" - ), - "sample": ( - self._environment.samples[source].sample - if self._environment.samples.get(source) - else None - ), - } - capture_post( - base_url=str(self._environment.url.geturl()), - router_name="workflow.router", - function_name="register_dc_group", + if self._environment.visit in source.parts: + metadata_source = source + else: + metadata_source_as_str = ( + "/".join(source.parts[:-2]) + + f"/{self._environment.visit}/" + + source.parts[-2] + ) + metadata_source = Path(metadata_source_as_str.replace("//", "/")) + ensure_dcg_exists( + collection_type="spa", + metadata_source=metadata_source, + environment=self._environment, token=self.token, - visit_name=self._environment.visit, - session_id=self.session_id, - data=dcg_data, ) if from_form: data = { diff --git a/src/murfey/client/tui/app.py b/src/murfey/client/tui/app.py index 334a4518..3ab2cb61 100644 --- a/src/murfey/client/tui/app.py +++ b/src/murfey/client/tui/app.py @@ -14,6 +14,7 @@ from textual.widgets import Button, Input from murfey.client.analyser import Analyser +from murfey.client.context import ensure_dcg_exists from murfey.client.contexts.spa import SPAModularContext from murfey.client.contexts.tomo import TomographyContext from murfey.client.destinations import determine_default_destination @@ -561,28 +562,20 @@ def _start_dc(self, metadata_json, from_form: bool = False): ) log.info("Tomography processing flushed") elif isinstance(context, SPAModularContext): - dcg_data = { - "experiment_type_id": 37, # Single particle - "tag": str(source), - "atlas": ( - str(self._environment.samples[source].atlas) - if self._environment.samples.get(source) - else "" - ), - "sample": ( - self._environment.samples[source].sample - if self._environment.samples.get(source) - else None - ), - } - capture_post( - base_url=str(self._url.geturl()), - router_name="workflow.router", - function_name="register_dc_group", - token=token, - visit_name=self._visit, - session_id=self._environment.murfey_session, - data=dcg_data, + if self._environment.visit in source.parts: + metadata_source = source + else: + metadata_source_as_str = ( + "/".join(source.parts[:-2]) + + f"/{self._environment.visit}/" + + source.parts[-2] + ) + metadata_source = Path(metadata_source_as_str.replace("//", "/")) + ensure_dcg_exists( + collection_type="spa", + metadata_source=metadata_source, + environment=self._environment, + token=self.token, ) if from_form: data = { diff --git a/src/murfey/server/api/workflow.py b/src/murfey/server/api/workflow.py index fcc793de..cd405662 100644 --- a/src/murfey/server/api/workflow.py +++ b/src/murfey/server/api/workflow.py @@ -106,11 +106,24 @@ def register_dc_group( db.exec(select(Session).where(Session.id == session_id)).one().instrument_name ) logger.info(f"Registering data collection group on microscope {instrument_name}") - if dcg_murfey := db.exec( - select(DataCollectionGroup) - .where(DataCollectionGroup.session_id == session_id) - .where(DataCollectionGroup.tag == dcg_params.tag) - ).all(): + if ( + dcg_murfey := db.exec( + select(DataCollectionGroup) + .where(DataCollectionGroup.session_id == session_id) + .where(DataCollectionGroup.tag == dcg_params.tag) + ).all() + ) or ( + ( + dcg_murfey := db.exec( + select(DataCollectionGroup) + .where(DataCollectionGroup.session_id == session_id) + .where(DataCollectionGroup.sample == dcg_params.sample) + ).all() + ) + and dcg_params.experiment_type_id == 44 + ): + # Either switching atlas for a common (atlas or processing) tag + # Or registering a new atlas-type dcg for a sample that is already present dcg_murfey[0].atlas = dcg_params.atlas or dcg_murfey[0].atlas dcg_murfey[0].sample = dcg_params.sample or dcg_murfey[0].sample dcg_murfey[0].atlas_pixel_size = ( @@ -155,6 +168,24 @@ def register_dc_group( session_id, sm.name, search_map_params, db, close_db=False ) db.close() + elif dcg_murfey := db.exec( + select(DataCollectionGroup) + .where(DataCollectionGroup.session_id == session_id) + .where(DataCollectionGroup.sample == dcg_params.sample) + ).all(): + # Case where we switch from atlas to processing + dcg_murfey[0].tag = dcg_params.tag or dcg_murfey[0].tag + if _transport_object: + _transport_object.send( + _transport_object.feedback_queue, + { + "register": "experiment_type_update", + "experiment_type_id": dcg_params.experiment_type_id, + "dcgid": dcg_murfey[0].id, + }, + ) + db.add(dcg_murfey[0]) + db.commit() else: dcg_parameters = { "start_time": str(datetime.now()), diff --git a/src/murfey/server/ispyb.py b/src/murfey/server/ispyb.py index 0085f81b..47860c89 100644 --- a/src/murfey/server/ispyb.py +++ b/src/murfey/server/ispyb.py @@ -137,6 +137,34 @@ def do_insert_data_collection_group( ) return {"success": False, "return_value": None} + def do_update_data_collection_group( + self, + record: DataCollectionGroup, + message=None, + **kwargs, + ): + try: + with ISPyBSession() as db: + dcg = ( + db.query(DataCollectionGroup) + .filter( + DataCollectionGroup.dataCollectionGroupId + == record.dataCollectionGroupId + ) + .one() + ) + dcg.experimentTypeId = record.experimentTypeId + db.add(dcg) + db.commit() + return {"success": True, "return_value": record.dataCollectionGroupId} + except ispyb.ISPyBException as e: + log.error( + "Updating Data Collection Group entry caused exception '%s'.", + e, + exc_info=True, + ) + return {"success": False, "return_value": None} + def do_insert_atlas(self, record: Atlas): try: with ISPyBSession() as db: diff --git a/src/murfey/workflows/register_experiment_type_update.py b/src/murfey/workflows/register_experiment_type_update.py new file mode 100644 index 00000000..6d70d2e2 --- /dev/null +++ b/src/murfey/workflows/register_experiment_type_update.py @@ -0,0 +1,39 @@ +import logging +import time + +import ispyb.sqlalchemy._auto_db_schema as ISPyBDB +from sqlmodel.orm.session import Session as SQLModelSession + +from murfey.server import _transport_object + +logger = logging.getLogger("murfey.workflows.register_data_collection_group") + + +def run( + message: dict, murfey_db: SQLModelSession, demo: bool = False +) -> dict[str, bool]: + # Fail immediately if no transport wrapper is found + if _transport_object is None: + logger.error("Unable to find transport manager") + return {"success": False, "requeue": False} + + logger.info(f"Updating the experiment type for data collection group: \n{message}") + + record = ISPyBDB.DataCollectionGroup( + dataCollectionGroupId=message["dcgid"], + experimentTypeId=message["experiment_type_id"], + ) + dcgid = _transport_object.do_update_data_collection_group(record).get( + "return_value", None + ) + + if dcgid is None: + time.sleep(2) + logger.error( + "Failed to update the following data collection group: \n" + f"{message} \n" + "Requeuing message" + ) + return {"success": False, "requeue": True} + + return {"success": True} diff --git a/src/murfey/workflows/tomo/tomo_metadata.py b/src/murfey/workflows/tomo/tomo_metadata.py index 2dbc25cb..23b6490f 100644 --- a/src/murfey/workflows/tomo/tomo_metadata.py +++ b/src/murfey/workflows/tomo/tomo_metadata.py @@ -215,16 +215,16 @@ def register_search_map_in_database( # Convert from metres to pixels search_map_params.height_on_atlas = int( - search_map.height * search_map.pixel_size / dcg.atlas_pixel_size + search_map.height * search_map.pixel_size / dcg.atlas_pixel_size * 7.8 ) search_map_params.width_on_atlas = int( - search_map.width * search_map.pixel_size / dcg.atlas_pixel_size + search_map.width * search_map.pixel_size / dcg.atlas_pixel_size * 7.8 ) search_map_params.x_location = float( - corrected_vector[0] / dcg.atlas_pixel_size + 2003 + corrected_vector[0] / dcg.atlas_pixel_size * 7.8 + 2003 ) search_map_params.y_location = float( - corrected_vector[1] / dcg.atlas_pixel_size + 2003 + corrected_vector[1] / dcg.atlas_pixel_size * 7.8 + 2003 ) search_map.x_location = search_map_params.x_location search_map.y_location = search_map_params.y_location diff --git a/tests/client/contexts/test_atlas.py b/tests/client/contexts/test_atlas.py new file mode 100644 index 00000000..b56f0162 --- /dev/null +++ b/tests/client/contexts/test_atlas.py @@ -0,0 +1,92 @@ +from unittest.mock import patch +from urllib.parse import urlparse + +from murfey.client.contexts.atlas import AtlasContext +from murfey.client.instance_environment import MurfeyInstanceEnvironment + + +def test_atlas_context_initialisation(tmp_path): + context = AtlasContext("tomo", tmp_path, "token") + assert context.name == "Atlas" + assert context._acquisition_software == "tomo" + assert context._basepath == tmp_path + assert context._token == "token" + + +@patch("murfey.client.contexts.atlas.capture_post") +def test_atlas_context_mrc(mock_capture_post, tmp_path): + env = MurfeyInstanceEnvironment( + url=urlparse("http://localhost:8000"), + client_id=0, + sources=[tmp_path / "cm12345-6"], + default_destinations={ + tmp_path / "cm12345-6": f"{tmp_path}/destination/cm12345-6" + }, + instrument_name="", + visit="cm12345-6", + murfey_session=1, + ) + context = AtlasContext("tomo", tmp_path, "token") + + atlas_mrc = tmp_path / "cm12345-6/Supervisor_atlas/Sample2/Atlas/Atlas_1.mrc" + atlas_mrc.parent.mkdir(parents=True) + atlas_mrc.touch() + + context.post_transfer( + atlas_mrc, + environment=env, + ) + mock_capture_post.assert_called_once_with( + base_url="http://localhost:8000", + router_name="session_control.spa_router", + function_name="make_atlas_jpg", + token="token", + session_id=1, + data={"path": f"{tmp_path}/destination/{atlas_mrc.relative_to(tmp_path)}"}, + ) + + +@patch("murfey.client.contexts.atlas.capture_post") +def test_atlas_context_xml(mock_capture_post, tmp_path): + env = MurfeyInstanceEnvironment( + url=urlparse("http://localhost:8000"), + client_id=0, + sources=[tmp_path / "cm12345-6"], + default_destinations={ + tmp_path / "cm12345-6": f"{tmp_path}/destination/cm12345-6" + }, + instrument_name="", + visit="cm12345-6", + murfey_session=1, + ) + context = AtlasContext("tomo", tmp_path, "token") + + atlas_pixel_size = 4.6 + atlas_xml = tmp_path / "cm12345-6/Supervisor_atlas/Sample2/Atlas/Atlas_1.xml" + atlas_xml.parent.mkdir(parents=True) + with open(atlas_xml, "w") as new_xml: + new_xml.write( + f"{atlas_pixel_size}" + "" + ) + + context.post_transfer( + atlas_xml, + environment=env, + ) + dcg_data = { + "experiment_type_id": 44, # Atlas + "tag": str(atlas_xml.parent), + "atlas": f"{tmp_path}/destination/{atlas_xml.relative_to(tmp_path).with_suffix('.jpg')}", + "sample": 2, + "atlas_pixel_size": atlas_pixel_size * 7.8, + } + mock_capture_post.assert_called_once_with( + base_url="http://localhost:8000", + router_name="workflow.router", + function_name="register_dc_group", + token="token", + visit_name="cm12345-6", + session_id=1, + data=dcg_data, + ) diff --git a/tests/client/contexts/test_tomo.py b/tests/client/contexts/test_tomo.py new file mode 100644 index 00000000..07713618 --- /dev/null +++ b/tests/client/contexts/test_tomo.py @@ -0,0 +1,267 @@ +from __future__ import annotations + +from pathlib import Path +from unittest.mock import patch +from urllib.parse import urlparse + +from murfey.client.contexts.tomo import TomographyContext +from murfey.client.instance_environment import MurfeyInstanceEnvironment + + +def test_tomography_context_initialisation_for_tomo(tmp_path): + context = TomographyContext("tomo", tmp_path, "") + assert not context._completed_tilt_series + assert context._acquisition_software == "tomo" + + +@patch("requests.get") +@patch("requests.post") +def test_tomography_context_add_tomo_tilt(mock_post, mock_get, tmp_path): + mock_post().status_code = 200 + + env = MurfeyInstanceEnvironment( + url=urlparse("http://localhost:8000"), + client_id=0, + sources=[tmp_path], + default_destinations={tmp_path: str(tmp_path)}, + instrument_name="", + visit="test", + murfey_session=1, + ) + context = TomographyContext("tomo", tmp_path, "") + (tmp_path / "Position_1_001_[30.0]_date_time_fractions.tiff").touch() + context.post_transfer( + tmp_path / "Position_1_001_[30.0]_date_time_fractions.tiff", + required_position_files=[], + required_strings=["fractions"], + environment=env, + ) + assert context._tilt_series == { + "Position_1": [tmp_path / "Position_1_001_[30.0]_date_time_fractions.tiff"] + } + (tmp_path / "Position_1_002_[-30.0]_date_time_fractions.tiff").touch() + context.post_transfer( + tmp_path / "Position_1_002_[-30.0]_date_time_fractions.tiff", + required_position_files=[], + required_strings=["fractions"], + environment=env, + ) + assert not context._completed_tilt_series + + # Add Position_1.mdoc, which completes this position + with open(tmp_path / "Position_1.mdoc", "w") as mdoc: + mdoc.write("[ZValue = 0]\n[ZValue = 1]\n") + context.post_transfer( + tmp_path / "Position_1.mdoc", + required_position_files=[], + required_strings=["fractions"], + environment=env, + ) + assert context._completed_tilt_series == ["Position_1"] + + # Start Position_2, this is not complete + (tmp_path / "Position_2_002_[30.0]_date_time_fractions.tiff").touch() + context.post_transfer( + tmp_path / "Position_2_002_[30.0]_date_time_fractions.tiff", + required_position_files=[], + required_strings=["fractions"], + environment=env, + ) + assert len(context._tilt_series.values()) == 2 + assert context._completed_tilt_series == ["Position_1"] + + +@patch("requests.get") +@patch("requests.post") +def test_tomography_context_add_tomo_tilt_out_of_order(mock_post, mock_get, tmp_path): + mock_post().status_code = 200 + + env = MurfeyInstanceEnvironment( + url=urlparse("http://localhost:8000"), + client_id=0, + sources=[tmp_path], + default_destinations={tmp_path: str(tmp_path)}, + instrument_name="", + visit="test", + murfey_session=1, + ) + context = TomographyContext("tomo", tmp_path, "") + (tmp_path / "Position_1_001_[30.0]_date_time_fractions.tiff").touch() + context.post_transfer( + tmp_path / "Position_1_001_[30.0]_date_time_fractions.tiff", + required_position_files=[], + required_strings=["fractions"], + environment=env, + ) + assert context._tilt_series == { + "Position_1": [tmp_path / "Position_1_001_[30.0]_date_time_fractions.tiff"] + } + (tmp_path / "Position_1_002_[-30.0]_date_time_fractions.tiff").touch() + context.post_transfer( + tmp_path / "Position_1_002_[-30.0]_date_time_fractions.tiff", + required_position_files=[], + required_strings=["fractions"], + environment=env, + ) + assert not context._completed_tilt_series + (tmp_path / "Position_2_002_[-30.0]_date_time_fractions.tiff").touch() + context.post_transfer( + tmp_path / "Position_2_002_[-30.0]_date_time_fractions.tiff", + required_position_files=[], + required_strings=["fractions"], + environment=env, + ) + assert len(context._tilt_series.values()) == 2 + assert not context._completed_tilt_series + (tmp_path / "Position_2_001_[30.0]_date_time_fractions.tiff").touch() + context.post_transfer( + tmp_path / "Position_2_001_[30.0]_date_time_fractions.tiff", + required_position_files=[], + required_strings=["fractions"], + environment=env, + ) + assert len(context._tilt_series.values()) == 2 + assert not context._completed_tilt_series + (tmp_path / "Position_3_001_[30.0]_date_time_fractions.tiff").touch() + (tmp_path / "Position_3_002_[-30.0]_date_time_fractions.tiff").touch() + context.post_transfer( + tmp_path / "Position_3_002_[-30.0]_date_time_fractions.tiff", + required_position_files=[], + required_strings=["fractions"], + environment=env, + ) + assert len(context._tilt_series.values()) == 3 + assert not context._completed_tilt_series + + # Add Position_1.mdoc, which completes this position + with open(tmp_path / "Position_1.mdoc", "w") as mdoc: + mdoc.write("[ZValue = 0]\n[ZValue = 1]\n") + context.post_transfer( + tmp_path / "Position_1.mdoc", + required_position_files=[], + required_strings=["fractions"], + environment=env, + ) + assert context._completed_tilt_series == ["Position_1"] + + # Add Position_2.mdoc, which completes this position + with open(tmp_path / "Position_2.mdoc", "w") as mdoc: + mdoc.write("[ZValue = 0]\n[ZValue = 1]\n") + context.post_transfer( + tmp_path / "Position_2.mdoc", + required_position_files=[], + required_strings=["fractions"], + environment=env, + ) + assert context._completed_tilt_series == ["Position_1", "Position_2"] + + +@patch("requests.get") +@patch("requests.post") +def test_tomography_context_add_tomo_tilt_delayed_tilt(mock_post, mock_get, tmp_path): + mock_post().status_code = 200 + + env = MurfeyInstanceEnvironment( + url=urlparse("http://localhost:8000"), + client_id=0, + sources=[tmp_path], + default_destinations={tmp_path: str(tmp_path)}, + instrument_name="", + visit="test", + murfey_session=1, + ) + context = TomographyContext("tomo", tmp_path, "") + (tmp_path / "Position_1_001_[30.0]_date_time_fractions.tiff").touch() + context.post_transfer( + tmp_path / "Position_1_001_[30.0]_date_time_fractions.tiff", + required_position_files=[], + required_strings=["fractions"], + environment=env, + ) + assert context._tilt_series == { + "Position_1": [tmp_path / "Position_1_001_[30.0]_date_time_fractions.tiff"] + } + (tmp_path / "Position_1_002_[-30.0]_date_time_fractions.tiff").touch() + context.post_transfer( + tmp_path / "Position_1_002_[-30.0]_date_time_fractions.tiff", + required_position_files=[], + required_strings=["fractions"], + environment=env, + ) + assert not context._completed_tilt_series + + # Add Position_1.mdoc, with more tilts than have been seen so far + with open(tmp_path / "Position_1.mdoc", "w") as mdoc: + mdoc.write("[ZValue = 0]\n[ZValue = 1]\n[ZValue = 2]\n") + context.post_transfer( + tmp_path / "Position_1.mdoc", + required_position_files=[], + required_strings=["fractions"], + environment=env, + ) + assert not context._completed_tilt_series + + # Now add the tilt which completes the series + (tmp_path / "Position_1_003_[60.0]_data_time_fractions.tiff").touch() + new_series = context.post_transfer( + tmp_path / "Position_1_003_[60.0]_data_time_fractions.tiff", + required_position_files=[], + required_strings=["fractions"], + environment=env, + ) + assert context._completed_tilt_series == ["Position_1"] + assert new_series == ["Position_1"] + + +def test_tomography_context_initialisation_for_serialem(tmp_path): + context = TomographyContext("serialem", tmp_path, "") + assert not context._completed_tilt_series + assert context._acquisition_software == "serialem" + + +@patch("requests.get") +@patch("requests.post") +def test_setting_tilt_series_size_and_completion_from_mdoc_parsing( + mock_post, mock_get, tmp_path +): + mock_post().status_code = 200 + + env = MurfeyInstanceEnvironment( + url=urlparse("http://localhost:8000"), + client_id=0, + sources=[tmp_path], + default_destinations={tmp_path: str(tmp_path)}, + instrument_name="", + visit="test", + murfey_session=1, + ) + context = TomographyContext("tomo", tmp_path, "") + assert len(context._tilt_series_sizes) == 0 + context.post_transfer( + Path(__file__).parent.parent.parent / "util" / "test_1.mdoc", + environment=env, + required_strings=["fractions"], + ) + assert len(context._tilt_series_sizes) == 1 + assert context._tilt_series_sizes == {"test_1": 11} + (tmp_path / "test_1.mdoc").touch() + tilt = -50 + (tmp_path / f"test_1_001_[{tilt:.1f}]_data_time_fractions.tiff").touch() + context.post_transfer( + tmp_path / f"test_1_001_[{tilt:.1f}]_data_time_fractions.tiff", + environment=env, + required_strings=["fractions"], + ) + assert context._tilt_series == { + "test_1": [tmp_path / f"test_1_001_[{tilt:.1f}]_data_time_fractions.tiff"] + } + for i, t in enumerate(range(-40, 60, 10)): + assert not context._completed_tilt_series + (tmp_path / f"test_1_{i:03}_[{t:.1f}]_data_time_fractions.tiff").touch() + context.post_transfer( + tmp_path / f"test_1_{i:03}_[{t:.1f}]_data_time_fractions.tiff", + environment=env, + required_strings=["fractions"], + ) + assert len(context._tilt_series["test_1"]) == 11 + assert context._completed_tilt_series == ["test_1"] diff --git a/tests/client/test_context.py b/tests/client/test_context.py index 4c88c938..e61a33be 100644 --- a/tests/client/test_context.py +++ b/tests/client/test_context.py @@ -1,267 +1,157 @@ -from __future__ import annotations - -from pathlib import Path from unittest.mock import patch from urllib.parse import urlparse -from murfey.client.contexts.tomo import TomographyContext +from murfey.client.context import ensure_dcg_exists from murfey.client.instance_environment import MurfeyInstanceEnvironment -def test_tomography_context_initialisation_for_tomo(tmp_path): - context = TomographyContext("tomo", tmp_path, "") - assert not context._completed_tilt_series - assert context._acquisition_software == "tomo" - - -@patch("requests.get") -@patch("requests.post") -def test_tomography_context_add_tomo_tilt(mock_post, mock_get, tmp_path): - mock_post().status_code = 200 - +@patch("murfey.client.context.capture_post") +def test_ensure_dcg_exists_tomo(mock_capture_post, tmp_path): env = MurfeyInstanceEnvironment( url=urlparse("http://localhost:8000"), client_id=0, - sources=[tmp_path], - default_destinations={tmp_path: str(tmp_path)}, + sources=[tmp_path / "cm12345-6/metadata_folder"], + default_destinations={ + tmp_path + / "cm12345-6/metadata_folder": f"{tmp_path}/destination/cm12345-6/raw" + }, instrument_name="", - visit="test", + visit="cm12345-6", murfey_session=1, ) - context = TomographyContext("tomo", tmp_path, "") - (tmp_path / "Position_1_001_[30.0]_date_time_fractions.tiff").touch() - context.post_transfer( - tmp_path / "Position_1_001_[30.0]_date_time_fractions.tiff", - required_position_files=[], - required_strings=["fractions"], - environment=env, - ) - assert context._tilt_series == { - "Position_1": [tmp_path / "Position_1_001_[30.0]_date_time_fractions.tiff"] - } - (tmp_path / "Position_1_002_[-30.0]_date_time_fractions.tiff").touch() - context.post_transfer( - tmp_path / "Position_1_002_[-30.0]_date_time_fractions.tiff", - required_position_files=[], - required_strings=["fractions"], - environment=env, - ) - assert not context._completed_tilt_series - # Add Position_1.mdoc, which completes this position - with open(tmp_path / "Position_1.mdoc", "w") as mdoc: - mdoc.write("[ZValue = 0]\n[ZValue = 1]\n") - context.post_transfer( - tmp_path / "Position_1.mdoc", - required_position_files=[], - required_strings=["fractions"], - environment=env, - ) - assert context._completed_tilt_series == ["Position_1"] + metadata_source = tmp_path / "cm12345-6/metadata_folder" + metadata_source.mkdir(parents=True) + with open(metadata_source / "Session.dm", "w") as dm_file: + dm_file.write( + "" + r"X:\cm12345-6\atlas\atlas_metadata\Sample6\Atlas\Atlas.dm" + "" + ) + + atlas_xml = tmp_path / "cm12345-6/atlas/atlas_metadata/Sample6/Atlas/Atlas_4.xml" + atlas_xml.parent.mkdir(parents=True) + with open(atlas_xml, "w") as xml_file: + xml_file.write( + "4.7" + "" + ) - # Start Position_2, this is not complete - (tmp_path / "Position_2_002_[30.0]_date_time_fractions.tiff").touch() - context.post_transfer( - tmp_path / "Position_2_002_[30.0]_date_time_fractions.tiff", - required_position_files=[], - required_strings=["fractions"], + ensure_dcg_exists( + collection_type="tomo", + metadata_source=metadata_source, environment=env, + token="token", ) - assert len(context._tilt_series.values()) == 2 - assert context._completed_tilt_series == ["Position_1"] + dcg_data = { + "experiment_type_id": 36, + "tag": f"{tmp_path}/metadata_folder", + "atlas": f"{tmp_path}/destination/{atlas_xml.relative_to(tmp_path).with_suffix('.jpg')}", + "sample": 6, + "atlas_pixel_size": 4.7 * 7.8, + } + mock_capture_post.assert_called_once_with( + base_url="http://localhost:8000", + router_name="workflow.router", + function_name="register_dc_group", + token="token", + visit_name="cm12345-6", + session_id=1, + data=dcg_data, + ) -@patch("requests.get") -@patch("requests.post") -def test_tomography_context_add_tomo_tilt_out_of_order(mock_post, mock_get, tmp_path): - mock_post().status_code = 200 +@patch("murfey.client.context.capture_post") +def test_ensure_dcg_exists_spa(mock_capture_post, tmp_path): env = MurfeyInstanceEnvironment( url=urlparse("http://localhost:8000"), client_id=0, - sources=[tmp_path], - default_destinations={tmp_path: str(tmp_path)}, + sources=[tmp_path / "cm12345-6/metadata_folder"], + default_destinations={ + tmp_path + / "cm12345-6/metadata_folder": f"{tmp_path}/destination/cm12345-6/raw", + }, instrument_name="", - visit="test", + visit="cm12345-6", murfey_session=1, ) - context = TomographyContext("tomo", tmp_path, "") - (tmp_path / "Position_1_001_[30.0]_date_time_fractions.tiff").touch() - context.post_transfer( - tmp_path / "Position_1_001_[30.0]_date_time_fractions.tiff", - required_position_files=[], - required_strings=["fractions"], - environment=env, - ) - assert context._tilt_series == { - "Position_1": [tmp_path / "Position_1_001_[30.0]_date_time_fractions.tiff"] - } - (tmp_path / "Position_1_002_[-30.0]_date_time_fractions.tiff").touch() - context.post_transfer( - tmp_path / "Position_1_002_[-30.0]_date_time_fractions.tiff", - required_position_files=[], - required_strings=["fractions"], - environment=env, - ) - assert not context._completed_tilt_series - (tmp_path / "Position_2_002_[-30.0]_date_time_fractions.tiff").touch() - context.post_transfer( - tmp_path / "Position_2_002_[-30.0]_date_time_fractions.tiff", - required_position_files=[], - required_strings=["fractions"], - environment=env, - ) - assert len(context._tilt_series.values()) == 2 - assert not context._completed_tilt_series - (tmp_path / "Position_2_001_[30.0]_date_time_fractions.tiff").touch() - context.post_transfer( - tmp_path / "Position_2_001_[30.0]_date_time_fractions.tiff", - required_position_files=[], - required_strings=["fractions"], - environment=env, - ) - assert len(context._tilt_series.values()) == 2 - assert not context._completed_tilt_series - (tmp_path / "Position_3_001_[30.0]_date_time_fractions.tiff").touch() - (tmp_path / "Position_3_002_[-30.0]_date_time_fractions.tiff").touch() - context.post_transfer( - tmp_path / "Position_3_002_[-30.0]_date_time_fractions.tiff", - required_position_files=[], - required_strings=["fractions"], - environment=env, - ) - assert len(context._tilt_series.values()) == 3 - assert not context._completed_tilt_series - - # Add Position_1.mdoc, which completes this position - with open(tmp_path / "Position_1.mdoc", "w") as mdoc: - mdoc.write("[ZValue = 0]\n[ZValue = 1]\n") - context.post_transfer( - tmp_path / "Position_1.mdoc", - required_position_files=[], - required_strings=["fractions"], - environment=env, - ) - assert context._completed_tilt_series == ["Position_1"] - - # Add Position_2.mdoc, which completes this position - with open(tmp_path / "Position_2.mdoc", "w") as mdoc: - mdoc.write("[ZValue = 0]\n[ZValue = 1]\n") - context.post_transfer( - tmp_path / "Position_2.mdoc", - required_position_files=[], - required_strings=["fractions"], - environment=env, - ) - assert context._completed_tilt_series == ["Position_1", "Position_2"] + metadata_source = tmp_path / "cm12345-6/metadata_folder" + metadata_source.mkdir(parents=True) + with open(metadata_source / "EpuSession.dm", "w") as dm_file: + dm_file.write( + "<_items>" + r"X:\cm12345-6\atlas\atlas_metadata\Sample6\Atlas\Atlas.dm" + "" + ) -@patch("requests.get") -@patch("requests.post") -def test_tomography_context_add_tomo_tilt_delayed_tilt(mock_post, mock_get, tmp_path): - mock_post().status_code = 200 + # Make data location + (tmp_path / "metadata_folder/Images-Disc1").mkdir(parents=True) - env = MurfeyInstanceEnvironment( - url=urlparse("http://localhost:8000"), - client_id=0, - sources=[tmp_path], - default_destinations={tmp_path: str(tmp_path)}, - instrument_name="", - visit="test", - murfey_session=1, - ) - context = TomographyContext("tomo", tmp_path, "") - (tmp_path / "Position_1_001_[30.0]_date_time_fractions.tiff").touch() - context.post_transfer( - tmp_path / "Position_1_001_[30.0]_date_time_fractions.tiff", - required_position_files=[], - required_strings=["fractions"], - environment=env, - ) - assert context._tilt_series == { - "Position_1": [tmp_path / "Position_1_001_[30.0]_date_time_fractions.tiff"] - } - (tmp_path / "Position_1_002_[-30.0]_date_time_fractions.tiff").touch() - context.post_transfer( - tmp_path / "Position_1_002_[-30.0]_date_time_fractions.tiff", - required_position_files=[], - required_strings=["fractions"], - environment=env, - ) - assert not context._completed_tilt_series + atlas_xml = tmp_path / "cm12345-6/atlas/atlas_metadata/Sample6/Atlas/Atlas_4.xml" + atlas_xml.parent.mkdir(parents=True) + with open(atlas_xml, "w") as xml_file: + xml_file.write( + "4.7" + "" + ) - # Add Position_1.mdoc, with more tilts than have been seen so far - with open(tmp_path / "Position_1.mdoc", "w") as mdoc: - mdoc.write("[ZValue = 0]\n[ZValue = 1]\n[ZValue = 2]\n") - context.post_transfer( - tmp_path / "Position_1.mdoc", - required_position_files=[], - required_strings=["fractions"], + ensure_dcg_exists( + collection_type="spa", + metadata_source=metadata_source, environment=env, + token="token", ) - assert not context._completed_tilt_series - # Now add the tilt which completes the series - (tmp_path / "Position_1_003_[60.0]_data_time_fractions.tiff").touch() - new_series = context.post_transfer( - tmp_path / "Position_1_003_[60.0]_data_time_fractions.tiff", - required_position_files=[], - required_strings=["fractions"], - environment=env, + dcg_data = { + "experiment_type_id": 37, + "tag": f"{tmp_path}/metadata_folder/Images-Disc1", + "atlas": f"{tmp_path}/destination/{atlas_xml.relative_to(tmp_path).with_suffix('.jpg')}", + "sample": 6, + "atlas_pixel_size": 4.7 * 7.8, + } + mock_capture_post.assert_called_once_with( + base_url="http://localhost:8000", + router_name="workflow.router", + function_name="register_dc_group", + token="token", + visit_name="cm12345-6", + session_id=1, + data=dcg_data, ) - assert context._completed_tilt_series == ["Position_1"] - assert new_series == ["Position_1"] - - -def test_tomography_context_initialisation_for_serialem(tmp_path): - context = TomographyContext("serialem", tmp_path, "") - assert not context._completed_tilt_series - assert context._acquisition_software == "serialem" -@patch("requests.get") -@patch("requests.post") -def test_setting_tilt_series_size_and_completion_from_mdoc_parsing( - mock_post, mock_get, tmp_path -): - mock_post().status_code = 200 - +@patch("murfey.client.context.capture_post") +def test_ensure_dcg_exists_spa_missing_xml(mock_capture_post, tmp_path): env = MurfeyInstanceEnvironment( url=urlparse("http://localhost:8000"), client_id=0, sources=[tmp_path], default_destinations={tmp_path: str(tmp_path)}, instrument_name="", - visit="test", + visit="cm12345-6", murfey_session=1, ) - context = TomographyContext("tomo", tmp_path, "") - assert len(context._tilt_series_sizes) == 0 - context.post_transfer( - Path(__file__).parent.parent / "util" / "test_1.mdoc", - environment=env, - required_strings=["fractions"], - ) - assert len(context._tilt_series_sizes) == 1 - assert context._tilt_series_sizes == {"test_1": 11} - (tmp_path / "test_1.mdoc").touch() - tilt = -50 - (tmp_path / f"test_1_001_[{tilt:.1f}]_data_time_fractions.tiff").touch() - context.post_transfer( - tmp_path / f"test_1_001_[{tilt:.1f}]_data_time_fractions.tiff", + + metadata_source = tmp_path / "cm12345-6/metadata_folder" + ensure_dcg_exists( + collection_type="spa", + metadata_source=metadata_source, environment=env, - required_strings=["fractions"], + token="token", ) - assert context._tilt_series == { - "test_1": [tmp_path / f"test_1_001_[{tilt:.1f}]_data_time_fractions.tiff"] + + dcg_data = { + "experiment_type_id": 37, + "tag": f"{tmp_path}/metadata_folder", } - for i, t in enumerate(range(-40, 60, 10)): - assert not context._completed_tilt_series - (tmp_path / f"test_1_{i:03}_[{t:.1f}]_data_time_fractions.tiff").touch() - context.post_transfer( - tmp_path / f"test_1_{i:03}_[{t:.1f}]_data_time_fractions.tiff", - environment=env, - required_strings=["fractions"], - ) - assert len(context._tilt_series["test_1"]) == 11 - assert context._completed_tilt_series == ["test_1"] + mock_capture_post.assert_called_once_with( + base_url="http://localhost:8000", + router_name="workflow.router", + function_name="register_dc_group", + token="token", + visit_name="cm12345-6", + session_id=1, + data=dcg_data, + ) diff --git a/tests/server/api/test_workflow.py b/tests/server/api/test_workflow.py new file mode 100644 index 00000000..01ebffe6 --- /dev/null +++ b/tests/server/api/test_workflow.py @@ -0,0 +1,324 @@ +from unittest import mock + +from sqlmodel import Session, select + +from murfey.server.api.workflow import DCGroupParameters, register_dc_group +from murfey.util.db import DataCollectionGroup, SearchMap +from tests.conftest import ExampleVisit + + +@mock.patch("murfey.server.api.workflow._transport_object") +def test_register_dc_group_new_dcg(mock_transport, murfey_db_session: Session): + """Test the request for a completely new data collection group""" + mock_transport.feedback_queue = "mock_feedback_queue" + + # Request new dcg registration + dcg_params = DCGroupParameters( + experiment_type_id=44, + tag="atlas_tag", + atlas="/path/to/Atlas_1.jpg", + sample=10, + atlas_pixel_size=1e-5, + ) + register_dc_group( + visit_name="cm12345-6", + session_id=ExampleVisit.murfey_session_id, + dcg_params=dcg_params, + db=murfey_db_session, + ) + + # Check request for registering dcg in ispyb and murfey + mock_transport.send.assert_called_once_with( + "mock_feedback_queue", + { + "register": "data_collection_group", + "start_time": mock.ANY, + "experiment_type_id": 44, + "tag": "atlas_tag", + "session_id": ExampleVisit.murfey_session_id, + "atlas": "/path/to/Atlas_1.jpg", + "sample": 10, + "atlas_pixel_size": 1e-5, + "microscope": "", + "proposal_code": ExampleVisit.proposal_code, + "proposal_number": str(ExampleVisit.proposal_number), + "visit_number": str(ExampleVisit.visit_number), + }, + ) + + +@mock.patch("murfey.server.api.workflow._transport_object") +def test_register_dc_group_atlas_to_processing( + mock_transport, murfey_db_session: Session +): + """ + Test the request to update an existing data collection group + from atlas type 44 to a processing type with a different tag + """ + mock_transport.feedback_queue = "mock_feedback_queue" + + # Make sure dcg is present + dcg = DataCollectionGroup( + id=1, + session_id=ExampleVisit.murfey_session_id, + tag="atlas_tag", + atlas_id=90, + atlas_pixel_size=1e-5, + sample=10, + atlas="/path/to/Atlas_1.jpg", + ) + murfey_db_session.add(dcg) + murfey_db_session.commit() + + # Request new dcg registration with processing experiment type and tag + dcg_params = DCGroupParameters( + experiment_type_id=36, + tag="processing_tag", + atlas="/path/to/Atlas_1.jpg", + sample=10, + atlas_pixel_size=1e-5, + ) + register_dc_group( + visit_name="cm12345-6", + session_id=ExampleVisit.murfey_session_id, + dcg_params=dcg_params, + db=murfey_db_session, + ) + + # Check request to ispyb for updating the experiment type + mock_transport.send.assert_called_once_with( + "mock_feedback_queue", + { + "register": "experiment_type_update", + "experiment_type_id": 36, + "dcgid": 1, + }, + ) + + # Check that the tag of the data collection group was updated + new_dcg = murfey_db_session.exec( + select(DataCollectionGroup).where(DataCollectionGroup.id == 1) + ).one() + assert new_dcg.tag == "processing_tag" + + +@mock.patch("murfey.server.api.workflow._transport_object") +def test_register_dc_group_processing_to_atlas( + mock_transport, murfey_db_session: Session +): + """ + Test the request to update an existing data collection group + of processing type with a new atlas type 44, which should leave the tag unchanged + """ + mock_transport.feedback_queue = "mock_feedback_queue" + + # Make sure dcg is present + dcg = DataCollectionGroup( + id=1, + session_id=ExampleVisit.murfey_session_id, + tag="processing_tag", + atlas_id=90, + atlas_pixel_size=1e-5, + sample=10, + atlas="/path/to/Atlas_1.jpg", + ) + murfey_db_session.add(dcg) + murfey_db_session.commit() + + # Request new dcg registration with atlas experiment type and tag + dcg_params = DCGroupParameters( + experiment_type_id=44, + tag="atlas_tag", + atlas="/path/to/Atlas_2.jpg", + sample=10, + atlas_pixel_size=1e-4, + ) + register_dc_group( + visit_name="cm12345-6", + session_id=ExampleVisit.murfey_session_id, + dcg_params=dcg_params, + db=murfey_db_session, + ) + + # Check request to ispyb for updating the experiment type + mock_transport.send.assert_called_once_with( + "mock_feedback_queue", + { + "register": "atlas_update", + "atlas_id": 90, + "atlas": "/path/to/Atlas_2.jpg", + "sample": 10, + "atlas_pixel_size": 1e-4, + "dcgid": 1, + "session_id": ExampleVisit.murfey_session_id, + }, + ) + + # Check the data collection group atlas was updated + new_dcg = murfey_db_session.exec( + select(DataCollectionGroup).where(DataCollectionGroup.id == 1) + ).one() + assert new_dcg.atlas == "/path/to/Atlas_2.jpg" + assert new_dcg.atlas_pixel_size == 1e-4 + # Check the tag of the data collection group was not updated + assert new_dcg.tag != "atlas_tag" + + +@mock.patch("murfey.server.api.workflow._transport_object") +def test_register_dc_group_new_atlas(mock_transport, murfey_db_session: Session): + """ + Test the request to update an existing data collection group + by adding an atlas, using the same tag + """ + mock_transport.feedback_queue = "mock_feedback_queue" + mock_transport.do_insert_atlas.return_value = {"return_value": 5} + + # Make sure dcg is present without an atlas id + dcg = DataCollectionGroup( + id=1, + session_id=ExampleVisit.murfey_session_id, + tag="processing_tag", + ) + murfey_db_session.add(dcg) + murfey_db_session.commit() + + # Request new dcg registration with atlas and exisiting tag + dcg_params = DCGroupParameters( + experiment_type_id=36, + tag="processing_tag", + atlas="/path/to/Atlas_2.jpg", + sample=10, + atlas_pixel_size=1e-4, + ) + register_dc_group( + visit_name="cm12345-6", + session_id=ExampleVisit.murfey_session_id, + dcg_params=dcg_params, + db=murfey_db_session, + ) + + # Check no sends are made by the transport object + mock_transport.send.assert_not_called() + + # Check the call to insert the atlas into ispyb + atlas_args = mock_transport.do_insert_atlas.call_args_list + assert len(atlas_args) == 1 + assert atlas_args[0][0][0].dataCollectionGroupId == 1 + assert atlas_args[0][0][0].atlasImage == "/path/to/Atlas_2.jpg" + assert atlas_args[0][0][0].pixelSize == 1e-4 + assert atlas_args[0][0][0].cassetteSlot == 10 + + # Check the data collection group atlas was updated + new_dcg = murfey_db_session.exec( + select(DataCollectionGroup).where(DataCollectionGroup.id == 1) + ).one() + assert new_dcg.atlas == "/path/to/Atlas_2.jpg" + assert new_dcg.sample == 10 + assert new_dcg.atlas_pixel_size == 1e-4 + assert new_dcg.tag == "processing_tag" + assert new_dcg.atlas_id == 5 + + +@mock.patch("murfey.server.api.workflow._transport_object") +@mock.patch("murfey.server.api.workflow.register_search_map_in_database") +def test_register_dc_group_new_atlas_with_searchmaps( + mock_register_search_map, mock_transport, murfey_db_session: Session +): + """ + Test the request to update an existing data collection group + by adding an atlas, using the same tag, and also update search maps + """ + mock_transport.feedback_queue = "mock_feedback_queue" + + # Make sure dcg is present with an atlas id + dcg = DataCollectionGroup( + id=1, + session_id=ExampleVisit.murfey_session_id, + tag="processing_tag", + atlas_id=90, + atlas_pixel_size=1e-5, + sample=10, + atlas="/path/to/Atlas_1.jpg", + ) + murfey_db_session.add(dcg) + murfey_db_session.commit() + + # Add some search maps with the dcg tag and one with a different tag + sm1 = SearchMap( + id=1, + session_id=ExampleVisit.murfey_session_id, + tag="processing_tag", + name="searchmap1", + ) + sm2 = SearchMap( + id=2, + session_id=ExampleVisit.murfey_session_id, + tag="processing_tag", + name="searchmap2", + ) + sm3 = SearchMap( + id=3, + session_id=ExampleVisit.murfey_session_id, + tag="different_tag", + name="searchmap3", + ) + murfey_db_session.add(sm1) + murfey_db_session.add(sm2) + murfey_db_session.add(sm3) + murfey_db_session.commit() + + # Request new dcg registration with new atlas tag and sample + dcg_params = DCGroupParameters( + experiment_type_id=37, + tag="processing_tag", + atlas="/path/to/Atlas_2.jpg", + sample=12, + atlas_pixel_size=1e-4, + ) + register_dc_group( + visit_name="cm12345-6", + session_id=ExampleVisit.murfey_session_id, + dcg_params=dcg_params, + db=murfey_db_session, + ) + + # Check request to ispyb for updating the experiment type + mock_transport.send.assert_called_once_with( + "mock_feedback_queue", + { + "register": "atlas_update", + "atlas_id": 90, + "atlas": "/path/to/Atlas_2.jpg", + "sample": 12, + "atlas_pixel_size": 1e-4, + "dcgid": 1, + "session_id": ExampleVisit.murfey_session_id, + }, + ) + + # Check the data collection group atlas was updated + new_dcg = murfey_db_session.exec( + select(DataCollectionGroup).where(DataCollectionGroup.id == dcg.id) + ).one() + assert new_dcg.atlas == "/path/to/Atlas_2.jpg" + assert new_dcg.sample == 12 + assert new_dcg.atlas_pixel_size == 1e-4 + assert new_dcg.tag == "processing_tag" + assert new_dcg.atlas_id == 90 + + # Check the search map update calls + assert mock_register_search_map.call_count == 2 + mock_register_search_map.assert_any_call( + ExampleVisit.murfey_session_id, + "searchmap1", + mock.ANY, + murfey_db_session, + close_db=False, + ) + mock_register_search_map.assert_any_call( + ExampleVisit.murfey_session_id, + "searchmap2", + mock.ANY, + murfey_db_session, + close_db=False, + ) diff --git a/tests/server/test_ispyb.py b/tests/server/test_ispyb.py index d60ada7b..58f2f2bb 100644 --- a/tests/server/test_ispyb.py +++ b/tests/server/test_ispyb.py @@ -1,10 +1,12 @@ -from ispyb.sqlalchemy import BLSession, Proposal +from unittest import mock + +from ispyb.sqlalchemy import BLSession, DataCollectionGroup, Proposal from pytest import mark from sqlalchemy import select from sqlalchemy.orm import Session -from murfey.server.ispyb import get_proposal_id, get_session_id -from tests.conftest import ExampleVisit +from murfey.server.ispyb import TransportManager, get_proposal_id, get_session_id +from tests.conftest import ExampleVisit, get_or_create_db_entry def test_get_session_id( @@ -67,3 +69,44 @@ def test_get_sub_samples_from_visit(): @mark.skip def test_get_all_ongoing_visits(): pass + + +@mock.patch("workflows.transport.pika_transport.PikaTransport") +def test_update_data_collection_group(mock_transport, ispyb_db_session: Session): + # Manually get the BLSession ID for comparison + bl_session_id = ( + ispyb_db_session.execute( + select(BLSession) + .join(Proposal) + .where(BLSession.proposalId == Proposal.proposalId) + .where(BLSession.beamLineName == ExampleVisit.instrument_name) + .where(Proposal.proposalCode == ExampleVisit.proposal_code) + .where(Proposal.proposalNumber == str(ExampleVisit.proposal_number)) + .where(BLSession.visit_number == ExampleVisit.visit_number) + ) + .scalar_one() + .sessionId + ) + # Insert data collection group + get_or_create_db_entry( + session=ispyb_db_session, + table=DataCollectionGroup, + insert_kwargs={ + "dataCollectionGroupId": 1, + "sessionId": bl_session_id, + "experimentTypeId": 1, + }, + ) + + transport_manager = TransportManager("PikaTransport") + with mock.patch("murfey.server.ispyb.ISPyBSession", return_value=ispyb_db_session): + transport_manager.do_update_data_collection_group( + record=DataCollectionGroup(dataCollectionGroupId=1, experimentTypeId=2) + ) + + final_dcg_entry = get_or_create_db_entry( + session=ispyb_db_session, + table=DataCollectionGroup, + lookup_kwargs={"dataCollectionGroupId": 1}, + ) + assert final_dcg_entry.experimentTypeId == 2 diff --git a/tests/workflows/test_register_experiment_type_update.py b/tests/workflows/test_register_experiment_type_update.py new file mode 100644 index 00000000..3d159dd7 --- /dev/null +++ b/tests/workflows/test_register_experiment_type_update.py @@ -0,0 +1,46 @@ +from unittest.mock import MagicMock + +import pytest +from pytest_mock import MockerFixture + +from murfey.workflows.register_experiment_type_update import run + +register_experiment_type_update_matrix = (0, 1, None) + + +@pytest.mark.parametrize("insert_dcg", register_experiment_type_update_matrix) +def test_run( + mocker: MockerFixture, + insert_dcg: int | None, +): + # Mock the transport object functions + mock_transport_object = mocker.patch( + "murfey.workflows.register_experiment_type_update._transport_object" + ) + mock_transport_object.do_update_data_collection_group.return_value = { + "return_value": insert_dcg, + } + mock_ispyb = mocker.patch( + "murfey.workflows.register_experiment_type_update.ISPyBDB" + ) + mock_ispyb.DataCollectionGroup.return_value = "ispyb_dcg" + + # Mock the Murfey database + mock_murfey_db = MagicMock() + + # Run the function and check the results and calls + message = { + "dcgid": 1, + "experiment_type_id": 0, + } + result = run(message=message, murfey_db=mock_murfey_db) + mock_ispyb.DataCollectionGroup.assert_called_once_with( + dataCollectionGroupId=1, experimentTypeId=0 + ) + mock_transport_object.do_update_data_collection_group.assert_called_once_with( + "ispyb_dcg" + ) + if insert_dcg is not None: + assert result == {"success": True} + else: + assert result == {"success": False, "requeue": True} diff --git a/tests/workflows/tomo/test_tomo_metadata.py b/tests/workflows/tomo/test_tomo_metadata.py index c1c45822..61e94ff2 100644 --- a/tests/workflows/tomo/test_tomo_metadata.py +++ b/tests/workflows/tomo/test_tomo_metadata.py @@ -135,12 +135,16 @@ def test_register_search_map_update_with_all_parameters( assert sm_final_parameters.y_location is not None # Check this would have updated ispyb - mock_transport.do_update_search_map.assert_called_with(1, new_parameters) - new_parameters.x_location = sm_final_parameters.x_location - new_parameters.y_location = sm_final_parameters.y_location - new_parameters.height_on_atlas = 40 - new_parameters.width_on_atlas = 20 - mock_transport.do_update_search_map.assert_called_with(1, new_parameters) + update_args = mock_transport.do_update_search_map.call_args_list + assert len(update_args) == 2 + assert update_args[0][0][0] == 1 + assert update_args[1][0][0] == 1 + assert update_args[0][0][1].x_stage_position == 0.3 + assert update_args[0][0][1].y_stage_position == 0.4 + assert update_args[1][0][1].x_location == sm_final_parameters.x_location + assert update_args[1][0][1].y_location == sm_final_parameters.y_location + assert update_args[1][0][1].height_on_atlas == 311 + assert update_args[1][0][1].width_on_atlas == 155 @mock.patch("murfey.workflows.tomo.tomo_metadata._transport_object")